Skip to content

Commit f16c647

Browse files
committed
fix: add inserted at to all the olap payload reads
1 parent 0accd6c commit f16c647

2 files changed

Lines changed: 109 additions & 36 deletions

File tree

api/v1/server/run/run.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ import (
5050
"github.com/hatchet-dev/hatchet/api/v1/server/middleware/telemetry"
5151
"github.com/hatchet-dev/hatchet/api/v1/server/oas/gen"
5252
"github.com/hatchet-dev/hatchet/pkg/config/server"
53+
"github.com/hatchet-dev/hatchet/pkg/repository"
5354
"github.com/hatchet-dev/hatchet/pkg/repository/sqlcv1"
5455
)
5556

@@ -469,7 +470,10 @@ func (t *APIServer) registerSpec(g *echo.Group, spec *openapi3.T) (*populator.Po
469470
return nil, "", err
470471
}
471472

472-
payload, err := t.config.V1.OLAP().ReadPayload(timeoutCtx, v1Event.TenantID, v1Event.ExternalID)
473+
payload, err := t.config.V1.OLAP().ReadPayload(timeoutCtx, v1Event.TenantID, repository.ReadOLAPPayloadOpts{
474+
ExternalId: v1Event.ExternalID,
475+
InsertedAt: v1Event.SeenAt,
476+
})
473477

