@@ -438,4 +438,181 @@ public void selectorRefetchedOnReconnection() throws Exception {
438438 synchronizer .close ();
439439 }
440440 }
441+
442+ @ Test
443+ public void errorEventFromServer () throws Exception {
444+ String errorEvent = makeEvent ("error" , "{\" id\" :\" error-123\" ,\" reason\" :\" some server error\" }" );
445+ String serverIntent = makeEvent ("server-intent" , "{\" payloads\" :[{\" id\" :\" payload-1\" ,\" target\" :100,\" intentCode\" :\" xfer-full\" ,\" reason\" :\" payload-missing\" }]}" );
446+ String payloadTransferred = makeEvent ("payload-transferred" , "{\" state\" :\" (p:payload-1:100)\" ,\" version\" :100}" );
447+
448+ try (HttpServer server = HttpServer .start (Handlers .all (
449+ Handlers .SSE .start (),
450+ Handlers .SSE .event (errorEvent ),
451+ Handlers .SSE .event (serverIntent ),
452+ Handlers .SSE .event (payloadTransferred ),
453+ Handlers .SSE .leaveOpen ()))) {
454+
455+ HttpProperties httpProperties = toHttpProperties (clientContext ("sdk-key" , baseConfig ().build ()).getHttp ());
456+ SelectorSource selectorSource = mockSelectorSource ();
457+
458+ StreamingSynchronizerImpl synchronizer = new StreamingSynchronizerImpl (
459+ httpProperties ,
460+ server .getUri (),
461+ "/stream" ,
462+ testLogger ,
463+ selectorSource
464+ );
465+
466+ // Error event should be logged but not queued, so we should get the changeset
467+ CompletableFuture <FDv2SourceResult > resultFuture = synchronizer .next ();
468+ FDv2SourceResult result = resultFuture .get (5 , TimeUnit .SECONDS );
469+
470+ assertNotNull (result );
471+ assertEquals (FDv2SourceResult .ResultType .CHANGE_SET , result .getResultType ());
472+ assertNotNull (result .getChangeSet ());
473+
474+ synchronizer .close ();
475+ }
476+ }
477+
478+ @ Test
479+ public void selectorWithVersionOnly () throws Exception {
480+ String serverIntent = makeEvent ("server-intent" , "{\" payloads\" :[{\" id\" :\" payload-1\" ,\" target\" :100,\" intentCode\" :\" xfer-full\" ,\" reason\" :\" payload-missing\" }]}" );
481+ String payloadTransferred = makeEvent ("payload-transferred" , "{\" state\" :\" (p:payload-1:100)\" ,\" version\" :100}" );
482+
483+ try (HttpServer server = HttpServer .start (Handlers .all (
484+ Handlers .SSE .start (),
485+ Handlers .SSE .event (serverIntent ),
486+ Handlers .SSE .event (payloadTransferred ),
487+ Handlers .SSE .leaveOpen ()))) {
488+
489+ HttpProperties httpProperties = toHttpProperties (clientContext ("sdk-key" , baseConfig ().build ()).getHttp ());
490+
491+ SelectorSource selectorSource = mock (SelectorSource .class );
492+ when (selectorSource .getSelector ()).thenReturn (Selector .make (75 , null ));
493+
494+ StreamingSynchronizerImpl synchronizer = new StreamingSynchronizerImpl (
495+ httpProperties ,
496+ server .getUri (),
497+ "/stream" ,
498+ testLogger ,
499+ selectorSource
500+ );
501+
502+ CompletableFuture <FDv2SourceResult > resultFuture = synchronizer .next ();
503+ FDv2SourceResult result = resultFuture .get (5 , TimeUnit .SECONDS );
504+
505+ assertNotNull (result );
506+ assertEquals (FDv2SourceResult .ResultType .CHANGE_SET , result .getResultType ());
507+
508+ // Verify the request had version but not state parameter
509+ assertEquals (1 , server .getRecorder ().count ());
510+ RequestInfo request = server .getRecorder ().requireRequest ();
511+ assertThat (request .getQuery (), containsString ("version=75" ));
512+ // State should not be present (or if present, not have an actual state value)
513+
514+ synchronizer .close ();
515+ }
516+ }
517+
518+ @ Test
519+ public void selectorWithEmptyState () throws Exception {
520+ String serverIntent = makeEvent ("server-intent" , "{\" payloads\" :[{\" id\" :\" payload-1\" ,\" target\" :100,\" intentCode\" :\" xfer-full\" ,\" reason\" :\" payload-missing\" }]}" );
521+ String payloadTransferred = makeEvent ("payload-transferred" , "{\" state\" :\" (p:payload-1:100)\" ,\" version\" :100}" );
522+
523+ try (HttpServer server = HttpServer .start (Handlers .all (
524+ Handlers .SSE .start (),
525+ Handlers .SSE .event (serverIntent ),
526+ Handlers .SSE .event (payloadTransferred ),
527+ Handlers .SSE .leaveOpen ()))) {
528+
529+ HttpProperties httpProperties = toHttpProperties (clientContext ("sdk-key" , baseConfig ().build ()).getHttp ());
530+
531+ SelectorSource selectorSource = mock (SelectorSource .class );
532+ when (selectorSource .getSelector ()).thenReturn (Selector .make (80 , "" ));
533+
534+ StreamingSynchronizerImpl synchronizer = new StreamingSynchronizerImpl (
535+ httpProperties ,
536+ server .getUri (),
537+ "/stream" ,
538+ testLogger ,
539+ selectorSource
540+ );
541+
542+ CompletableFuture <FDv2SourceResult > resultFuture = synchronizer .next ();
543+ FDv2SourceResult result = resultFuture .get (5 , TimeUnit .SECONDS );
544+
545+ assertNotNull (result );
546+ assertEquals (FDv2SourceResult .ResultType .CHANGE_SET , result .getResultType ());
547+
548+ // Verify the request had version but not state parameter (empty string shouldn't add state)
549+ assertEquals (1 , server .getRecorder ().count ());
550+ RequestInfo request = server .getRecorder ().requireRequest ();
551+ assertThat (request .getQuery (), containsString ("version=80" ));
552+
553+ synchronizer .close ();
554+ }
555+ }
556+
557+ @ Test
558+ public void closeCalledMultipleTimes () throws Exception {
559+ try (HttpServer server = HttpServer .start (Handlers .all (
560+ Handlers .SSE .start (),
561+ Handlers .hang ()))) {
562+
563+ HttpProperties httpProperties = toHttpProperties (clientContext ("sdk-key" , baseConfig ().build ()).getHttp ());
564+ SelectorSource selectorSource = mockSelectorSource ();
565+
566+ StreamingSynchronizerImpl synchronizer = new StreamingSynchronizerImpl (
567+ httpProperties ,
568+ server .getUri (),
569+ "/stream" ,
570+ testLogger ,
571+ selectorSource
572+ );
573+
574+ // Call close multiple times - should not throw exceptions
575+ synchronizer .close ();
576+ synchronizer .close ();
577+ synchronizer .close ();
578+
579+ // next() should still return shutdown
580+ FDv2SourceResult result = synchronizer .next ().get (1 , TimeUnit .SECONDS );
581+ assertNotNull (result );
582+ assertEquals (FDv2SourceResult .ResultType .STATUS , result .getResultType ());
583+ assertEquals (FDv2SourceResult .State .SHUTDOWN , result .getStatus ().getState ());
584+ }
585+ }
586+
587+ @ Test
588+ public void invalidEventStructureCausesInterrupt () throws Exception {
589+ // Event with missing required fields - should cause protocol handler to fail
590+ String badEventStructure = makeEvent ("put-object" , "{}" );
591+
592+ try (HttpServer server = HttpServer .start (Handlers .all (
593+ Handlers .SSE .start (),
594+ Handlers .SSE .event (badEventStructure ),
595+ Handlers .SSE .leaveOpen ()))) {
596+
597+ HttpProperties httpProperties = toHttpProperties (clientContext ("sdk-key" , baseConfig ().build ()).getHttp ());
598+ SelectorSource selectorSource = mockSelectorSource ();
599+
600+ StreamingSynchronizerImpl synchronizer = new StreamingSynchronizerImpl (
601+ httpProperties ,
602+ server .getUri (),
603+ "/stream" ,
604+ testLogger ,
605+ selectorSource
606+ );
607+
608+ CompletableFuture <FDv2SourceResult > resultFuture = synchronizer .next ();
609+ FDv2SourceResult result = resultFuture .get (5 , TimeUnit .SECONDS );
610+
611+ assertNotNull (result );
612+ assertEquals (FDv2SourceResult .ResultType .STATUS , result .getResultType ());
613+ assertEquals (FDv2SourceResult .State .INTERRUPTED , result .getStatus ().getState ());
614+
615+ synchronizer .close ();
616+ }
617+ }
441618}
0 commit comments