Skip to content

Commit b7262c3

Browse files
authored
Fix a logical type issue about JdbcDateType and JdbcTimeType (#35243)
* Fix a logical type issue about JdbcDateType * Fix typo and also fix the logical class for java time. * Get rid of the workaround on logical type registration. Trigger tests. * Fix lints.
1 parent 9a057b6 commit b7262c3

5 files changed

Lines changed: 24 additions & 45 deletions

File tree

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
{
22
"comment": "Modify this file in a trivial way to cause this test suite to run.",
3-
"modification": 12
3+
"modification": 13
44
}
55

sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,6 @@
3636
from apache_beam.testing.test_pipeline import TestPipeline
3737
from apache_beam.testing.util import assert_that
3838
from apache_beam.testing.util import equal_to
39-
from apache_beam.typehints.schemas import LogicalType
40-
from apache_beam.typehints.schemas import MillisInstant
4139
from apache_beam.utils.timestamp import Timestamp
4240

4341
# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
@@ -255,10 +253,6 @@ def test_xlang_jdbc_write_read(self, database):
255253
classpath=config['classpath'],
256254
))
257255

258-
# Register MillisInstant logical type to override the mapping from Timestamp
259-
# originally handled by MicrosInstant.
260-
LogicalType.register_logical_type(MillisInstant)
261-
262256
with TestPipeline() as p:
263257
p.not_use_test_runner_api = True
264258
result = (
@@ -355,10 +349,6 @@ def custom_row_equals(expected, actual):
355349
classpath=config['classpath'],
356350
))
357351

358-
# Register MillisInstant logical type to override the mapping from Timestamp
359-
# originally handled by MicrosInstant.
360-
LogicalType.register_logical_type(MillisInstant)
361-
362352
# Run read pipeline with custom schema
363353
with TestPipeline() as p:
364354
p.not_use_test_runner_api = True

sdks/python/apache_beam/io/jdbc.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -401,7 +401,7 @@ def __init__(self, argument=""):
401401

402402
@classmethod
403403
def representation_type(cls) -> type:
404-
return Timestamp
404+
return MillisInstant
405405

406406
@classmethod
407407
def urn(cls):
@@ -417,7 +417,6 @@ def to_representation_type(self, value: datetime.date) -> Timestamp:
417417
value, datetime.datetime.min.time(), tzinfo=datetime.timezone.utc))
418418

419419
def to_language_type(self, value: Timestamp) -> datetime.date:
420-
421420
return value.to_utc_datetime().date()
422421

423422
@classmethod
@@ -445,7 +444,7 @@ def __init__(self, argument=""):
445444

446445
@classmethod
447446
def representation_type(cls) -> type:
448-
return Timestamp
447+
return MillisInstant
449448

450449
@classmethod
451450
def urn(cls):
@@ -463,7 +462,6 @@ def to_representation_type(self, value: datetime.date) -> Timestamp:
463462
tzinfo=datetime.timezone.utc))
464463

465464
def to_language_type(self, value: Timestamp) -> datetime.date:
466-
467465
return value.to_utc_datetime().time()
468466

469467
@classmethod

sdks/python/apache_beam/typehints/schemas.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,10 @@ def typing_to_runner_api(self, type_: type) -> schema_pb2.FieldType:
335335
array_type=schema_pb2.ArrayType(element_type=element_type))
336336

337337
try:
338-
logical_type = LogicalType.from_typing(type_)
338+
if LogicalType.is_known_logical_type(type_):
339+
logical_type = type_
340+
else:
341+
logical_type = LogicalType.from_typing(type_)
339342
except ValueError:
340343
# Unknown type, just treat it like Any
341344
return schema_pb2.FieldType(
@@ -669,7 +672,7 @@ def add(self, urn, logical_type):
669672
def get_logical_type_by_urn(self, urn):
670673
return self.by_urn.get(urn, None)
671674

672-
def get_urn_by_logial_type(self, logical_type):
675+
def get_urn_by_logical_type(self, logical_type):
673676
return self.by_logical_type.get(logical_type, None)
674677

675678
def get_logical_type_by_language_type(self, representation_type):
@@ -808,6 +811,11 @@ def from_runner_api(cls, logical_type_proto):
808811
return logical_type()
809812
return logical_type(argument)
810813

814+
@classmethod
815+
def is_known_logical_type(cls, logical_type):
816+
return cls._known_logical_types.get_urn_by_logical_type(
817+
logical_type) is not None
818+
811819

812820
class NoArgumentLogicalType(LogicalType[LanguageT, RepresentationT, None]):
813821
@classmethod

sdks/python/apache_beam/yaml/main.py

Lines changed: 11 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
#
1717

1818
import argparse
19-
import contextlib
2019
import json
2120
import os
2221
import sys
@@ -27,8 +26,6 @@
2726
import apache_beam as beam
2827
from apache_beam.io.filesystems import FileSystems
2928
from apache_beam.transforms import resources
30-
from apache_beam.typehints.schemas import LogicalType
31-
from apache_beam.typehints.schemas import MillisInstant
3229
from apache_beam.yaml import yaml_testing
3330
from apache_beam.yaml import yaml_transform
3431
from apache_beam.yaml import yaml_utils
@@ -136,25 +133,12 @@ def _pipeline_spec_from_args(known_args):
136133
return pipeline_yaml
137134

138135

139-
@contextlib.contextmanager
140-
def _fix_xlang_instant_coding():
141-
# Scoped workaround for https://github.com/apache/beam/issues/28151.
142-
old_registry = LogicalType._known_logical_types
143-
LogicalType._known_logical_types = old_registry.copy()
144-
try:
145-
LogicalType.register_logical_type(MillisInstant)
146-
yield
147-
finally:
148-
LogicalType._known_logical_types = old_registry
149-
150-
151136
def run(argv=None):
152137
options, constructor, display_data = build_pipeline_components_from_argv(argv)
153-
with _fix_xlang_instant_coding():
154-
with beam.Pipeline(options=options, display_data=display_data) as p:
155-
print('Building pipeline...')
156-
constructor(p)
157-
print('Running pipeline...')
138+
with beam.Pipeline(options=options, display_data=display_data) as p:
139+
print('Building pipeline...')
140+
constructor(p)
141+
print('Running pipeline...')
158142

159143

160144
def run_tests(argv=None, exit=True):
@@ -185,14 +169,13 @@ def run_tests(argv=None, exit=True):
185169
"If you haven't added a set of tests yet, you can get started by "
186170
'running your pipeline with the --create_test flag enabled.')
187171

188-
with _fix_xlang_instant_coding():
189-
tests = [
190-
yaml_testing.YamlTestCase(
191-
pipeline_spec, test_spec, options, known_args.fix_tests)
192-
for test_spec in test_specs
193-
]
194-
suite = unittest.TestSuite(tests)
195-
result = unittest.TextTestRunner().run(suite)
172+
tests = [
173+
yaml_testing.YamlTestCase(
174+
pipeline_spec, test_spec, options, known_args.fix_tests)
175+
for test_spec in test_specs
176+
]
177+
suite = unittest.TestSuite(tests)
178+
result = unittest.TextTestRunner().run(suite)
196179

197180
if known_args.fix_tests or known_args.create_test:
198181
update_tests(known_args, pipeline_yaml, pipeline_spec, options, tests)

0 commit comments

Comments
 (0)