Skip to content

Commit 802231f

Browse files
feat(bigframes): Support loading avro, orc data
1 parent 614a3d0 commit 802231f

File tree

4 files changed

+244
-0
lines changed

4 files changed

+244
-0
lines changed

packages/bigframes/bigframes/pandas/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@
101101
_read_gbq_colab,
102102
from_glob_path,
103103
read_arrow,
104+
read_avro,
104105
read_csv,
105106
read_gbq,
106107
read_gbq_function,
@@ -109,6 +110,7 @@
109110
read_gbq_query,
110111
read_gbq_table,
111112
read_json,
113+
read_orc,
112114
read_pandas,
113115
read_parquet,
114116
read_pickle,
@@ -459,6 +461,8 @@ def reset_session():
459461
read_pandas,
460462
read_parquet,
461463
read_pickle,
464+
read_orc,
465+
read_avro,
462466
remote_function,
463467
to_datetime,
464468
to_timedelta,
@@ -494,6 +498,8 @@ def reset_session():
494498
"read_pandas",
495499
"read_parquet",
496500
"read_pickle",
501+
"read_orc",
502+
"read_avro",
497503
"remote_function",
498504
"to_datetime",
499505
"to_timedelta",

packages/bigframes/bigframes/pandas/io/api.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -589,6 +589,38 @@ def read_parquet(
589589
read_parquet.__doc__ = inspect.getdoc(bigframes.session.Session.read_parquet)
590590

591591

592+
def read_orc(
593+
path: str | IO["bytes"],
594+
*,
595+
engine: str = "auto",
596+
write_engine: constants.WriteEngineType = "default",
597+
) -> bigframes.dataframe.DataFrame:
598+
return global_session.with_default_session(
599+
bigframes.session.Session.read_orc,
600+
path,
601+
engine=engine,
602+
write_engine=write_engine,
603+
)
604+
605+
606+
read_orc.__doc__ = inspect.getdoc(bigframes.session.Session.read_orc)
607+
608+
609+
def read_avro(
610+
path: str | IO["bytes"],
611+
*,
612+
engine: str = "auto",
613+
) -> bigframes.dataframe.DataFrame:
614+
return global_session.with_default_session(
615+
bigframes.session.Session.read_avro,
616+
path,
617+
engine=engine,
618+
)
619+
620+
621+
read_avro.__doc__ = inspect.getdoc(bigframes.session.Session.read_avro)
622+
623+
592624
def read_gbq_function(
593625
function_name: str,
594626
is_row_processor: bool = False,

packages/bigframes/bigframes/session/__init__.py

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1360,6 +1360,87 @@ def read_parquet(
13601360
)
13611361
return self._read_pandas(pandas_obj, write_engine=write_engine)
13621362

1363+
def read_orc(
1364+
self,
1365+
path: str | IO["bytes"],
1366+
*,
1367+
engine: str = "auto",
1368+
write_engine: constants.WriteEngineType = "default",
1369+
) -> dataframe.DataFrame:
1370+
"""Load an ORC file to a BigQuery DataFrames DataFrame.
1371+
1372+
Args:
1373+
path (str or IO):
1374+
The path or buffer to the ORC file. Can be a local path or Google Cloud Storage URI.
1375+
engine (str, default "auto"):
1376+
The engine used to read the file. Supported values: `auto`, `bigquery`, `pyarrow`.
1377+
write_engine (str, default "default"):
1378+
The write engine used to persist the data to BigQuery if needed.
1379+
1380+
Returns:
1381+
bigframes.dataframe.DataFrame:
1382+
A new DataFrame representing the data from the ORC file.
1383+
"""
1384+
bigframes.session.validation.validate_engine_compatibility(
1385+
engine=engine,
1386+
write_engine=write_engine,
1387+
)
1388+
if engine == "bigquery":
1389+
job_config = bigquery.LoadJobConfig()
1390+
job_config.source_format = bigquery.SourceFormat.ORC
1391+
job_config.labels = {"bigframes-api": "read_orc"}
1392+
table_id = self._loader.load_file(path, job_config=job_config)
1393+
return self._loader.read_gbq_table(table_id)
1394+
elif engine in ("auto", "pyarrow"):
1395+
if isinstance(path, str) and "*" in path:
1396+
raise ValueError(
1397+
"The provided path contains a wildcard character (*), which is not "
1398+
"supported by the current engine. To read files from wildcard paths, "
1399+
"please use the 'bigquery' engine by setting `engine='bigquery'` in "
1400+
"your configuration."
1401+
)
1402+
1403+
read_orc_kwargs: Dict[str, Any] = {}
1404+
if not pandas.__version__.startswith("1."):
1405+
read_orc_kwargs["dtype_backend"] = "pyarrow"
1406+
1407+
pandas_obj = pandas.read_orc(path, **read_orc_kwargs)
1408+
return self._read_pandas(pandas_obj, write_engine=write_engine)
1409+
else:
1410+
raise ValueError(
1411+
f"Unsupported engine: {repr(engine)}. Supported values: 'auto', 'bigquery', 'pyarrow'."
1412+
)
1413+
1414+
def read_avro(
1415+
self,
1416+
path: str | IO["bytes"],
1417+
*,
1418+
engine: str = "auto",
1419+
) -> dataframe.DataFrame:
1420+
"""Load an Avro file to a BigQuery DataFrames DataFrame.
1421+
1422+
Args:
1423+
path (str or IO):
1424+
The path or buffer to the Avro file. Can be a local path or Google Cloud Storage URI.
1425+
engine (str, default "auto"):
1426+
The engine used to read the file. Only `bigquery` is supported for Avro.
1427+
1428+
Returns:
1429+
bigframes.dataframe.DataFrame:
1430+
A new DataFrame representing the data from the Avro file.
1431+
"""
1432+
if engine not in ("auto", "bigquery"):
1433+
raise ValueError(
1434+
f"Unsupported engine: {repr(engine)}. Supported values: 'auto', 'bigquery'."
1435+
)
1436+
1437+
job_config = bigquery.LoadJobConfig()
1438+
job_config.use_avro_logical_types = True
1439+
job_config.source_format = bigquery.SourceFormat.AVRO
1440+
job_config.labels = {"bigframes-api": "read_avro"}
1441+
table_id = self._loader.load_file(path, job_config=job_config)
1442+
return self._loader.read_gbq_table(table_id)
1443+
13631444
def read_json(
13641445
self,
13651446
path_or_buf: str | IO["bytes"],

packages/bigframes/tests/system/small/test_session.py

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1932,6 +1932,131 @@ def test_read_parquet_gcs(
19321932
bigframes.testing.utils.assert_frame_equal(pd_df_in, pd_df_out)
19331933

19341934

1935+
@pytest.mark.parametrize(
1936+
("engine", "filename"),
1937+
(
1938+
pytest.param(
1939+
"bigquery",
1940+
"000000000000.orc",
1941+
id="bigquery",
1942+
),
1943+
pytest.param(
1944+
"auto",
1945+
"000000000000.orc",
1946+
id="auto",
1947+
),
1948+
pytest.param(
1949+
"pyarrow",
1950+
"000000000000.orc",
1951+
id="pyarrow",
1952+
),
1953+
pytest.param(
1954+
"bigquery",
1955+
"*.orc",
1956+
id="bigquery_wildcard",
1957+
),
1958+
pytest.param(
1959+
"auto",
1960+
"*.orc",
1961+
id="auto_wildcard",
1962+
marks=pytest.mark.xfail(
1963+
raises=ValueError,
1964+
),
1965+
),
1966+
),
1967+
)
1968+
def test_read_orc_gcs(
1969+
session: bigframes.Session, scalars_dfs, gcs_folder, engine, filename
1970+
):
1971+
pytest.importorskip(
1972+
"pandas",
1973+
minversion="2.0.0",
1974+
reason="pandas<2 does not handle nullable int columns well",
1975+
)
1976+
scalars_df, _ = scalars_dfs
1977+
write_path = gcs_folder + test_read_orc_gcs.__name__ + "000000000000.orc"
1978+
read_path = gcs_folder + test_read_orc_gcs.__name__ + filename
1979+
1980+
df_in: bigframes.dataframe.DataFrame = scalars_df.copy()
1981+
df_in = df_in.drop(
1982+
columns=[
1983+
"geography_col",
1984+
"time_col",
1985+
"datetime_col",
1986+
"duration_col",
1987+
"timestamp_col",
1988+
]
1989+
)
1990+
df_write = df_in.reset_index(drop=False)
1991+
df_write.index.name = f"ordering_id_{random.randrange(1_000_000)}"
1992+
df_write.to_orc(write_path)
1993+
1994+
df_out = (
1995+
session.read_orc(read_path, engine=engine)
1996+
.set_index(df_write.index.name)
1997+
.sort_index()
1998+
.set_index(typing.cast(str, df_in.index.name))
1999+
)
2000+
2001+
assert df_out.size != 0
2002+
pd_df_in = df_in.to_pandas()
2003+
pd_df_out = df_out.to_pandas()
2004+
bigframes.testing.utils.assert_frame_equal(pd_df_in, pd_df_out)
2005+
2006+
2007+
@pytest.mark.parametrize(
2008+
("engine", "filename"),
2009+
(
2010+
pytest.param(
2011+
"bigquery",
2012+
"000000000000.avro",
2013+
id="bigquery",
2014+
),
2015+
pytest.param(
2016+
"bigquery",
2017+
"*.avro",
2018+
id="bigquery_wildcard",
2019+
),
2020+
),
2021+
)
2022+
def test_read_avro_gcs(
2023+
session: bigframes.Session, scalars_dfs, gcs_folder, engine, filename
2024+
):
2025+
scalars_df, _ = scalars_dfs
2026+
write_uri = gcs_folder + test_read_avro_gcs.__name__ + "*.avro"
2027+
read_uri = gcs_folder + test_read_avro_gcs.__name__ + filename
2028+
2029+
df_in: bigframes.dataframe.DataFrame = scalars_df.copy()
2030+
# datetime round-trips back as str in avro
2031+
df_in = df_in.drop(columns=["geography_col", "duration_col", "datetime_col"])
2032+
df_write = df_in.reset_index(drop=False)
2033+
index_name = f"ordering_id_{random.randrange(1_000_000)}"
2034+
df_write.index.name = index_name
2035+
2036+
# Create a BigQuery table
2037+
table_id = df_write.to_gbq()
2038+
2039+
# Extract to GCS as Avro
2040+
client = session.bqclient
2041+
extract_job_config = bigquery.ExtractJobConfig()
2042+
extract_job_config.destination_format = "AVRO"
2043+
extract_job_config.use_avro_logical_types = True
2044+
2045+
client.extract_table(table_id, write_uri, job_config=extract_job_config).result()
2046+
2047+
df_out = (
2048+
session.read_avro(read_uri, engine=engine)
2049+
.set_index(index_name)
2050+
.sort_index()
2051+
.set_index(typing.cast(str, df_in.index.name))
2052+
)
2053+
2054+
assert df_out.size != 0
2055+
pd_df_in = df_in.to_pandas()
2056+
pd_df_out = df_out.to_pandas()
2057+
bigframes.testing.utils.assert_frame_equal(pd_df_in, pd_df_out)
2058+
2059+
19352060
@pytest.mark.parametrize(
19362061
"compression",
19372062
[

0 commit comments

Comments
 (0)