@@ -2,6 +2,7 @@ package capabilities
22
33import (
44 "context"
5+ "fmt"
56 "sync"
67 "sync/atomic"
78 "testing"
@@ -58,7 +59,7 @@ func (p *sendProbe) fn(ctx context.Context, te TriggerEvent, workflowId string)
5859 // Optionally fail some sends
5960 if atomic .LoadInt32 (& p .failFirstN ) > 0 {
6061 atomic .AddInt32 (& p .failFirstN , - 1 )
61- return assertErr // sentinel below
62+ return fmt . Errorf ( "test error" )
6263 }
6364 p .mu .Lock ()
6465 p .calls = append (p .calls , struct {
@@ -75,12 +76,6 @@ func (p *sendProbe) count() int {
7576 return len (p .calls )
7677}
7778
78- var assertErr = & temporaryError {"boom" }
79-
80- type temporaryError struct { s string }
81-
82- func (e * temporaryError ) Error () string { return e .s }
83-
8479// lost hook probe
8580type lostProbe struct {
8681 mu sync.Mutex
@@ -107,7 +102,6 @@ func newBase(t *testing.T, store EventStore, send OutboundSend, lost LostHook) *
107102 store : store ,
108103 send : send ,
109104 lost : lost ,
110- // lggr: your test logger if desired
111105 }
112106 return b
113107}
@@ -139,7 +133,7 @@ func TestStart_LoadsAndSendsPersisted(t *testing.T) {
139133 defer cancel ()
140134
141135 require .NoError (t , b .Start (ctx ))
142- t .Cleanup (func () { b .cancel (); b . wg . Wait () })
136+ t .Cleanup (func () { b .Stop () })
143137
144138 // Initial send triggered on Start
145139 require .Eventually (t , func () bool { return probe .count () >= 1 }, 200 * time .Millisecond , 5 * time .Millisecond )
@@ -154,14 +148,14 @@ func TestDeliverEvent_PersistsAndSends(t *testing.T) {
154148 defer cancel ()
155149
156150 require .NoError (t , b .Start (ctx ))
157- t .Cleanup (func () { b .cancel (); b . wg . Wait () })
151+ t .Cleanup (func () { b .Stop () })
158152
159153 te := TriggerEvent {
160154 TriggerType : "trigB" ,
161155 ID : "e2" ,
162156 Payload : & anypb.Any {TypeUrl : "type.googleapis.com/thing" , Value : []byte ("x" )},
163157 }
164- require .NoError (t , b .deliverEvent (ctx , te , []string {"wf1" , "wf2" }))
158+ require .NoError (t , b .DeliverEvent (ctx , te , []string {"wf1" , "wf2" }))
165159
166160 // Persisted twice (two workflows)
167161 recs , _ := store .List (ctx )
@@ -180,14 +174,14 @@ func TestAckEvent_StopsRetransmit(t *testing.T) {
180174 defer cancel ()
181175
182176 require .NoError (t , b .Start (ctx ))
183- t .Cleanup (func () { b .cancel (); b . wg . Wait () })
177+ t .Cleanup (func () { b .Stop () })
184178
185179 te := TriggerEvent {
186180 TriggerType : "trigC" ,
187181 ID : "e3" ,
188182 Payload : & anypb.Any {TypeUrl : "type.googleapis.com/thing" , Value : []byte ("x" )},
189183 }
190- require .NoError (t , b .deliverEvent (ctx , te , []string {"wf1" }))
184+ require .NoError (t , b .DeliverEvent (ctx , te , []string {"wf1" }))
191185 require .Eventually (t , func () bool { return probe .count () >= 1 }, 200 * time .Millisecond , 5 * time .Millisecond )
192186
193187 // Ack and ensure no more sends occur after a couple of retransmit periods
@@ -203,25 +197,25 @@ func TestRetryThenLost_AfterTmax(t *testing.T) {
203197 lostp := & lostProbe {}
204198 b := newBase (t , store , probe .fn , lostp .fn )
205199
206- // tighten timers for the test
207200 b .tRetransmit = 20 * time .Millisecond
208201 b .tMax = 80 * time .Millisecond
209202
210203 ctx , cancel := ctxWithCancel (t )
211204 defer cancel ()
212205
213206 require .NoError (t , b .Start (ctx ))
214- t .Cleanup (func () { b .cancel (); b . wg . Wait () })
207+ t .Cleanup (func () { b .Stop () })
215208
216209 te := TriggerEvent {
217210 TriggerType : "trigD" ,
218211 ID : "e4" ,
219212 Payload : & anypb.Any {TypeUrl : "type.googleapis.com/thing" , Value : []byte ("x" )},
220213 }
221- require .NoError (t , b .deliverEvent (ctx , te , []string {"wf1" }))
214+ require .NoError (t , b .DeliverEvent (ctx , te , []string {"wf1" }))
222215
223216 // Should attempt several sends, then mark lost and delete from store
224217 require .Eventually (t , func () bool { return lostp .count () >= 1 }, 500 * time .Millisecond , 5 * time .Millisecond )
218+ require .True (t , probe .count () == 0 )
225219
226220 // Ensure the record is gone from the store after lost
227221 recs , _ := store .List (ctx )
@@ -237,7 +231,7 @@ func TestTrySendErrorIsIgnoredByCallSites(t *testing.T) {
237231 defer cancel ()
238232
239233 require .NoError (t , b .Start (ctx ))
240- t .Cleanup (func () { b .cancel (); b . wg . Wait () })
234+ t .Cleanup (func () { b .Stop () })
241235
242236 te := TriggerEvent {
243237 TriggerType : "trigE" ,
@@ -246,7 +240,7 @@ func TestTrySendErrorIsIgnoredByCallSites(t *testing.T) {
246240 }
247241 // deliverEvent should not return an error even if the first send fails;
248242 // retransmitLoop will retry later.
249- require .NoError (t , b .deliverEvent (ctx , te , []string {"wf1" }))
243+ require .NoError (t , b .DeliverEvent (ctx , te , []string {"wf1" }))
250244
251245 // Eventually should succeed on a subsequent attempt (after the first forced failure)
252246 require .Eventually (t , func () bool { return probe .count () >= 1 }, 300 * time .Millisecond , 5 * time .Millisecond )
0 commit comments