Skip to content

Commit da5fdce

Browse files
execution: consistently recover in goroutines (#703)
Signed-off-by: Michael Hoffmann <mhoffmann@cloudflare.com>
1 parent eae5060 commit da5fdce

5 files changed

Lines changed: 35 additions & 15 deletions

File tree

engine/engine_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5623,7 +5623,7 @@ func TestEngineRecoversFromPanic(t *testing.T) {
56235623
testutil.Ok(t, err)
56245624

56255625
r := q.Exec(ctx)
5626-
testutil.Assert(t, r.Err.Error() == "unexpected error: panic!")
5626+
testutil.Assert(t, r.Err.Error() == "unexpected panic: panic!")
56275627
})
56285628

56295629
t.Run("range", func(t *testing.T) {
@@ -5633,7 +5633,7 @@ func TestEngineRecoversFromPanic(t *testing.T) {
56335633
testutil.Ok(t, err)
56345634

56355635
r := q.Exec(ctx)
5636-
testutil.Assert(t, r.Err.Error() == "unexpected error: panic!")
5636+
testutil.Assert(t, r.Err.Error() == "unexpected panic: panic!")
56375637
})
56385638
}
56395639

execution/binary/scalar.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/thanos-io/promql-engine/query"
1515
"github.com/thanos-io/promql-engine/warnings"
1616

17+
"github.com/efficientgo/core/errors"
1718
"github.com/prometheus/prometheus/model/histogram"
1819
"github.com/prometheus/prometheus/model/labels"
1920
"github.com/prometheus/prometheus/promql/parser"
@@ -91,12 +92,17 @@ func (o *scalarOperator) Next(ctx context.Context, buf []model.StepVector) (int,
9192
var lhsN int
9293
var lerrChan = make(chan error, 1)
9394
go func() {
95+
defer func() {
96+
if r := recover(); r != nil {
97+
lerrChan <- errors.Newf("unexpected panic: %v", r)
98+
}
99+
close(lerrChan)
100+
}()
94101
var err error
95102
lhsN, err = o.lhs.Next(ctx, o.lhsBuf)
96103
if err != nil {
97104
lerrChan <- err
98105
}
99-
close(lerrChan)
100106
}()
101107

102108
rhsN, rerr := o.rhs.Next(ctx, o.rhsBuf)

execution/binary/vector.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,12 +106,17 @@ func (o *vectorOperator) Next(ctx context.Context, buf []model.StepVector) (int,
106106
var lhsN int
107107
var lerrChan = make(chan error, 1)
108108
go func() {
109+
defer func() {
110+
if r := recover(); r != nil {
111+
lerrChan <- errors.Newf("unexpected panic: %v", r)
112+
}
113+
close(lerrChan)
114+
}()
109115
var err error
110116
lhsN, err = o.lhs.Next(ctx, o.lhsBuf)
111117
if err != nil {
112118
lerrChan <- err
113119
}
114-
close(lerrChan)
115120
}()
116121

117122
rhsN, rerr := o.rhs.Next(ctx, o.rhsBuf)
@@ -153,12 +158,17 @@ func (o *vectorOperator) init(ctx context.Context) error {
153158
var highCardSide []labels.Labels
154159
var errChan = make(chan error, 1)
155160
go func() {
161+
defer func() {
162+
if r := recover(); r != nil {
163+
errChan <- errors.Newf("unexpected panic: %v", r)
164+
}
165+
close(errChan)
166+
}()
156167
var err error
157168
highCardSide, err = o.lhs.Series(ctx)
158169
if err != nil {
159170
errChan <- err
160171
}
161-
close(errChan)
162172
}()
163173

164174
lowCardSide, err := o.rhs.Series(ctx)

execution/exchange/coalesce.go

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,11 @@ func (c *coalesce) Next(ctx context.Context, buf []model.StepVector) (int, error
124124
c.wg.Add(1)
125125
go func(opIdx int, o model.VectorOperator) {
126126
defer c.wg.Done()
127+
defer func() {
128+
if r := recover(); r != nil {
129+
errChan <- errors.Newf("unexpected panic: %v", r)
130+
}
131+
}()
127132

128133
n, err := o.Next(ctx, c.tempBufs[opIdx])
129134
if err != nil {
@@ -221,16 +226,9 @@ func (c *coalesce) loadSeries(ctx context.Context) error {
221226
go func(i int) {
222227
defer wg.Done()
223228
defer func() {
224-
e := recover()
225-
if e == nil {
226-
return
229+
if r := recover(); r != nil {
230+
errChan <- errors.Newf("unexpected panic: %v", r)
227231
}
228-
229-
switch err := e.(type) {
230-
case error:
231-
errChan <- errors.Wrapf(err, "unexpected error")
232-
}
233-
234232
}()
235233
series, err := c.operators[i].Series(ctx)
236234
if err != nil {

execution/exchange/concurrent.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/thanos-io/promql-engine/execution/telemetry"
1313
"github.com/thanos-io/promql-engine/query"
1414

15+
"github.com/efficientgo/core/errors"
1516
"github.com/prometheus/prometheus/model/labels"
1617
)
1718

@@ -118,7 +119,12 @@ func (c *concurrencyOperator) Next(ctx context.Context, buf []model.StepVector)
118119
}
119120

120121
func (c *concurrencyOperator) pull(ctx context.Context) {
121-
defer close(c.buffer)
122+
defer func() {
123+
if r := recover(); r != nil {
124+
c.buffer <- maybeStepVector{err: errors.Newf("unexpected panic: %v", r)}
125+
}
126+
close(c.buffer)
127+
}()
122128

123129
for {
124130
select {

0 commit comments

Comments
 (0)