Skip to content

Commit 7498cfb

Browse files
committed
Revert apache#35243
1 parent 33ea938 commit 7498cfb

5 files changed

Lines changed: 45 additions & 24 deletions

File tree

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
{
22
"comment": "Modify this file in a trivial way to cause this test suite to run.",
3-
"modification": 13
3+
"modification": 12
44
}
55

sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@
3636
from apache_beam.testing.test_pipeline import TestPipeline
3737
from apache_beam.testing.util import assert_that
3838
from apache_beam.testing.util import equal_to
39+
from apache_beam.typehints.schemas import LogicalType
40+
from apache_beam.typehints.schemas import MillisInstant
3941
from apache_beam.utils.timestamp import Timestamp
4042

4143
# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
@@ -253,6 +255,10 @@ def test_xlang_jdbc_write_read(self, database):
253255
classpath=config['classpath'],
254256
))
255257

258+
# Register MillisInstant logical type to override the mapping from Timestamp
259+
# originally handled by MicrosInstant.
260+
LogicalType.register_logical_type(MillisInstant)
261+
256262
with TestPipeline() as p:
257263
p.not_use_test_runner_api = True
258264
result = (
@@ -349,6 +355,10 @@ def custom_row_equals(expected, actual):
349355
classpath=config['classpath'],
350356
))
351357

358+
# Register MillisInstant logical type to override the mapping from Timestamp
359+
# originally handled by MicrosInstant.
360+
LogicalType.register_logical_type(MillisInstant)
361+
352362
# Run read pipeline with custom schema
353363
with TestPipeline() as p:
354364
p.not_use_test_runner_api = True

sdks/python/apache_beam/io/jdbc.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -401,7 +401,7 @@ def __init__(self, argument=""):
401401

402402
@classmethod
403403
def representation_type(cls) -> type:
404-
return MillisInstant
404+
return Timestamp
405405

406406
@classmethod
407407
def urn(cls):
@@ -417,6 +417,7 @@ def to_representation_type(self, value: datetime.date) -> Timestamp:
417417
value, datetime.datetime.min.time(), tzinfo=datetime.timezone.utc))
418418

419419
def to_language_type(self, value: Timestamp) -> datetime.date:
420+
420421
return value.to_utc_datetime().date()
421422

422423
@classmethod
@@ -444,7 +445,7 @@ def __init__(self, argument=""):
444445

445446
@classmethod
446447
def representation_type(cls) -> type:
447-
return MillisInstant
448+
return Timestamp
448449

449450
@classmethod
450451
def urn(cls):
@@ -462,6 +463,7 @@ def to_representation_type(self, value: datetime.date) -> Timestamp:
462463
tzinfo=datetime.timezone.utc))
463464

464465
def to_language_type(self, value: Timestamp) -> datetime.date:
466+
465467
return value.to_utc_datetime().time()
466468

467469
@classmethod

sdks/python/apache_beam/typehints/schemas.py

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -335,10 +335,7 @@ def typing_to_runner_api(self, type_: type) -> schema_pb2.FieldType:
335335
array_type=schema_pb2.ArrayType(element_type=element_type))
336336

337337
try:
338-
if LogicalType.is_known_logical_type(type_):
339-
logical_type = type_
340-
else:
341-
logical_type = LogicalType.from_typing(type_)
338+
logical_type = LogicalType.from_typing(type_)
342339
except ValueError:
343340
# Unknown type, just treat it like Any
344341
return schema_pb2.FieldType(
@@ -672,7 +669,7 @@ def add(self, urn, logical_type):
672669
def get_logical_type_by_urn(self, urn):
673670
return self.by_urn.get(urn, None)
674671

675-
def get_urn_by_logical_type(self, logical_type):
672+
def get_urn_by_logial_type(self, logical_type):
676673
return self.by_logical_type.get(logical_type, None)
677674

678675
def get_logical_type_by_language_type(self, representation_type):
@@ -811,11 +808,6 @@ def from_runner_api(cls, logical_type_proto):
811808
return logical_type()
812809
return logical_type(argument)
813810

814-
@classmethod
815-
def is_known_logical_type(cls, logical_type):
816-
return cls._known_logical_types.get_urn_by_logical_type(
817-
logical_type) is not None
818-
819811

820812
class NoArgumentLogicalType(LogicalType[LanguageT, RepresentationT, None]):
821813
@classmethod

sdks/python/apache_beam/yaml/main.py

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#
1717

1818
import argparse
19+
import contextlib
1920
import json
2021
import os
2122
import sys
@@ -26,6 +27,8 @@
2627
import apache_beam as beam
2728
from apache_beam.io.filesystems import FileSystems
2829
from apache_beam.transforms import resources
30+
from apache_beam.typehints.schemas import LogicalType
31+
from apache_beam.typehints.schemas import MillisInstant
2932
from apache_beam.yaml import yaml_testing
3033
from apache_beam.yaml import yaml_transform
3134
from apache_beam.yaml import yaml_utils
@@ -133,12 +136,25 @@ def _pipeline_spec_from_args(known_args):
133136
return pipeline_yaml
134137

135138

139+
@contextlib.contextmanager
140+
def _fix_xlang_instant_coding():
141+
# Scoped workaround for https://github.com/apache/beam/issues/28151.
142+
old_registry = LogicalType._known_logical_types
143+
LogicalType._known_logical_types = old_registry.copy()
144+
try:
145+
LogicalType.register_logical_type(MillisInstant)
146+
yield
147+
finally:
148+
LogicalType._known_logical_types = old_registry
149+
150+
136151
def run(argv=None):
137152
options, constructor, display_data = build_pipeline_components_from_argv(argv)
138-
with beam.Pipeline(options=options, display_data=display_data) as p:
139-
print('Building pipeline...')
140-
constructor(p)
141-
print('Running pipeline...')
153+
with _fix_xlang_instant_coding():
154+
with beam.Pipeline(options=options, display_data=display_data) as p:
155+
print('Building pipeline...')
156+
constructor(p)
157+
print('Running pipeline...')
142158

143159

144160
def run_tests(argv=None, exit=True):
@@ -169,13 +185,14 @@ def run_tests(argv=None, exit=True):
169185
"If you haven't added a set of tests yet, you can get started by "
170186
'running your pipeline with the --create_test flag enabled.')
171187

172-
tests = [
173-
yaml_testing.YamlTestCase(
174-
pipeline_spec, test_spec, options, known_args.fix_tests)
175-
for test_spec in test_specs
176-
]
177-
suite = unittest.TestSuite(tests)
178-
result = unittest.TextTestRunner().run(suite)
188+
with _fix_xlang_instant_coding():
189+
tests = [
190+
yaml_testing.YamlTestCase(
191+
pipeline_spec, test_spec, options, known_args.fix_tests)
192+
for test_spec in test_specs
193+
]
194+
suite = unittest.TestSuite(tests)
195+
result = unittest.TextTestRunner().run(suite)
179196

180197
if known_args.fix_tests or known_args.create_test:
181198
update_tests(known_args, pipeline_yaml, pipeline_spec, options, tests)

0 commit comments

Comments
 (0)