@@ -29,23 +29,33 @@ protected void internalShutdown() {
2929 requestor .close ();
3030 }
3131
32+ private static boolean getFallback (FDv2Requestor .FDv2PayloadResponse response ) {
33+ if (response != null && response .getHeaders () != null ) {
34+ String headerValue = response .getHeaders ().get (HeaderConstants .FDV1_FALLBACK .getHeaderName ());
35+ return headerValue != null && headerValue .equalsIgnoreCase ("true" );
36+ }
37+ // if(ex != null) {
38+ // if(ex instanceof HttpErrorException) {
39+ // ((HttpErrors.HttpErrorException) ex).
40+ // }
41+ // }
42+
43+ return false ;
44+ }
45+
46+ private static String getEnvironmentId (FDv2Requestor .FDv2PayloadResponse response ) {
47+ if (response != null && response .getHeaders () != null ) {
48+ return response .getHeaders ().get (HeaderConstants .ENVIRONMENT_ID .getHeaderName ());
49+ }
50+ return null ;
51+ }
52+
3253 protected CompletableFuture <FDv2SourceResult > poll (Selector selector , boolean oneShot ) {
3354 return requestor .Poll (selector ).handle (((pollingResponse , ex ) -> {
55+ boolean fdv1Fallback = getFallback (pollingResponse );
56+ String environmentId = getEnvironmentId (pollingResponse );
3457 if (ex != null ) {
35- if (ex instanceof HttpErrors .HttpErrorException ) {
36- HttpErrors .HttpErrorException e = (HttpErrors .HttpErrorException ) ex ;
37- DataSourceStatusProvider .ErrorInfo errorInfo = DataSourceStatusProvider .ErrorInfo .fromHttpError (e .getStatus ());
38- // Errors without an HTTP status are recoverable. If there is a status, then we check if the error
39- // is recoverable.
40- boolean recoverable = e .getStatus () <= 0 || isHttpErrorRecoverable (e .getStatus ());
41- logger .error ("Polling request failed with HTTP error: {}" , e .getStatus ());
42- // For a one-shot request all errors are terminal.
43- if (oneShot ) {
44- return FDv2SourceResult .terminalError (errorInfo );
45- } else {
46- return recoverable ? FDv2SourceResult .interrupted (errorInfo ) : FDv2SourceResult .terminalError (errorInfo );
47- }
48- } else if (ex instanceof IOException ) {
58+ if (ex instanceof IOException ) {
4959 IOException e = (IOException ) ex ;
5060 logger .error ("Polling request failed with network error: {}" , e .toString ());
5161 DataSourceStatusProvider .ErrorInfo info = new DataSourceStatusProvider .ErrorInfo (
@@ -54,7 +64,7 @@ protected CompletableFuture<FDv2SourceResult> poll(Selector selector, boolean on
5464 e .toString (),
5565 new Date ().toInstant ()
5666 );
57- return oneShot ? FDv2SourceResult .terminalError (info ) : FDv2SourceResult .interrupted (info );
67+ return oneShot ? FDv2SourceResult .terminalError (info , fdv1Fallback ) : FDv2SourceResult .interrupted (info , fdv1Fallback );
5868 } else if (ex instanceof SerializationException ) {
5969 SerializationException e = (SerializationException ) ex ;
6070 logger .error ("Polling request received malformed data: {}" , e .toString ());
@@ -64,7 +74,7 @@ protected CompletableFuture<FDv2SourceResult> poll(Selector selector, boolean on
6474 e .toString (),
6575 new Date ().toInstant ()
6676 );
67- return oneShot ? FDv2SourceResult .terminalError (info ) : FDv2SourceResult .interrupted (info );
77+ return oneShot ? FDv2SourceResult .terminalError (info , fdv1Fallback ) : FDv2SourceResult .interrupted (info , fdv1Fallback );
6878 }
6979 String msg = ex .toString ();
7080 logger .error ("Polling request failed with an unknown error: {}" , msg );
@@ -74,17 +84,30 @@ protected CompletableFuture<FDv2SourceResult> poll(Selector selector, boolean on
7484 msg ,
7585 new Date ().toInstant ()
7686 );
77- return oneShot ? FDv2SourceResult .terminalError (info ) : FDv2SourceResult .interrupted (info );
87+ return oneShot ? FDv2SourceResult .terminalError (info , fdv1Fallback ) : FDv2SourceResult .interrupted (info , fdv1Fallback );
7888 }
79- // A null polling response indicates that we received a 304, which means nothing has changed.
80- if (pollingResponse == null ) {
89+ // If we get a 304, then that means nothing has changed.
90+ if (pollingResponse . getStatusCode () == 304 ) {
8191 return FDv2SourceResult .changeSet (
8292 new DataStoreTypes .ChangeSet <>(DataStoreTypes .ChangeSetType .None ,
8393 Selector .EMPTY ,
8494 null ,
85- // TODO: Implement environment ID support.
86- null
87- ));
95+ null // Header derived values will have been handled on initial response.
96+ ),
97+ // Headers would have been processed from the initial response.
98+ false );
99+ }
100+ if (!pollingResponse .isSuccess ()) {
101+ int statusCode = pollingResponse .getStatusCode ();
102+ boolean recoverable = statusCode <= 0 || isHttpErrorRecoverable (statusCode );
103+ DataSourceStatusProvider .ErrorInfo errorInfo = DataSourceStatusProvider .ErrorInfo .fromHttpError (statusCode );
104+ logger .error ("Polling request failed with HTTP error: {}" , statusCode );
105+ // For a one-shot request all errors are terminal.
106+ if (oneShot ) {
107+ return FDv2SourceResult .terminalError (errorInfo , fdv1Fallback );
108+ } else {
109+ return recoverable ? FDv2SourceResult .interrupted (errorInfo , fdv1Fallback ) : FDv2SourceResult .terminalError (errorInfo , fdv1Fallback );
110+ }
88111 }
89112 FDv2ProtocolHandler handler = new FDv2ProtocolHandler ();
90113 for (FDv2Event event : pollingResponse .getEvents ()) {
@@ -96,10 +119,9 @@ protected CompletableFuture<FDv2SourceResult> poll(Selector selector, boolean on
96119 DataStoreTypes .ChangeSet <DataStoreTypes .ItemDescriptor > converted = FDv2ChangeSetTranslator .toChangeSet (
97120 ((FDv2ProtocolHandler .FDv2ActionChangeset ) res ).getChangeset (),
98121 logger ,
99- // TODO: Implement environment ID support.
100- null
122+ environmentId
101123 );
102- return FDv2SourceResult .changeSet (converted );
124+ return FDv2SourceResult .changeSet (converted , fdv1Fallback );
103125 } catch (Exception e ) {
104126 // TODO: Do we need to be more specific about the exception type here?
105127 DataSourceStatusProvider .ErrorInfo info = new DataSourceStatusProvider .ErrorInfo (
@@ -108,7 +130,7 @@ protected CompletableFuture<FDv2SourceResult> poll(Selector selector, boolean on
108130 e .toString (),
109131 new Date ().toInstant ()
110132 );
111- return oneShot ? FDv2SourceResult .terminalError (info ) : FDv2SourceResult .interrupted (info );
133+ return oneShot ? FDv2SourceResult .terminalError (info , fdv1Fallback ) : FDv2SourceResult .interrupted (info , fdv1Fallback );
112134 }
113135 case ERROR : {
114136 FDv2ProtocolHandler .FDv2ActionError error = ((FDv2ProtocolHandler .FDv2ActionError ) res );
@@ -117,10 +139,10 @@ protected CompletableFuture<FDv2SourceResult> poll(Selector selector, boolean on
117139 0 ,
118140 error .getReason (),
119141 new Date ().toInstant ());
120- return oneShot ? FDv2SourceResult .terminalError (info ) : FDv2SourceResult .interrupted (info );
142+ return oneShot ? FDv2SourceResult .terminalError (info , fdv1Fallback ) : FDv2SourceResult .interrupted (info , fdv1Fallback );
121143 }
122144 case GOODBYE :
123- return FDv2SourceResult .goodbye (((FDv2ProtocolHandler .FDv2ActionGoodbye ) res ).getReason ());
145+ return FDv2SourceResult .goodbye (((FDv2ProtocolHandler .FDv2ActionGoodbye ) res ).getReason (), fdv1Fallback );
124146 case NONE :
125147 break ;
126148 case INTERNAL_ERROR : {
@@ -141,7 +163,7 @@ protected CompletableFuture<FDv2SourceResult> poll(Selector selector, boolean on
141163 0 ,
142164 "Internal error occurred during polling" ,
143165 new Date ().toInstant ());
144- return oneShot ? FDv2SourceResult .terminalError (info ) : FDv2SourceResult .interrupted (info );
166+ return oneShot ? FDv2SourceResult .terminalError (info , fdv1Fallback ) : FDv2SourceResult .interrupted (info , fdv1Fallback );
145167 }
146168 }
147169 }
@@ -152,7 +174,7 @@ protected CompletableFuture<FDv2SourceResult> poll(Selector selector, boolean on
152174 "Unexpected end of polling response" ,
153175 new Date ().toInstant ()
154176 );
155- return oneShot ? FDv2SourceResult .terminalError (info ) : FDv2SourceResult .interrupted (info );
177+ return oneShot ? FDv2SourceResult .terminalError (info , fdv1Fallback ) : FDv2SourceResult .interrupted (info , fdv1Fallback );
156178 }));
157179 }
158180}
0 commit comments