Throttle reads on data channel when writes on output channel are backpressured.#38422
Throttle reads on data channel when writes on output channel are backpressured.#38422tvalentyn wants to merge 1 commit into
Conversation
|
|
||
| def _put_to_send_queue(self, element): | ||
| # type: (Union[beam_fn_api_pb2.Elements.Data, beam_fn_api_pb2.Elements.Timers]) -> None | ||
| element_size = element.ByteSize() |
There was a problem hiding this comment.
this might be expensive, but we could _element_weight from the other PR
|
Just throttling the input doesn't handle the case where a single element is producing a large amount of data (for example if the input is a filename and it is being loaded and output). For that reason and since it might unnecessarily delay the work if the queue clears before the work triggers and output, I think it might be better or just let it process and block. Instead, maybe we could modify the lull tracking to detect it is blocking on output and modify the message? |
one option is to track writes to the queue that took longer than X seconds to output and emit a message every Y seconds that queue has a delay because it has too many elements (too large). if we do nothing, there probably would still be some tell-tale sign of having queue.put() in the stacktrace somewhere. |
Please add a meaningful description for your change here
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>instead.CHANGES.mdwith noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.