Skip to content

Commit 1eb59a6

Browse files
committed
refactor to remove TaskWithRuntime
1 parent 40ec2d9 commit 1eb59a6

1 file changed

Lines changed: 7 additions & 17 deletions

File tree

internal/services/dispatcher/dispatcher.go

Lines changed: 7 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -509,8 +509,8 @@ func (d *DispatcherImpl) handleTaskBulkAssignedTask(ctx context.Context, msg *ms
509509
// load the step runs from the database
510510
taskIds := make([]int64, 0)
511511

512-
for _, ids := range innerMsg.WorkerIdToTaskIds {
513-
taskIds = append(taskIds, ids...)
512+
for _, tasks := range innerMsg.WorkerIdToTaskIds {
513+
taskIds = append(taskIds, tasks...)
514514
}
515515

516516
taskIdToData, err := d.populateTaskData(ctx, requeue, msg.TenantID, taskIds)
@@ -520,13 +520,11 @@ func (d *DispatcherImpl) handleTaskBulkAssignedTask(ctx context.Context, msg *ms
520520
continue
521521
}
522522

523-
for workerId, ids := range innerMsg.WorkerIdToTaskIds {
524-
if len(ids) == 0 {
525-
continue
526-
}
523+
for workerId, taskIds := range innerMsg.WorkerIdToTaskIds {
524+
workerId := workerId
527525

528526
outerEg.Go(func() error {
529-
return d.sendTasksToWorker(ctx, requeue, msg.TenantID, workerId, ids, taskIdToData)
527+
return d.sendTasksToWorker(ctx, requeue, msg.TenantID, workerId, taskIds, taskIdToData)
530528
})
531529
}
532530
}
@@ -680,12 +678,7 @@ func (d *DispatcherImpl) populateTaskData(
680678
}
681679
}
682680

683-
bulkV1Tasks := make([]*sqlcv1.V1Task, len(bulkDatas))
684-
for i, task := range bulkDatas {
685-
bulkV1Tasks[i] = task
686-
}
687-
688-
parentDataMap, err := d.repov1.Tasks().ListTaskParentOutputs(ctx, tenantId, bulkV1Tasks)
681+
parentDataMap, err := d.repov1.Tasks().ListTaskParentOutputs(ctx, tenantId, bulkDatas)
689682

690683
if err != nil {
691684
for _, task := range bulkDatas {
@@ -1144,10 +1137,7 @@ func (d *DispatcherImpl) handleTaskCancelled(ctx context.Context, msg *msgqueue.
11441137
task, ok := taskIdsToTasks[msg.TaskId]
11451138

11461139
if !ok {
1147-
d.l.Warn().Ctx(ctx).Msgf("task %d not found in retry counts", msg.TaskId)
1148-
continue
1149-
}
1150-
if task == nil {
1140+
d.l.Warn().Ctx(ctx).Msgf("task %d not found", msg.TaskId)
11511141
continue
11521142
}
11531143

0 commit comments

Comments
 (0)