Skip to content

Commit 00c1f10

Browse files
authored
[yaml] - add nullable field test for readFromBigQuery and update create core logic (#35692)
* add more test coverage for nullable field for BQ * yapf changes * change else statement to field types instead of row * remove error handling as not needed for now * fix lint issues * comment out failing create transforms * address comments * add core create UT * update core logic * fix lint * fix brittle test * update multiple type logic * revert changes to assign timestamps and validate * switch to typehints * updated core tests with typehints
1 parent 6b91e5c commit 00c1f10

4 files changed

Lines changed: 172 additions & 11 deletions

File tree

sdks/python/apache_beam/transforms/core.py

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import traceback
3030
import types
3131
import typing
32+
from collections import defaultdict
3233
from itertools import dropwhile
3334

3435
from apache_beam import coders
@@ -3962,9 +3963,41 @@ def to_runner_api_parameter(self, context):
39623963
def infer_output_type(self, unused_input_type):
39633964
if not self.values:
39643965
return typehints.Any
3965-
return typehints.Union[[
3966-
trivial_inference.instance_to_type(v) for v in self.values
3967-
]]
3966+
3967+
# No field data - just use default Union.
3968+
if not hasattr(self.values[0], 'as_dict'):
3969+
return typehints.Union[[
3970+
trivial_inference.instance_to_type(v) for v in self.values
3971+
]]
3972+
3973+
first_fields = self.values[0].as_dict().keys()
3974+
3975+
# Save field types for each field
3976+
field_types_by_field = defaultdict(set)
3977+
for row in self.values:
3978+
row_dict = row.as_dict()
3979+
for field in first_fields:
3980+
field_types_by_field[field].add(
3981+
trivial_inference.instance_to_type(row_dict.get(field)))
3982+
3983+
# Determine the appropriate type for each field
3984+
final_fields = []
3985+
for field in first_fields:
3986+
field_types = field_types_by_field[field]
3987+
non_none_types = {t for t in field_types if t is not type(None)}
3988+
3989+
if len(non_none_types) > 1:
3990+
final_type = typehints.Union[tuple(non_none_types)]
3991+
elif len(non_none_types) == 1 and len(field_types) == 1:
3992+
final_type = non_none_types.pop()
3993+
elif len(non_none_types) == 1 and len(field_types) == 2:
3994+
final_type = typehints.Optional[non_none_types.pop()]
3995+
else:
3996+
raise TypeError("No types found for field %s", field)
3997+
3998+
final_fields.append((field, final_type))
3999+
4000+
return row_type.RowTypeConstraint.from_fields(final_fields)
39684001

39694002
def get_output_type(self):
39704003
return (

sdks/python/apache_beam/transforms/core_test.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
from apache_beam.testing.util import equal_to
3232
from apache_beam.transforms.window import FixedWindows
3333
from apache_beam.typehints import TypeCheckError
34+
from apache_beam.typehints import row_type
3435
from apache_beam.typehints import typehints
3536

3637
RETURN_NONE_PARTIAL_WARNING = "No iterator is returned"
@@ -322,6 +323,36 @@ def test_typecheck_with_default(self):
322323
| beam.Map(lambda s: s.upper()).with_input_types(str))
323324

324325

326+
class CreateInferOutputSchemaTest(unittest.TestCase):
327+
def test_multiple_types_for_field(self):
328+
output_type = beam.Create([beam.Row(a=1),
329+
beam.Row(a='foo')]).infer_output_type(None)
330+
self.assertEqual(
331+
output_type,
332+
row_type.RowTypeConstraint.from_fields([
333+
('a', typehints.Union[int, str])
334+
]))
335+
336+
def test_single_type_for_field(self):
337+
output_type = beam.Create([beam.Row(a=1),
338+
beam.Row(a=2)]).infer_output_type(None)
339+
self.assertEqual(
340+
output_type, row_type.RowTypeConstraint.from_fields([('a', int)]))
341+
342+
def test_optional_type_for_field(self):
343+
output_type = beam.Create([beam.Row(a=1),
344+
beam.Row(a=None)]).infer_output_type(None)
345+
self.assertEqual(
346+
output_type,
347+
row_type.RowTypeConstraint.from_fields([('a', typehints.Optional[int])
348+
]))
349+
350+
def test_none_type_for_field_raises_error(self):
351+
with self.assertRaisesRegex(TypeError,
352+
"('No types found for field %s', 'a')"):
353+
beam.Create([beam.Row(a=None), beam.Row(a=None)]).infer_output_type(None)
354+
355+
325356
if __name__ == '__main__':
326357
logging.getLogger().setLevel(logging.INFO)
327358
unittest.main()

sdks/python/apache_beam/yaml/extended_tests/databases/bigquery.yaml

Lines changed: 71 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,20 @@
1616
#
1717

1818
fixtures:
19-
- name: BQ_TABLE
19+
- name: BQ_TABLE_0
2020
type: "apache_beam.yaml.integration_tests.temp_bigquery_table"
2121
config:
2222
project: "apache-beam-testing"
23-
- name: TEMP_DIR
23+
- name: TEMP_DIR_0
24+
# Need distributed filesystem to be able to read and write from a container.
25+
type: "apache_beam.yaml.integration_tests.gcs_temp_dir"
26+
config:
27+
bucket: "gs://temp-storage-for-end-to-end-tests/temp-it"
28+
- name: BQ_TABLE_1
29+
type: "apache_beam.yaml.integration_tests.temp_bigquery_table"
30+
config:
31+
project: "apache-beam-testing"
32+
- name: TEMP_DIR_1
2433
# Need distributed filesystem to be able to read and write from a container.
2534
type: "apache_beam.yaml.integration_tests.gcs_temp_dir"
2635
config:
@@ -38,17 +47,17 @@ pipelines:
3847
- {label: "389a", rank: 2}
3948
- type: WriteToBigQuery
4049
config:
41-
table: "{BQ_TABLE}"
50+
table: "{BQ_TABLE_0}"
4251
options:
4352
project: "apache-beam-testing"
44-
temp_location: "{TEMP_DIR}"
53+
temp_location: "{TEMP_DIR_0}"
4554

4655
- pipeline:
4756
type: chain
4857
transforms:
4958
- type: ReadFromBigQuery
5059
config:
51-
table: "{BQ_TABLE}"
60+
table: "{BQ_TABLE_0}"
5261
- type: AssertEqual
5362
config:
5463
elements:
@@ -57,14 +66,14 @@ pipelines:
5766
- {label: "389a", rank: 2}
5867
options:
5968
project: "apache-beam-testing"
60-
temp_location: "{TEMP_DIR}"
69+
temp_location: "{TEMP_DIR_0}"
6170

6271
- pipeline:
6372
type: chain
6473
transforms:
6574
- type: ReadFromBigQuery
6675
config:
67-
table: "{BQ_TABLE}"
76+
table: "{BQ_TABLE_0}"
6877
fields: ["label"]
6978
row_restriction: "rank > 0"
7079
- type: AssertEqual
@@ -74,4 +83,58 @@ pipelines:
7483
- {label: "389a"}
7584
options:
7685
project: "apache-beam-testing"
77-
temp_location: "{TEMP_DIR}"
86+
temp_location: "{TEMP_DIR_0}"
87+
88+
# ----------------------------------------------------------------------------
89+
90+
# New write to verify row restriction based on Timestamp and nullability
91+
- pipeline:
92+
type: chain
93+
transforms:
94+
- type: Create
95+
config:
96+
elements:
97+
- {label: "4a", rank: 3, timestamp: "2024-07-14 00:00:00 UTC"}
98+
- {label: "5a", rank: 4}
99+
- {label: "6a", rank: 5, timestamp: "2024-07-14T02:00:00.123Z"}
100+
- type: WriteToBigQuery
101+
config:
102+
table: "{BQ_TABLE_1}"
103+
104+
# New read from BQ to verify row restriction with nullable field and filter
105+
# out nullable record
106+
- pipeline:
107+
type: chain
108+
transforms:
109+
- type: ReadFromBigQuery
110+
config:
111+
table: "{BQ_TABLE_1}"
112+
fields: ["label","rank","timestamp"]
113+
row_restriction: "TIMESTAMP(timestamp) <= TIMESTAMP_SUB('2025-07-14 04:00:00', INTERVAL 4 HOUR)"
114+
- type: AssertEqual
115+
config:
116+
elements:
117+
- {label: "4a", rank: 3, timestamp: "2024-07-14 00:00:00 UTC"}
118+
- {label: "6a", rank: 5,timestamp: "2024-07-14T02:00:00.123Z"}
119+
options:
120+
project: "apache-beam-testing"
121+
temp_location: "{TEMP_DIR_1}"
122+
123+
# New read from BQ to verify row restriction with nullable field and keep
124+
# nullable record
125+
- pipeline:
126+
type: chain
127+
transforms:
128+
- type: ReadFromBigQuery
129+
config:
130+
table: "{BQ_TABLE_1}"
131+
fields: ["timestamp", "label", "rank"]
132+
row_restriction: "timestamp is NULL"
133+
- type: AssertEqual
134+
config:
135+
elements:
136+
- {label: "5a", rank: 4}
137+
options:
138+
project: "apache-beam-testing"
139+
temp_location: "{TEMP_DIR_1}"
140+

sdks/python/apache_beam/yaml/tests/create.yaml

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,3 +81,37 @@ pipelines:
8181
- {sdk: MapReduce, year: 2004}
8282
- {sdk: Flume}
8383
- {sdk: MillWheel, year: 2008}
84+
85+
# Simple Create with explicit null value
86+
- pipeline:
87+
type: chain
88+
transforms:
89+
- type: Create
90+
config:
91+
elements:
92+
- {sdk: MapReduce, year: 2004}
93+
- {sdk: Flume, year: null}
94+
- {sdk: MillWheel, year: 2008}
95+
- type: AssertEqual
96+
config:
97+
elements:
98+
- {sdk: MapReduce, year: 2004}
99+
- {sdk: Flume, year: null}
100+
- {sdk: MillWheel, year: 2008}
101+
102+
# Simple Create with explicit null values for the entire record
103+
- pipeline:
104+
type: chain
105+
transforms:
106+
- type: Create
107+
config:
108+
elements:
109+
- {sdk: MapReduce, year: 2004}
110+
- {sdk: null, year: null}
111+
- {sdk: MillWheel, year: 2008}
112+
- type: AssertEqual
113+
config:
114+
elements:
115+
- {sdk: MapReduce, year: 2004}
116+
- {}
117+
- {sdk: MillWheel, year: 2008}

0 commit comments

Comments
 (0)