Skip to content

Commit 633d4a2

Browse files
authored
Refactor: Pass insertedAt to OLAP payload reads (#3960)
* chore: rm unused method * fix: add inserted at to all the olap payload reads * fix: bugs * fix: pass correct insertedAt through
1 parent 39c53a0 commit 633d4a2

4 files changed

Lines changed: 141 additions & 50 deletions

File tree

api/v1/server/run/run.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ import (
5050
"github.com/hatchet-dev/hatchet/api/v1/server/middleware/telemetry"
5151
"github.com/hatchet-dev/hatchet/api/v1/server/oas/gen"
5252
"github.com/hatchet-dev/hatchet/pkg/config/server"
53+
"github.com/hatchet-dev/hatchet/pkg/repository"
5354
"github.com/hatchet-dev/hatchet/pkg/repository/sqlcv1"
5455
)
5556

@@ -469,7 +470,10 @@ func (t *APIServer) registerSpec(g *echo.Group, spec *openapi3.T) (*populator.Po
469470
return nil, "", err
470471
}
471472

472-
payload, err := t.config.V1.OLAP().ReadPayload(timeoutCtx, v1Event.TenantID, v1Event.ExternalID)
473+
payload, err := t.config.V1.OLAP().ReadPayload(timeoutCtx, v1Event.TenantID, repository.ReadOLAPPayloadOpts{
474+
ExternalId: v1Event.ExternalID,
475+
InsertedAt: v1Event.SeenAt,
476+
})
473477

474478
if err != nil {
475479
return nil, "", err

pkg/repository/olap.go

Lines changed: 102 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -274,8 +274,7 @@ type OLAPRepository interface {
274274
CreateIncomingWebhookValidationFailureLogs(ctx context.Context, tenantId uuid.UUID, opts []CreateIncomingWebhookFailureLogOpts) error
275275
StoreCELEvaluationFailures(ctx context.Context, tenantId uuid.UUID, failures []CELEvaluationFailure) error
276276
PutPayloads(ctx context.Context, tx sqlcv1.DBTX, tenantId uuid.UUID, putPayloadOpts ...StoreOLAPPayloadOpts) (map[uuid.UUID]ExternalPayloadLocationKey, error)
277-
ReadPayload(ctx context.Context, tenantId uuid.UUID, externalId uuid.UUID) ([]byte, error)
278-
ReadPayloads(ctx context.Context, tenantId uuid.UUID, externalIds ...uuid.UUID) (map[uuid.UUID][]byte, error)
277+
ReadPayload(ctx context.Context, tenantId uuid.UUID, opt ReadOLAPPayloadOpts) ([]byte, error)
279278

280279
AnalyzeOLAPTables(ctx context.Context) error
281280
OffloadPayloads(ctx context.Context, tenantId uuid.UUID, payloads []OffloadPayloadOpts) error
@@ -572,7 +571,10 @@ func (r *OLAPRepositoryImpl) ReadWorkflowRun(ctx context.Context, workflowRunExt
572571
return nil, err
573572
}
574573

575-
inputPayload, err := r.ReadPayload(ctx, row.TenantID, row.ExternalID)
574+
inputPayload, err := r.ReadPayload(ctx, row.TenantID, ReadOLAPPayloadOpts{
575+
ExternalId: row.ExternalID,
576+
InsertedAt: row.InsertedAt,
577+
})
576578

577579
if err != nil {
578580
return nil, err
@@ -581,7 +583,10 @@ func (r *OLAPRepositoryImpl) ReadWorkflowRun(ctx context.Context, workflowRunExt
581583
var outputPayload []byte
582584

583585
if row.OutputEventExternalID != nil {
584-
outputPayload, err = r.ReadPayload(ctx, row.TenantID, *row.OutputEventExternalID)
586+
outputPayload, err = r.ReadPayload(ctx, row.TenantID, ReadOLAPPayloadOpts{
587+
ExternalId: *row.OutputEventExternalID,
588+
InsertedAt: row.OutputEventInsertedAt,
589+
})
585590

586591
if err != nil {
587592
return nil, err
@@ -631,6 +636,7 @@ func (r *OLAPRepositoryImpl) ReadTaskRunData(ctx context.Context, tenantId uuid.
631636
}
632637

633638
var workflowRunId uuid.UUID
639+
var workflowRunInsertedAt pgtype.Timestamptz
634640

635641
if taskRun.DagID.Valid {
636642
dagId := taskRun.DagID.Int64
@@ -644,17 +650,26 @@ func (r *OLAPRepositoryImpl) ReadTaskRunData(ctx context.Context, tenantId uuid.
644650
if err != nil {
645651
return nil, emptyUUID, err
646652
}
653+
654+
workflowRunInsertedAt = dagInsertedAt
647655
} else {
648656
workflowRunId = taskRun.ExternalID
657+
workflowRunInsertedAt = taskRun.InsertedAt
649658
}
650659

651-
externalIds := []uuid.UUID{workflowRunId}
660+
retrievePayloadOpts := []ReadOLAPPayloadOpts{{
661+
ExternalId: workflowRunId,
662+
InsertedAt: workflowRunInsertedAt,
663+
}}
652664

653665
if taskRun.OutputEventExternalID != nil {
654-
externalIds = append(externalIds, *taskRun.OutputEventExternalID)
666+
retrievePayloadOpts = append(retrievePayloadOpts, ReadOLAPPayloadOpts{
667+
ExternalId: *taskRun.OutputEventExternalID,
668+
InsertedAt: taskRun.OutputEventInsertedAt,
669+
})
655670
}
656671

657-
payloads, err := r.readPayloads(ctx, r.readPool, tenantId, externalIds...)
672+
payloads, err := r.readPayloads(ctx, r.readPool, tenantId, retrievePayloadOpts...)
658673

659674
if err != nil {
660675
return nil, emptyUUID, err
@@ -703,6 +718,7 @@ func (r *OLAPRepositoryImpl) ReadTaskRunData(ctx context.Context, tenantId uuid.
703718
ErrorMessage: taskRun.ErrorMessage,
704719
RetryCount: taskRun.RetryCount,
705720
OutputEventExternalID: taskRun.OutputEventExternalID,
721+
OutputEventInsertedAt: taskRun.OutputEventInsertedAt,
706722
IsStandalone: taskRun.IsStandalone,
707723
IsDurable: taskRun.IsDurable,
708724
},
@@ -826,16 +842,22 @@ func (r *OLAPRepositoryImpl) ListTasks(ctx context.Context, tenantId uuid.UUID,
826842
payloads := make(map[uuid.UUID][]byte)
827843

828844
if opts.IncludePayloads {
829-
externalIds := make([]uuid.UUID, 0)
845+
retrievePayloadOpts := make([]ReadOLAPPayloadOpts, 0)
830846
for _, task := range tasksWithData {
831-
externalIds = append(externalIds, task.ExternalID)
847+
retrievePayloadOpts = append(retrievePayloadOpts, ReadOLAPPayloadOpts{
848+
ExternalId: task.ExternalID,
849+
InsertedAt: task.InsertedAt,
850+
})
832851

833852
if task.OutputEventExternalID != nil {
834-
externalIds = append(externalIds, *task.OutputEventExternalID)
853+
retrievePayloadOpts = append(retrievePayloadOpts, ReadOLAPPayloadOpts{
854+
ExternalId: *task.OutputEventExternalID,
855+
InsertedAt: task.OutputEventInsertedAt,
856+
})
835857
}
836858
}
837859

838-
payloads, err = r.readPayloads(ctx, tx, tenantId, externalIds...)
860+
payloads, err = r.readPayloads(ctx, tx, tenantId, retrievePayloadOpts...)
839861

840862
if err != nil {
841863
return nil, 0, err
@@ -923,16 +945,22 @@ func (r *OLAPRepositoryImpl) ListTasksByDAGId(ctx context.Context, tenantId uuid
923945
payloads := make(map[uuid.UUID][]byte)
924946

925947
if includePayloads {
926-
externalIds := make([]uuid.UUID, 0)
948+
retrievePayloadOpts := make([]ReadOLAPPayloadOpts, 0)
927949
for _, task := range tasksWithData {
928-
externalIds = append(externalIds, task.ExternalID)
950+
retrievePayloadOpts = append(retrievePayloadOpts, ReadOLAPPayloadOpts{
951+
ExternalId: task.ExternalID,
952+
InsertedAt: task.InsertedAt,
953+
})
929954

930955
if task.OutputEventExternalID != nil {
931-
externalIds = append(externalIds, *task.OutputEventExternalID)
956+
retrievePayloadOpts = append(retrievePayloadOpts, ReadOLAPPayloadOpts{
957+
ExternalId: *task.OutputEventExternalID,
958+
InsertedAt: task.OutputEventInsertedAt,
959+
})
932960
}
933961
}
934962

935-
payloads, err = r.readPayloads(ctx, tx, tenantId, externalIds...)
963+
payloads, err = r.readPayloads(ctx, tx, tenantId, retrievePayloadOpts...)
936964

937965
if err != nil {
938966
return nil, taskIdToDagExternalId, err
@@ -1005,17 +1033,22 @@ func (r *OLAPRepositoryImpl) ListTasksByIdAndInsertedAt(ctx context.Context, ten
10051033
payloads := make(map[uuid.UUID][]byte)
10061034

10071035
if includePayloads {
1008-
externalIds := make([]uuid.UUID, 0)
1036+
retrievePayloadOpts := make([]ReadOLAPPayloadOpts, 0)
10091037
for _, task := range tasksWithData {
1010-
externalIds = append(externalIds, task.ExternalID)
1038+
retrievePayloadOpts = append(retrievePayloadOpts, ReadOLAPPayloadOpts{
1039+
ExternalId: task.ExternalID,
1040+
InsertedAt: task.InsertedAt,
1041+
})
10111042

10121043
if task.OutputEventExternalID != nil {
1013-
1014-
externalIds = append(externalIds, *task.OutputEventExternalID)
1044+
retrievePayloadOpts = append(retrievePayloadOpts, ReadOLAPPayloadOpts{
1045+
ExternalId: *task.OutputEventExternalID,
1046+
InsertedAt: task.OutputEventInsertedAt,
1047+
})
10151048
}
10161049
}
10171050

1018-
payloads, err = r.readPayloads(ctx, tx, tenantId, externalIds...)
1051+
payloads, err = r.readPayloads(ctx, tx, tenantId, retrievePayloadOpts...)
10191052

10201053
if err != nil {
10211054
return nil, err
@@ -1148,7 +1181,7 @@ func (r *OLAPRepositoryImpl) ListWorkflowRuns(ctx context.Context, tenantId uuid
11481181
runIdsWithDAGs := make([]int64, 0)
11491182
runInsertedAtsWithDAGs := make([]pgtype.Timestamptz, 0)
11501183
idsInsertedAts := make([]IdInsertedAt, 0, len(workflowRunIds))
1151-
externalIdsForPayloads := make([]uuid.UUID, 0)
1184+
retrievePayloadOpts := make([]ReadOLAPPayloadOpts, 0)
11521185

11531186
for _, row := range workflowRunIds {
11541187
if row.Kind == sqlcv1.V1RunKindDAG {
@@ -1179,10 +1212,16 @@ func (r *OLAPRepositoryImpl) ListWorkflowRuns(ctx context.Context, tenantId uuid
11791212
externalId := dag.ExternalID.String()
11801213

11811214
dagsToPopulated[externalId] = dag
1182-
externalIdsForPayloads = append(externalIdsForPayloads, dag.ExternalID)
1215+
retrievePayloadOpts = append(retrievePayloadOpts, ReadOLAPPayloadOpts{
1216+
ExternalId: dag.ExternalID,
1217+
InsertedAt: dag.InsertedAt,
1218+
})
11831219

11841220
if dag.OutputEventExternalID != nil {
1185-
externalIdsForPayloads = append(externalIdsForPayloads, *dag.OutputEventExternalID)
1221+
retrievePayloadOpts = append(retrievePayloadOpts, ReadOLAPPayloadOpts{
1222+
ExternalId: *dag.OutputEventExternalID,
1223+
InsertedAt: dag.OutputEventInsertedAt,
1224+
})
11861225
}
11871226
}
11881227

@@ -1198,10 +1237,16 @@ func (r *OLAPRepositoryImpl) ListWorkflowRuns(ctx context.Context, tenantId uuid
11981237
externalId := task.ExternalID.String()
11991238
tasksToPopulated[externalId] = task
12001239

1201-
externalIdsForPayloads = append(externalIdsForPayloads, task.ExternalID)
1240+
retrievePayloadOpts = append(retrievePayloadOpts, ReadOLAPPayloadOpts{
1241+
ExternalId: task.ExternalID,
1242+
InsertedAt: task.InsertedAt,
1243+
})
12021244

12031245
if task.OutputEventExternalID != nil {
1204-
externalIdsForPayloads = append(externalIdsForPayloads, *task.OutputEventExternalID)
1246+
retrievePayloadOpts = append(retrievePayloadOpts, ReadOLAPPayloadOpts{
1247+
ExternalId: *task.OutputEventExternalID,
1248+
InsertedAt: task.OutputEventInsertedAt,
1249+
})
12051250
}
12061251
}
12071252

@@ -1222,7 +1267,7 @@ func (r *OLAPRepositoryImpl) ListWorkflowRuns(ctx context.Context, tenantId uuid
12221267
externalIdToPayload := make(map[uuid.UUID][]byte)
12231268

12241269
if opts.IncludePayloads {
1225-
externalIdToPayload, err = r.readPayloads(ctx, r.readPool, tenantId, externalIdsForPayloads...)
1270+
externalIdToPayload, err = r.readPayloads(ctx, r.readPool, tenantId, retrievePayloadOpts...)
12261271

12271272
if err != nil {
12281273
return nil, 0, err
@@ -1447,13 +1492,16 @@ func (r *OLAPRepositoryImpl) ListTaskRunEventsByWorkflowRunId(ctx context.Contex
14471492
return nil, err
14481493
}
14491494

1450-
externalIds := make([]uuid.UUID, len(rows))
1495+
retrievePayloadOpts := make([]ReadOLAPPayloadOpts, len(rows))
14511496

14521497
for i, row := range rows {
1453-
externalIds[i] = row.EventExternalID
1498+
retrievePayloadOpts[i] = ReadOLAPPayloadOpts{
1499+
ExternalId: row.EventExternalID,
1500+
InsertedAt: row.EventTimestamp,
1501+
}
14541502
}
14551503

1456-
payloads, err := r.readPayloads(ctx, r.readPool, tenantId, externalIds...)
1504+
payloads, err := r.readPayloads(ctx, r.readPool, tenantId, retrievePayloadOpts...)
14571505

14581506
if err != nil {
14591507
return nil, err
@@ -2583,7 +2631,10 @@ func (r *OLAPRepositoryImpl) GetEventWithPayload(ctx context.Context, externalId
25832631
return nil, err
25842632
}
25852633

2586-
payload, err := r.ReadPayload(ctx, tenantId, event.ExternalID)
2634+
payload, err := r.ReadPayload(ctx, tenantId, ReadOLAPPayloadOpts{
2635+
ExternalId: event.ExternalID,
2636+
InsertedAt: event.SeenAt,
2637+
})
25872638

25882639
if err != nil {
25892640
return nil, fmt.Errorf("error reading event payload: %v", err)
@@ -2690,10 +2741,15 @@ func (r *OLAPRepositoryImpl) ListEvents(ctx context.Context, opts sqlcv1.ListEve
26902741
}
26912742

26922743
eventExternalIds := make([]uuid.UUID, len(events))
2744+
readPayloadOpts := make([]ReadOLAPPayloadOpts, len(events))
26932745
minSeenAt := sqlchelpers.TimestamptzFromTime(time.Now())
26942746

26952747
for i, event := range events {
26962748
eventExternalIds[i] = event.ExternalID
2749+
readPayloadOpts[i] = ReadOLAPPayloadOpts{
2750+
ExternalId: event.ExternalID,
2751+
InsertedAt: event.SeenAt,
2752+
}
26972753

26982754
if event.SeenAt.Time.Before(minSeenAt.Time) {
26992755
minSeenAt = event.SeenAt
@@ -2711,7 +2767,7 @@ func (r *OLAPRepositoryImpl) ListEvents(ctx context.Context, opts sqlcv1.ListEve
27112767
return nil, nil, fmt.Errorf("error populating event data: %v", err)
27122768
}
27132769

2714-
externalIdToPayload, err := r.readPayloads(ctx, r.readPool, opts.Tenantid, eventExternalIds...)
2770+
externalIdToPayload, err := r.readPayloads(ctx, r.readPool, opts.Tenantid, readPayloadOpts...)
27152771

27162772
if err != nil {
27172773
return nil, nil, fmt.Errorf("error reading event payloads: %v", err)
@@ -2992,27 +3048,34 @@ func (r *OLAPRepositoryImpl) PutPayloads(ctx context.Context, tx sqlcv1.DBTX, te
29923048
return externalIdToKey, nil
29933049
}
29943050

2995-
func (r *OLAPRepositoryImpl) ReadPayload(ctx context.Context, tenantId uuid.UUID, externalId uuid.UUID) ([]byte, error) {
2996-
payloads, err := r.readPayloads(ctx, r.readPool, tenantId, externalId)
3051+
type ReadOLAPPayloadOpts struct {
3052+
ExternalId uuid.UUID
3053+
InsertedAt pgtype.Timestamptz
3054+
}
3055+
3056+
func (r *OLAPRepositoryImpl) ReadPayload(ctx context.Context, tenantId uuid.UUID, opt ReadOLAPPayloadOpts) ([]byte, error) {
3057+
payloads, err := r.readPayloads(ctx, r.readPool, tenantId, opt)
29973058

29983059
if err != nil {
29993060
return nil, err
30003061
}
30013062

3002-
payload, exists := payloads[externalId]
3063+
payload, exists := payloads[opt.ExternalId]
30033064

30043065
if !exists {
3005-
r.l.Debug().Ctx(ctx).Msgf("payload for external ID %s not found", externalId.String())
3066+
r.l.Debug().Ctx(ctx).Msgf("payload for external ID %s not found", opt.ExternalId.String())
30063067
}
30073068

30083069
return payload, nil
30093070
}
30103071

3011-
func (r *OLAPRepositoryImpl) ReadPayloads(ctx context.Context, tenantId uuid.UUID, externalIds ...uuid.UUID) (map[uuid.UUID][]byte, error) {
3012-
return r.readPayloads(ctx, r.readPool, tenantId, externalIds...)
3013-
}
3072+
func (r *OLAPRepositoryImpl) readPayloads(ctx context.Context, tx sqlcv1.DBTX, tenantId uuid.UUID, opts ...ReadOLAPPayloadOpts) (map[uuid.UUID][]byte, error) {
3073+
externalIds := make([]uuid.UUID, len(opts))
3074+
3075+
for i, opt := range opts {
3076+
externalIds[i] = opt.ExternalId
3077+
}
30143078

3015-
func (r *OLAPRepositoryImpl) readPayloads(ctx context.Context, tx sqlcv1.DBTX, tenantId uuid.UUID, externalIds ...uuid.UUID) (map[uuid.UUID][]byte, error) {
30163079
payloads, err := r.queries.ReadPayloadsOLAP(ctx, tx, sqlcv1.ReadPayloadsOLAPParams{
30173080
Tenantid: tenantId,
30183081
Externalids: externalIds,

0 commit comments

Comments
 (0)