@@ -44,7 +44,9 @@ func TestDurableWorkflow(t *testing.T) {
4444
4545 id := uniqueID ()
4646
47- time .Sleep (time .Duration (sleepTime + 10 ) * time .Second )
47+ // Wait for the run to start, then let the internal SleepFor(sleepTime) finish before pushing the event.
48+ pollUntilRunStatus (t , ctx , sharedClient , ref .RunId , string (rest .V1TaskStatusRUNNING ))
49+ time .Sleep (time .Duration (sleepTime + 3 ) * time .Second )
4850
4951 err = sharedClient .Events ().Push (ctx , eventKey , AwaitedEvent {ID : id })
5052 require .NoError (t , err )
@@ -68,15 +70,21 @@ func TestDurableSleepCancelReplay(t *testing.T) {
6870 ref , err := testWaitForSleepTwice .RunNoWait (ctx , EmptyInput {})
6971 require .NoError (t , err )
7072
71- time . Sleep ( time . Duration ( sleepTime / 2 ) * time . Second )
73+ pollUntilRunStatus ( t , ctx , sharedClient , ref . RunId , string ( rest . V1TaskStatusRUNNING ) )
7274
7375 _ , err = sharedClient .Runs ().Cancel (ctx , rest.V1CancelTaskRequest {
7476 ExternalIds : toUUIDs (ref .RunId ),
7577 })
7678 require .NoError (t , err )
7779
78- // Wait for cancellation
79- time .Sleep (2 * time .Second )
80+ // Wait for cancellation to propagate before replaying.
81+ pollUntil (t , ctx , func () (bool , error ) {
82+ status , err := sharedClient .Runs ().GetStatus (ctx , ref .RunId )
83+ if err != nil {
84+ return false , err
85+ }
86+ return * status == rest .V1TaskStatusCANCELLED , nil
87+ })
8088
8189 replayStart := time .Now ()
8290 _ , err = sharedClient .Runs ().Replay (ctx , rest.V1ReplayTaskRequest {
@@ -92,8 +100,8 @@ func TestDurableSleepCancelReplay(t *testing.T) {
92100 err = result .TaskOutput ("wait-for-sleep-twice" ).Into (& output )
93101 require .NoError (t , err )
94102
95- assert .Less (t , output ["runtime" ], float64 (sleepTime ))
96- assert .LessOrEqual (t , replayElapsed , float64 (sleepTime ))
103+ assert .Less (t , output ["runtime" ], float64 (sleepTime )+ timingTolerance )
104+ assert .LessOrEqual (t , replayElapsed , float64 (sleepTime )+ timingTolerance )
97105}
98106
99107func TestDurableChildSpawn (t * testing.T ) {
@@ -122,6 +130,7 @@ func TestDurableChildBulkSpawn(t *testing.T) {
122130 require .NoError (t , err )
123131 outputs , ok := m ["child_outputs" ].([]any )
124132 require .True (t , ok , "expected child_outputs to be an array" )
133+
125134 assert .GreaterOrEqual (t , len (outputs ), n - 1 )
126135 assert .LessOrEqual (t , len (outputs ), n )
127136
@@ -145,7 +154,9 @@ func TestDurableSleepEventSpawnReplay(t *testing.T) {
145154 ref , err := testDurableSleepEventSpawn .RunNoWait (ctx , EmptyInput {})
146155 require .NoError (t , err )
147156
148- time .Sleep (time .Duration (sleepTime + 5 ) * time .Second )
157+ // Wait for the run to start, then let the internal SleepFor(sleepTime) finish before pushing the event.
158+ pollUntilRunStatus (t , ctx , sharedClient , ref .RunId , string (rest .V1TaskStatusRUNNING ))
159+ time .Sleep (time .Duration (sleepTime + 3 ) * time .Second )
149160 err = sharedClient .Events ().Push (ctx , eventKey , map [string ]string {"test" : "test" })
150161 require .NoError (t , err )
151162
@@ -173,7 +184,7 @@ func TestDurableSleepEventSpawnReplay(t *testing.T) {
173184 replayChild , ok := rm ["child_output" ].(map [string ]any )
174185 require .True (t , ok )
175186 assert .Equal (t , "hello from child 1" , replayChild ["message" ])
176- assert .Less (t , replayElapsed , float64 (sleepTime ))
187+ assert .Less (t , replayElapsed , float64 (sleepTime )+ timingTolerance )
177188}
178189
179190func TestDurableCompletedReplay (t * testing.T ) {
@@ -207,8 +218,8 @@ func TestDurableCompletedReplay(t *testing.T) {
207218 var replayOutput map [string ]float64
208219 err = replayResult .TaskOutput ("wait-for-sleep-twice" ).Into (& replayOutput )
209220 require .NoError (t , err )
210- assert .Less (t , replayOutput ["runtime" ], float64 (sleepTime ))
211- assert .Less (t , elapsed , float64 (sleepTime ))
221+ assert .Less (t , replayOutput ["runtime" ], float64 (sleepTime )+ timingTolerance )
222+ assert .Less (t , elapsed , float64 (sleepTime )+ timingTolerance )
212223}
213224
214225func TestDurableSpawnDAG (t * testing.T ) {
@@ -306,7 +317,7 @@ func TestDurableReplayReset(t *testing.T) {
306317 durations := []float64 {resetOutput .Sleep1Duration , resetOutput .Sleep2Duration , resetOutput .Sleep3Duration }
307318 for i , d := range durations {
308319 if int64 (i + 1 ) < nodeID {
309- assert .Less (t , d , float64 (replayResetSleepTime ))
320+ assert .Less (t , d , float64 (replayResetSleepTime )+ timingTolerance )
310321 } else {
311322 assert .GreaterOrEqual (t , d , float64 (replayResetSleepTime ))
312323 }
@@ -369,7 +380,7 @@ func TestDurableBranchingOffBranch(t *testing.T) {
369380 err = resetResult2 .TaskOutput ("durable-replay-reset" ).Into (& resetOutput2 )
370381 require .NoError (t , err )
371382
372- assert .Less (t , resetOutput2 .Sleep1Duration , float64 (replayResetSleepTime ))
383+ assert .Less (t , resetOutput2 .Sleep1Duration , float64 (replayResetSleepTime )+ timingTolerance )
373384 assert .GreaterOrEqual (t , resetOutput2 .Sleep2Duration , float64 (replayResetSleepTime ))
374385 assert .GreaterOrEqual (t , resetOutput2 .Sleep3Duration , float64 (replayResetSleepTime ))
375386 assert .GreaterOrEqual (t , resetElapsed2 , float64 (2 * replayResetSleepTime ))
@@ -407,7 +418,7 @@ func TestDurableMemoizationViaReplay(t *testing.T) {
407418 require .NoError (t , err )
408419
409420 assert .GreaterOrEqual (t , duration1 , float64 (sleepTime ))
410- assert .Less (t , duration2 , 1.0 )
421+ assert .Less (t , duration2 , 1.1 )
411422 assert .Equal (t , output1 .Message , output2 .Message )
412423}
413424
0 commit comments