Skip to content

Commit 8a4328a

Browse files
committed
fix: use global coder registry; Pipeline has no coder_registry attr
beam_PreCommit_Python_Coverage CI on PR 38724 surfaced ``AttributeError: 'Pipeline' object has no attribute 'coder_registry'`` from the previous commit's attempt to register the source-declared output coder against a pipeline-scoped registry. Beam's Python ``Pipeline`` has no such attribute today; the global ``coders.registry`` is the only available knob, matching the pattern used by ``BoundedSource`` at iobase.py:938. Reverting to ``coders.registry.register_coder``. The cross-pipeline side effect that the pipeline-scoped attempt was trying to avoid (a registration persists for the process lifetime and may affect concurrent pipelines that use the same element type) is now documented as a known limitation tracked under #19137 W2. 42/42 unbounded_source_test, 16/16 iobase_test.
1 parent 0753268 commit 8a4328a

1 file changed

Lines changed: 10 additions & 7 deletions

File tree

sdks/python/apache_beam/io/unbounded_source.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -779,26 +779,29 @@ def expand(self, pbegin):
779779
_ReadFromUnboundedSourceDoFn(self._poll_interval_seconds)))
780780
# Wire the source's declared output coder onto the output PCollection.
781781
# Setting ``element_type`` alone is not enough: the runner derives the
782-
# PCollection's coder via ``coder_registry.get_coder(element_type)``,
782+
# PCollection's coder via ``coders.registry.get_coder(element_type)``,
783783
# which may resolve to a registry default that does NOT match the
784784
# source's declared coder (silently downgrading custom coders to pickle).
785-
# Register against the pipeline-specific ``coder_registry`` rather than
786-
# the global ``coders.registry`` so the registration does not leak
787-
# across pipelines running in the same process.
785+
# We register against the process-global ``coders.registry`` -- Beam's
786+
# Python ``Pipeline`` has no pipeline-scoped coder registry today, so
787+
# the global registry is the only available knob. This matches the
788+
# pattern used for ``BoundedSource`` at iobase.py:938. The cross-
789+
# pipeline side effect (registrations persist for the process lifetime
790+
# and may affect concurrent pipelines that use the same element type)
791+
# is a documented limitation tracked under #19137 W2.
788792
try:
789793
type_hint = output_coder.to_type_hint()
790794
except NotImplementedError:
791795
type_hint = None
792796
if type_hint is not None:
793797
try:
794-
pbegin.pipeline.coder_registry.register_coder(
795-
type_hint, type(output_coder))
798+
coders.registry.register_coder(type_hint, type(output_coder))
796799
except Exception: # pylint: disable=broad-except
797800
# Some coder classes refuse class-only registration (e.g. coders
798801
# parameterised by non-default constructor args). The element_type
799802
# below still flows through the registry's standard lookup; users
800803
# with parameterised coders must register their coder explicitly
801-
# via ``pipeline.coder_registry.register_coder`` before pipeline
804+
# via ``coders.registry.register_coder`` before pipeline
802805
# construction.
803806
_LOGGER.warning(
804807
'Could not register %s for element type %s; users must register '

0 commit comments

Comments
 (0)