Skip to content

Commit 19add94

Browse files
committed
Enforce MilliInstant as the logical type for Timestamp in ReadFromJDBC.
1 parent 7f64dec commit 19add94

1 file changed

Lines changed: 15 additions & 2 deletions

File tree

sdks/python/apache_beam/io/jdbc.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@
8686

8787
# pytype: skip-file
8888

89+
import contextlib
8990
import datetime
9091
import typing
9192

@@ -257,6 +258,17 @@ def __init__(
257258
)
258259

259260

261+
@contextlib.contextmanager
262+
def enforce_millis_instant_for_timestamp():
263+
old_registry = LogicalType._known_logical_types
264+
LogicalType._known_logical_types = old_registry.copy()
265+
try:
266+
LogicalType.register_logical_type(MillisInstant)
267+
yield
268+
finally:
269+
LogicalType._known_logical_types = old_registry
270+
271+
260272
class ReadFromJdbc(ExternalTransform):
261273
"""A PTransform which reads Rows from the specified database via JDBC.
262274
@@ -352,8 +364,9 @@ def __init__(
352364

353365
dataSchema = None
354366
if schema is not None:
355-
# Convert Python schema to Beam Schema proto
356-
schema_proto = typing_to_runner_api(schema).row_type.schema
367+
with enforce_millis_instant_for_timestamp():
368+
# Convert Python schema to Beam Schema proto
369+
schema_proto = typing_to_runner_api(schema).row_type.schema
357370
# Serialize the proto to bytes for transmission
358371
dataSchema = schema_proto.SerializeToString()
359372

0 commit comments

Comments
 (0)