Skip to content

Commit 7d87b02

Browse files
committed
feat: Add handling of context cancellation
1 parent 362598b commit 7d87b02

2 files changed

Lines changed: 51 additions & 2 deletions

File tree

scheduler/resolvers/resolvers.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,7 @@ import (
99
"github.com/cloudquery/plugin-sdk/v4/caser"
1010
"github.com/cloudquery/plugin-sdk/v4/scheduler/metrics"
1111
"github.com/cloudquery/plugin-sdk/v4/schema"
12-
"github.com/getsentry/sentry-go"
1312
"github.com/rs/zerolog"
14-
"github.com/thoas/go-funk"
1513
)
1614

1715
func resolveColumn(ctx context.Context, logger zerolog.Logger, m *metrics.Metrics, selector metrics.Selector, client schema.ClientMeta, resource *schema.Resource, column schema.Column, c *caser.Caser) {
@@ -82,6 +80,11 @@ func ResolveResourcesChunk(ctx context.Context, logger zerolog.Logger, m *metric
8280
filtered := make([]*schema.Resource, 0, len(resources))
8381
for _, resource := range resources {
8482
if err := table.PreResourceResolver(ctx, client, resource); err != nil {
83+
if ctx.Err() != nil {
84+
tableLogger.Error().Err(err).Msg("pre resource resolver failed, context cancelled")
85+
m.AddErrors(ctx, 1, selector)
86+
return nil
87+
}
8588
tableLogger.Error().Err(err).Msg("pre resource resolver failed")
8689
m.AddErrors(ctx, 1, selector)
8790
continue

scheduler/resolvers/resolvers_test.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,3 +94,49 @@ func TestResolveResourcesChunk_PreResourceResolverPartialFailure(t *testing.T) {
9494
})
9595
}
9696
}
97+
98+
// TestResolveResourcesChunk_PreResourceResolverContextCancelled verifies that
99+
// once the context is cancelled, the chunk is dropped immediately with a single
100+
// error instead of emitting one error per remaining resource.
101+
func TestResolveResourcesChunk_PreResourceResolverContextCancelled(t *testing.T) {
102+
ctx, cancel := context.WithCancel(context.Background())
103+
defer cancel()
104+
105+
calls := 0
106+
table := &schema.Table{
107+
Name: "test_table",
108+
PreResourceResolver: func(_ context.Context, _ schema.ClientMeta, _ *schema.Resource) error {
109+
calls++
110+
if calls == 2 {
111+
cancel()
112+
return errors.New("pre resource resolver boom")
113+
}
114+
return nil
115+
},
116+
Columns: []schema.Column{
117+
{
118+
Name: "test_column",
119+
Type: arrow.PrimitiveTypes.Int64,
120+
Resolver: func(_ context.Context, _ schema.ClientMeta, resource *schema.Resource, c schema.Column) error {
121+
return resource.Set(c.Name, int64(resource.Item.(int)))
122+
},
123+
},
124+
},
125+
}
126+
127+
client := testClient{}
128+
m := metrics.NewMetrics()
129+
m.InitWithClients(table, []schema.ClientMeta{client})
130+
131+
chunk := []any{0, 1, 2, 3, 4}
132+
logger := zerolog.New(zerolog.NewTestWriter(t))
133+
134+
resources := ResolveResourcesChunk(ctx, logger, m, table, client, nil, chunk, caser.New())
135+
136+
require.Empty(t, resources, "cancelled chunk should not return resources")
137+
require.Equal(t, 2, calls, "resolver should not be called for resources after cancellation")
138+
139+
selector := m.NewSelector(client.ID(), table.Name)
140+
require.Equal(t, uint64(1), m.GetErrors(selector), "cancellation should be counted as a single error, not one per remaining resource")
141+
require.Equal(t, uint64(0), m.GetResources(selector), "no resources should be counted for a cancelled chunk")
142+
}

0 commit comments

Comments
 (0)