Skip to content

Commit 99a72bc

Browse files
authored
Use registered type for Row (#38108)
* Use registered type for Row * Introduce register_row to register with both coder and schema registry Save schema registry id->type mapping * Allow decorator usage; document register_row is preferred
1 parent 0d1d247 commit 99a72bc

11 files changed

Lines changed: 103 additions & 33 deletions
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
22
"comment": "Modify this file in a trivial way to cause this test suite to run",
3-
"modification": 1
3+
"modification": 16
44
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
22
"comment": "Modify this file in a trivial way to cause this test suite to run",
3-
"modification": 1
3+
"modification": 17
44
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
22
"comment": "Modify this file in a trivial way to cause this test suite to run",
3-
"modification": 1
3+
"modification": 3
44
}

sdks/python/apache_beam/coders/row_coder_test.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,13 +60,13 @@
6060
("one_more_field", typing.Optional[str])])
6161

6262

63+
@coders_registry.register_row
6364
class People(typing.NamedTuple):
6465
primary: Person
6566
partner: typing.Optional[Person]
6667

6768

68-
coders_registry.register_coder(Person, RowCoder)
69-
coders_registry.register_coder(People, RowCoder)
69+
coders_registry.register_row(Person)
7070

7171

7272
class RowCoderTest(unittest.TestCase):

sdks/python/apache_beam/coders/typecoders.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,19 @@ def _normalize_typehint_type(typehint_type):
124124
def register_coder(
125125
self, typehint_type: Any,
126126
typehint_coder_class: Type[coders.Coder]) -> None:
127+
"""
128+
Register a user type with a coder.
129+
130+
Typical usage::
131+
132+
class MyCustomType:
133+
pass
134+
135+
coders.registry.register_coder(MyCustomType, MyCustomCoder)
136+
137+
To register a supported user type (data class or named tuple) with Beam Row,
138+
use :meth:`register_row` instead, as it registers both coder and schema.
139+
"""
127140
if not isinstance(typehint_coder_class, type):
128141
raise TypeError(
129142
'Coder registration requires a coder class object. '
@@ -133,6 +146,34 @@ def register_coder(
133146
self._register_coder_internal(
134147
self._normalize_typehint_type(typehint_type), typehint_coder_class)
135148

149+
def register_row(self, typehint_type: type[Any]) -> type[Any]:
150+
"""
151+
Register a user type with a Beam Row.
152+
153+
This registers the type with a RowCoder and register its schema.
154+
155+
Register a dataclass::
156+
157+
@coders.registry.register_row
158+
@dataclass
159+
class MyDataClass:
160+
id: int
161+
name: str
162+
163+
Register a named tuple::
164+
165+
coders.registry.register_row(MyNamedTuple)
166+
"""
167+
from apache_beam.coders import RowCoder
168+
from apache_beam.typehints.schemas import typing_to_runner_api
169+
170+
# Register with row coder
171+
self.register_coder(typehint_type, RowCoder)
172+
# This call generated a schema id for the type and register it with
173+
# schema registry
174+
typing_to_runner_api(typehint_type)
175+
return typehint_type
176+
136177
def get_coder(self, typehint: Any) -> coders.Coder:
137178
if typehint and typehint.__module__ == '__main__':
138179
# See https://github.com/apache/beam/issues/21541

sdks/python/apache_beam/internal/cloudpickle_pickler.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,20 +256,27 @@ def dump_session(file_path):
256256
# dump supported Beam Registries (currently only logical type registry)
257257
from apache_beam.coders import typecoders
258258
from apache_beam.typehints import schemas
259+
from apache_beam.typehints.schema_registry import SCHEMA_REGISTRY
259260

260261
with _pickle_lock, open(file_path, 'wb') as file:
261262
coder_reg = typecoders.registry.get_custom_type_coder_tuples()
262263
logical_type_reg = schemas.LogicalType._known_logical_types.copy_custom()
264+
schema_reg = SCHEMA_REGISTRY.get_registered_typings()
263265

264266
pickler = cloudpickle.CloudPickler(file)
265267
# TODO(https://github.com/apache/beam/issues/18500) add file system registry
266268
# once implemented
267-
pickler.dump({"coder": coder_reg, "logical_type": logical_type_reg})
269+
pickler.dump({
270+
"coder": coder_reg,
271+
"logical_type": logical_type_reg,
272+
"schema": schema_reg
273+
})
268274

269275

270276
def load_session(file_path):
271277
from apache_beam.coders import typecoders
272278
from apache_beam.typehints import schemas
279+
from apache_beam.typehints.schema_registry import SCHEMA_REGISTRY
273280

274281
with _pickle_lock, open(file_path, 'rb') as file:
275282
registries = cloudpickle.load(file)
@@ -284,3 +291,7 @@ def load_session(file_path):
284291
schemas.LogicalType._known_logical_types.load(registries["logical_type"])
285292
else:
286293
_LOGGER.warning('No logical type registry found in saved session')
294+
if "schema" in registries:
295+
SCHEMA_REGISTRY.load_registered_typings(registries["schema"])
296+
else:
297+
_LOGGER.warning('No schema registry found in saved session')

sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030

3131
import apache_beam as beam
3232
from apache_beam import coders
33+
from apache_beam.io import jdbc
3334
from apache_beam.io.jdbc import ReadFromJdbc
3435
from apache_beam.io.jdbc import WriteToJdbc
3536
from apache_beam.options.pipeline_options import StandardOptions
@@ -64,7 +65,7 @@
6465
("f_timestamp", Timestamp), ("f_decimal", Decimal),
6566
("f_date", datetime.date), ("f_time", datetime.time)],
6667
)
67-
coders.registry.register_coder(JdbcTestRow, coders.RowCoder)
68+
coders.registry.register_row(JdbcTestRow)
6869

