11package org .wildfly .extras .a2a .server .apps .jakarta ;
22
3+ import java .io .IOException ;
4+ import java .io .PrintWriter ;
35import java .util .Enumeration ;
46import java .util .HashMap ;
57import java .util .Map ;
8+ import java .util .concurrent .CompletableFuture ;
69import java .util .concurrent .Executor ;
710import java .util .concurrent .Flow ;
11+ import java .util .concurrent .atomic .AtomicLong ;
812
913import jakarta .enterprise .inject .Instance ;
1014import jakarta .inject .Inject ;
1115import jakarta .servlet .http .HttpServletRequest ;
16+ import jakarta .servlet .http .HttpServletResponse ;
1217import jakarta .ws .rs .Consumes ;
1318import jakarta .ws .rs .GET ;
1419import jakarta .ws .rs .POST ;
1520import jakarta .ws .rs .Path ;
1621import jakarta .ws .rs .Produces ;
1722import jakarta .ws .rs .core .Context ;
23+ import jakarta .ws .rs .core .HttpHeaders ;
1824import jakarta .ws .rs .core .MediaType ;
1925import jakarta .ws .rs .core .Response ;
2026import jakarta .ws .rs .core .SecurityContext ;
2127import jakarta .ws .rs .ext .ExceptionMapper ;
2228import jakarta .ws .rs .ext .Provider ;
23- import jakarta .ws .rs .sse .Sse ;
24- import jakarta .ws .rs .sse .SseEventSink ;
29+ import jakarta .ws .rs .ext .Providers ;
2530
2631import com .fasterxml .jackson .core .JsonParseException ;
2732import com .fasterxml .jackson .databind .JsonMappingException ;
33+ import com .fasterxml .jackson .databind .ObjectMapper ;
2834import io .a2a .server .ExtendedAgentCard ;
2935import io .a2a .server .ServerCallContext ;
3036import io .a2a .server .auth .UnauthenticatedUser ;
4046import io .a2a .spec .InvalidParamsError ;
4147import io .a2a .spec .InvalidParamsJsonMappingException ;
4248import io .a2a .spec .InvalidRequestError ;
43- import io .a2a .spec .JSONErrorResponse ;
4449import io .a2a .spec .JSONParseError ;
4550import io .a2a .spec .JSONRPCError ;
4651import io .a2a .spec .JSONRPCErrorResponse ;
@@ -108,19 +113,51 @@ public JSONRPCResponse<?> handleNonStreamingRequests(
108113
109114 /**
110115 * Handles incoming POST requests to the main A2A endpoint that involve Server-Sent Events (SSE).
111- * Dispatches the request to the appropriate JSON-RPC handler method and returns the response .
116+ * Uses custom SSE response handling to avoid JAX-RS SSE compatibility issues with async publishers .
112117 */
113118 @ POST
114119 @ Consumes (MediaType .APPLICATION_JSON )
115120 @ Produces (MediaType .SERVER_SENT_EVENTS )
116121 public void handleStreamingRequests (
117- StreamingJSONRPCRequest <?> request , @ Context SseEventSink sseEventSink ,
118- @ Context Sse sse , @ Context HttpServletRequest httpRequest ,
119- @ Context SecurityContext securityContext ) {
122+ StreamingJSONRPCRequest <?> request ,
123+ @ Context HttpServletResponse response ,
124+ @ Context HttpServletRequest httpRequest ,
125+ @ Context SecurityContext securityContext ,
126+ @ Context Providers providers ) throws IOException {
127+
120128 ServerCallContext context = createCallContext (httpRequest , securityContext );
121- LOGGER .debug ("Handling streaming request" );
122- executor .execute (() -> processStreamingRequest (request , sseEventSink , sse , context ));
123- LOGGER .debug ("Submitted streaming request for async processing" );
129+ LOGGER .debug ("Handling streaming request with custom SSE response" );
130+
131+ // Set SSE headers manually for proper streaming
132+ response .setContentType (MediaType .SERVER_SENT_EVENTS );
133+ response .setCharacterEncoding ("UTF-8" );
134+ response .setHeader (HttpHeaders .CACHE_CONTROL , "no-cache" );
135+
136+ // Get the ObjectMapper from JAX-RS context
137+ ObjectMapper objectMapper = providers .getContextResolver (ObjectMapper .class , MediaType .APPLICATION_JSON_TYPE )
138+ .getContext (JSONRPCResponse .class );
139+ if (objectMapper == null ) {
140+ // Fallback to properly configured ObjectMapper if context resolver doesn't provide one
141+ objectMapper = new ObjectMapper ();
142+ objectMapper .registerModule (new com .fasterxml .jackson .datatype .jsr310 .JavaTimeModule ());
143+ objectMapper .disable (com .fasterxml .jackson .databind .SerializationFeature .WRITE_DATES_AS_TIMESTAMPS );
144+ }
145+
146+ // Get the publisher synchronously to avoid connection closure issues
147+ Flow .Publisher <? extends JSONRPCResponse <?>> publisher = createStreamingPublisher (request , context );
148+ LOGGER .debug ("Created streaming publisher: {}" , publisher );
149+
150+ if (publisher != null ) {
151+ // Handle the streaming response with custom SSE formatting
152+ LOGGER .debug ("Handling custom SSE response for publisher: {}" , publisher );
153+ handleCustomSSEResponse (publisher , response , objectMapper );
154+ } else {
155+ // Handle unsupported request types
156+ LOGGER .debug ("Unsupported streaming request type: {}" , request .getClass ().getSimpleName ());
157+ response .sendError (HttpServletResponse .SC_BAD_REQUEST , "Unsupported streaming request type" );
158+ }
159+
160+ LOGGER .debug ("Completed streaming request processing" );
124161 }
125162
126163 /**
@@ -159,26 +196,42 @@ private JSONRPCResponse<?> processNonStreamingRequest(NonStreamingJSONRPCRequest
159196 }
160197 }
161198
162- private void processStreamingRequest (StreamingJSONRPCRequest <?> request , SseEventSink sseEventSink , Sse sse ,
163- ServerCallContext context ) {
164- Flow .Publisher <? extends JSONRPCResponse <?>> publisher ;
199+ /**
200+ * Creates a streaming publisher for the given request.
201+ * This method runs synchronously to avoid connection closure issues.
202+ */
203+ private Flow .Publisher <? extends JSONRPCResponse <?>> createStreamingPublisher (StreamingJSONRPCRequest <?> request ,
204+ ServerCallContext context ) {
165205 if (request instanceof SendStreamingMessageRequest req ) {
166- publisher = jsonRpcHandler .onMessageSendStream (req , context );
167- handleStreamingResponse (publisher , sseEventSink , sse );
206+ return jsonRpcHandler .onMessageSendStream (req , context );
168207 } else if (request instanceof TaskResubscriptionRequest req ) {
169- publisher = jsonRpcHandler .onResubscribeToTask (req , context );
170- handleStreamingResponse (publisher , sseEventSink , sse );
208+ return jsonRpcHandler .onResubscribeToTask (req , context );
209+ } else {
210+ return null ; // Unsupported request type
171211 }
172212 }
173213
174- private void handleStreamingResponse (Flow .Publisher <? extends JSONRPCResponse <?>> publisher , SseEventSink sseEventSink , Sse sse ) {
214+ /**
215+ * Handles the streaming response using custom SSE formatting.
216+ * This approach avoids JAX-RS SSE compatibility issues with async publishers.
217+ */
218+ private void handleCustomSSEResponse (Flow .Publisher <? extends JSONRPCResponse <?>> publisher ,
219+ HttpServletResponse response , ObjectMapper objectMapper ) throws IOException {
220+
221+ PrintWriter writer = response .getWriter ();
222+ AtomicLong eventId = new AtomicLong (0 );
223+ CompletableFuture <Void > streamingComplete = new CompletableFuture <>();
224+
175225 publisher .subscribe (new Flow .Subscriber <JSONRPCResponse <?>>() {
226+ @ SuppressWarnings ("unused" ) // Stored for potential future use (e.g., cancellation)
176227 private Flow .Subscription subscription ;
177228
178229 @ Override
179230 public void onSubscribe (Flow .Subscription subscription ) {
231+ LOGGER .debug ("Custom SSE subscriber onSubscribe called" );
180232 this .subscription = subscription ;
181233 subscription .request (Long .MAX_VALUE );
234+
182235 // Notify tests that we are subscribed
183236 Runnable runnable = streamingIsSubscribedRunnable ;
184237 if (runnable != null ) {
@@ -188,26 +241,57 @@ public void onSubscribe(Flow.Subscription subscription) {
188241
189242 @ Override
190243 public void onNext (JSONRPCResponse <?> item ) {
191-
192- sseEventSink .send (sse .newEventBuilder ()
193- .mediaType (MediaType .APPLICATION_JSON_TYPE )
194- .data (item )
195- .build ());
244+ LOGGER .debug ("Custom SSE subscriber onNext called with item: {}" , item );
245+ try {
246+ // Format as proper SSE event (matching Quarkus format)
247+ String jsonData = objectMapper .writeValueAsString (item );
248+ long id = eventId .getAndIncrement ();
249+
250+ writer .write ("data: " + jsonData + "\n " );
251+ writer .write ("id: " + id + "\n " );
252+ writer .write ("\n " ); // Empty line to complete the event
253+ writer .flush ();
254+
255+ LOGGER .debug ("Custom SSE event sent successfully with id: {}" , id );
256+ } catch (Exception e ) {
257+ LOGGER .error ("Error writing SSE event: {}" , e .getMessage (), e );
258+ onError (e );
259+ }
196260 }
197261
198262 @ Override
199263 public void onError (Throwable throwable ) {
200- // TODO
201- sseEventSink .close ();
264+ LOGGER .debug ("Custom SSE subscriber onError called: {}" , throwable .getMessage (), throwable );
265+ try {
266+ writer .close ();
267+ } catch (Exception e ) {
268+ LOGGER .error ("Error closing writer: {}" , e .getMessage (), e );
269+ }
270+ streamingComplete .completeExceptionally (throwable );
202271 }
203272
204273 @ Override
205274 public void onComplete () {
206- sseEventSink .close ();
275+ LOGGER .debug ("Custom SSE subscriber onComplete called" );
276+ try {
277+ writer .close ();
278+ } catch (Exception e ) {
279+ LOGGER .error ("Error closing writer: {}" , e .getMessage (), e );
280+ }
281+ streamingComplete .complete (null );
207282 }
208283 });
284+
285+ try {
286+ // Wait for streaming to complete before method returns
287+ streamingComplete .get ();
288+ } catch (Exception e ) {
289+ LOGGER .error ("Error waiting for streaming completion: {}" , e .getMessage (), e );
290+ throw new IOException ("Streaming failed" , e );
291+ }
209292 }
210293
294+
211295 private JSONRPCResponse <?> generateErrorResponse (JSONRPCRequest <?> request , JSONRPCError error ) {
212296 return new JSONRPCErrorResponse (request .getId (), error );
213297 }
0 commit comments