Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion sdks/python/apache_beam/transforms/sideinputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ def default_window_mapping_fn(
def map_via_end(source_window: window.BoundedWindow) -> window.BoundedWindow:
return list(
target_window_fn.assign(
window.WindowFn.AssignContext(source_window.max_timestamp())))[-1]
window.WindowFn.AssignContext(
source_window.max_timestamp(), window=source_window)))[-1]

return map_via_end

Expand Down
28 changes: 28 additions & 0 deletions sdks/python/apache_beam/transforms/sideinputs_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from typing import Union

import pytest
from unittest import mock

import apache_beam as beam
from apache_beam.testing.synthetic_pipeline import SyntheticSDFAsSource
Expand All @@ -41,6 +42,7 @@
from apache_beam.transforms import Map
from apache_beam.transforms import trigger
from apache_beam.transforms import window
from apache_beam.transforms import sideinputs
from apache_beam.utils.timestamp import Timestamp


Expand Down Expand Up @@ -489,6 +491,32 @@ def process(
assert_that(results, equal_to([(num_records, expected_fingerprint)]))
pipeline.run()

def test_default_window_mapping_fn_source_window(self):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this is failing in CI with:

<testcase classname="apache_beam.transforms.sideinputs_test.SideInputsTest" name="test_default_window_mapping_fn_source_window" time="0.001">
<failure message="TypeError: Can't instantiate abstract class StringIDWindows with abstract method get_window_coder">self = <apache_beam.transforms.sideinputs_test.SideInputsTest testMethod=test_default_window_mapping_fn_source_window> def test_default_window_mapping_fn_source_window(self): """Test that the default window mapping function will propagate the source window when attempting to assign context. """ class StringIDWindow(window.BoundedWindow): """A window defined by an arbitrary string ID.""" def __init__(self, window_id: str): super().__init__(self._getTimestampFromProto()) self.id = window_id class StringIDWindows(window.NonMergingWindowFn): """ A windowing function that assigns each element a window with ID.""" def assign( self, assign_context: window.WindowFn.AssignContext ) -> Iterable[StringIDWindow]: if assign_context.element is None: return [assign_context.window] return [StringIDWindow(str(assign_context.element))] > mapping_fn = sideinputs.default_window_mapping_fn(StringIDWindows()) E TypeError: Can't instantiate abstract class StringIDWindows with abstract method get_window_coder apache_beam/transforms/sideinputs_test.py:513: TypeError</failure>
</testcase>

Could you please take a look?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the required dummy _getTimestampFromProto and get_window_coder methods for the string id window examples.

Test should pass now.

"""Test that the default window mapping function will propagate the
source window when attempting to assign context.
"""
class StringIDWindow(window.BoundedWindow):
"""A window defined by an arbitrary string ID."""
def __init__(self, window_id: str):
super().__init__(self._getTimestampFromProto())
self.id = window_id

class StringIDWindows(window.NonMergingWindowFn):
""" A windowing function that assigns each element a window with ID."""
def assign(
self, assign_context: window.WindowFn.AssignContext
) -> Iterable[StringIDWindow]:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you mind adjusting this type hint? It looks like this is causing linting to fail: https://github.com/apache/beam/actions/runs/21918758108/job/63292969740?pr=36722

apache_beam/transforms/sideinputs_test.py:514: error: List item 0 has incompatible type "BoundedWindow | None"; expected "StringIDWindow"  [list-item]

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to window.BoundedWindow

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like linting checks are still failing - https://github.com/apache/beam/actions/runs/22365640103/job/64730757498?pr=36722 - could you please take a look?

You should be able to reproduce locally by running ./gradlew :pythonLintPreCommit

if assign_context.element is None:
return [assign_context.window]
return [StringIDWindow(str(assign_context.element))]

mapping_fn = sideinputs.default_window_mapping_fn(StringIDWindows())
source_window = StringIDWindows().assign(
window.WindowFn.AssignContext(Timestamp(10), element='element'))[0]
bounded_window = mapping_fn(source_window)
assert bounded_window is not None
assert bounded_window.id == 'element'


if __name__ == '__main__':
logging.getLogger().setLevel(logging.DEBUG)
Expand Down
Loading