Skip to content

Commit e05b415

Browse files
authored
Merge pull request #61 from kabir/align-main
Align with main, main testsuite passing
2 parents 9448644 + 34fec0b commit e05b415

10 files changed

Lines changed: 185 additions & 62 deletions

File tree

.github/workflows/run-tck.yml

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,12 @@ on:
1313

1414
env:
1515
# Ref (branch/tag/SHA) of the TCK
16-
TCK_VERSION: main
16+
TCK_VERSION: 1.0-dev
1717
# Tells uv to not need a venv, and instead use system
1818
UV_SYSTEM_PYTHON: 1
19+
# Env vars to configure SUT hosts
20+
SUT_HOST: localhost:8080
21+
SUT_GRPC_HOST: localhost:9555
1922

2023
# Only run the latest job
2124
concurrency:
@@ -83,13 +86,14 @@ jobs:
8386
working-directory: a2a-tck
8487
- name: Start the WildFly SUT
8588
run: |
86-
SUT_JSONRPC_URL=http://localhost:8080 SUT_REST_URL=http://localhost:8080 SUT_GRPC_URL=http://localhost:9555 mvn wildfly:start -B -Dversion.sdk=${SDK_VERSION} -pl tck -Dstartup-timeout=120 -Dwildfly.serverArgs="--stability=preview"
89+
mvn wildfly:start -B -Dversion.sdk=${SDK_VERSION} -pl tck -Dstartup-timeout=120 -Dwildfly.serverArgs="--stability=preview"
8790
- name: Run TCK
88-
env:
89-
SUT_JSONRPC_URL: http://localhost:8080
90-
TCK_STREAMING_TIMEOUT: 4.0
91+
# Might not be needed any longer
92+
# env:
93+
# TCK_STREAMING_TIMEOUT: 4.0
9194
run: |
92-
./run_tck.py --sut-url ${SUT_JSONRPC_URL} --category all --transports jsonrpc,grpc,rest --compliance-report report.json
95+
set -o pipefail
96+
uv run ./run_tck.py --sut-host http://${{ env.SUT_HOST }} -v 2>&1 | tee tck-output.log
9397
working-directory: a2a-tck
9498
- name: Shutdown the WildFly SUT
9599
run: |

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ In the `tck/pom.xml` we enable both JSON-RPC and gRPC, and have the following de
4646
* `org.wildfly.a2a:a2a-java-sdk-server-jakarta-grpc` - this is the dependency for **gRPC** support.
4747
* We exclude the gRPC core libraries (`io.grpc` and `com.google.protobuf:protobuf-java`). This is because when deploying to WildFly with gRPC support, the server is provisioned with the WildFly gRPC feature-pack, which already provides these libraries. Including them in the `.war` would lead to conflicts. If you only want to support JSON-RPC, you can omit this dependency.
4848
* `jakarta.ws.rs:jakarta.ws.rs-api` - this is not part of the dependencies brought in via the A2A dependencies but is needed to compile the TCK module. Since it is provided by WildFly, we make the scope `provided` so it is not included in the `.war`.
49-
* `org.a2aproject.sdk:a2a-tck-server` - this is the application, which contains the `AgentExecutor` and `AgentCard` implementations for the TCK. In your case, they will most likely be implemented in the project you use to create the `.war`.
49+
* `org.a2aproject.sdk:a2a-java-sdk-tck-sut` - this is the application, which contains the `AgentExecutor` and `AgentCard` implementations for the TCK. In your case, they will most likely be implemented in the project you use to create the `.war`.
5050
* In this case we exclude all transitive dependencies, since we are doing the main dependency management via the `org.wildfly.a2a:a2a-java-sdk-jakarta-jsonrpc` and `org.wildfly.a2a:a2a-java-sdk-server-jakarta-grpc` dependencies.
5151

