Skip to content
This repository was archived by the owner on Aug 30, 2025. It is now read-only.

Commit e6cef69

Browse files
authored
fixes pii detect not syncing more than max limit (#3527)
1 parent 98a7618 commit e6cef69

1 file changed

Lines changed: 108 additions & 78 deletions

File tree

  • worker/pkg/workflows/ee/piidetect/workflows/job

worker/pkg/workflows/ee/piidetect/workflows/job/workflow.go

Lines changed: 108 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -242,110 +242,140 @@ func orchestrateTables(
242242
userPrompt string,
243243
logger log.Logger,
244244
) (*piidetect_job_activities.JobPiiDetectReport, error) {
245-
workselector := workflow.NewNamedSelector(ctx, "job_pii_detect")
246-
247245
maxConcurrency := getTablePiiDetectMaxConcurrency()
248-
inFlightLimiter := workflow.NewSemaphore(ctx, int64(maxConcurrency))
249246

250247
tableWf := piidetect_table_workflow.New()
251248
wfInfo := workflow.GetInfo(ctx)
252249

253250
tableResultKeys := []*piidetect_job_activities.TableReport{}
254251
mu := workflow.NewMutex(ctx)
255252

256-
processTable := func(table piidetect_job_activities.TableIdentifierWithFingerprint, previousReport *piidetect_job_activities.TableReport) error {
257-
if err := inFlightLimiter.Acquire(ctx, 1); err != nil {
258-
return fmt.Errorf("unable to acquire semaphore: %w", err)
259-
}
260-
var previousResultsKey *mgmtv1alpha1.RunContextKey
261-
if previousReport != nil {
262-
previousResultsKey = previousReport.ReportKey
263-
}
264-
workselector.AddFuture(
265-
workflow.ExecuteChildWorkflow(
266-
workflow.WithChildOptions(
267-
ctx,
268-
workflow.ChildWorkflowOptions{
269-
WorkflowID: workflow_shared.BuildChildWorkflowId(
270-
wfInfo.WorkflowExecution.ID,
271-
fmt.Sprintf("%s.%s", table.Schema, table.Table),
272-
workflow.Now(ctx),
273-
),
274-
RetryPolicy: &temporal.RetryPolicy{
275-
MaximumAttempts: 1,
276-
},
277-
WorkflowRunTimeout: 5 * time.Minute,
278-
}),
279-
tableWf.TablePiiDetect,
280-
&piidetect_table_workflow.TablePiiDetectRequest{
281-
AccountId: accountId,
282-
JobId: jobId,
283-
ConnectionId: connectionId,
284-
TableSchema: table.Schema,
285-
TableName: table.Table,
286-
ParentExecutionId: &wfInfo.WorkflowExecution.ID,
287-
ShouldSampleData: shouldSampleData,
288-
UserPrompt: userPrompt,
289-
PreviousResultsKey: previousResultsKey,
290-
},
291-
),
292-
func(f workflow.Future) {
293-
var wfResult *piidetect_table_workflow.TablePiiDetectResponse
294-
err := f.Get(ctx, &wfResult)
295-
inFlightLimiter.Release(1)
253+
previousReportsMap := make(
254+
map[piidetect_job_activities.TableIdentifier]*piidetect_job_activities.TableReport,
255+
)
256+
for _, report := range tablesToScanResp.PreviousReports {
257+
previousReportsMap[piidetect_job_activities.TableIdentifier{Schema: report.TableSchema, Table: report.TableName}] = report
258+
}
259+
260+
logger.Debug("starting table processing")
261+
logger.Debug("total tables to process", "count", len(tablesToScanResp.Tables))
262+
263+
// Create channels for coordination
264+
type tableWork struct {
265+
table piidetect_job_activities.TableIdentifierWithFingerprint
266+
previousReport *piidetect_job_activities.TableReport
267+
}
268+
269+
// Use a buffered channel as a work queue
270+
workQueue := workflow.NewBufferedChannel(ctx, len(tablesToScanResp.Tables))
271+
272+
// Queue all work items
273+
for _, table := range tablesToScanResp.Tables {
274+
previousReport := previousReportsMap[table.TableIdentifier]
275+
workQueue.Send(ctx, tableWork{table: table, previousReport: previousReport})
276+
logger.Debug("queued table", "schema", table.Schema, "table", table.Table)
277+
}
278+
workQueue.Close()
279+
280+
// Channel to track completion
281+
completionChannel := workflow.NewChannel(ctx)
282+
activeWorkers := 0
283+
284+
// Start worker goroutines
285+
for i := 0; i < maxConcurrency; i++ {
286+
activeWorkers++
287+
workflow.Go(ctx, func(ctx workflow.Context) {
288+
defer func() {
289+
completionChannel.Send(ctx, true)
290+
}()
291+
292+
for {
293+
var work tableWork
294+
more := workQueue.Receive(ctx, &work)
295+
if !more {
296+
logger.Debug("worker exiting, no more work", "workerIndex", i)
297+
return // Channel closed, no more work
298+
}
299+
300+
logger.Debug("worker processing table", "workerIndex", i, "table", work.table.Table, "schema", work.table.Schema)
301+
302+
var previousResultsKey *mgmtv1alpha1.RunContextKey
303+
if work.previousReport != nil {
304+
previousResultsKey = work.previousReport.ReportKey
305+
}
306+
307+
// Execute child workflow synchronously within this goroutine
308+
var wfResult piidetect_table_workflow.TablePiiDetectResponse
309+
err := workflow.ExecuteChildWorkflow(
310+
workflow.WithChildOptions(
311+
ctx,
312+
workflow.ChildWorkflowOptions{
313+
WorkflowID: workflow_shared.BuildChildWorkflowId(
314+
wfInfo.WorkflowExecution.ID,
315+
fmt.Sprintf("%s.%s", work.table.Schema, work.table.Table),
316+
workflow.Now(ctx),
317+
),
318+
RetryPolicy: &temporal.RetryPolicy{
319+
MaximumAttempts: 1,
320+
},
321+
WorkflowRunTimeout: 5 * time.Minute,
322+
}),
323+
tableWf.TablePiiDetect,
324+
&piidetect_table_workflow.TablePiiDetectRequest{
325+
AccountId: accountId,
326+
JobId: jobId,
327+
ConnectionId: connectionId,
328+
TableSchema: work.table.Schema,
329+
TableName: work.table.Table,
330+
ParentExecutionId: &wfInfo.WorkflowExecution.ID,
331+
ShouldSampleData: shouldSampleData,
332+
UserPrompt: userPrompt,
333+
PreviousResultsKey: previousResultsKey,
334+
},
335+
).Get(ctx, &wfResult)
336+
296337
if err != nil {
297-
logger.Error("activity did not complete", "err", err)
298-
return
338+
logger.Error("child workflow did not complete",
339+
"table", work.table.Table,
340+
"schema", work.table.Schema,
341+
"err", err)
342+
continue
299343
}
344+
300345
logger.Debug(
301346
"table pii detect completed",
302-
"table",
303-
table.Table,
304-
"schema",
305-
table.Schema,
347+
"table", work.table.Table,
348+
"schema", work.table.Schema,
306349
)
350+
351+
// Store result
307352
err = mu.Lock(ctx)
308353
if err != nil {
309354
logger.Error(
310355
"unable to lock mutex after table pii detect completed",
311-
"err",
312-
err,
356+
"err", err,
313357
)
314-
return
358+
continue
315359
}
316-
defer mu.Unlock()
317360
tableResultKeys = append(tableResultKeys, &piidetect_job_activities.TableReport{
318-
TableSchema: table.Schema,
319-
TableName: table.Table,
320-
ScanFingerprint: table.Fingerprint,
361+
TableSchema: work.table.Schema,
362+
TableName: work.table.Table,
363+
ScanFingerprint: work.table.Fingerprint,
321364
ReportKey: wfResult.ResultKey,
322365
})
323-
},
324-
)
325-
return nil
326-
}
327-
328-
previousReportsMap := make(
329-
map[piidetect_job_activities.TableIdentifier]*piidetect_job_activities.TableReport,
330-
)
331-
for _, report := range tablesToScanResp.PreviousReports {
332-
previousReportsMap[piidetect_job_activities.TableIdentifier{Schema: report.TableSchema, Table: report.TableName}] = report
333-
}
334-
335-
for _, table := range tablesToScanResp.Tables {
336-
previousReport := previousReportsMap[table.TableIdentifier]
337-
if err := processTable(table, previousReport); err != nil {
338-
return nil, err
339-
}
366+
mu.Unlock()
367+
}
368+
})
340369
}
341370

342-
logger.Debug("waiting for all table pii detect workflows to complete")
343-
344-
for range tablesToScanResp.Tables {
345-
workselector.Select(ctx)
371+
// Wait for all workers to complete
372+
for i := 0; i < activeWorkers; i++ {
373+
var completed bool
374+
completionChannel.Receive(ctx, &completed)
375+
logger.Debug("worker completed", "remaining", activeWorkers-i-1)
346376
}
347377

348-
logger.Debug("all tables processed")
378+
logger.Debug("all tables processed", "total_processed", len(tableResultKeys))
349379
return &piidetect_job_activities.JobPiiDetectReport{
350380
SuccessfulTableReports: tableResultKeys,
351381
}, nil

0 commit comments

Comments
 (0)