Skip to content

Commit f031291

Browse files
committed
fix: Restore build, fix shuffle-queue deep relation drop, add relation cascade tests
1 parent 7d87b02 commit f031291

4 files changed

Lines changed: 128 additions & 3 deletions

File tree

scheduler/queue/scheduler.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,8 +119,10 @@ func (d *Scheduler) Sync(ctx context.Context, tableClients []WorkUnit, resolvedR
119119
default:
120120
item := queue.Pop()
121121

122-
// There is work to do
122+
// There is work to do. Mark it active before handing it off so the
123+
// idle check below cannot fire while the item is in flight to a worker.
123124
if item != nil {
125+
activeWorkSignal.Add()
124126
jobs <- item
125127
continue
126128
}

scheduler/queue/worker.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,9 @@ type worker struct {
3737

3838
func (w *worker) work(ctx context.Context, activeWorkSignal *activeWorkSignal) {
3939
for j := range w.jobs {
40-
activeWorkSignal.Add()
41-
40+
// the work unit was already marked active by the dispatcher before it was
41+
// handed off, so the dispatcher can never observe an idle state while a
42+
// job is in flight between the queue and a worker
4243
w.resolveTable(ctx, j.Table, j.Client, j.Parent)
4344

4445
activeWorkSignal.Done()

scheduler/resolvers/resolvers.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@ import (
99
"github.com/cloudquery/plugin-sdk/v4/caser"
1010
"github.com/cloudquery/plugin-sdk/v4/scheduler/metrics"
1111
"github.com/cloudquery/plugin-sdk/v4/schema"
12+
"github.com/getsentry/sentry-go"
1213
"github.com/rs/zerolog"
14+
"github.com/thoas/go-funk"
1315
)
1416

1517
func resolveColumn(ctx context.Context, logger zerolog.Logger, m *metrics.Metrics, selector metrics.Selector, client schema.ClientMeta, resource *schema.Resource, column schema.Column, c *caser.Caser) {
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
package scheduler
2+
3+
import (
4+
"context"
5+
"errors"
6+
"testing"
7+
8+
"github.com/apache/arrow-go/v18/arrow"
9+
"github.com/apache/arrow-go/v18/arrow/array"
10+
"github.com/cloudquery/plugin-sdk/v4/message"
11+
"github.com/cloudquery/plugin-sdk/v4/schema"
12+
"github.com/rs/zerolog"
13+
"github.com/stretchr/testify/require"
14+
)
15+
16+
// TestSchedulerPreResourceResolverPartialFailureWithRelations verifies that a
17+
// PreResourceResolver failure only drops the failing resource and its own
18+
// subtree, at every level of a parent -> child1 -> child2 hierarchy:
19+
// - parent "p2" fails: p2 and all its descendants are dropped, the other
20+
// parents and their descendants survive
21+
// - child1 "p1/c0" fails: only that row and its child2 descendants are
22+
// dropped, sibling "p1/c1" and its descendants survive
23+
func TestSchedulerPreResourceResolverPartialFailureWithRelations(t *testing.T) {
24+
for _, strategy := range AllStrategies {
25+
t.Run(strategy.String(), func(t *testing.T) {
26+
nameColumn := schema.Column{
27+
Name: "name",
28+
Type: arrow.BinaryTypes.String,
29+
Resolver: func(_ context.Context, _ schema.ClientMeta, resource *schema.Resource, c schema.Column) error {
30+
return resource.Set(c.Name, resource.Item.(string))
31+
},
32+
}
33+
34+
child2 := &schema.Table{
35+
Name: "test_child2",
36+
Resolver: func(_ context.Context, _ schema.ClientMeta, parent *schema.Resource, res chan<- any) error {
37+
res <- []string{parent.Item.(string) + "/g0"}
38+
return nil
39+
},
40+
Columns: []schema.Column{nameColumn},
41+
}
42+
43+
child1 := &schema.Table{
44+
Name: "test_child1",
45+
Resolver: func(_ context.Context, _ schema.ClientMeta, parent *schema.Resource, res chan<- any) error {
46+
p := parent.Item.(string)
47+
res <- []string{p + "/c0", p + "/c1"}
48+
return nil
49+
},
50+
PreResourceResolver: func(_ context.Context, _ schema.ClientMeta, resource *schema.Resource) error {
51+
if resource.Item.(string) == "p1/c0" {
52+
return errors.New("child1 pre resource resolver boom")
53+
}
54+
return nil
55+
},
56+
Columns: []schema.Column{nameColumn},
57+
Relations: schema.Tables{child2},
58+
}
59+
60+
parentTable := &schema.Table{
61+
Name: "test_parent",
62+
Resolver: func(_ context.Context, _ schema.ClientMeta, _ *schema.Resource, res chan<- any) error {
63+
// a single slice so all parents land in one chunk, like one API page
64+
res <- []string{"p0", "p1", "p2", "p3", "p4"}
65+
return nil
66+
},
67+
PreResourceResolver: func(_ context.Context, _ schema.ClientMeta, resource *schema.Resource) error {
68+
if resource.Item.(string) == "p2" {
69+
return errors.New("parent pre resource resolver boom")
70+
}
71+
return nil
72+
},
73+
Columns: []schema.Column{nameColumn},
74+
Relations: schema.Tables{child1},
75+
}
76+
77+
tables := schema.Tables{parentTable}
78+
c := testExecutionClient{}
79+
sc := NewScheduler(
80+
WithLogger(zerolog.New(zerolog.NewTestWriter(t)).Level(zerolog.DebugLevel)),
81+
WithStrategy(strategy),
82+
)
83+
msgs := make(chan message.SyncMessage, 500)
84+
require.NoError(t, sc.Sync(context.Background(), &c, tables, msgs))
85+
close(msgs)
86+
87+
var messages message.SyncMessages
88+
for msg := range msgs {
89+
messages = append(messages, msg)
90+
}
91+
92+
collect := func(tb *schema.Table) []string {
93+
values := []string{}
94+
for _, rec := range messages.GetInserts().GetRecordsForTable(tb) {
95+
idx := rec.Schema().FieldIndices("name")[0]
96+
col := rec.Column(idx).(*array.String)
97+
for i := 0; i < col.Len(); i++ {
98+
values = append(values, col.Value(i))
99+
}
100+
}
101+
return values
102+
}
103+
104+
require.ElementsMatch(t,
105+
[]string{"p0", "p1", "p3", "p4"},
106+
collect(parentTable),
107+
"only the failing parent should be dropped")
108+
109+
require.ElementsMatch(t,
110+
[]string{"p0/c0", "p0/c1", "p1/c1", "p3/c0", "p3/c1", "p4/c0", "p4/c1"},
111+
collect(child1),
112+
"children of surviving parents should sync, except the failing child; no children of the dropped parent")
113+
114+
require.ElementsMatch(t,
115+
[]string{"p0/c0/g0", "p0/c1/g0", "p1/c1/g0", "p3/c0/g0", "p3/c1/g0", "p4/c0/g0", "p4/c1/g0"},
116+
collect(child2),
117+
"grandchildren should only sync under surviving child1 rows")
118+
})
119+
}
120+
}

0 commit comments

Comments
 (0)