|
17 | 17 |
|
18 | 18 | def main(project_id="your-project-id", snapshot_millis=0): |
19 | 19 | # [START bigquerystorage_quickstart] |
20 | | - from google.cloud import bigquery_storage_v1beta1 |
| 20 | + from google.cloud import bigquery_storage_v1 |
21 | 21 |
|
22 | 22 | # TODO(developer): Set the project_id variable. |
23 | 23 | # project_id = 'your-project-id' |
24 | 24 | # |
25 | 25 | # The read session is created in this project. This project can be |
26 | 26 | # different from that which contains the table. |
27 | 27 |
|
28 | | - client = bigquery_storage_v1beta1.BigQueryStorageClient() |
| 28 | + client = bigquery_storage_v1.BigQueryReadClient() |
29 | 29 |
|
30 | 30 | # This example reads baby name data from the public datasets. |
31 | | - table_ref = bigquery_storage_v1beta1.types.TableReference() |
32 | | - table_ref.project_id = "bigquery-public-data" |
33 | | - table_ref.dataset_id = "usa_names" |
34 | | - table_ref.table_id = "usa_1910_current" |
| 31 | + table = "projects/{}/datasets/{}/tables/{}".format( |
| 32 | + "bigquery-public-data", "usa_names", "usa_1910_current" |
| 33 | + ) |
| 34 | + |
| 35 | + requested_session = bigquery_storage_v1.types.ReadSession() |
| 36 | + requested_session.table = table |
| 37 | + # This API can also deliver data serialized in Apache Arrow format. |
| 38 | + # This example leverages Apache Avro. |
| 39 | + requested_session.data_format = bigquery_storage_v1.enums.DataFormat.AVRO |
35 | 40 |
|
36 | 41 | # We limit the output columns to a subset of those allowed in the table, |
37 | 42 | # and set a simple filter to only report names from the state of |
38 | 43 | # Washington (WA). |
39 | | - read_options = bigquery_storage_v1beta1.types.TableReadOptions() |
40 | | - read_options.selected_fields.append("name") |
41 | | - read_options.selected_fields.append("number") |
42 | | - read_options.selected_fields.append("state") |
43 | | - read_options.row_restriction = 'state = "WA"' |
| 44 | + requested_session.read_options.selected_fields.append("name") |
| 45 | + requested_session.read_options.selected_fields.append("number") |
| 46 | + requested_session.read_options.selected_fields.append("state") |
| 47 | + requested_session.read_options.row_restriction = 'state = "WA"' |
44 | 48 |
|
45 | 49 | # Set a snapshot time if it's been specified. |
46 | 50 | modifiers = None |
47 | 51 | if snapshot_millis > 0: |
48 | | - modifiers = bigquery_storage_v1beta1.types.TableModifiers() |
49 | | - modifiers.snapshot_time.FromMilliseconds(snapshot_millis) |
| 52 | + requested_session.table_modifiers.snapshot_time.FromMilliseconds( |
| 53 | + snapshot_millis |
| 54 | + ) |
50 | 55 |
|
51 | 56 | parent = "projects/{}".format(project_id) |
52 | 57 | session = client.create_read_session( |
53 | | - table_ref, |
54 | 58 | parent, |
55 | | - table_modifiers=modifiers, |
56 | | - read_options=read_options, |
57 | | - # This API can also deliver data serialized in Apache Arrow format. |
58 | | - # This example leverages Apache Avro. |
59 | | - format_=bigquery_storage_v1beta1.enums.DataFormat.AVRO, |
60 | | - # We use a LIQUID strategy in this example because we only read from a |
61 | | - # single stream. Consider BALANCED if you're consuming multiple streams |
62 | | - # concurrently and want more consistent stream sizes. |
63 | | - sharding_strategy=(bigquery_storage_v1beta1.enums.ShardingStrategy.LIQUID), |
64 | | - ) # API request. |
65 | | - |
66 | | - # We'll use only a single stream for reading data from the table. Because |
67 | | - # of dynamic sharding, this will yield all the rows in the table. However, |
68 | | - # if you wanted to fan out multiple readers you could do so by having a |
69 | | - # reader process each individual stream. |
70 | | - reader = client.read_rows( |
71 | | - bigquery_storage_v1beta1.types.StreamPosition(stream=session.streams[0]) |
| 59 | + requested_session, |
| 60 | + # We'll use only a single stream for reading data from the table. However, |
| 61 | + # if you wanted to fan out multiple readers you could do so by having a |
| 62 | + # reader process each individual stream. |
| 63 | + max_stream_count=1, |
72 | 64 | ) |
| 65 | + reader = client.read_rows(session.streams[0].name) |
73 | 66 |
|
74 | 67 | # The read stream contains blocks of Avro-encoded bytes. The rows() method |
75 | 68 | # uses the fastavro library to parse these blocks as an interable of Python |
|
0 commit comments