[FLINK-37730][Job Manager] Expose JM exception as K8s exceptions#978
Conversation
Thank you! |
It'd be great to catch and turn every job exception into a k8s event, not just for terminal job failures. It'd simplify collecting historical diagnostic data before an actual crash occurs. |
@morhidi Sorry I do not understand this. I am not checking for only terminal job failures. I am checking for all the failures, when the job is not in one of the terminal states. |
nm I miss-read it at first glance |
| public static boolean createIfNotExists( | ||
| KubernetesClient client, | ||
| HasMetadata target, |
There was a problem hiding this comment.
Why can't we call createWithAnnotationsIfNotExists() from this method to avoid code duplication?
There was a problem hiding this comment.
I had thought about it but I did not do it because of the event time. In our case we had decided to set the exception time as event time, but I am not aware of how should it happen for other k8s events, so I kept them separate with the cost of duplicated code.
cc @gyfora
gyfora
left a comment
There was a problem hiding this comment.
I think this is looking pretty good now, I added a few minor comments still
gyfora
left a comment
There was a problem hiding this comment.
Looks good! As a future followup we could think about reducing the number of REST API calls we make to fetch exceptions.
At the moment this is done on every step but based on the job details that we get in previous steps in the observers we may be able to deduct that the job did not fail since the last time we checked so exceptions do not need to be queried.
If you could open a follow up ticket for that I think that would be nice :)
|
You need to regenerate the docs: |
|
I hit the following error while running locally: So something seems to be off with the time handling |
| } | ||
| } | ||
| ctx.getExceptionCacheEntry().setJobId(currentJobId); | ||
| ctx.getExceptionCacheEntry().setLastTimestamp(now.toEpochMilli()); |
There was a problem hiding this comment.
Shouldn't this be the max exception timestamp? It could happen that there are job exceptions between getting it in the rest api and emitting them here and those would be missed if we set a higher timestamp based on now
| String stacktrace = exception.getStacktrace(); | ||
| if (stacktrace != null && !stacktrace.isBlank()) { | ||
| String[] lines = stacktrace.split("\n"); | ||
| eventMessage.append("\n\nStacktrace (truncated):\n"); |

What is the purpose of the change
(For example: This pull request adds a new feature to periodically create and maintain savepoints through the
FlinkDeploymentcustom resource.)This pull requests adds a new feature to periodically check for job exceptions using the FLINK REST API for getting the exceptions and raise them as kubernetes events. This feature will be helpful for monitoring systems that want to do a post processing on the job exceptions.
This is ONLY introduced for Application mode and NOT Session mode.
Brief change log
(for example:)
SYSTEM_ADVANCEDconfig for configuring the max number of exceptions reported and max lenght for stacktrace (defaults are 5 and 10 respectively)Verifying this change
(Please pick either of the following options)
This change is a trivial rework / code cleanup without any test coverage.
(or)
This change is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
Apart from the unit tests in this PR, this was tested using two simulations:
sql-test: Good running job, exception simulated by manually killing the TM
sql-test-failing: Job that has exception in the open method, repeatedly fails.
Both the exceptions were produced one at a time, and both simultaneously.
Does this pull request potentially affect one of the following parts:
CustomResourceDescriptors: (yes / no)Documentation