474478
if err != nil {
475479
return nil, "", err

pkg/repository/olap.go

Lines changed: 104 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,7 @@ type OLAPRepository interface {
274274
CreateIncomingWebhookValidationFailureLogs(ctx context.Context, tenantId uuid.UUID, opts []CreateIncomingWebhookFailureLogOpts) error
275275
StoreCELEvaluationFailures(ctx context.Context, tenantId uuid.UUID, failures []CELEvaluationFailure) error
276276
PutPayloads(ctx context.Context, tx sqlcv1.DBTX, tenantId uuid.UUID, putPayloadOpts ...StoreOLAPPayloadOpts) (map[uuid.UUID]ExternalPayloadLocationKey, error)
277-
ReadPayload(ctx context.Context, tenantId uuid.UUID, externalId uuid.UUID) ([]byte, error)
277+
ReadPayload(ctx context.Context, tenantId uuid.UUID, opt ReadOLAPPayloadOpts) ([]byte, error)
278278

279279
AnalyzeOLAPTables(ctx context.Context) error
280280
OffloadPayloads(ctx context.Context, tenantId uuid.UUID, payloads []OffloadPayloadOpts) error
@@ -571,7 +571,10 @@ func (r *OLAPRepositoryImpl) ReadWorkflowRun(ctx context.Context, workflowRunExt
571571
return nil, err
572572
}
573573

574-
inputPayload, err := r.ReadPayload(ctx, row.TenantID, row.ExternalID)
574+
inputPayload, err := r.ReadPayload(ctx, row.TenantID, ReadOLAPPayloadOpts{
575+
ExternalId: row.ExternalID,
576+
InsertedAt: row.InsertedAt,
577+
})
575578

576579
if err != nil {
577580
return nil, err
@@ -580,7 +583,10 @@ func (r *OLAPRepositoryImpl) ReadWorkflowRun(ctx context.Context, workflowRunExt
580583
var outputPayload []byte
581584

582585
if row.OutputEventExternalID != nil {
583-
outputPayload, err = r.ReadPayload(ctx, row.TenantID, *row.OutputEventExternalID)
586+
outputPayload, err = r.ReadPayload(ctx, row.TenantID, ReadOLAPPayloadOpts{
587+
ExternalId: *row.OutputEventExternalID,
588+
InsertedAt: row.InsertedAt,
589+
})
584590

585591
if err != nil {
586592
return nil, err
@@ -630,6 +636,7 @@ func (r *OLAPRepositoryImpl) ReadTaskRunData(ctx context.Context, tenantId uuid.
630636
}
631637

632638
var workflowRunId uuid.UUID
639+
var workflowRunInsertedAt pgtype.Timestamptz
633640

634641
if taskRun.DagID.Valid {
635642
dagId := taskRun.DagID.Int64
@@ -643,17 +650,26 @@ func (r *OLAPRepositoryImpl) ReadTaskRunData(ctx context.Context, tenantId uuid.
643650
if err != nil {
644651
return nil, emptyUUID, err
645652
}
653+
654+
workflowRunInsertedAt = dagInsertedAt
646655
} else {
647656
workflowRunId = taskRun.ExternalID
657+
workflowRunInsertedAt = taskRun.InsertedAt
648658
}
649659

650-
externalIds := []uuid.UUID{workflowRunId}
660+
retrievePayloadOpts := []ReadOLAPPayloadOpts{{
661+
ExternalId: workflowRunId,
662+
InsertedAt: workflowRunInsertedAt,
663+
}}
651664

652665
if taskRun.OutputEventExternalID != nil {
653-
externalIds = append(externalIds, *taskRun.OutputEventExternalID)
666+
retrievePayloadOpts = append(retrievePayloadOpts, ReadOLAPPayloadOpts{
667+
ExternalId: *taskRun.OutputEventExternalID,
668+
InsertedAt: taskRun.InsertedAt,
669+
})
654670
}
655671

656-
payloads, err := r.readPayloads(ctx, r.readPool, tenantId, externalIds...)
672+
payloads, err := r.readPayloads(ctx, r.readPool, tenantId, retrievePayloadOpts...)
657673

658674
if err != nil {
659675
return nil, emptyUUID, err
@@ -825,16 +841,22 @@ func (r *OLAPRepositoryImpl) ListTasks(ctx context.Context, tenantId uuid.UUID,
825841
payloads := make(map[uuid.UUID][]byte)
826842

827843
if opts.IncludePayloads {
828-
externalIds := make([]uuid.UUID, 0)
844+
retrievePayloadOpts := make([]ReadOLAPPayloadOpts, 0)
829845
for _, task := range tasksWithData {
830-
externalIds = append(externalIds, task.ExternalID)
846+
retrievePayloadOpts = append(retrievePayloadOpts, ReadOLAPPayloadOpts{
847+
ExternalId: task.ExternalID,
848+
InsertedAt: task.InsertedAt,
849+
})
831850

832851
if task.OutputEventExternalID != nil {
833-
externalIds = append(externalIds, *task.OutputEventExternalID)
852+
retrievePayloadOpts = append(retrievePayloadOpts, ReadOLAPPayloadOpts{
853+
ExternalId: *task.OutputEventExternalID,
854+
InsertedAt: task.InsertedAt,
855+
})
834856
}
835857
}
836858

837-
payloads, err = r.readPayloads(ctx, tx, tenantId, externalIds...)
859+
payloads, err = r.readPayloads(ctx, tx, tenantId, retrievePayloadOpts...)
838860

839861
if err != nil {
840862
return nil, 0, err
@@ -922,16 +944,22 @@ func (r *OLAPRepositoryImpl) ListTasksByDAGId(ctx context.Context, tenantId uuid
922944
payloads := make(map[uuid.UUID][]byte)
923945

924946
if includePayloads {
925-
externalIds := make([]uuid.UUID, 0)
947+
retrievePayloadOpts := make([]ReadOLAPPayloadOpts, 0)
926948
for _, task := range tasksWithData {
927-
externalIds = append(externalIds, task.ExternalID)
949+
retrievePayloadOpts = append(retrievePayloadOpts, ReadOLAPPayloadOpts{
950+
ExternalId: task.ExternalID,
951+
InsertedAt: task.InsertedAt,
952+
})
928953

929954
if task.OutputEventExternalID != nil {
930-
externalIds = append(externalIds, *task.OutputEventExternalID)
955+
retrievePayloadOpts = append(retrievePayloadOpts, ReadOLAPPayloadOpts{
956+
ExternalId: *task.OutputEventExternalID,
957+
InsertedAt: task.InsertedAt,
958+
})
931959
}
932960
}
933961

934-
payloads, err = r.readPayloads(ctx, tx, tenantId, externalIds...)
962+
payloads, err = r.readPayloads(ctx, tx, tenantId, retrievePayloadOpts...)
935963

936964
if err != nil {
937965
return nil, taskIdToDagExternalId, err
@@ -1004,17 +1032,22 @@ func (r *OLAPRepositoryImpl) ListTasksByIdAndInsertedAt(ctx context.Context, ten
10041032
payloads := make(map[uuid.UUID][]byte)
10051033

10061034
if includePayloads {
1007-
externalIds := make([]uuid.UUID, 0)
1035+
retrievePayloadOpts := make([]ReadOLAPPayloadOpts, 0)
10081036
for _, task := range tasksWithData {
1009-
externalIds = append(externalIds, task.ExternalID)
1037+
retrievePayloadOpts = append(retrievePayloadOpts, ReadOLAPPayloadOpts{
1038+
ExternalId: task.ExternalID,
1039+
InsertedAt: task.InsertedAt,
1040+
})
10101041

10111042
if task.OutputEventExternalID != nil {
1012-
1013-
externalIds = append(externalIds, *task.OutputEventExternalID)
1043+
retrievePayloadOpts = append(retrievePayloadOpts, ReadOLAPPayloadOpts{
1044+
ExternalId: *task.OutputEventExternalID,
1045+
InsertedAt: task.InsertedAt,
1046+
})
10141047
}
10151048
}
10161049

1017-
payloads, err = r.readPayloads(ctx, tx, tenantId, externalIds...)
1050+
payloads, err = r.readPayloads(ctx, tx, tenantId, retrievePayloadOpts...)
10181051

10191052
if err != nil {
10201053
return nil, err
@@ -1147,7 +1180,7 @@ func (r *OLAPRepositoryImpl) ListWorkflowRuns(ctx context.Context, tenantId uuid
11471180
runIdsWithDAGs := make([]int64, 0)
11481181
runInsertedAtsWithDAGs := make([]pgtype.Timestamptz, 0)
11491182
idsInsertedAts := make([]IdInsertedAt, 0, len(workflowRunIds))
1150-
externalIdsForPayloads := make([]uuid.UUID, 0)
1183+
retrievePayloadOpts := make([]ReadOLAPPayloadOpts, 0)
11511184

11521185
for _, row := range workflowRunIds {
11531186
if row.Kind == sqlcv1.V1RunKindDAG {
@@ -1178,10 +1211,16 @@ func (r *OLAPRepositoryImpl) ListWorkflowRuns(ctx context.Context, tenantId uuid
11781211
externalId := dag.ExternalID.String()
11791212

11801213
dagsToPopulated[externalId] = dag
1181-
externalIdsForPayloads = append(externalIdsForPayloads, dag.ExternalID)
1214+
retrievePayloadOpts = append(retrievePayloadOpts, ReadOLAPPayloadOpts{
1215+
ExternalId: dag.ExternalID,
1216+
InsertedAt: dag.InsertedAt,
1217+
})
11821218

11831219
if dag.OutputEventExternalID != nil {
1184-
externalIdsForPayloads = append(externalIdsForPayloads, *dag.OutputEventExternalID)
1220+
retrievePayloadOpts = append(retrievePayloadOpts, ReadOLAPPayloadOpts{
1221+
ExternalId: *dag.OutputEventExternalID,
1222+
InsertedAt: dag.InsertedAt,
1223+
})
11851224
}
11861225
}
11871226

@@ -1197,10 +1236,16 @@ func (r *OLAPRepositoryImpl) ListWorkflowRuns(ctx context.Context, tenantId uuid
11971236
externalId := task.ExternalID.String()
11981237
tasksToPopulated[externalId] = task
11991238

1200-
externalIdsForPayloads = append(externalIdsForPayloads, task.ExternalID)
1239+
retrievePayloadOpts = append(retrievePayloadOpts, ReadOLAPPayloadOpts{
1240+
ExternalId: task.ExternalID,
1241+
InsertedAt: task.InsertedAt,
1242+
})
12011243

12021244
if task.OutputEventExternalID != nil {
1203-
externalIdsForPayloads = append(externalIdsForPayloads, *task.OutputEventExternalID)
1245+
retrievePayloadOpts = append(retrievePayloadOpts, ReadOLAPPayloadOpts{
1246+
ExternalId: *task.OutputEventExternalID,
1247+
InsertedAt: task.InsertedAt,
1248+
})
12041249
}
12051250
}
12061251

@@ -1221,7 +1266,7 @@ func (r *OLAPRepositoryImpl) ListWorkflowRuns(ctx context.Context, tenantId uuid
12211266
externalIdToPayload := make(map[uuid.UUID][]byte)
12221267

12231268
if opts.IncludePayloads {
1224-
externalIdToPayload, err = r.readPayloads(ctx, r.readPool, tenantId, externalIdsForPayloads...)
1269+
externalIdToPayload, err = r.readPayloads(ctx, r.readPool, tenantId, retrievePayloadOpts...)
12251270

12261271
if err != nil {
12271272
return nil, 0, err
@@ -1446,13 +1491,16 @@ func (r *OLAPRepositoryImpl) ListTaskRunEventsByWorkflowRunId(ctx context.Contex
14461491
return nil, err
14471492
}
14481493

1449-
externalIds := make([]uuid.UUID, len(rows))
1494+
retrievePayloadOpts := make([]ReadOLAPPayloadOpts, len(rows))
14501495

14511496
for i, row := range rows {
1452-
externalIds[i] = row.EventExternalID
1497+
retrievePayloadOpts[i] = ReadOLAPPayloadOpts{
1498+
ExternalId: row.EventExternalID,
1499+
InsertedAt: row.EventTimestamp,
1500+
}
14531501
}
14541502

1455-
payloads, err := r.readPayloads(ctx, r.readPool, tenantId, externalIds...)
1503+
payloads, err := r.readPayloads(ctx, r.readPool, tenantId, retrievePayloadOpts...)
14561504

14571505
if err != nil {
14581506
return nil, err
@@ -2582,7 +2630,10 @@ func (r *OLAPRepositoryImpl) GetEventWithPayload(ctx context.Context, externalId
25822630
return nil, err
25832631
}
25842632

2585-
payload, err := r.ReadPayload(ctx, tenantId, event.ExternalID)
2633+
payload, err := r.ReadPayload(ctx, tenantId, ReadOLAPPayloadOpts{
2634+
ExternalId: event.ExternalID,
2635+
InsertedAt: event.SeenAt,
2636+
})
25862637

25872638
if err != nil {
25882639
return nil, fmt.Errorf("error reading event payload: %v", err)
@@ -2689,10 +2740,15 @@ func (r *OLAPRepositoryImpl) ListEvents(ctx context.Context, opts sqlcv1.ListEve
26892740
}
26902741

26912742
eventExternalIds := make([]uuid.UUID, len(events))
2743+
readPayloadOpts := make([]ReadOLAPPayloadOpts, len(events))
26922744
minSeenAt := sqlchelpers.TimestamptzFromTime(time.Now())
26932745

26942746
for i, event := range events {
26952747
eventExternalIds[i] = event.ExternalID
2748+
readPayloadOpts = append(readPayloadOpts, ReadOLAPPayloadOpts{
2749+
ExternalId: event.ExternalID,
2750+
InsertedAt: event.SeenAt,
2751+
})
26962752

26972753
if event.SeenAt.Time.Before(minSeenAt.Time) {
26982754
minSeenAt = event.SeenAt
@@ -2710,7 +2766,7 @@ func (r *OLAPRepositoryImpl) ListEvents(ctx context.Context, opts sqlcv1.ListEve
27102766
return nil, nil, fmt.Errorf("error populating event data: %v", err)
27112767
}
27122768

2713-
externalIdToPayload, err := r.readPayloads(ctx, r.readPool, opts.Tenantid, eventExternalIds...)
2769+
externalIdToPayload, err := r.readPayloads(ctx, r.readPool, opts.Tenantid, readPayloadOpts...)
27142770

27152771
if err != nil {
27162772
return nil, nil, fmt.Errorf("error reading event payloads: %v", err)
@@ -2991,23 +3047,36 @@ func (r *OLAPRepositoryImpl) PutPayloads(ctx context.Context, tx sqlcv1.DBTX, te
29913047
return externalIdToKey, nil
29923048
}
29933049

2994-
func (r *OLAPRepositoryImpl) ReadPayload(ctx context.Context, tenantId uuid.UUID, externalId uuid.UUID) ([]byte, error) {
2995-
payloads, err := r.readPayloads(ctx, r.readPool, tenantId, externalId)
3050+
type ReadOLAPPayloadOpts struct {
3051+
ExternalId uuid.UUID
3052+
InsertedAt pgtype.Timestamptz
3053+
}
3054+
3055+
func (r *OLAPRepositoryImpl) ReadPayload(ctx context.Context, tenantId uuid.UUID, opt ReadOLAPPayloadOpts) ([]byte, error) {
3056+
payloads, err := r.readPayloads(ctx, r.readPool, tenantId, opt)
29963057

29973058
if err != nil {
29983059
return nil, err
29993060
}
30003061

3001-
payload, exists := payloads[externalId]
3062+
payload, exists := payloads[opt.ExternalId]
30023063

30033064
if !exists {
3004-
r.l.Debug().Ctx(ctx).Msgf("payload for external ID %s not found", externalId.String())
3065+
r.l.Debug().Ctx(ctx).Msgf("payload for external ID %s not found", opt.ExternalId.String())
30053066
}
30063067

30073068
return payload, nil
30083069
}
30093070

3010-
func (r *OLAPRepositoryImpl) readPayloads(ctx context.Context, tx sqlcv1.DBTX, tenantId uuid.UUID, externalIds ...uuid.UUID) (map[uuid.UUID][]byte, error) {
3071+
func (r *OLAPRepositoryImpl) readPayloads(ctx context.Context, tx sqlcv1.DBTX, tenantId uuid.UUID, opts ...ReadOLAPPayloadOpts) (map[uuid.UUID][]byte, error) {
3072+
externalIds := make([]uuid.UUID, len(opts))
3073+
insertedAts := make([]pgtype.Timestamptz, len(opts))
3074+
3075+
for i, opt := range opts {
3076+
externalIds[i] = opt.ExternalId
3077+
insertedAts[i] = opt.InsertedAt
3078+
}
3079+
30113080
payloads, err := r.queries.ReadPayloadsOLAP(ctx, tx, sqlcv1.ReadPayloadsOLAPParams{
30123081
Tenantid: tenantId,
30133082
Externalids: externalIds,

0 commit comments

Comments
 (0)