5252
If you are deploying to WildFly and want to use gRPC, you will also need to provision the server with the gRPC feature pack. You can see how this is done in the `wildfly-maven-plugin` configuration in the `tck/pom.xml`. Since the gRPC subsystem and feature pack are currently at the `preview` stability level, you will need to start the server with the `--stability=preview` argument.
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package org.wildfly.extras.a2a.server.apps.grpc;
2+
3+
import org.a2aproject.sdk.common.A2AHeaders;
4+
import org.a2aproject.sdk.transport.grpc.context.GrpcContextKeys;
5+
6+
import io.grpc.Context;
7+
import io.grpc.Contexts;
8+
import io.grpc.Metadata;
9+
import io.grpc.ServerCall;
10+
import io.grpc.ServerCallHandler;
11+
import io.grpc.ServerInterceptor;
12+
13+
/**
14+
* gRPC server interceptor that extracts A2A protocol headers from request
15+
* metadata and stores them in the gRPC {@link Context} for use by
16+
* {@link org.a2aproject.sdk.transport.grpc.handler.GrpcHandler}.
17+
*
18+
* <p>WildFly's gRPC subsystem discovers {@link ServerInterceptor} implementations
19+
* in the deployment and applies them automatically.
20+
*/
21+
public class A2AExtensionsInterceptor implements ServerInterceptor {
22+
23+
private static final Metadata.Key<String> VERSION_KEY = Metadata.Key.of(
24+
A2AHeaders.A2A_VERSION.toLowerCase(), Metadata.ASCII_STRING_MARSHALLER);
25+
private static final Metadata.Key<String> EXTENSIONS_KEY = Metadata.Key.of(
26+
A2AHeaders.A2A_EXTENSIONS.toLowerCase(), Metadata.ASCII_STRING_MARSHALLER);
27+
28+
@Override
29+
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
30+
ServerCall<ReqT, RespT> call, Metadata metadata, ServerCallHandler<ReqT, RespT> next) {
31+
String version = metadata.get(VERSION_KEY);
32+
String extensions = metadata.get(EXTENSIONS_KEY);
33+
34+
Context context = Context.current()
35+
.withValue(GrpcContextKeys.METADATA_KEY, metadata)
36+
.withValue(GrpcContextKeys.GRPC_METHOD_NAME_KEY, call.getMethodDescriptor().getFullMethodName())
37+
.withValue(GrpcContextKeys.METHOD_NAME_KEY,
38+
GrpcContextKeys.METHOD_MAPPING.get(call.getMethodDescriptor().getBareMethodName()));
39+
40+
if (version != null) {
41+
context = context.withValue(GrpcContextKeys.VERSION_HEADER_KEY, version);
42+
}
43+
if (extensions != null) {
44+
context = context.withValue(GrpcContextKeys.EXTENSIONS_HEADER_KEY, extensions);
45+
}
46+
47+
return Contexts.interceptCall(context, call, metadata, next);
48+
}
49+
}

impl/grpc/src/main/java/org/wildfly/extras/a2a/server/apps/grpc/WildFlyGrpcHandler.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import java.util.concurrent.Executor;
44
import java.util.concurrent.Flow;
5+
import java.util.function.Supplier;
56

67
import org.a2aproject.sdk.jsonrpc.common.wrappers.ListTasksResult;
78
import org.a2aproject.sdk.server.ServerCallContext;
@@ -173,5 +174,13 @@ public void onDeleteTaskPushNotificationConfig(DeleteTaskPushNotificationConfigP
173174
return null;
174175
});
175176
}
177+
178+
@Override
179+
public void validateRequestedTask(String requestedTaskId) throws A2AError {
180+
withDeploymentClassLoader(() -> {
181+
delegate.validateRequestedTask(requestedTaskId);
182+
return null;
183+
});
184+
}
176185
}
177186
}

impl/jsonrpc/src/main/java/org/wildfly/extras/a2a/server/apps/jsonrpc/A2AServerResource.java

