@@ -21,6 +21,7 @@ package pulsar
2121
2222import (
2323 "context"
24+ "errors"
2425 "fmt"
2526 "net/http"
2627 "runtime"
@@ -148,6 +149,7 @@ func testTopicMigrate(
148149 // Signals both producer and consumer have processed `messageCountBeforeUnload` messages
149150 wgSendAndReceiveMessages := sync.WaitGroup {}
150151 wgSendAndReceiveMessages .Add (2 )
152+ errCh := make (chan error , 1 )
151153
152154 // Producer
153155 go func () {
@@ -159,6 +161,7 @@ func testTopicMigrate(
159161 }
160162
161163 pm := ProducerMessage {Payload : []byte (fmt .Sprintf ("hello-%d" , i ))}
164+ retryStarted := time .Now ()
162165
163166 for true {
164167 ctx , cancel := context .WithTimeout (context .Background (), 1 * time .Second )
@@ -167,6 +170,20 @@ func testTopicMigrate(
167170 if err == nil {
168171 break
169172 }
173+ if errors .Is (err , ErrTopicTerminated ) || errors .Is (err , ErrProducerClosed ) {
174+ select {
175+ case errCh <- fmt .Errorf ("producer became terminal during migration at message %d: %w" , i , err ):
176+ default :
177+ }
178+ return
179+ }
180+ if time .Since (retryStarted ) > 30 * time .Second {
181+ select {
182+ case errCh <- fmt .Errorf ("producer send retry exceeded 30s at message %d: %w" , i , err ):
183+ default :
184+ }
185+ return
186+ }
170187 time .Sleep (1000 * time .Millisecond )
171188 }
172189
@@ -185,6 +202,7 @@ func testTopicMigrate(
185202 wgUnload .Wait ()
186203 }
187204
205+ retryStarted := time .Now ()
188206 for true {
189207 ctx , cancel := context .WithTimeout (context .Background (), 1 * time .Second )
190208 m , err := consumer .Receive (ctx )
@@ -195,6 +213,13 @@ func testTopicMigrate(
195213 break
196214 }
197215 }
216+ if time .Since (retryStarted ) > 30 * time .Second {
217+ select {
218+ case errCh <- fmt .Errorf ("consumer receive/ack retry exceeded 30s at message %d: %w" , i , err ):
219+ default :
220+ }
221+ return
222+ }
198223 time .Sleep (100 * time .Millisecond )
199224 }
200225
@@ -205,7 +230,27 @@ func testTopicMigrate(
205230 }()
206231
207232 // Unload the bundle, triggering the producers and consumers to reconnect to the specified broker.
208- wgSendAndReceiveMessages .Wait ()
233+ waitWithError := func (wg * sync.WaitGroup , stage string ) bool {
234+ doneCh := make (chan struct {})
235+ go func () {
236+ wg .Wait ()
237+ close (doneCh )
238+ }()
239+
240+ select {
241+ case <- doneCh :
242+ return true
243+ case err := <- errCh :
244+ req .NoError (err , stage )
245+ return false
246+ case <- time .After (90 * time .Second ):
247+ req .FailNow (stage + " timeout" )
248+ return false
249+ }
250+ }
251+ if ! waitWithError (& wgSendAndReceiveMessages , "waiting for pre-unload send/receive" ) {
252+ return
253+ }
209254
210255 topicMigrationURL = fmt .Sprintf (
211256 "/admin/v2/clusters/%s/migrate?migrated=true" , cluster )
@@ -248,5 +293,7 @@ func testTopicMigrate(
248293 return req .Equal (dstTopicBrokerURL , topicBrokerURL )
249294 })
250295
251- wgRoutines .Wait ()
296+ if ! waitWithError (& wgRoutines , "waiting for producer/consumer routines" ) {
297+ return
298+ }
252299}
0 commit comments