-
Notifications
You must be signed in to change notification settings - Fork 4.5k
Exposed source window to AssignContext in window_mapping_fn for side inputs #36722
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
1dc8447
a30b97f
d6d570d
35ef010
fedded5
41661a8
8dabb36
450d8d3
e41566c
27a2857
bbd1d95
9398bdf
46b0190
13aed8b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -30,6 +30,7 @@ | |
| from typing import Union | ||
|
|
||
| import pytest | ||
| from unittest import mock | ||
|
|
||
| import apache_beam as beam | ||
| from apache_beam.testing.synthetic_pipeline import SyntheticSDFAsSource | ||
|
|
@@ -41,6 +42,7 @@ | |
| from apache_beam.transforms import Map | ||
| from apache_beam.transforms import trigger | ||
| from apache_beam.transforms import window | ||
| from apache_beam.transforms import sideinputs | ||
| from apache_beam.utils.timestamp import Timestamp | ||
|
|
||
|
|
||
|
|
@@ -489,6 +491,32 @@ def process( | |
| assert_that(results, equal_to([(num_records, expected_fingerprint)])) | ||
| pipeline.run() | ||
|
|
||
| def test_default_window_mapping_fn_source_window(self): | ||
| """Test that the default window mapping function will propagate the | ||
| source window when attempting to assign context. | ||
| """ | ||
| class StringIDWindow(window.BoundedWindow): | ||
| """A window defined by an arbitrary string ID.""" | ||
| def __init__(self, window_id: str): | ||
| super().__init__(self._getTimestampFromProto()) | ||
| self.id = window_id | ||
|
|
||
| class StringIDWindows(window.NonMergingWindowFn): | ||
| """ A windowing function that assigns each element a window with ID.""" | ||
| def assign( | ||
| self, assign_context: window.WindowFn.AssignContext | ||
| ) -> Iterable[StringIDWindow]: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would you mind adjusting this type hint? It looks like this is causing linting to fail: https://github.com/apache/beam/actions/runs/21918758108/job/63292969740?pr=36722
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated to
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks like linting checks are still failing - https://github.com/apache/beam/actions/runs/22365640103/job/64730757498?pr=36722 - could you please take a look? You should be able to reproduce locally by running |
||
| if assign_context.element is None: | ||
| return [assign_context.window] | ||
| return [StringIDWindow(str(assign_context.element))] | ||
|
|
||
| mapping_fn = sideinputs.default_window_mapping_fn(StringIDWindows()) | ||
| source_window = StringIDWindows().assign( | ||
| window.WindowFn.AssignContext(Timestamp(10), element='element'))[0] | ||
| bounded_window = mapping_fn(source_window) | ||
| assert bounded_window is not None | ||
| assert bounded_window.id == 'element' | ||
|
|
||
|
|
||
| if __name__ == '__main__': | ||
| logging.getLogger().setLevel(logging.DEBUG) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this is failing in CI with:
Could you please take a look?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added the required dummy
_getTimestampFromProtoandget_window_codermethods for the string id window examples.Test should pass now.