Skip to content

Commit cba923c

Browse files
authored
fix: Don't exit early if a single PreResource Resolver fails (#2517)
<!-- Explain what problem this PR addresses --> ---
1 parent 19dea2a commit cba923c

6 files changed

Lines changed: 282 additions & 7 deletions

File tree

scheduler/queue/active_work_signal.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,10 @@ import (
1616
//
1717
// Use it like this:
1818
//
19-
// - When a worker picks up a task, call `Add()` (like a WaitGroup)
20-
// - When a worker finishes a task, call `Done()` (like a WaitGroup)
19+
// - When the dispatcher takes a task off the queue to hand it to a worker, call `Add()` (like a WaitGroup).
20+
// Marking the task active before the handoff ensures the idle check cannot fire while
21+
// the task is in flight between the queue and a worker.
22+
// - When a worker finishes a task, call `Done()` (like a WaitGroup)
2123
//
2224
// - If the queue is empty, check `IsIdle()` to check if no workers are active.
2325
// - If workers are still active, call `Wait()` to block until state changes.
@@ -35,7 +37,7 @@ func newActiveWorkSignal() *activeWorkSignal {
3537
}
3638
}
3739

38-
// Add means a worker has started working on a task.
40+
// Add means the dispatcher has taken a task off the queue for a worker.
3941
//
4042
// Wake up the work queuing goroutine.
4143
func (s *activeWorkSignal) Add() {

scheduler/queue/scheduler.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,8 +119,10 @@ func (d *Scheduler) Sync(ctx context.Context, tableClients []WorkUnit, resolvedR
119119
default:
120120
item := queue.Pop()
121121

122-
// There is work to do
122+
// There is work to do. Mark it active before handing it off so the
123+
// idle check below cannot fire while the item is in flight to a worker.
123124
if item != nil {
125+
activeWorkSignal.Add()
124126
jobs <- item
125127
continue
126128
}

scheduler/queue/worker.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,9 @@ type worker struct {
3737

3838
func (w *worker) work(ctx context.Context, activeWorkSignal *activeWorkSignal) {
3939
for j := range w.jobs {
40-
activeWorkSignal.Add()
41-
40+
// the work unit was already marked active by the dispatcher before it was
41+
// handed off, so the dispatcher can never observe an idle state while a
42+
// job is in flight between the queue and a worker
4243
w.resolveTable(ctx, j.Table, j.Client, j.Parent)
4344

4445
activeWorkSignal.Done()

scheduler/resolvers/resolvers.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,13 +79,21 @@ func ResolveResourcesChunk(ctx context.Context, logger zerolog.Logger, m *metric
7979
}
8080

8181
if table.PreResourceResolver != nil {
82+
filtered := resources[:0]
8283
for _, resource := range resources {
8384
if err := table.PreResourceResolver(ctx, client, resource); err != nil {
85+
if ctx.Err() != nil {
86+
tableLogger.Error().Err(err).Msg("pre resource resolver failed, context cancelled")
87+
m.AddErrors(ctx, 1, selector)
88+
return nil
89+
}
8490
tableLogger.Error().Err(err).Msg("pre resource resolver failed")
8591
m.AddErrors(ctx, 1, selector)
86-
return nil
92+
continue
8793
}
94+
filtered = append(filtered, resource)
8895
}
96+
resources = filtered
8997
}
9098
for _, resource := range resources {
9199
for _, column := range table.Columns {
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
package resolvers
2+
3+
import (
4+
"context"
5+
"errors"
6+
"testing"
7+
8+
"github.com/apache/arrow-go/v18/arrow"
9+
"github.com/cloudquery/plugin-sdk/v4/caser"
10+
"github.com/cloudquery/plugin-sdk/v4/scheduler/metrics"
11+
"github.com/cloudquery/plugin-sdk/v4/schema"
12+
"github.com/rs/zerolog"
13+
"github.com/stretchr/testify/require"
14+
)
15+
16+
type testClient struct{}
17+
18+
func (testClient) ID() string { return "test" }
19+
20+
var _ schema.ClientMeta = testClient{}
21+
22+
// TestResolveResourcesChunk_PreResourceResolverPartialFailure verifies that a
23+
// PreResourceResolver error on a single resource only drops that resource from
24+
// the batch, while the remaining resources are still resolved and returned.
25+
func TestResolveResourcesChunk_PreResourceResolverPartialFailure(t *testing.T) {
26+
for _, tc := range []struct {
27+
name string
28+
failItems map[int]bool
29+
expectedItems []int
30+
}{
31+
{
32+
name: "no failures keeps all resources",
33+
failItems: nil,
34+
expectedItems: []int{0, 1, 2, 3, 4},
35+
},
36+
{
37+
name: "single failure drops only that resource",
38+
failItems: map[int]bool{2: true},
39+
expectedItems: []int{0, 1, 3, 4},
40+
},
41+
{
42+
name: "multiple failures drop only the failing resources",
43+
failItems: map[int]bool{0: true, 3: true},
44+
expectedItems: []int{1, 2, 4},
45+
},
46+
{
47+
name: "all failures drop the whole batch but do not panic",
48+
failItems: map[int]bool{0: true, 1: true, 2: true, 3: true, 4: true},
49+
expectedItems: []int{},
50+
},
51+
} {
52+
t.Run(tc.name, func(t *testing.T) {
53+
table := &schema.Table{
54+
Name: "test_table",
55+
PreResourceResolver: func(_ context.Context, _ schema.ClientMeta, resource *schema.Resource) error {
56+
if tc.failItems[resource.Item.(int)] {
57+
return errors.New("pre resource resolver boom")
58+
}
59+
return nil
60+
},
61+
Columns: []schema.Column{
62+
{
63+
Name: "test_column",
64+
Type: arrow.PrimitiveTypes.Int64,
65+
Resolver: func(_ context.Context, _ schema.ClientMeta, resource *schema.Resource, c schema.Column) error {
66+
return resource.Set(c.Name, int64(resource.Item.(int)))
67+
},
68+
},
69+
},
70+
}
71+
72+
client := testClient{}
73+
m := metrics.NewMetrics()
74+
m.InitWithClients(table, []schema.ClientMeta{client})
75+
76+
chunk := []any{0, 1, 2, 3, 4}
77+
logger := zerolog.New(zerolog.NewTestWriter(t))
78+
79+
resources := ResolveResourcesChunk(context.Background(), logger, m, table, client, nil, chunk, caser.New())
80+
81+
gotItems := make([]int, len(resources))
82+
for i, r := range resources {
83+
gotItems[i] = r.Item.(int)
84+
// surviving resources should have been fully resolved through the column resolvers
85+
col := r.Get("test_column")
86+
require.True(t, col.IsValid(), "surviving resource should have its column resolved")
87+
require.Equal(t, int64(r.Item.(int)), col.Get(), "resolved column value should match the item")
88+
}
89+
require.ElementsMatch(t, tc.expectedItems, gotItems)
90+
91+
selector := m.NewSelector(client.ID(), table.Name)
92+
require.Equal(t, uint64(len(tc.failItems)), m.GetErrors(selector), "expected one error per failing resource")
93+
require.Equal(t, uint64(len(tc.expectedItems)), m.GetResources(selector), "only surviving resources should be counted")
94+
})
95+
}
96+
}
97+
98+
// TestResolveResourcesChunk_PreResourceResolverContextCancelled verifies that
99+
// once the context is cancelled, the chunk is dropped immediately with a single
100+
// error instead of emitting one error per remaining resource.
101+
func TestResolveResourcesChunk_PreResourceResolverContextCancelled(t *testing.T) {
102+
ctx, cancel := context.WithCancel(context.Background())
103+
defer cancel()
104+
105+
calls := 0
106+
table := &schema.Table{
107+
Name: "test_table",
108+
PreResourceResolver: func(_ context.Context, _ schema.ClientMeta, _ *schema.Resource) error {
109+
calls++
110+
if calls == 2 {
111+
cancel()
112+
return errors.New("pre resource resolver boom")
113+
}
114+
return nil
115+
},
116+
Columns: []schema.Column{
117+
{
118+
Name: "test_column",
119+
Type: arrow.PrimitiveTypes.Int64,
120+
Resolver: func(_ context.Context, _ schema.ClientMeta, resource *schema.Resource, c schema.Column) error {
121+
return resource.Set(c.Name, int64(resource.Item.(int)))
122+
},
123+
},
124+
},
125+
}
126+
127+
client := testClient{}
128+
m := metrics.NewMetrics()
129+
m.InitWithClients(table, []schema.ClientMeta{client})
130+
131+
chunk := []any{0, 1, 2, 3, 4}
132+
logger := zerolog.New(zerolog.NewTestWriter(t))
133+
134+
resources := ResolveResourcesChunk(ctx, logger, m, table, client, nil, chunk, caser.New())
135+
136+
require.Empty(t, resources, "cancelled chunk should not return resources")
137+
require.Equal(t, 2, calls, "resolver should not be called for resources after cancellation")
138+
139+
selector := m.NewSelector(client.ID(), table.Name)
140+
require.Equal(t, uint64(1), m.GetErrors(selector), "cancellation should be counted as a single error, not one per remaining resource")
141+
require.Equal(t, uint64(0), m.GetResources(selector), "no resources should be counted for a cancelled chunk")
142+
}
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
package scheduler
2+
3+
import (
4+
"context"
5+
"errors"
6+
"testing"
7+
8+
"github.com/apache/arrow-go/v18/arrow"
9+
"github.com/apache/arrow-go/v18/arrow/array"
10+
"github.com/cloudquery/plugin-sdk/v4/message"
11+
"github.com/cloudquery/plugin-sdk/v4/schema"
12+
"github.com/rs/zerolog"
13+
"github.com/stretchr/testify/require"
14+
)
15+
16+
// TestSchedulerPreResourceResolverPartialFailureWithRelations verifies that a
17+
// PreResourceResolver failure only drops the failing resource and its own
18+
// subtree, at every level of a parent -> child1 -> child2 hierarchy:
19+
// - parent "p2" fails: p2 and all its descendants are dropped, the other
20+
// parents and their descendants survive
21+
// - child1 "p1/c0" fails: only that row and its child2 descendants are
22+
// dropped, sibling "p1/c1" and its descendants survive
23+
func TestSchedulerPreResourceResolverPartialFailureWithRelations(t *testing.T) {
24+
for _, strategy := range AllStrategies {
25+
t.Run(strategy.String(), func(t *testing.T) {
26+
nameColumn := schema.Column{
27+
Name: "name",
28+
Type: arrow.BinaryTypes.String,
29+
Resolver: func(_ context.Context, _ schema.ClientMeta, resource *schema.Resource, c schema.Column) error {
30+
return resource.Set(c.Name, resource.Item.(string))
31+
},
32+
}
33+
34+
child2 := &schema.Table{
35+
Name: "test_child2",
36+
Resolver: func(_ context.Context, _ schema.ClientMeta, parent *schema.Resource, res chan<- any) error {
37+
res <- []string{parent.Item.(string) + "/g0"}
38+
return nil
39+
},
40+
Columns: []schema.Column{nameColumn},
41+
}
42+
43+
child1 := &schema.Table{
44+
Name: "test_child1",
45+
Resolver: func(_ context.Context, _ schema.ClientMeta, parent *schema.Resource, res chan<- any) error {
46+
p := parent.Item.(string)
47+
res <- []string{p + "/c0", p + "/c1"}
48+
return nil
49+
},
50+
PreResourceResolver: func(_ context.Context, _ schema.ClientMeta, resource *schema.Resource) error {
51+
if resource.Item.(string) == "p1/c0" {
52+
return errors.New("child1 pre resource resolver boom")
53+
}
54+
return nil
55+
},
56+
Columns: []schema.Column{nameColumn},
57+
Relations: schema.Tables{child2},
58+
}
59+
60+
parentTable := &schema.Table{
61+
Name: "test_parent",
62+
Resolver: func(_ context.Context, _ schema.ClientMeta, _ *schema.Resource, res chan<- any) error {
63+
// a single slice so all parents land in one chunk, like one API page
64+
res <- []string{"p0", "p1", "p2", "p3", "p4"}
65+
return nil
66+
},
67+
PreResourceResolver: func(_ context.Context, _ schema.ClientMeta, resource *schema.Resource) error {
68+
if resource.Item.(string) == "p2" {
69+
return errors.New("parent pre resource resolver boom")
70+
}
71+
return nil
72+
},
73+
Columns: []schema.Column{nameColumn},
74+
Relations: schema.Tables{child1},
75+
}
76+
77+
tables := schema.Tables{parentTable}
78+
c := testExecutionClient{}
79+
sc := NewScheduler(
80+
WithLogger(zerolog.New(zerolog.NewTestWriter(t)).Level(zerolog.DebugLevel)),
81+
WithStrategy(strategy),
82+
)
83+
msgs := make(chan message.SyncMessage, 500)
84+
require.NoError(t, sc.Sync(context.Background(), &c, tables, msgs))
85+
close(msgs)
86+
87+
var messages message.SyncMessages
88+
for msg := range msgs {
89+
messages = append(messages, msg)
90+
}
91+
92+
collect := func(tb *schema.Table) []string {
93+
values := []string{}
94+
for _, rec := range messages.GetInserts().GetRecordsForTable(tb) {
95+
idx := rec.Schema().FieldIndices("name")[0]
96+
col := rec.Column(idx).(*array.String)
97+
for i := 0; i < col.Len(); i++ {
98+
values = append(values, col.Value(i))
99+
}
100+
}
101+
return values
102+
}
103+
104+
require.ElementsMatch(t,
105+
[]string{"p0", "p1", "p3", "p4"},
106+
collect(parentTable),
107+
"only the failing parent should be dropped")
108+
109+
require.ElementsMatch(t,
110+
[]string{"p0/c0", "p0/c1", "p1/c1", "p3/c0", "p3/c1", "p4/c0", "p4/c1"},
111+
collect(child1),
112+
"children of surviving parents should sync, except the failing child; no children of the dropped parent")
113+
114+
require.ElementsMatch(t,
115+
[]string{"p0/c0/g0", "p0/c1/g0", "p1/c1/g0", "p3/c0/g0", "p3/c1/g0", "p4/c0/g0", "p4/c1/g0"},
116+
collect(child2),
117+
"grandchildren should only sync under surviving child1 rows")
118+
})
119+
}
120+
}

0 commit comments

Comments
 (0)