Skip to content

Commit d60223d

Browse files
authored
fix(engine): re-enable external id dupe check, add flag to enable dynamic window size (hatchet-dev#4050)
* feat: add env var to toggle dynamic window size estimation * fix: re-enable duped external id check, but only run it on the first pass of the job * fix: rm old code that's never fire * fix: timeout * fix: rollback properly
1 parent 19382ef commit d60223d

5 files changed

Lines changed: 172 additions & 92 deletions

File tree

internal/services/controllers/olap/controller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1249,7 +1249,7 @@ func (oc *OLAPControllerImpl) processPayloadExternalCutovers(ctx context.Context
12491249
oc.l.Debug().Ctx(ctx).Msgf("payload external cutover: processing external cutover payloads")
12501250

12511251
p := oc.repo.Payloads()
1252-
err := oc.repo.OLAP().ProcessOLAPPayloadCutovers(ctx, p.ExternalStoreEnabled(), p.InlineStoreTTL(), p.ExternalCutoverBatchSize(), p.ExternalCutoverNumConcurrentOffloads())
1252+
err := oc.repo.OLAP().ProcessOLAPPayloadCutovers(ctx, p.ExternalStoreEnabled(), p.InlineStoreTTL(), p.ExternalCutoverBatchSize(), p.ExternalCutoverNumConcurrentOffloads(), p.EnableWindowSizeOptimization())
12531253

12541254
if err != nil {
12551255
span.RecordError(err)

pkg/config/loader/loader.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -342,6 +342,7 @@ func (c *ConfigLoader) InitDataLayer() (res *database.Layer, err error) {
342342
ExternalCutoverBatchSize: scf.PayloadStore.ExternalCutoverBatchSize,
343343
ExternalCutoverNumConcurrentOffloads: scf.PayloadStore.ExternalCutoverNumConcurrentOffloads,
344344
InlineStoreTTL: &inlineStoreTTL,
345+
EnableWindowSizeOptimization: scf.PayloadStore.EnableWindowSizeOptimization,
345346
}
346347

347348
statusUpdateOpts := repov1.StatusUpdateBatchSizeLimits{

pkg/config/server/server.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -708,6 +708,7 @@ type PayloadStoreConfig struct {
708708
ExternalCutoverBatchSize int32 `mapstructure:"externalCutoverBatchSize" json:"externalCutoverBatchSize,omitempty" default:"1000"`
709709
ExternalCutoverNumConcurrentOffloads int32 `mapstructure:"externalCutoverNumConcurrentOffloads" json:"externalCutoverNumConcurrentOffloads,omitempty" default:"10"`
710710
InlineStoreTTLDays int32 `mapstructure:"inlineStoreTTLDays" json:"inlineStoreTTLDays,omitempty" default:"2"`
711+
EnableWindowSizeOptimization bool `mapstructure:"enableWindowSizeOptimization" json:"enableWindowSizeOptimization,omitempty" default:"true"`
711712
}
712713

713714
func (c *ServerConfig) HasService(name string) bool {
@@ -989,6 +990,7 @@ func BindAllEnv(v *viper.Viper) {
989990
_ = v.BindEnv("payloadStore.externalCutoverBatchSize", "SERVER_PAYLOAD_STORE_EXTERNAL_CUTOVER_BATCH_SIZE")
990991
_ = v.BindEnv("payloadStore.externalCutoverNumConcurrentOffloads", "SERVER_PAYLOAD_STORE_EXTERNAL_CUTOVER_NUM_CONCURRENT_OFFLOADS")
991992
_ = v.BindEnv("payloadStore.inlineStoreTTLDays", "SERVER_PAYLOAD_STORE_INLINE_STORE_TTL_DAYS")
993+
_ = v.BindEnv("payloadStore.enableWindowSizeOptimization", "SERVER_PAYLOAD_STORE_ENABLE_WINDOW_SIZE_OPTIMIZATION")
992994

993995
// cron operations options
994996
_ = v.BindEnv("cronOperations.taskAnalyzeCronInterval", "SERVER_CRON_OPERATIONS_TASK_ANALYZE_CRON_INTERVAL")

pkg/repository/olap.go

Lines changed: 83 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,7 @@ type OLAPRepository interface {
283283

284284
ListWorkflowRunExternalIds(ctx context.Context, tenantId uuid.UUID, opts ListWorkflowRunOpts) ([]uuid.UUID, error)
285285

286-
ProcessOLAPPayloadCutovers(ctx context.Context, externalStoreEnabled bool, inlineStoreTTL *time.Duration, externalCutoverBatchSize, externalCutoverNumConcurrentOffloads int32) error
286+
ProcessOLAPPayloadCutovers(ctx context.Context, externalStoreEnabled bool, inlineStoreTTL *time.Duration, externalCutoverBatchSize, externalCutoverNumConcurrentOffloads int32, enableWindowSizeOptimization bool) error
287287

288288
CountOLAPTempTableSizeForDAGStatusUpdates(ctx context.Context) (int64, error)
289289
CountOLAPTempTableSizeForTaskStatusUpdates(ctx context.Context) (int64, error)
@@ -3070,19 +3070,9 @@ func (r *OLAPRepositoryImpl) readPayloads(ctx context.Context, tx sqlcv1.DBTX, t
30703070
key := ExternalPayloadLocationKey(payload.ExternalLocationKey.String)
30713071
var retrieveFromExternalOpt RetrieveFromExternalOpts
30723072

3073-
if strings.HasSuffix(string(key), ".index") {
3074-
retrieveFromExternalOpt = RetrieveFromExternalOpts{
3075-
Method: RetrieveFromExternalByIndexFile,
3076-
ByIndexFile: &RetrieveFromExternalByIndexFileOpt{
3077-
IndexFileKey: ExternalIndexFileLocationKey(key),
3078-
ExternalId: payload.ExternalID,
3079-
},
3080-
}
3081-
} else {
3082-
retrieveFromExternalOpt = RetrieveFromExternalOpts{
3083-
Method: RetrieveFromExternalByKey,
3084-
ByKey: &RetrieveFromExternalByKeyOpt{Key: key},
3085-
}
3073+
retrieveFromExternalOpt = RetrieveFromExternalOpts{
3074+
Method: RetrieveFromExternalByKey,
3075+
ByKey: &RetrieveFromExternalByKeyOpt{Key: key},
30863076
}
30873077

30883078
externalIdToRetrieveFromExternalOpt[payload.ExternalID] = retrieveFromExternalOpt
@@ -3421,7 +3411,7 @@ func (p *OLAPRepositoryImpl) OptimizeOLAPPayloadWindowSize(ctx context.Context,
34213411
)
34223412
}
34233413

3424-
func (p *OLAPRepositoryImpl) processOLAPPayloadCutoverBatch(ctx context.Context, processId uuid.UUID, partitionDate PartitionDate, lastExternalId uuid.UUID, externalCutoverBatchSize, externalCutoverNumConcurrentOffloads int32) (*OLAPCutoverBatchOutcome, error) {
3414+
func (p *OLAPRepositoryImpl) processOLAPPayloadCutoverBatch(ctx context.Context, processId uuid.UUID, partitionDate PartitionDate, lastExternalId uuid.UUID, externalCutoverBatchSize, externalCutoverNumConcurrentOffloads int32, enableWindowSizeOptimization bool) (*OLAPCutoverBatchOutcome, error) {
34253415
ctx, span := telemetry.NewSpan(ctx, "OLAPRepository.processOLAPPayloadCutoverBatch")
34263416
defer span.End()
34273417

@@ -3433,19 +3423,23 @@ func (p *OLAPRepositoryImpl) processOLAPPayloadCutoverBatch(ctx context.Context,
34333423

34343424
defer rollback()
34353425

3436-
windowSizePtr, err := p.OptimizeOLAPPayloadWindowSize(
3437-
ctx,
3438-
tx,
3439-
partitionDate,
3440-
externalCutoverBatchSize*externalCutoverNumConcurrentOffloads,
3441-
lastExternalId,
3442-
)
3426+
windowSize := externalCutoverBatchSize * externalCutoverNumConcurrentOffloads
34433427

3444-
if err != nil {
3445-
return nil, fmt.Errorf("failed to optimize olap payload window size: %w", err)
3446-
}
3428+
if enableWindowSizeOptimization {
3429+
windowSizePtr, err := p.OptimizeOLAPPayloadWindowSize(
3430+
ctx,
3431+
tx,
3432+
partitionDate,
3433+
windowSize,
3434+
lastExternalId,
3435+
)
34473436

3448-
windowSize := *windowSizePtr
3437+
if err != nil {
3438+
return nil, fmt.Errorf("failed to optimize olap payload window size: %w", err)
3439+
}
3440+
3441+
windowSize = *windowSizePtr
3442+
}
34493443

34503444
payloadRanges, err := p.queries.CreateOLAPPayloadRangeChunks(ctx, tx, sqlcv1.CreateOLAPPayloadRangeChunksParams{
34513445
Chunksize: externalCutoverBatchSize,
@@ -3659,7 +3653,7 @@ func (p *OLAPRepositoryImpl) prepareCutoverTableJob(ctx context.Context, process
36593653
}, nil
36603654
}
36613655

3662-
func (p *OLAPRepositoryImpl) processSinglePartition(ctx context.Context, processId uuid.UUID, partitionDate PartitionDate, inlineStoreTTL *time.Duration, externalCutoverBatchSize, externalCutoverNumConcurrentOffloads int32) error {
3656+
func (p *OLAPRepositoryImpl) processSinglePartition(ctx context.Context, processId uuid.UUID, partitionDate PartitionDate, inlineStoreTTL *time.Duration, externalCutoverBatchSize, externalCutoverNumConcurrentOffloads int32, enableWindowSizeOptimization bool) error {
36633657
ctx, span := telemetry.NewSpan(ctx, "olap_repository.processSinglePartition")
36643658
defer span.End()
36653659

@@ -3673,36 +3667,78 @@ func (p *OLAPRepositoryImpl) processSinglePartition(ctx context.Context, process
36733667
return nil
36743668
}
36753669

3676-
// connStatementTimeout := 5 * 60 * 1000 // 5 minutes
3670+
// if the job is running for the first time, check that there aren't any duplicate external ids before proceeding
3671+
if jobMeta.LastExternalId == uuid.Nil {
3672+
connStatementTimeout := 15 * 60 * 1000 // 15 minutes
36773673

3678-
// conn, release, err := sqlchelpers.AcquireConnectionWithStatementTimeout(ctx, p.pool, p.l, connStatementTimeout)
3674+
conn, release, err := sqlchelpers.AcquireConnectionWithStatementTimeout(ctx, p.pool, p.l, connStatementTimeout)
36793675

3680-
// if err != nil {
3681-
// return fmt.Errorf("failed to acquire connection with statement timeout: %w", err)
3682-
// }
3676+
if err != nil {
3677+
return fmt.Errorf("failed to acquire connection with statement timeout: %w", err)
3678+
}
3679+
3680+
defer release()
3681+
3682+
stopLeaseExtension := make(chan struct{})
3683+
leaseExtensionDone := make(chan struct{})
3684+
3685+
go func() {
3686+
defer close(leaseExtensionDone)
3687+
3688+
ticker := time.NewTicker(30 * time.Second)
3689+
defer ticker.Stop()
36833690

3684-
// defer release()
3691+
for {
3692+
select {
3693+
case <-stopLeaseExtension:
3694+
return
3695+
case <-ticker.C:
3696+
leaseTx, leaseCommit, leaseRollback, txErr := sqlchelpers.PrepareTx(ctx, p.pool, p.l)
36853697

3686-
// duplicatedExternalIds, err := p.ValidateNoDuplicateOLAPExternalIds(ctx, conn, partitionDate)
3698+
if txErr != nil {
3699+
p.l.Error().Err(txErr).Msg("failed to prepare transaction for lease extension during duplicate check")
3700+
continue
3701+
}
36873702

3688-
// if err != nil {
3689-
// return fmt.Errorf("failed to validate no duplicate external ids: %w", err)
3690-
// }
3703+
_, txErr = p.acquireOrExtendJobLease(ctx, leaseTx, processId, partitionDate, jobMeta.LastExternalId)
36913704

3692-
// if len(duplicatedExternalIds) > 0 {
3693-
// var duplicatedIds []string
3705+
if txErr != nil {
3706+
leaseRollback()
3707+
p.l.Error().Err(txErr).Msg("failed to extend lease during duplicate check")
3708+
continue
3709+
}
36943710

3695-
// for _, row := range duplicatedExternalIds {
3696-
// duplicatedIds = append(duplicatedIds, row.ExternalId.String())
3697-
// }
3711+
if txErr = leaseCommit(ctx); txErr != nil {
3712+
leaseRollback()
3713+
p.l.Error().Err(txErr).Msg("failed to commit lease extension during duplicate check")
3714+
}
3715+
}
3716+
}
3717+
}()
36983718

3699-
// return fmt.Errorf("found duplicate external ids in partition %s. Sampled ids: %s", partitionDate.String(), strings.Join(duplicatedIds, ", "))
3700-
// }
3719+
duplicatedExternalIds, err := p.ValidateNoDuplicateOLAPExternalIds(ctx, conn, partitionDate)
3720+
close(stopLeaseExtension)
3721+
<-leaseExtensionDone
3722+
3723+
if err != nil {
3724+
return fmt.Errorf("failed to validate no duplicate external ids: %w", err)
3725+
}
3726+
3727+
if len(duplicatedExternalIds) > 0 {
3728+
var duplicatedIds []string
3729+
3730+
for _, row := range duplicatedExternalIds {
3731+
duplicatedIds = append(duplicatedIds, row.ExternalId.String())
3732+
}
3733+
3734+
return fmt.Errorf("found duplicate external ids in partition %s. Sampled ids: %s", partitionDate.String(), strings.Join(duplicatedIds, ", "))
3735+
}
3736+
}
37013737

37023738
lastExternalId := jobMeta.LastExternalId
37033739

37043740
for {
3705-
outcome, err := p.processOLAPPayloadCutoverBatch(ctx, processId, partitionDate, lastExternalId, externalCutoverBatchSize, externalCutoverNumConcurrentOffloads)
3741+
outcome, err := p.processOLAPPayloadCutoverBatch(ctx, processId, partitionDate, lastExternalId, externalCutoverBatchSize, externalCutoverNumConcurrentOffloads, enableWindowSizeOptimization)
37063742

37073743
if err != nil {
37083744
return fmt.Errorf("failed to process payload cutover batch: %w", err)
@@ -3751,7 +3787,7 @@ func (p *OLAPRepositoryImpl) createOLAPIndexBlock(ctx context.Context, tx pgx.Tx
37513787
})
37523788
}
37533789

3754-
func (p *OLAPRepositoryImpl) ProcessOLAPPayloadCutovers(ctx context.Context, externalStoreEnabled bool, inlineStoreTTL *time.Duration, externalCutoverBatchSize, externalCutoverNumConcurrentOffloads int32) error {
3790+
func (p *OLAPRepositoryImpl) ProcessOLAPPayloadCutovers(ctx context.Context, externalStoreEnabled bool, inlineStoreTTL *time.Duration, externalCutoverBatchSize, externalCutoverNumConcurrentOffloads int32, enableWindowSizeOptimization bool) error {
37553791
if !externalStoreEnabled {
37563792
return nil
37573793
}
@@ -3778,7 +3814,7 @@ func (p *OLAPRepositoryImpl) ProcessOLAPPayloadCutovers(ctx context.Context, ext
37783814

37793815
for _, partition := range partitions {
37803816
p.l.Info().Ctx(ctx).Str("partition", partition.PartitionName).Msg("processing payload cutover for partition")
3781-
err = p.processSinglePartition(ctx, processId, PartitionDate(partition.PartitionDate), inlineStoreTTL, externalCutoverBatchSize, externalCutoverNumConcurrentOffloads)
3817+
err = p.processSinglePartition(ctx, processId, PartitionDate(partition.PartitionDate), inlineStoreTTL, externalCutoverBatchSize, externalCutoverNumConcurrentOffloads, enableWindowSizeOptimization)
37823818

37833819
if err != nil {
37843820
return fmt.Errorf("failed to process partition %s: %w", partition.PartitionName, err)

0 commit comments

Comments
 (0)