Skip to content
This repository was archived by the owner on Nov 12, 2025. It is now read-only.

Commit 10dcc1f

Browse files
committed
use schema in template and add table verification
1 parent 8ad4fb2 commit 10dcc1f

2 files changed

Lines changed: 92 additions & 69 deletions

File tree

google/cloud/bigquery_storage_v1/writer.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -72,11 +72,13 @@ def _process_request_template(
7272
# The protobuf payload will be decoded as proto2 on the server side. The
7373
# schema is also specified as proto2. Hence we must clear proto3-only
7474
# features. This works since proto2 and proto3 are binary-compatible.
75-
proto_descriptor = template_copy.proto_rows.writer_schema.proto_descriptor
76-
for field in proto_descriptor.field:
77-
field.ClearField("oneof_index")
78-
field.ClearField("proto3_optional")
79-
proto_descriptor.ClearField("oneof_decl")
75+
oneof_field = template_copy._pb.WhichOneof("rows")
76+
if oneof_field == "proto_rows":
77+
proto_descriptor = template_copy.proto_rows.writer_schema.proto_descriptor
78+
for field in proto_descriptor.field:
79+
field.ClearField("oneof_index")
80+
field.ClearField("proto3_optional")
81+
proto_descriptor.ClearField("oneof_decl")
8082

8183
return template_copy
8284

samples/pyarrow/append_rows_with_arrow.py

Lines changed: 85 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -24,43 +24,76 @@
2424
from google.cloud.bigquery_storage_v1.writer import AppendRowsStream
2525

2626

27+
TABLE_LENGTH = 1000
28+
29+
BQ_SCHEMA = [
30+
bigquery.SchemaField("bool_col", enums.SqlTypeNames.BOOLEAN),
31+
bigquery.SchemaField("int64_col", enums.SqlTypeNames.INT64),
32+
bigquery.SchemaField("float64_col", enums.SqlTypeNames.FLOAT64),
33+
bigquery.SchemaField("numeric_col", enums.SqlTypeNames.NUMERIC),
34+
bigquery.SchemaField("bignumeric_col", enums.SqlTypeNames.BIGNUMERIC),
35+
bigquery.SchemaField("string_col", enums.SqlTypeNames.STRING),
36+
bigquery.SchemaField("bytes_col", enums.SqlTypeNames.BYTES),
37+
bigquery.SchemaField("date_col", enums.SqlTypeNames.DATE),
38+
bigquery.SchemaField("datetime_col", enums.SqlTypeNames.DATETIME),
39+
bigquery.SchemaField("time_col", enums.SqlTypeNames.TIME),
40+
bigquery.SchemaField("timestamp_col", enums.SqlTypeNames.TIMESTAMP),
41+
bigquery.SchemaField("geography_col", enums.SqlTypeNames.GEOGRAPHY),
42+
bigquery.SchemaField(
43+
"range_date_col", enums.SqlTypeNames.RANGE, range_element_type="DATE"
44+
),
45+
bigquery.SchemaField(
46+
"range_datetime_col",
47+
enums.SqlTypeNames.RANGE,
48+
range_element_type="DATETIME",
49+
),
50+
bigquery.SchemaField(
51+
"range_timestamp_col",
52+
enums.SqlTypeNames.RANGE,
53+
range_element_type="TIMESTAMP",
54+
),
55+
]
56+
57+
PYARROW_SCHEMA = pa.schema(
58+
[
59+
pa.field("bool_col", pa.bool_()),
60+
pa.field("int64_col", pa.int64()),
61+
pa.field("float64_col", pa.float64()),
62+
pa.field("numeric_col", pa.decimal128(38, scale=9)),
63+
pa.field("bignumeric_col", pa.decimal256(76, scale=38)),
64+
pa.field("string_col", pa.string()),
65+
pa.field("bytes_col", pa.binary()),
66+
pa.field("date_col", pa.date32()),
67+
pa.field("datetime_col", pa.timestamp("us")),
68+
pa.field("time_col", pa.time64("us")),
69+
pa.field("timestamp_col", pa.timestamp("us")),
70+
pa.field("geography_col", pa.string()),
71+
pa.field(
72+
"range_date_col",
73+
pa.struct([("start", pa.date32()), ("end", pa.date32())]),
74+
),
75+
pa.field(
76+
"range_datetime_col",
77+
pa.struct([("start", pa.timestamp("us")), ("end", pa.timestamp("us"))]),
78+
),
79+
pa.field(
80+
"range_timestamp_col",
81+
pa.struct([("start", pa.timestamp("us")), ("end", pa.timestamp("us"))]),
82+
),
83+
]
84+
)
85+
86+
2787
def bqstorage_write_client():
2888
from google.cloud import bigquery_storage_v1
2989

3090
return bigquery_storage_v1.BigQueryWriteClient()
3191

3292

3393
def make_table(project_id, dataset_id, bq_client):
34-
schema = [
35-
bigquery.SchemaField("bool_col", enums.SqlTypeNames.BOOLEAN),
36-
bigquery.SchemaField("int64_col", enums.SqlTypeNames.INT64),
37-
bigquery.SchemaField("float64_col", enums.SqlTypeNames.FLOAT64),
38-
bigquery.SchemaField("numeric_col", enums.SqlTypeNames.NUMERIC),
39-
bigquery.SchemaField("bignumeric_col", enums.SqlTypeNames.BIGNUMERIC),
40-
bigquery.SchemaField("string_col", enums.SqlTypeNames.STRING),
41-
bigquery.SchemaField("bytes_col", enums.SqlTypeNames.BYTES),
42-
bigquery.SchemaField("date_col", enums.SqlTypeNames.DATE),
43-
bigquery.SchemaField("datetime_col", enums.SqlTypeNames.DATETIME),
44-
bigquery.SchemaField("time_col", enums.SqlTypeNames.TIME),
45-
bigquery.SchemaField("timestamp_col", enums.SqlTypeNames.TIMESTAMP),
46-
bigquery.SchemaField("geography_col", enums.SqlTypeNames.GEOGRAPHY),
47-
bigquery.SchemaField(
48-
"range_date_col", enums.SqlTypeNames.RANGE, range_element_type="DATE"
49-
),
50-
bigquery.SchemaField(
51-
"range_datetime_col",
52-
enums.SqlTypeNames.RANGE,
53-
range_element_type="DATETIME",
54-
),
55-
bigquery.SchemaField(
56-
"range_timestamp_col",
57-
enums.SqlTypeNames.RANGE,
58-
range_element_type="TIMESTAMP",
59-
),
60-
]
6194
table_id = "append_rows_w_arrow_test"
6295
table_id_full = f"{project_id}.{dataset_id}.{table_id}"
63-
bq_table = bigquery.Table(table_id_full, schema=schema)
96+
bq_table = bigquery.Table(table_id_full, schema=BQ_SCHEMA)
6497
created_table = bq_client.create_table(bq_table)
6598

6699
return created_table
@@ -71,14 +104,19 @@ def create_stream(bqstorage_write_client, table):
71104
request_template = gapic_types.AppendRowsRequest()
72105
request_template.write_stream = stream_name
73106

107+
# Add schema to the template.
108+
arrow_data = gapic_types.AppendRowsRequest.ArrowData()
109+
arrow_data.writer_schema.serialized_schema = PYARROW_SCHEMA.serialize().to_pybytes()
110+
request_template.arrow_rows = arrow_data
111+
74112
append_rows_stream = AppendRowsStream(
75113
bqstorage_write_client,
76114
request_template,
77115
)
78116
return append_rows_stream
79117

80118

81-
def generate_write_request_with_pyarrow(row_num=10):
119+
def generate_write_request_with_pyarrow(num_rows=TABLE_LENGTH):
82120
date_1 = datetime.date(2020, 10, 1)
83121
date_2 = datetime.date(2021, 10, 1)
84122

@@ -94,7 +132,7 @@ def generate_write_request_with_pyarrow(row_num=10):
94132

95133
# Pandas Dataframe.
96134
rows = []
97-
for i in range(row_num):
135+
for i in range(num_rows):
98136
row = {
99137
"bool_col": True,
100138
"int64_col": i,
@@ -116,57 +154,40 @@ def generate_write_request_with_pyarrow(row_num=10):
116154
df = pd.DataFrame(rows)
117155

118156
# Dataframe to PyArrow Table.
119-
schema = pa.schema(
120-
[
121-
pa.field("bool_col", pa.bool_()),
122-
pa.field("int64_col", pa.int64()),
123-
pa.field("float64_col", pa.float64()),
124-
pa.field("numeric_col", pa.decimal128(38, scale=9)),
125-
pa.field("bignumeric_col", pa.decimal256(76, scale=38)),
126-
pa.field("string_col", pa.string()),
127-
pa.field("bytes_col", pa.binary()),
128-
pa.field("date_col", pa.date32()),
129-
pa.field("datetime_col", pa.timestamp("us")),
130-
pa.field("time_col", pa.time64("us")),
131-
pa.field("timestamp_col", pa.timestamp("us")),
132-
pa.field("geography_col", pa.string()),
133-
pa.field(
134-
"range_date_col",
135-
pa.struct([("start", pa.date32()), ("end", pa.date32())]),
136-
),
137-
pa.field(
138-
"range_datetime_col",
139-
pa.struct([("start", pa.timestamp("us")), ("end", pa.timestamp("us"))]),
140-
),
141-
pa.field(
142-
"range_timestamp_col",
143-
pa.struct([("start", pa.timestamp("us")), ("end", pa.timestamp("us"))]),
144-
),
145-
]
146-
)
147-
table = pa.Table.from_pandas(df, schema=schema)
157+
table = pa.Table.from_pandas(df, schema=PYARROW_SCHEMA)
148158

149159
# Construct request.
150160
request = gapic_types.AppendRowsRequest()
151-
arrow_data = gapic_types.AppendRowsRequest.ArrowData()
152-
arrow_data.writer_schema.serialized_schema = table.schema.serialize().to_pybytes()
153-
arrow_data.rows.serialized_record_batch = (
161+
request.arrow_rows.rows.serialized_record_batch = (
154162
table.to_batches()[0].serialize().to_pybytes()
155163
)
156-
request.arrow_rows = arrow_data
157164
return request
158165

159166

160167
def append_rows(bqstorage_write_client, table):
161168
append_rows_stream = create_stream(bqstorage_write_client, table)
162-
request = generate_write_request_with_pyarrow(row_num=20)
169+
request = generate_write_request_with_pyarrow(num_rows=TABLE_LENGTH)
163170

164171
response_future = append_rows_stream.send(request)
165172
print(response_future.result())
166173

167174

175+
def verify_table(client, table):
176+
bq_table = client.get_table(table)
177+
178+
# Verify table schema.
179+
assert bq_table.schema == BQ_SCHEMA
180+
181+
# Verify table size.
182+
query = client.query(f"SELECT COUNT(1) FROM `{bq_table}`;")
183+
query_result = query.result().to_dataframe()
184+
assert query_result.iloc[0, 0] == TABLE_LENGTH
185+
186+
168187
def main(project_id, dataset):
169188
write_client = bqstorage_write_client()
170189
bq_client = bigquery.Client()
171190
table = make_table(project_id, dataset.dataset_id, bq_client)
191+
172192
append_rows(write_client, table)
193+
verify_table(bq_client, table)

0 commit comments

Comments
 (0)