Skip to content

[FLINK-40040][cdc-runtime] Wrap coordinator failure in SerializedThrowable across the RPC boundary#4456

Open
Zhile wants to merge 1 commit into
apache:masterfrom
Zhile:FLINK-40040
Open

[FLINK-40040][cdc-runtime] Wrap coordinator failure in SerializedThrowable across the RPC boundary#4456
Zhile wants to merge 1 commit into
apache:masterfrom
Zhile:FLINK-40040

Conversation

@Zhile

@Zhile Zhile commented Jul 1, 2026

Copy link
Copy Markdown

What is the purpose of the change

Schema-evolution coordination responses are dropped when the failure's cause is a class that only lives in the user classloader (e.g. com.mysql.cj.exceptions.ConnectionIsClosedException). flink-rpc-akka deserializes the coordination response with an isolated classloader and throws ClassNotFoundException, so the response never reaches SchemaOperator, which then blocks on responseFuture.get(rpcTimeout) and fails with a misleading TimeoutException -- turning a transient error into a restart loop that only a full job restart recovers from.

Observed in production (Flink CDC 3.6.0 on Flink 1.20.3):

ERROR org.apache.pekko.remote.Remoting - com.mysql.cj.exceptions.ConnectionIsClosedException
java.lang.ClassNotFoundException: com.mysql.cj.exceptions.ConnectionIsClosedException
    at org.apache.pekko.util.ClassLoaderObjectInputStream.resolveClass(...)
    at ...MiscMessageSerializer.deserializeStatusFailure(...)

JIRA: https://issues.apache.org/jira/browse/FLINK-40040

Brief change log

  • SchemaRegistry.failJob now wraps the cause into SerializedThrowable (idempotent, not re-wrapped if already one) before calling handleUnrecoverableError (which completes pending coordination responses exceptionally) and context.failJob. SerializedThrowable carries the original exception as bytes plus a stringified stack trace, so the receiving side can deserialize it without the original class and still see the real cause.
  • SchemaRegistry.runInEventLoop's catch block now routes through failJob instead of duplicating the fail path, so every exit shares the wrapping.
  • Applies to both the regular and distributed SchemaCoordinator, since both complete pending response futures via the shared failJob -> handleUnrecoverableError path.

Verifying this change

Added SchemaRegistryFailJobSerializationTest:

  • failJobWrapsExceptionIntoSerializedThrowable asserts the failure delivered to the coordinator context is a SerializedThrowable.
  • wrappedFailureSurvivesIsolatedClassloaderButRawOneDoesNot shows a raw exception whose cause is a classloader-isolated type fails to deserialize (ClassNotFoundException), while the SerializedThrowable-wrapped one deserializes and preserves the real cause text.

Does this pull request potentially affect one of the following parts

  • Dependencies (does it add or upgrade a dependency): no
  • Public API / SDK: no
  • Serializers / state: no (wraps only the failure carried over the coordinator RPC channel; no state format change)

…wable across RPC boundary

SchemaRegistry.failJob now wraps the failure cause into a SerializedThrowable
before it crosses the operator-coordinator RPC boundary, and runInEventLoop
routes through failJob so all exits share the wrapping.

Otherwise an exception whose class only lives in the user classloader (e.g. the
MySQL driver's com.mysql.cj.exceptions.ConnectionIsClosedException raised during
table discovery) fails to deserialize on the receiving side, where flink-rpc-akka
uses an isolated classloader, with ClassNotFoundException. The coordination
response is then lost and the SchemaOperator request stalls until rpcTimeout,
failing with a misleading TimeoutException and turning a transient error into a
restart loop that only a full job restart recovers from.
@yuxiqian

yuxiqian commented Jul 1, 2026

Copy link
Copy Markdown
Member

Could you please check the failing test cases?

@yuxiqian yuxiqian self-requested a review July 1, 2026 14:29
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants