Skip to content

Commit 3ecbcfc

Browse files
committed
initial commit
1 parent 39c53a0 commit 3ecbcfc

3 files changed

Lines changed: 21 additions & 3 deletions

File tree

internal/operation/operation.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ func (o *SerialOperation) RunOrContinue(l *zerolog.Logger) {
131131

132132
func (o *SerialOperation) Run(l *zerolog.Logger) {
133133
if !o.setRunning(true, l) {
134-
l.Info().Msg(fmt.Sprintf("SerialOperation %s skipped because it is already running, last run: %s ", o.operationId, o.lastRun))
134+
l.Error().Msg(fmt.Sprintf("SerialOperation %s skipped because it is already running, last run: %s ", o.operationId, o.lastRun))
135135
return
136136
}
137137

internal/services/controllers/task/process_reassignments.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@ func (tc *TasksControllerImpl) processTaskReassignments(ctx context.Context, ten
2222
tenantIdUUID := uuid.MustParse(tenantId)
2323

2424
res, shouldContinue, err := tc.repov1.Tasks().ProcessTaskReassignments(ctx, tenantIdUUID)
25-
25+
if !shouldContinue {
26+
tc.l.Error().Msg(fmt.Sprintf("Task reassignment stopping: %e", err))
27+
}
2628
if err != nil {
2729
return false, fmt.Errorf("could not list step runs to reassign for tenant %s: %w", tenantId, err)
2830
}

pkg/repository/task.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -841,7 +841,15 @@ func (r *TaskRepositoryImpl) failTasksTx(ctx context.Context, tx sqlcv1.DBTX, te
841841
if err != nil {
842842
return nil, err
843843
}
844-
844+
if len(internalFailureRetries) < len(internalFailureTaskIds) {
845+
for i, taskId := range internalFailureTaskIds {
846+
r.l.Warn().
847+
Int64("task_id", taskId).
848+
Int32("retry_count", internalFailureTaskRetryCounts[i]).
849+
Str("tenant_id", tenantId.String()).
850+
Msg("FailTaskInternalFailure skipped task - possible stale v1_task_runtime row with mismatched retry_count")
851+
}
852+
}
845853
for _, task := range internalFailureRetries {
846854
retriedTasks = append(retriedTasks, RetriedTask{
847855
TaskIdInsertedAtRetryCount: &TaskIdInsertedAtRetryCount{
@@ -1319,6 +1327,9 @@ func (r *TaskRepositoryImpl) ProcessTaskReassignments(ctx context.Context, tenan
13191327
}
13201328

13211329
if len(toReassign) == 0 {
1330+
r.l.Error().
1331+
Str("tenant_id", tenantId.String()).
1332+
Msg("could not find any tasks to reassign")
13221333
return &FailTasksResponse{
13231334
FinalizedTaskResponse: &FinalizedTaskResponse{
13241335
ReleasedTasks: make([]*sqlcv1.ReleaseTasksRow, 0),
@@ -1332,6 +1343,11 @@ func (r *TaskRepositoryImpl) ProcessTaskReassignments(ctx context.Context, tenan
13321343
failOpts := make([]FailTaskOpts, 0, len(toReassign))
13331344

13341345
for _, task := range toReassign {
1346+
r.l.Warn().
1347+
Int64("task_id", task.ID).
1348+
Int32("retry_count", task.RetryCount).
1349+
Str("tenant_id", tenantId.String()).
1350+
Msg("found task to reassign")
13351351
failOpts = append(failOpts, FailTaskOpts{
13361352
TaskIdInsertedAtRetryCount: &TaskIdInsertedAtRetryCount{
13371353
Id: task.ID,

0 commit comments

Comments
 (0)