6970
CustomSchemaRow = typing.NamedTuple(
7071
"CustomSchemaRow",
@@ -82,11 +83,17 @@
8283
("renamed_time", datetime.time),
8384
],
8485
)
85-
coders.registry.register_coder(CustomSchemaRow, coders.RowCoder)
86+
87+
# Need to put inside enforce_millis_instant_for_timestamp context to align
88+
# with the same setup in ReadFromJdbc.__init__. Remove once Beam moved to
89+
# micros instant for timestamp
90+
# Alternatively, use coders.registry.register_coder(CustomSchemaRow, RowCoder)
91+
with jdbc.enforce_millis_instant_for_timestamp():
92+
coders.registry.register_row(CustomSchemaRow)
8693

8794
SimpleRow = typing.NamedTuple(
8895
"SimpleRow", [("id", int), ("name", str), ("value", float)])
89-
coders.registry.register_coder(SimpleRow, coders.RowCoder)
96+
coders.registry.register_row(SimpleRow)
9097

9198

9299
@pytest.mark.uses_gcp_java_expansion_service

sdks/python/apache_beam/io/jdbc.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,7 @@ def enforce_millis_instant_for_timestamp():
264264
LogicalType._known_logical_types = old_registry.copy()
265265
try:
266266
LogicalType.register_logical_type(MillisInstant)
267+
LogicalType.register_logical_type(JdbcDateType)
267268
yield
268269
finally:
269270
LogicalType._known_logical_types = old_registry

sdks/python/apache_beam/typehints/row_type_test.py

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,17 +33,18 @@ class RowTypeTest(unittest.TestCase):
3333
@staticmethod
3434
def _check_key_type_and_count(x) -> int:
3535
key_type = type(x[0])
36-
if not row_type._user_type_is_generated(key_type):
37-
raise RuntimeError("Expect type after GBK to be generated user type")
36+
if row_type._user_type_is_generated(key_type):
37+
raise RuntimeError("Type after GBK not preserved, get generated type")
38+
if not hasattr(key_type, row_type._BEAM_SCHEMA_ID):
39+
raise RuntimeError("Type after GBK missing Beam schema ID")
3840

3941
return len(x[1])
4042

4143
def test_group_by_key_namedtuple(self):
4244
MyNamedTuple = typing.NamedTuple(
4345
"MyNamedTuple", [("id", int), ("name", str)])
4446

45-
beam.coders.typecoders.registry.register_coder(
46-
MyNamedTuple, beam.coders.RowCoder)
47+
beam.coders.typecoders.registry.register_row(MyNamedTuple)
4748

4849
def generate(num: int):
4950
for i in range(100):
@@ -67,8 +68,7 @@ class MyDataClass:
6768
id: int
6869
name: str
6970

70-
beam.coders.typecoders.registry.register_coder(
71-
MyDataClass, beam.coders.RowCoder)
71+
beam.coders.typecoders.registry.register_row(MyDataClass)
7272

7373
def generate(num: int):
7474
for i in range(100):
@@ -120,10 +120,8 @@ class DataClassInt:
120120
class DataClassStr(DataClassInt):
121121
name: str
122122

123-
beam.coders.typecoders.registry.register_coder(
124-
DataClassInt, beam.coders.RowCoder)
125-
beam.coders.typecoders.registry.register_coder(
126-
DataClassStr, beam.coders.RowCoder)
123+
beam.coders.typecoders.registry.register_row(DataClassInt)
124+
beam.coders.typecoders.registry.register_row(DataClassStr)
127125

128126
def generate(num: int):
129127
for i in range(10):

sdks/python/apache_beam/typehints/schema_registry.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
class SchemaTypeRegistry(object):
2727
def __init__(self):
2828
self.by_id = {}
29-
self.by_typing = {}
29+
self.by_typing = {} # currently not used
3030

3131
def generate_new_id(self):
3232
for _ in range(100):
@@ -43,6 +43,15 @@ def add(self, typing, schema):
4343
if schema.id:
4444
self.by_id[schema.id] = (typing, schema)
4545

46+
def load_registered_typings(self, by_id):
47+
for id, typing in by_id.items():
48+
if id not in self.by_id:
49+
self.by_id[id] = (typing, None)
50+
51+
def get_registered_typings(self):
52+
# Used by save_main_session, as pb2.schema isn't picklable
53+
return {k: v[0] for k, v in self.by_id.items()}
54+
4655
def get_typing_by_id(self, unique_id):
4756
if not unique_id:
4857
return None

0 commit comments

Comments
 (0)