Skip to content

Commit 6d46cba

Browse files
feat(bigframes): Support loading avro, orc data (#16555)
Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly: - [ ] Make sure to open an issue as a [bug/issue](https://github.com/googleapis/google-cloud-python/issues) before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea - [ ] Ensure the tests and linter pass - [ ] Code coverage does not decrease (if any source code was changed) - [ ] Appropriate docs were updated (if necessary) Fixes #<issue_number_goes_here> 🦕
1 parent ef62df4 commit 6d46cba

File tree

4 files changed

+247
-3
lines changed

4 files changed

+247
-3
lines changed

packages/bigframes/bigframes/pandas/__init__.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@
101101
_read_gbq_colab,
102102
from_glob_path,
103103
read_arrow,
104+
read_avro,
104105
read_csv,
105106
read_gbq,
106107
read_gbq_function,
@@ -109,6 +110,7 @@
109110
read_gbq_query,
110111
read_gbq_table,
111112
read_json,
113+
read_orc,
112114
read_pandas,
113115
read_parquet,
114116
read_pickle,
@@ -446,8 +448,9 @@ def reset_session():
446448
get_dummies,
447449
merge,
448450
qcut,
449-
read_csv,
450451
read_arrow,
452+
read_avro,
453+
read_csv,
451454
read_gbq,
452455
_read_gbq_colab,
453456
read_gbq_function,
@@ -456,6 +459,7 @@ def reset_session():
456459
read_gbq_query,
457460
read_gbq_table,
458461
read_json,
462+
read_orc,
459463
read_pandas,
460464
read_parquet,
461465
read_pickle,
@@ -481,8 +485,9 @@ def reset_session():
481485
"get_dummies",
482486
"merge",
483487
"qcut",
484-
"read_csv",
485488
"read_arrow",
489+
"read_avro",
490+
"read_csv",
486491
"read_gbq",
487492
"_read_gbq_colab",
488493
"read_gbq_function",
@@ -491,6 +496,7 @@ def reset_session():
491496
"read_gbq_query",
492497
"read_gbq_table",
493498
"read_json",
499+
"read_orc",
494500
"read_pandas",
495501
"read_parquet",
496502
"read_pickle",

packages/bigframes/bigframes/pandas/io/api.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,21 @@ def read_arrow(pa_table: pa.Table) -> bigframes.dataframe.DataFrame:
9494
return session.read_arrow(pa_table=pa_table)
9595

9696

97+
def read_avro(
98+
path: str | IO["bytes"],
99+
*,
100+
engine: str = "auto",
101+
) -> bigframes.dataframe.DataFrame:
102+
return global_session.with_default_session(
103+
bigframes.session.Session.read_avro,
104+
path,
105+
engine=engine,
106+
)
107+
108+
109+
read_avro.__doc__ = inspect.getdoc(bigframes.session.Session.read_avro)
110+
111+
97112
def read_csv(
98113
filepath_or_buffer: str | IO["bytes"],
99114
*,
@@ -514,6 +529,23 @@ def read_gbq_table(
514529
read_gbq_table.__doc__ = inspect.getdoc(bigframes.session.Session.read_gbq_table)
515530

516531

532+
def read_orc(
533+
path: str | IO["bytes"],
534+
*,
535+
engine: str = "auto",
536+
write_engine: constants.WriteEngineType = "default",
537+
) -> bigframes.dataframe.DataFrame:
538+
return global_session.with_default_session(
539+
bigframes.session.Session.read_orc,
540+
path,
541+
engine=engine,
542+
write_engine=write_engine,
543+
)
544+
545+
546+
read_orc.__doc__ = inspect.getdoc(bigframes.session.Session.read_orc)
547+
548+
517549
@typing.overload
518550
def read_pandas(
519551
pandas_dataframe: pandas.DataFrame,

packages/bigframes/bigframes/session/__init__.py

Lines changed: 82 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1344,7 +1344,7 @@ def read_parquet(
13441344
"The provided path contains a wildcard character (*), which is not "
13451345
"supported by the current engine. To read files from wildcard paths, "
13461346
"please use the 'bigquery' engine by setting `engine='bigquery'` in "
1347-
"your configuration."
1347+
"the function call."
13481348
)
13491349

13501350
read_parquet_kwargs: Dict[str, Any] = {}
@@ -1360,6 +1360,87 @@ def read_parquet(
13601360
)
13611361
return self._read_pandas(pandas_obj, write_engine=write_engine)
13621362

1363+
def read_orc(
1364+
self,
1365+
path: str | IO["bytes"],
1366+
*,
1367+
engine: str = "auto",
1368+
write_engine: constants.WriteEngineType = "default",
1369+
) -> dataframe.DataFrame:
1370+
"""Load an ORC file to a BigQuery DataFrames DataFrame.
1371+
1372+
Args:
1373+
path (str or IO):
1374+
The path or buffer to the ORC file. Can be a local path or Google Cloud Storage URI.
1375+
engine (str, default "auto"):
1376+
The engine used to read the file. Supported values: `auto`, `bigquery`, `pyarrow`.
1377+
write_engine (str, default "default"):
1378+
The write engine used to persist the data to BigQuery if needed.
1379+
1380+
Returns:
1381+
bigframes.pandas.DataFrame:
1382+
A new DataFrame representing the data from the ORC file.
1383+
"""
1384+
bigframes.session.validation.validate_engine_compatibility(
1385+
engine=engine,
1386+
write_engine=write_engine,
1387+
)
1388+
if engine == "bigquery":
1389+
job_config = bigquery.LoadJobConfig()
1390+
job_config.source_format = bigquery.SourceFormat.ORC
1391+
job_config.labels = {"bigframes-api": "read_orc"}
1392+
table_id = self._loader.load_file(path, job_config=job_config)
1393+
return self._loader.read_gbq_table(table_id)
1394+
elif engine in ("auto", "pyarrow"):
1395+
if isinstance(path, str) and "*" in path:
1396+
raise ValueError(
1397+
"The provided path contains a wildcard character (*), which is not "
1398+
"supported by the current engine. To read files from wildcard paths, "
1399+
"please use the 'bigquery' engine by setting `engine='bigquery'` in "
1400+
"your configuration."
1401+
)
1402+
1403+
read_orc_kwargs: Dict[str, Any] = {}
1404+
if not pandas.__version__.startswith("1."):
1405+
read_orc_kwargs["dtype_backend"] = "pyarrow"
1406+
1407+
pandas_obj = pandas.read_orc(path, **read_orc_kwargs)
1408+
return self._read_pandas(pandas_obj, write_engine=write_engine)
1409+
else:
1410+
raise ValueError(
1411+
f"Unsupported engine: {repr(engine)}. Supported values: 'auto', 'bigquery', 'pyarrow'."
1412+
)
1413+
1414+
def read_avro(
1415+
self,
1416+
path: str | IO["bytes"],
1417+
*,
1418+
engine: str = "auto",
1419+
) -> dataframe.DataFrame:
1420+
"""Load an Avro file to a BigQuery DataFrames DataFrame.
1421+
1422+
Args:
1423+
path (str or IO):
1424+
The path or buffer to the Avro file. Can be a local path or Google Cloud Storage URI.
1425+
engine (str, default "auto"):
1426+
The engine used to read the file. Only `bigquery` is supported for Avro.
1427+
1428+
Returns:
1429+
bigframes.pandas.DataFrame:
1430+
A new DataFrame representing the data from the Avro file.
1431+
"""
1432+
if engine not in ("auto", "bigquery"):
1433+
raise ValueError(
1434+
f"Unsupported engine: {repr(engine)}. Supported values: 'auto', 'bigquery'."
1435+
)
1436+
1437+
job_config = bigquery.LoadJobConfig()
1438+
job_config.use_avro_logical_types = True
1439+
job_config.source_format = bigquery.SourceFormat.AVRO
1440+
job_config.labels = {"bigframes-api": "read_avro"}
1441+
table_id = self._loader.load_file(path, job_config=job_config)
1442+
return self._loader.read_gbq_table(table_id)
1443+
13631444
def read_json(
13641445
self,
13651446
path_or_buf: str | IO["bytes"],

packages/bigframes/tests/system/small/test_session.py

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1935,6 +1935,131 @@ def test_read_parquet_gcs(
19351935
bigframes.testing.utils.assert_frame_equal(pd_df_in, pd_df_out)
19361936

19371937

1938+
@pytest.mark.parametrize(
1939+
("engine", "filename"),
1940+
(
1941+
pytest.param(
1942+
"bigquery",
1943+
"000000000000.orc",
1944+
id="bigquery",
1945+
),
1946+
pytest.param(
1947+
"auto",
1948+
"000000000000.orc",
1949+
id="auto",
1950+
),
1951+
pytest.param(
1952+
"pyarrow",
1953+
"000000000000.orc",
1954+
id="pyarrow",
1955+
),
1956+
pytest.param(
1957+
"bigquery",
1958+
"*.orc",
1959+
id="bigquery_wildcard",
1960+
),
1961+
pytest.param(
1962+
"auto",
1963+
"*.orc",
1964+
id="auto_wildcard",
1965+
marks=pytest.mark.xfail(
1966+
raises=ValueError,
1967+
),
1968+
),
1969+
),
1970+
)
1971+
def test_read_orc_gcs(
1972+
session: bigframes.Session, scalars_dfs, gcs_folder, engine, filename
1973+
):
1974+
pytest.importorskip(
1975+
"pandas",
1976+
minversion="2.0.0",
1977+
reason="pandas<2 does not handle nullable int columns well",
1978+
)
1979+
scalars_df, _ = scalars_dfs
1980+
write_path = gcs_folder + test_read_orc_gcs.__name__ + "000000000000.orc"
1981+
read_path = gcs_folder + test_read_orc_gcs.__name__ + filename
1982+
1983+
df_in: bigframes.dataframe.DataFrame = scalars_df.copy()
1984+
df_in = df_in.drop(
1985+
columns=[
1986+
"geography_col",
1987+
"time_col",
1988+
"datetime_col",
1989+
"duration_col",
1990+
"timestamp_col",
1991+
]
1992+
)
1993+
df_write = df_in.reset_index(drop=False)
1994+
df_write.index.name = f"ordering_id_{random.randrange(1_000_000)}"
1995+
df_write.to_orc(write_path)
1996+
1997+
df_out = (
1998+
session.read_orc(read_path, engine=engine)
1999+
.set_index(df_write.index.name)
2000+
.sort_index()
2001+
.set_index(typing.cast(str, df_in.index.name))
2002+
)
2003+
2004+
assert df_out.size != 0
2005+
pd_df_in = df_in.to_pandas()
2006+
pd_df_out = df_out.to_pandas()
2007+
bigframes.testing.utils.assert_frame_equal(pd_df_in, pd_df_out)
2008+
2009+
2010+
@pytest.mark.parametrize(
2011+
("engine", "filename"),
2012+
(
2013+
pytest.param(
2014+
"bigquery",
2015+
"000000000000.avro",
2016+
id="bigquery",
2017+
),
2018+
pytest.param(
2019+
"bigquery",
2020+
"*.avro",
2021+
id="bigquery_wildcard",
2022+
),
2023+
),
2024+
)
2025+
def test_read_avro_gcs(
2026+
session: bigframes.Session, scalars_dfs, gcs_folder, engine, filename
2027+
):
2028+
scalars_df, _ = scalars_dfs
2029+
write_uri = gcs_folder + test_read_avro_gcs.__name__ + "*.avro"
2030+
read_uri = gcs_folder + test_read_avro_gcs.__name__ + filename
2031+
2032+
df_in: bigframes.dataframe.DataFrame = scalars_df.copy()
2033+
# datetime round-trips back as str in avro
2034+
df_in = df_in.drop(columns=["geography_col", "duration_col", "datetime_col"])
2035+
df_write = df_in.reset_index(drop=False)
2036+
index_name = f"ordering_id_{random.randrange(1_000_000)}"
2037+
df_write.index.name = index_name
2038+
2039+
# Create a BigQuery table
2040+
table_id = df_write.to_gbq()
2041+
2042+
# Extract to GCS as Avro
2043+
client = session.bqclient
2044+
extract_job_config = bigquery.ExtractJobConfig()
2045+
extract_job_config.destination_format = "AVRO"
2046+
extract_job_config.use_avro_logical_types = True
2047+
2048+
client.extract_table(table_id, write_uri, job_config=extract_job_config).result()
2049+
2050+
df_out = (
2051+
session.read_avro(read_uri, engine=engine)
2052+
.set_index(index_name)
2053+
.sort_index()
2054+
.set_index(typing.cast(str, df_in.index.name))
2055+
)
2056+
2057+
assert df_out.size != 0
2058+
pd_df_in = df_in.to_pandas()
2059+
pd_df_out = df_out.to_pandas()
2060+
bigframes.testing.utils.assert_frame_equal(pd_df_in, pd_df_out)
2061+
2062+
19382063
@pytest.mark.parametrize(
19392064
"compression",
19402065
[

0 commit comments

Comments
 (0)