@@ -71,6 +71,7 @@ public class BrowserEmulatorClient {
7171 private ConcurrentHashMap <String , AtomicBoolean > participantConnecting = new ConcurrentHashMap <>();
7272 private Set <String > participantReconnecting = new CopyOnWriteArraySet <>();
7373 private ConcurrentHashMap <String , Calendar > userDisconnectTimestamps = new ConcurrentHashMap <>();
74+ private Set <String > processingDisconnects = ConcurrentHashMap .newKeySet ();
7475
7576 private CreateParticipantResponse lastErrorReconnectingResponse ;
7677
@@ -102,7 +103,7 @@ public void clean() {
102103
103104 public void addDisconnectTimestamp (String userId , String sessionId ) {
104105 String key = userId + "-" + sessionId ;
105- userDisconnectTimestamps .put (key , Calendar .getInstance ());
106+ userDisconnectTimestamps .putIfAbsent (key , Calendar .getInstance ());
106107 }
107108
108109 public Map <String , Calendar > getPerUserDisconnectTimestamps () {
@@ -182,20 +183,34 @@ public void addClientFailure(String workerUrl, String participant, String sessio
182183 public void addClientFailure (String workerUrl , String participant , String session , boolean waitForConnection ,
183184 boolean reconnect ) {
184185 if (this .isClean .get ()) {
185- // Test finished
186186 return ;
187187 }
188+ String user = participant + "-" + session ;
189+ ConcurrentHashMap <String , Role > workerRoles = this .clientRoles .get (workerUrl );
190+ if (workerRoles == null ) {
191+ log .debug ("Worker {} has no roles map, ignoring disconnect for {}" , workerUrl , user );
192+ return ;
193+ }
194+ if (!workerRoles .containsKey (user )) {
195+ log .debug ("Worker {} doesn't have participant {}, ignoring disconnect" , workerUrl , user );
196+ return ;
197+ }
198+ if (!processingDisconnects .add (user )) {
199+ log .debug ("Already processing disconnect for {}, ignoring duplicate" , user );
200+ return ;
201+ }
202+ log .info ("Processing disconnect for {} on worker {}" , user , workerUrl );
188203 log .debug ("Adding client failure for participant {} in session {}" , participant , session );
189204 log .debug ("Wait for connection: {}" , waitForConnection );
190- while (waitForConnection && this .participantConnecting .get (participant + "-" + session ).get ()) {
205+ AtomicBoolean isConnecting = this .participantConnecting .get (user );
206+ while (waitForConnection && isConnecting != null && isConnecting .get ()) {
191207 if (endOfTest .get ()) {
192208 return ;
193209 }
194210 sleeper .sleep (WAIT_S , null );
195211 }
196212 ConcurrentHashMap <String , AtomicInteger > failures = this .clientFailures .computeIfAbsent (workerUrl ,
197213 key -> new ConcurrentHashMap <>());
198- String user = participant + "-" + session ;
199214 AtomicInteger currentFailures = failures .computeIfAbsent (user , key -> new AtomicInteger (0 ));
200215 int newFailures = currentFailures .incrementAndGet ();
201216 log .error ("Participant {} in session {} failed {} times" , participant , session , newFailures );
@@ -212,8 +227,8 @@ public void addClientFailure(String workerUrl, String participant, String sessio
212227 log .debug ("Stop reconnecting participant {} in session {}" , participant , session );
213228 this .lastErrorReconnectingResponse = new CreateParticipantResponse ()
214229 .setResponseOk (false )
215- .setStopReason ("Participant " + participant + " in session " + session + " failed "
216- + newFailures + " times " );
230+ .setStopReason ("Participant " + participant + "- " + session + " failed after "
231+ + newFailures + " retries " );
217232 }
218233 }
219234 }
@@ -248,34 +263,39 @@ private void reconnect(String workerUrl, String participant, String session) {
248263 private void afterDisconnect (String workerUrl , String participant , String session ) {
249264 log .debug ("After disconnect user {} session {} in {}" , participant , session , workerUrl );
250265 String user = participant + "-" + session ;
251- ConcurrentHashMap <String , Role > workerRoles = this .clientRoles .get (workerUrl );
252- if (workerRoles == null ) {
253- // The connect request hasn't finished yet, wait for it
254- log .debug ("Worker roles is null for {} in session {} in {}. Waiting ..." , participant , session , workerUrl );
255- sleeper .sleep (WAIT_S , null );
256- this .afterDisconnect (workerUrl , participant , session );
257- return ;
258- }
259- Role role = workerRoles .get (user );
260- // get user number from participant removing prefix
261- int userNumber = Integer .parseInt (participant .replace (loadTestConfig .getUserNamePrefix (), "" ));
262- // get session number from session removing prefix
263- int sessionNumber = Integer .parseInt (session .replace (loadTestConfig .getSessionNamePrefix (), "" ));
264- CreateParticipantResponse response = null ;
265- if (role .equals (Role .PUBLISHER )) {
266- response = this .createPublisher (workerUrl , userNumber , sessionNumber , this .participantTestCases .get (user ));
267- } else {
268- response = this .createSubscriber (workerUrl , userNumber , sessionNumber , this .participantTestCases .get (user ));
269- }
270- if (response .isResponseOk ()) {
271- this .participantReconnecting .remove (user );
272- } else {
273- this .lastErrorReconnectingResponse = response ;
274- log .error ("Response status is not 200 OK. Exit" );
266+ try {
267+ ConcurrentHashMap <String , Role > workerRoles = this .clientRoles .get (workerUrl );
268+ if (workerRoles == null ) {
269+ log .debug ("Worker roles is null for {} in session {} in {}. Waiting ..." , participant , session , workerUrl );
270+ sleeper .sleep (WAIT_S , null );
271+ this .afterDisconnect (workerUrl , participant , session );
272+ return ;
273+ }
274+ Role role = workerRoles .get (user );
275+ if (role == null ) {
276+ log .warn ("Role is null for {} in session {} in {}. This worker may not have this participant." , participant , session , workerUrl );
277+ return ;
278+ }
279+ int userNumber = Integer .parseInt (participant .replace (loadTestConfig .getUserNamePrefix (), "" ));
280+ int sessionNumber = Integer .parseInt (session .replace (loadTestConfig .getSessionNamePrefix (), "" ));
281+ CreateParticipantResponse response = null ;
282+ if (role .equals (Role .PUBLISHER )) {
283+ response = this .createPublisher (workerUrl , userNumber , sessionNumber , this .participantTestCases .get (user ));
284+ } else {
285+ response = this .createSubscriber (workerUrl , userNumber , sessionNumber , this .participantTestCases .get (user ));
286+ }
287+ if (response .isResponseOk ()) {
288+ this .participantReconnecting .remove (user );
289+ } else {
290+ this .lastErrorReconnectingResponse = response ;
291+ log .error ("Response status is not 200 OK. Exit" );
292+ }
293+ } finally {
294+ processingDisconnects .remove (user );
275295 }
276296 }
277297
278- private HttpResponse <String > disconnectUser (String workerUrl , String participant , String session ) {
298+ private HttpResponse <String > disconnectUser (String workerUrl , String participant , String session ) {
279299 try {
280300 log .info ("Deleting participant {} from worker {}" , participant , workerUrl );
281301 Map <String , String > headers = new HashMap <>();
@@ -447,20 +467,23 @@ private CreateParticipantResponse createParticipant(String workerUrl, int userNu
447467 log .error ("Participant {} in session {} failed {} times" , userId , sessionId , failures );
448468 sleeper .sleep (WAIT_S , null );
449469 if (!loadTestConfig .isRetryMode () || isResponseLimitReached (failures ) || endOfTest .get ()) {
470+ String reason = "Participant " + userId + "-" + sessionId + " failed after "
471+ + failures + " retries" ;
450472 // Set lastErrorReconnectingResponse to trigger test termination
451473 this .lastErrorReconnectingResponse = new CreateParticipantResponse ()
452474 .setResponseOk (false )
453- .setStopReason ("Participant " + userId + " in session " + sessionId + " failed "
454- + failures + " times" );
455- return cpr .setResponseOk (false );
475+ .setStopReason (reason );
476+ // Also set stopReason on the returned response to avoid race condition
477+ // where getLastResponse() may return this object before lastErrorReconnectingResponse is visible
478+ return cpr .setResponseOk (false ).setStopReason (reason );
456479 }
457480 log .warn ("Retrying" );
458481 return this .createParticipant (workerUrl , userNumber , sessionNumber , testCase , role );
459482 } else {
460483 this .participantConnecting .get (user ).set (false );
461484 this .saveParticipantData (workerUrl , testCase .isTeaching () ? Role .PUBLISHER : role );
462485 }
463- return processResponse (response );
486+ return processResponse (response , workerUrl );
464487 } catch (Exception e ) {
465488 CreateParticipantErrorContext ctx = new CreateParticipantErrorContext (
466489 new UserInfo (workerUrl , userNumber , sessionNumber , role , userId , sessionId ), testCase , cpr );
@@ -520,7 +543,7 @@ private CreateParticipantResponse createExternalRecordingParticipant(String work
520543 return okResponse ;
521544 }
522545
523- private CreateParticipantResponse processResponse (HttpResponse <String > response ) {
546+ private CreateParticipantResponse processResponse (HttpResponse <String > response , String workerUrl ) {
524547 CreateParticipantResponse cpr = new CreateParticipantResponse ();
525548 if (response != null && response .statusCode () == HTTP_STATUS_OK ) {
526549 JsonObject jsonResponse = jsonUtils .getJson (response .body ());
@@ -538,7 +561,8 @@ private CreateParticipantResponse processResponse(HttpResponse<String> response)
538561 return cpr .setResponseOk (true ).setConnectionId (connectionId )
539562 .setUserId (userId ).setSessionId (sessionId )
540563 .setWorkerCpuPct (workerCpuPct ).setStreamsInWorker (streamsInWorker )
541- .setParticipantsInWorker (participantsInWorker );
564+ .setParticipantsInWorker (participantsInWorker )
565+ .setWorkerUrl (workerUrl );
542566 }
543567 if (response == null ) {
544568 log .error ("Error. Response is null" );
0 commit comments