[FLINK-37730] Improve exception recording ts initialization + 2.0 compatibility#983
Conversation
| lastExceptionTs = | ||
| EventUtils.findLastJobExceptionTsFromK8s( | ||
| ctx.getKubernetesClient(), resource) | ||
| .orElse(Instant.now().minus(MAX_K8S_EVENT_AGE)); |
There was a problem hiding this comment.
| .orElse(Instant.now().minus(MAX_K8S_EVENT_AGE)); | |
| .orElse(k8sExpirationTs); |
There was a problem hiding this comment.
Good catch, I cleaned up / simplified the duplicated code in the method in a new commit, please check :)
|
Thanks @gyfora for the PR! |
|
I tested this PR on a dev env yesterday, and it all works (against Flink 1.19) |
| if (maxJobExceptionTs.isBefore(k8sExpirationTs)) { | ||
| // If the last job exception was a long time ago, then there is no point in | ||
| // checking in k8s. | ||
| lastExceptionTs = maxJobExceptionTs; |
There was a problem hiding this comment.
Any reason for this optimization? It complicates the code by adding another setting. It also requires the user to tune just another setting. There is no harm in calling out to the k8s api regularly to fetch events.
There was a problem hiding this comment.
there is no config for this (nothing to tune) and the optimization can be very important when the operator starts up because then the cache is empty and it would fetch events for every single job. In most cases this filter completely eliminates that so this greatly reduces the startup api server load
There was a problem hiding this comment.
Fair point. The value is hardcoded. We would only query for the jobs with exceptions, but still those could amount to quite some jobs.
What is the purpose of the change
Initialize last triggered event timestamp correctly from kubernetes events + fix 2.0 compatibility
Verifying this change
Manually verified (Flink 1.18-2.0) + Unit tests
Does this pull request potentially affect one of the following parts:
CustomResourceDescriptors: no