@@ -176,56 +176,66 @@ public void handleStreamingRequests(
176176 ServerCallContext context = createCallContext (httpRequest , securityContext );
177177 LOGGER .debug ("Handling streaming request with custom SSE response" );
178178
179- // Set SSE headers manually for proper streaming
179+ // Parse and validate before committing to SSE response format.
180+ // Validation errors (e.g. terminal task) must be returned as plain
181+ // JSON-RPC error responses, not SSE events.
182+ A2ARequest <?> request = null ;
183+ try {
184+ request = JSONRPCUtils .parseRequestBody (body , null );
185+ validateStreamingRequest ((StreamingJSONRPCRequest <?>) request );
186+ } catch (A2AError e ) {
187+ LOGGER .debug ("A2AError validating streaming request: {}" , e .getMessage ());
188+ sendJsonRpcError (response , request != null ? request .getId () : null , e );
189+ return ;
190+ } catch (InvalidParamsJsonMappingException e ) {
191+ LOGGER .warn ("Invalid params in streaming request: {}" , e .getMessage ());
192+ sendJsonRpcError (response , e .getId (), new InvalidParamsError (null , e .getMessage (), null ));
193+ return ;
194+ } catch (MethodNotFoundJsonMappingException e ) {
195+ LOGGER .warn ("Method not found in streaming request: {}" , e .getMessage ());
196+ sendJsonRpcError (response , e .getId (), new MethodNotFoundError (null , e .getMessage (), null ));
197+ return ;
198+ } catch (IdJsonMappingException e ) {
199+ LOGGER .warn ("Invalid request ID in streaming request: {}" , e .getMessage ());
200+ sendJsonRpcError (response , e .getId (), new InvalidRequestError (null , e .getMessage (), null ));
201+ return ;
202+ } catch (JsonMappingException e ) {
203+ LOGGER .warn ("JSON mapping error in streaming request: {}" , e .getMessage (), e );
204+ sendJsonRpcError (response , null , new InvalidRequestError (null , e .getMessage (), null ));
205+ return ;
206+ } catch (JsonSyntaxException e ) {
207+ LOGGER .warn ("JSON syntax error in streaming request: {}" , e .getMessage ());
208+ sendJsonRpcError (response , null , new JSONParseError (e .getMessage ()));
209+ return ;
210+ } catch (JsonProcessingException e ) {
211+ LOGGER .warn ("JSON processing error in streaming request: {}" , e .getMessage ());
212+ sendJsonRpcError (response , null , new JSONParseError (e .getMessage ()));
213+ return ;
214+ } catch (Throwable e ) {
215+ LOGGER .error ("Unexpected error processing streaming request: {}" , e .getMessage (), e );
216+ sendJsonRpcError (response , null , new InternalError (e .getMessage ()));
217+ return ;
218+ }
219+
220+ // Validation passed — now commit to SSE response format
180221 response .setContentType (MediaType .SERVER_SENT_EVENTS );
181222 response .setCharacterEncoding ("UTF-8" );
182223 response .setHeader (HttpHeaders .CACHE_CONTROL , "no-cache" );
183224
184- A2ARequest <?> request = null ;
185225 try {
186- // Parse the request body
187- request = JSONRPCUtils .parseRequestBody (body , null );
188-
189- // Get the publisher synchronously to avoid connection closure issues
190226 Flow .Publisher <? extends A2AResponse <?>> publisher = createStreamingPublisher ((StreamingJSONRPCRequest <?>) request , context );
191227 LOGGER .debug ("Created streaming publisher: {}" , publisher );
192228
193229 if (publisher != null ) {
194- // Handle the streaming response with custom SSE formatting
195230 LOGGER .debug ("Handling custom SSE response for publisher: {}" , publisher );
196231 handleCustomSSEResponse (publisher , response , context );
197232 } else {
198- // Handle unsupported request types
199233 LOGGER .debug ("Unsupported streaming request type: {}" , request .getClass ().getSimpleName ());
200234 response .sendError (HttpServletResponse .SC_BAD_REQUEST , "Unsupported streaming request type" );
201235 }
202- } catch (MethodNotFoundJsonMappingException e ) {
203- LOGGER .warn ("Method not found in streaming request: {}" , e .getMessage ());
204- sendErrorSSE (response , e .getId (), new MethodNotFoundError ());
205- } catch (InvalidParamsJsonMappingException e ) {
206- LOGGER .warn ("Invalid params in streaming request: {}" , e .getMessage ());
207- sendErrorSSE (response , e .getId (), new InvalidParamsError ());
208- } catch (IdJsonMappingException e ) {
209- LOGGER .warn ("Invalid request ID in streaming request: {}" , e .getMessage ());
210- sendErrorSSE (response , e .getId (), new InvalidRequestError ());
211- } catch (JsonMappingException e ) {
212- LOGGER .warn ("JSON mapping error in streaming request: {}" , e .getMessage (), e );
213- // Check if this is a parse error wrapped in a mapping exception
214- if (e .getCause () instanceof JsonProcessingException ) {
215- sendErrorSSE (response , null , new JSONParseError ());
216- } else {
217- // Otherwise it's an invalid request (valid JSON but doesn't match schema)
218- sendErrorSSE (response , null , new InvalidRequestError ());
219- }
220- } catch (JsonSyntaxException e ) {
221- LOGGER .warn ("JSON syntax error in streaming request: {}" , e .getMessage ());
222- sendErrorSSE (response , null , new JSONParseError ());
223- } catch (JsonProcessingException e ) {
224- LOGGER .warn ("JSON processing error in streaming request: {}" , e .getMessage ());
225- sendErrorSSE (response , null , new JSONParseError ());
226236 } catch (A2AError e ) {
227237 LOGGER .debug ("A2AError in streaming request: {}" , e .getMessage ());
228- sendErrorSSE (response , request != null ? request .getId () : null , e );
238+ sendErrorSSE (response , request .getId (), e );
229239 } catch (Throwable e ) {
230240 LOGGER .error ("Unexpected error processing streaming request: {}" , e .getMessage (), e );
231241 sendErrorSSE (response , null , new InternalError (e .getMessage ()));
@@ -293,6 +303,20 @@ private A2AResponse<?> processNonStreamingRequest(NonStreamingJSONRPCRequest<?>
293303 }
294304 }
295305
306+ /**
307+ * Validates a streaming request before entering SSE mode.
308+ * Throws A2AError if the task is in a terminal state or not found.
309+ * This must be called before setting SSE headers so that errors
310+ * are returned as plain JSON-RPC error responses, not SSE events.
311+ */
312+ private void validateStreamingRequest (StreamingJSONRPCRequest <?> request ) throws A2AError {
313+ if (request instanceof SendStreamingMessageRequest req ) {
314+ jsonRpcHandler .validateRequestedTask (req .getParams ().message ().taskId ());
315+ } else if (request instanceof SubscribeToTaskRequest req ) {
316+ jsonRpcHandler .validateRequestedTask (req .getParams ().id ());
317+ }
318+ }
319+
296320 /**
297321 * Creates a streaming publisher for the given request.
298322 * This method runs synchronously to avoid connection closure issues.
@@ -415,6 +439,23 @@ private A2AResponse<?> generateErrorResponse(A2ARequest<?> request, A2AError err
415439 return new A2AErrorResponse (request .getId (), error );
416440 }
417441
442+ /**
443+ * Sends a plain JSON-RPC error response (Content-Type: application/json).
444+ * Used for pre-streaming validation errors that should not be sent as SSE.
445+ */
446+ private void sendJsonRpcError (HttpServletResponse response , Object id , A2AError error ) {
447+ try {
448+ A2AErrorResponse errorResponse = new A2AErrorResponse (id , error );
449+ String jsonData = serializeResponse (errorResponse );
450+ response .setStatus (HttpServletResponse .SC_OK );
451+ response .setContentType (org .a2aproject .sdk .common .MediaType .APPLICATION_JSON );
452+ response .getWriter ().write (jsonData );
453+ response .getWriter ().flush ();
454+ } catch (Exception e ) {
455+ LOGGER .error ("Error sending JSON-RPC error response: {}" , e .getMessage (), e );
456+ }
457+ }
458+
418459 /**
419460 * Sends an error response as a Server-Sent Event.
420461 */
0 commit comments