Lines changed: 78 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -176,56 +176,66 @@ public void handleStreamingRequests(
176176
ServerCallContext context = createCallContext(httpRequest, securityContext);
177177
LOGGER.debug("Handling streaming request with custom SSE response");
178178

179-
// Set SSE headers manually for proper streaming
179+
// Parse and validate before committing to SSE response format.
180+
// Validation errors (e.g. terminal task) must be returned as plain
181+
// JSON-RPC error responses, not SSE events.
182+
A2ARequest<?> request = null;
183+
try {
184+
request = JSONRPCUtils.parseRequestBody(body, null);
185+
validateStreamingRequest((StreamingJSONRPCRequest<?>) request);
186+
} catch (A2AError e) {
187+
LOGGER.debug("A2AError validating streaming request: {}", e.getMessage());
188+
sendJsonRpcError(response, request != null ? request.getId() : null, e);
189+
return;
190+
} catch (InvalidParamsJsonMappingException e) {
191+
LOGGER.warn("Invalid params in streaming request: {}", e.getMessage());
192+
sendJsonRpcError(response, e.getId(), new InvalidParamsError(null, e.getMessage(), null));
193+
return;
194+
} catch (MethodNotFoundJsonMappingException e) {
195+
LOGGER.warn("Method not found in streaming request: {}", e.getMessage());
196+
sendJsonRpcError(response, e.getId(), new MethodNotFoundError(null, e.getMessage(), null));
197+
return;
198+
} catch (IdJsonMappingException e) {
199+
LOGGER.warn("Invalid request ID in streaming request: {}", e.getMessage());
200+
sendJsonRpcError(response, e.getId(), new InvalidRequestError(null, e.getMessage(), null));
201+
return;
202+
} catch (JsonMappingException e) {
203+
LOGGER.warn("JSON mapping error in streaming request: {}", e.getMessage(), e);
204+
sendJsonRpcError(response, null, new InvalidRequestError(null, e.getMessage(), null));
205+
return;
206+
} catch (JsonSyntaxException e) {
207+
LOGGER.warn("JSON syntax error in streaming request: {}", e.getMessage());
208+
sendJsonRpcError(response, null, new JSONParseError(e.getMessage()));
209+
return;
210+
} catch (JsonProcessingException e) {
211+
LOGGER.warn("JSON processing error in streaming request: {}", e.getMessage());
212+
sendJsonRpcError(response, null, new JSONParseError(e.getMessage()));
213+
return;
214+
} catch (Throwable e) {
215+
LOGGER.error("Unexpected error processing streaming request: {}", e.getMessage(), e);
216+
sendJsonRpcError(response, null, new InternalError(e.getMessage()));
217+
return;
218+
}
219+
220+
// Validation passed — now commit to SSE response format
180221
response.setContentType(MediaType.SERVER_SENT_EVENTS);
181222
response.setCharacterEncoding("UTF-8");
182223
response.setHeader(HttpHeaders.CACHE_CONTROL, "no-cache");
183224

184-
A2ARequest<?> request = null;
185225
try {
186-
// Parse the request body
187-
request = JSONRPCUtils.parseRequestBody(body, null);
188-
189-
// Get the publisher synchronously to avoid connection closure issues
190226
Flow.Publisher<? extends A2AResponse<?>> publisher = createStreamingPublisher((StreamingJSONRPCRequest<?>) request, context);
191227
LOGGER.debug("Created streaming publisher: {}", publisher);
192228

193229
if (publisher != null) {
194-
// Handle the streaming response with custom SSE formatting
195230
LOGGER.debug("Handling custom SSE response for publisher: {}", publisher);
196231
handleCustomSSEResponse(publisher, response, context);
197232
} else {
198-
// Handle unsupported request types
199233
LOGGER.debug("Unsupported streaming request type: {}", request.getClass().getSimpleName());
200234
response.sendError(HttpServletResponse.SC_BAD_REQUEST, "Unsupported streaming request type");
201235
}
202-
} catch (MethodNotFoundJsonMappingException e) {
203-
LOGGER.warn("Method not found in streaming request: {}", e.getMessage());
204-
sendErrorSSE(response, e.getId(), new MethodNotFoundError());
205-
} catch (InvalidParamsJsonMappingException e) {
206-
LOGGER.warn("Invalid params in streaming request: {}", e.getMessage());
207-
sendErrorSSE(response, e.getId(), new InvalidParamsError());
208-
} catch (IdJsonMappingException e) {
209-
LOGGER.warn("Invalid request ID in streaming request: {}", e.getMessage());
210-
sendErrorSSE(response, e.getId(), new InvalidRequestError());
211-
} catch (JsonMappingException e) {
212-
LOGGER.warn("JSON mapping error in streaming request: {}", e.getMessage(), e);
213-
// Check if this is a parse error wrapped in a mapping exception
214-
if (e.getCause() instanceof JsonProcessingException) {
215-
sendErrorSSE(response, null, new JSONParseError());
216-
} else {
217-
// Otherwise it's an invalid request (valid JSON but doesn't match schema)
218-
sendErrorSSE(response, null, new InvalidRequestError());
219-
}
220-
} catch (JsonSyntaxException e) {
221-
LOGGER.warn("JSON syntax error in streaming request: {}", e.getMessage());
222-
sendErrorSSE(response, null, new JSONParseError());
223-
} catch (JsonProcessingException e) {
224-
LOGGER.warn("JSON processing error in streaming request: {}", e.getMessage());
225-
sendErrorSSE(response, null, new JSONParseError());
226236
} catch (A2AError e) {
227237
LOGGER.debug("A2AError in streaming request: {}", e.getMessage());
228-
sendErrorSSE(response, request != null ? request.getId() : null, e);
238+
sendErrorSSE(response, request.getId(), e);
229239
} catch (Throwable e) {
230240
LOGGER.error("Unexpected error processing streaming request: {}", e.getMessage(), e);
231241
sendErrorSSE(response, null, new InternalError(e.getMessage()));
@@ -293,6 +303,20 @@ private A2AResponse<?> processNonStreamingRequest(NonStreamingJSONRPCRequest<?>
293303
}
294304
}
295305

306+
/**
307+
* Validates a streaming request before entering SSE mode.
308+
* Throws A2AError if the task is in a terminal state or not found.
309+
* This must be called before setting SSE headers so that errors
310+
* are returned as plain JSON-RPC error responses, not SSE events.
311+
*/
312+
private void validateStreamingRequest(StreamingJSONRPCRequest<?> request) throws A2AError {
313+
if (request instanceof SendStreamingMessageRequest req) {
314+
jsonRpcHandler.validateRequestedTask(req.getParams().message().taskId());
315+
} else if (request instanceof SubscribeToTaskRequest req) {
316+
jsonRpcHandler.validateRequestedTask(req.getParams().id());
317+
}
318+
}
319+
296320
/**
297321
* Creates a streaming publisher for the given request.
298322
* This method runs synchronously to avoid connection closure issues.
@@ -415,6 +439,23 @@ private A2AResponse<?> generateErrorResponse(A2ARequest<?> request, A2AError err
415439
return new A2AErrorResponse(request.getId(), error);
416440
}
417441

442+
/**
443+
* Sends a plain JSON-RPC error response (Content-Type: application/json).
444+
* Used for pre-streaming validation errors that should not be sent as SSE.
445+
*/
446+
private void sendJsonRpcError(HttpServletResponse response, Object id, A2AError error) {
447+
try {
448+
A2AErrorResponse errorResponse = new A2AErrorResponse(id, error);
449+
String jsonData = serializeResponse(errorResponse);
450+
response.setStatus(HttpServletResponse.SC_OK);
451+
response.setContentType(org.a2aproject.sdk.common.MediaType.APPLICATION_JSON);
452+
response.getWriter().write(jsonData);
453+
response.getWriter().flush();
454+
} catch (Exception e) {
455+
LOGGER.error("Error sending JSON-RPC error response: {}", e.getMessage(), e);
456+
}
457+
}
458+
418459
/**
419460
* Sends an error response as a Server-Sent Event.
420461
*/
@@ -465,7 +506,7 @@ public String getUsername() {
465506
Map<String, String> headers = new HashMap<>();
466507
for (Enumeration<String> headerNames = request.getHeaderNames(); headerNames.hasMoreElements() ; ) {
467508
String name = headerNames.nextElement();
468-
headers.put(name, headers.get(name));
509+
headers.put(name, request.getHeader(name));
469510
}
470511

471512
state.put(HEADERS_KEY, headers);
@@ -478,8 +519,9 @@ public String getUsername() {
478519
Set<String> requestedExtensions = A2AExtensions.getRequestedExtensions(extensionHeaderValues);
479520
state.put(TENANT_KEY, extractTenant(request));
480521
state.put(TRANSPORT_KEY, TransportProtocol.JSONRPC);
481-
482-
return new ServerCallContext(user, state, requestedExtensions);
522+
523+
String requestedVersion = request.getHeader(A2AHeaders.A2A_VERSION);
524+
return new ServerCallContext(user, state, requestedExtensions, requestedVersion);
483525
} else {
484526
CallContextFactory builder = callContextFactory.get();
485527
return builder.build(request);

impl/rest/src/main/java/org/wildfly/extras/a2a/server/apps/rest/A2ARestServerResource.java

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -124,8 +124,7 @@ public void sendMessageStreaming(String body, @Context HttpServletRequest httpRe
124124
}
125125
} finally {
126126
if (error != null) {
127-
httpResponse.setHeader(CONTENT_TYPE, APPLICATION_JSON);
128-
httpResponse.sendError(error.getStatusCode(), error.getBody());
127+
sendErrorResponse(httpResponse, error);
129128
} else {
130129
handleCustomSSEResponse(streamingResponse.getPublisher(), httpResponse, context);
131130
}
@@ -149,8 +148,7 @@ public void resubscribeTask(@PathParam("taskId") String taskId, @Context HttpSer
149148
}
150149
} finally {
151150
if (error != null) {
152-
httpResponse.setHeader(CONTENT_TYPE, APPLICATION_JSON);
153-
httpResponse.sendError(error.getStatusCode(), error.getBody());
151+
sendErrorResponse(httpResponse, error);
154152
} else {
155153
handleCustomSSEResponse(streamingResponse.getPublisher(), httpResponse, context);
156154
}
@@ -420,6 +418,13 @@ public Response deleteTaskPushNotificationConfiguration(@PathParam("taskId") Str
420418
}
421419
}
422420

421+
private void sendErrorResponse(HttpServletResponse httpResponse, RestHandler.HTTPRestResponse error) throws IOException {
422+
httpResponse.setStatus(error.getStatusCode());
423+
httpResponse.setHeader(CONTENT_TYPE, error.getContentType());
424+
httpResponse.getWriter().write(error.getBody());
425+
httpResponse.getWriter().flush();
426+
}
427+
423428
/**
424429
* Handles the streaming response using custom SSE formatting.
425430
* This approach avoids JAX-RS SSE compatibility issues with async publishers.
@@ -428,10 +433,15 @@ public Response deleteTaskPushNotificationConfiguration(@PathParam("taskId") Str
428433
private void handleCustomSSEResponse(Flow.Publisher<String> publisher,
429434
HttpServletResponse response,
430435
ServerCallContext context) throws IOException {
436+
response.setHeader(CONTENT_TYPE, MediaType.SERVER_SENT_EVENTS);
437+
response.setHeader("Cache-Control", "no-cache");
438+
response.setHeader("X-Accel-Buffering", "no");
439+
431440
CompletableFuture<Void> streamingComplete = new CompletableFuture<>();
432441
try (PrintWriter writer = response.getWriter()) {
442+
writer.write(": SSE stream started\n\n");
443+
writer.flush();
433444
publisher.subscribe(new SSESubscriber(streamingComplete, writer, context));
434-
// Wait for streaming to complete before method returns
435445
streamingComplete.get();
436446
} catch (Exception e) {
437447
LOGGER.error("Error waiting for streaming completion: {}", e.getMessage(), e);
@@ -485,7 +495,8 @@ public String getUsername() {
485495
extensionHeaderValues.add(en.nextElement());
486496
}
487497
Set<String> requestedExtensions = A2AExtensions.getRequestedExtensions(extensionHeaderValues);
488-
return new ServerCallContext(user, state, requestedExtensions);
498+
String requestedVersion = request.getHeader(A2AHeaders.A2A_VERSION);
499+
return new ServerCallContext(user, state, requestedExtensions, requestedVersion);
489500
} else {
490501
CallContextFactory builder = callContextFactory.get();
491502
return builder.build(request);

pom.xml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545
<properties>
4646
<jboss.home>${project.build.directory}${file.separator}wildfly</jboss.home>
4747
<version.wildfly>39.0.1.Final</version.wildfly>
48-
<version.sdk>1.0.0.Beta1-SNAPSHOT</version.sdk>
48+
<version.sdk>1.0.0.Beta2-SNAPSHOT</version.sdk>
4949

5050
<version.hamcrest>2.2</version.hamcrest>
5151
<version.junit>5.12.2</version.junit>
@@ -127,7 +127,7 @@
127127
</dependency>
128128
<dependency>
129129
<groupId>org.a2aproject.sdk</groupId>
130-
<artifactId>a2a-tck-server</artifactId>
130+
<artifactId>a2a-java-sdk-tck-sut</artifactId>
131131
<version>${version.sdk}</version>
132132
</dependency>
133133
<dependency>
@@ -220,8 +220,8 @@
220220
<module>impl/jsonrpc</module>
221221
<module>impl/grpc</module>
222222
<module>impl/rest</module>
223-
<module>tck</module>
224223
<module>tests</module>
224+
<module>tck</module>
225225
</modules>
226226
<profiles>
227227
<profile>

0 commit comments

Comments
 (0)