Skip to content

Commit 718d378

Browse files
committed
Save schema registry id->type mapping
1 parent 1e00d27 commit 718d378

File tree

2 files changed

+22
-2
lines changed

2 files changed

+22
-2
lines changed

sdks/python/apache_beam/internal/cloudpickle_pickler.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,20 +256,27 @@ def dump_session(file_path):
256256
# dump supported Beam Registries (currently only logical type registry)
257257
from apache_beam.coders import typecoders
258258
from apache_beam.typehints import schemas
259+
from apache_beam.typehints.schema_registry import SCHEMA_REGISTRY
259260

260261
with _pickle_lock, open(file_path, 'wb') as file:
261262
coder_reg = typecoders.registry.get_custom_type_coder_tuples()
262263
logical_type_reg = schemas.LogicalType._known_logical_types.copy_custom()
264+
schema_reg = SCHEMA_REGISTRY.get_registered_typings()
263265

264266
pickler = cloudpickle.CloudPickler(file)
265267
# TODO(https://github.com/apache/beam/issues/18500) add file system registry
266268
# once implemented
267-
pickler.dump({"coder": coder_reg, "logical_type": logical_type_reg})
269+
pickler.dump({
270+
"coder": coder_reg,
271+
"logical_type": logical_type_reg,
272+
"schema": schema_reg
273+
})
268274

269275

270276
def load_session(file_path):
271277
from apache_beam.coders import typecoders
272278
from apache_beam.typehints import schemas
279+
from apache_beam.typehints.schema_registry import SCHEMA_REGISTRY
273280

274281
with _pickle_lock, open(file_path, 'rb') as file:
275282
registries = cloudpickle.load(file)
@@ -284,3 +291,7 @@ def load_session(file_path):
284291
schemas.LogicalType._known_logical_types.load(registries["logical_type"])
285292
else:
286293
_LOGGER.warning('No logical type registry found in saved session')
294+
if "schema" in registries:
295+
SCHEMA_REGISTRY.load_registered_typings(registries["schema"])
296+
else:
297+
_LOGGER.warning('No schema registry found in saved session')

sdks/python/apache_beam/typehints/schema_registry.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
class SchemaTypeRegistry(object):
2727
def __init__(self):
2828
self.by_id = {}
29-
self.by_typing = {}
29+
self.by_typing = {} # currently not used
3030

3131
def generate_new_id(self):
3232
for _ in range(100):
@@ -43,6 +43,15 @@ def add(self, typing, schema):
4343
if schema.id:
4444
self.by_id[schema.id] = (typing, schema)
4545

46+
def load_registered_typings(self, by_id):
47+
for id, typing in by_id.items():
48+
if id not in self.by_id:
49+
self.by_id[id] = (typing, None)
50+
51+
def get_registered_typings(self):
52+
# Used by save_main_session, as pb2.schema isn't picklable
53+
return {k: v[0] for k, v in self.by_id.items()}
54+
4655
def get_typing_by_id(self, unique_id):
4756
if not unique_id:
4857
return None

0 commit comments

Comments
 (0)