This repository was archived by the owner on Apr 1, 2026. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 68
Expand file tree
/
Copy pathread_gbq_query.py
More file actions
95 lines (81 loc) · 3.19 KB
/
read_gbq_query.py
File metadata and controls
95 lines (81 loc) · 3.19 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Private helpers for implementing read_gbq_query."""
from __future__ import annotations
from typing import Optional
from google.cloud import bigquery
import google.cloud.bigquery.table
import numpy
import pandas
import pyarrow
from bigframes import dataframe
from bigframes.core import local_data
import bigframes.core as core
import bigframes.core.blocks as blocks
import bigframes.core.guid
import bigframes.core.schema as schemata
import bigframes.session
def create_dataframe_from_query_job_stats(
query_job: Optional[bigquery.QueryJob], *, session: bigframes.session.Session
) -> dataframe.DataFrame:
"""Convert a QueryJob into a DataFrame with key statistics about the query.
Any changes you make here, please try to keep in sync with pandas-gbq.
"""
return dataframe.DataFrame(
data=pandas.DataFrame(
{
"statement_type": [
query_job.statement_type if query_job else "unknown"
],
"job_id": [query_job.job_id if query_job else "unknown"],
"location": [query_job.location if query_job else "unknown"],
}
),
session=session,
)
def create_dataframe_from_row_iterator(
rows: google.cloud.bigquery.table.RowIterator, *, session: bigframes.session.Session
) -> dataframe.DataFrame:
"""Convert a RowIterator into a DataFrame wrapping a LocalNode.
This allows us to create a DataFrame from query results, even in the
'jobless' case where there's no destination table.
"""
pa_table = rows.to_arrow()
# TODO(tswast): Use array_value.promote_offsets() instead once that node is
# supported by the local engine.
offsets_col = bigframes.core.guid.generate_guid()
pa_table = pa_table.append_column(
pyarrow.field(offsets_col, pyarrow.int64()),
[numpy.arange(pa_table.num_rows)],
)
# We use the ManagedArrowTable constructor directly, because the
# results of to_arrow() should be the source of truth with regards
# to canonical formats since it comes from either the BQ Storage
# Read API or has been transformed by google-cloud-bigquery to look
# like the output of the BQ Storage Read API.
mat = local_data.ManagedArrowTable(
pa_table,
schemata.ArraySchema.from_bq_schema(
list(rows.schema) + [bigquery.SchemaField(offsets_col, "INTEGER")]
),
)
mat.validate()
array_value = core.ArrayValue.from_managed(mat, session)
block = blocks.Block(
array_value,
(offsets_col,),
[field.name for field in rows.schema],
(None,),
)
return dataframe.DataFrame(block)