Skip to content
This repository was archived by the owner on Apr 1, 2026. It is now read-only.

Commit 8b5bca7

Browse files
feat: Support loading avro, orc data
1 parent 44e0ffd commit 8b5bca7

File tree

5 files changed

+241
-0
lines changed

5 files changed

+241
-0
lines changed

bigframes/pandas/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@
9797
_read_gbq_colab,
9898
from_glob_path,
9999
read_arrow,
100+
read_avro,
100101
read_csv,
101102
read_gbq,
102103
read_gbq_function,
@@ -105,6 +106,7 @@
105106
read_gbq_query,
106107
read_gbq_table,
107108
read_json,
109+
read_orc,
108110
read_pandas,
109111
read_parquet,
110112
read_pickle,
@@ -461,6 +463,8 @@ def reset_session():
461463
read_pandas,
462464
read_parquet,
463465
read_pickle,
466+
read_orc,
467+
read_avro,
464468
remote_function,
465469
to_datetime,
466470
to_timedelta,
@@ -496,6 +500,8 @@ def reset_session():
496500
"read_pandas",
497501
"read_parquet",
498502
"read_pickle",
503+
"read_orc",
504+
"read_avro",
499505
"remote_function",
500506
"to_datetime",
501507
"to_timedelta",

bigframes/pandas/io/api.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -600,6 +600,40 @@ def read_parquet(
600600
read_parquet.__doc__ = inspect.getdoc(bigframes.session.Session.read_parquet)
601601

602602

603+
def read_orc(
604+
path: str | IO["bytes"],
605+
*,
606+
engine: str = "auto",
607+
write_engine: constants.WriteEngineType = "default",
608+
) -> bigframes.dataframe.DataFrame:
609+
return global_session.with_default_session(
610+
bigframes.session.Session.read_orc,
611+
path,
612+
engine=engine,
613+
write_engine=write_engine,
614+
)
615+
616+
617+
read_orc.__doc__ = inspect.getdoc(bigframes.session.Session.read_orc)
618+
619+
620+
def read_avro(
621+
path: str | IO["bytes"],
622+
*,
623+
engine: str = "bigquery",
624+
write_engine: constants.WriteEngineType = "default",
625+
) -> bigframes.dataframe.DataFrame:
626+
return global_session.with_default_session(
627+
bigframes.session.Session.read_avro,
628+
path,
629+
engine=engine,
630+
write_engine=write_engine,
631+
)
632+
633+
634+
read_avro.__doc__ = inspect.getdoc(bigframes.session.Session.read_avro)
635+
636+
603637
def read_gbq_function(
604638
function_name: str,
605639
is_row_processor: bool = False,

bigframes/session/__init__.py

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1369,6 +1369,87 @@ def read_parquet(
13691369
)
13701370
return self._read_pandas(pandas_obj, write_engine=write_engine)
13711371

1372+
def read_orc(
1373+
self,
1374+
path: str | IO["bytes"],
1375+
*,
1376+
engine: str = "auto",
1377+
write_engine: constants.WriteEngineType = "default",
1378+
) -> dataframe.DataFrame:
1379+
"""Load an ORC file to a BigQuery DataFrames DataFrame.
1380+
1381+
Args:
1382+
path (str or IO):
1383+
The path or buffer to the ORC file. Can be a local path or Google Cloud Storage URI.
1384+
engine (str, default "auto"):
1385+
The engine used to read the file. Supported values: `auto`, `bigquery`, `pyarrow`.
1386+
write_engine (str, default "default"):
1387+
The write engine used to persist the data to BigQuery if needed.
1388+
1389+
Returns:
1390+
bigframes.dataframe.DataFrame:
1391+
A new DataFrame representing the data from the ORC file.
1392+
"""
1393+
bigframes.session.validation.validate_engine_compatibility(
1394+
engine=engine,
1395+
write_engine=write_engine,
1396+
)
1397+
if engine == "bigquery":
1398+
job_config = bigquery.LoadJobConfig()
1399+
job_config.source_format = bigquery.SourceFormat.ORC
1400+
job_config.labels = {"bigframes-api": "read_orc"}
1401+
table_id = self._loader.load_file(path, job_config=job_config)
1402+
return self._loader.read_gbq_table(table_id)
1403+
elif engine in ("auto", "pyarrow"):
1404+
if isinstance(path, str) and "*" in path:
1405+
raise ValueError(
1406+
"The provided path contains a wildcard character (*), which is not "
1407+
"supported by the current engine. To read files from wildcard paths, "
1408+
"please use the 'bigquery' engine by setting `engine='bigquery'` in "
1409+
"your configuration."
1410+
)
1411+
1412+
read_orc_kwargs: Dict[str, Any] = {}
1413+
if not pandas.__version__.startswith("1."):
1414+
read_orc_kwargs["dtype_backend"] = "pyarrow"
1415+
1416+
pandas_obj = pandas.read_orc(path, **read_orc_kwargs)
1417+
return self._read_pandas(pandas_obj, write_engine=write_engine)
1418+
else:
1419+
raise ValueError(
1420+
f"Unsupported engine: {repr(engine)}. Supported values: 'auto', 'bigquery', 'pyarrow'."
1421+
)
1422+
1423+
def read_avro(
1424+
self,
1425+
path: str | IO["bytes"],
1426+
*,
1427+
engine: str = "default",
1428+
) -> dataframe.DataFrame:
1429+
"""Load an Avro file to a BigQuery DataFrames DataFrame.
1430+
1431+
Args:
1432+
path (str or IO):
1433+
The path or buffer to the Avro file. Can be a local path or Google Cloud Storage URI.
1434+
engine (str, default "default"):
1435+
The engine used to read the file. Only `bigquery` is supported for Avro.
1436+
1437+
Returns:
1438+
bigframes.dataframe.DataFrame:
1439+
A new DataFrame representing the data from the Avro file.
1440+
"""
1441+
if engine not in ("default", "bigquery"):
1442+
raise ValueError(
1443+
f"Unsupported engine: {repr(engine)}. Supported values: 'default', 'bigquery'."
1444+
)
1445+
1446+
job_config = bigquery.LoadJobConfig()
1447+
job_config.use_avro_logical_types = True
1448+
job_config.source_format = bigquery.SourceFormat.AVRO
1449+
job_config.labels = {"bigframes-api": "read_avro"}
1450+
table_id = self._loader.load_file(path, job_config=job_config)
1451+
return self._loader.read_gbq_table(table_id)
1452+
13721453
def read_json(
13731454
self,
13741455
path_or_buf: str | IO["bytes"],

test_types.orc

406 Bytes
Binary file not shown.

tests/system/small/test_session.py

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1931,6 +1931,126 @@ def test_read_parquet_gcs(
19311931
bigframes.testing.utils.assert_frame_equal(pd_df_in, pd_df_out)
19321932

19331933

1934+
@pytest.mark.parametrize(
1935+
("engine", "filename"),
1936+
(
1937+
pytest.param(
1938+
"bigquery",
1939+
"000000000000.orc",
1940+
id="bigquery",
1941+
),
1942+
pytest.param(
1943+
"auto",
1944+
"000000000000.orc",
1945+
id="auto",
1946+
),
1947+
pytest.param(
1948+
"pyarrow",
1949+
"000000000000.orc",
1950+
id="pyarrow",
1951+
),
1952+
pytest.param(
1953+
"bigquery",
1954+
"*.orc",
1955+
id="bigquery_wildcard",
1956+
),
1957+
pytest.param(
1958+
"auto",
1959+
"*.orc",
1960+
id="auto_wildcard",
1961+
marks=pytest.mark.xfail(
1962+
raises=ValueError,
1963+
),
1964+
),
1965+
),
1966+
)
1967+
def test_read_orc_gcs(
1968+
session: bigframes.Session, scalars_dfs, gcs_folder, engine, filename
1969+
):
1970+
scalars_df, _ = scalars_dfs
1971+
write_path = gcs_folder + test_read_orc_gcs.__name__ + "000000000000.orc"
1972+
read_path = gcs_folder + test_read_orc_gcs.__name__ + filename
1973+
1974+
df_in: bigframes.dataframe.DataFrame = scalars_df.copy()
1975+
df_in = df_in.drop(
1976+
columns=[
1977+
"geography_col",
1978+
"time_col",
1979+
"datetime_col",
1980+
"duration_col",
1981+
"timestamp_col",
1982+
]
1983+
)
1984+
df_write = df_in.reset_index(drop=False)
1985+
df_write.index.name = f"ordering_id_{random.randrange(1_000_000)}"
1986+
df_write.to_orc(write_path)
1987+
1988+
df_out = (
1989+
session.read_orc(read_path, engine=engine)
1990+
.set_index(df_write.index.name)
1991+
.sort_index()
1992+
.set_index(typing.cast(str, df_in.index.name))
1993+
)
1994+
1995+
assert df_out.size != 0
1996+
pd_df_in = df_in.to_pandas()
1997+
pd_df_out = df_out.to_pandas()
1998+
bigframes.testing.utils.assert_frame_equal(pd_df_in, pd_df_out)
1999+
2000+
2001+
@pytest.mark.parametrize(
2002+
("engine", "filename"),
2003+
(
2004+
pytest.param(
2005+
"bigquery",
2006+
"000000000000.avro",
2007+
id="bigquery",
2008+
),
2009+
pytest.param(
2010+
"bigquery",
2011+
"*.avro",
2012+
id="bigquery_wildcard",
2013+
),
2014+
),
2015+
)
2016+
def test_read_avro_gcs(
2017+
session: bigframes.Session, scalars_dfs, gcs_folder, engine, filename
2018+
):
2019+
scalars_df, _ = scalars_dfs
2020+
write_uri = gcs_folder + test_read_avro_gcs.__name__ + "*.avro"
2021+
read_uri = gcs_folder + test_read_avro_gcs.__name__ + filename
2022+
2023+
df_in: bigframes.dataframe.DataFrame = scalars_df.copy()
2024+
# datetime round-trips back as str in avro
2025+
df_in = df_in.drop(columns=["geography_col", "duration_col", "datetime_col"])
2026+
df_write = df_in.reset_index(drop=False)
2027+
index_name = f"ordering_id_{random.randrange(1_000_000)}"
2028+
df_write.index.name = index_name
2029+
2030+
# Create a BigQuery table
2031+
table_id = df_write.to_gbq()
2032+
2033+
# Extract to GCS as Avro
2034+
client = session.bqclient
2035+
extract_job_config = bigquery.ExtractJobConfig()
2036+
extract_job_config.destination_format = "AVRO"
2037+
extract_job_config.use_avro_logical_types = True
2038+
2039+
client.extract_table(table_id, write_uri, job_config=extract_job_config).result()
2040+
2041+
df_out = (
2042+
session.read_avro(read_uri, engine=engine)
2043+
.set_index(index_name)
2044+
.sort_index()
2045+
.set_index(typing.cast(str, df_in.index.name))
2046+
)
2047+
2048+
assert df_out.size != 0
2049+
pd_df_in = df_in.to_pandas()
2050+
pd_df_out = df_out.to_pandas()
2051+
bigframes.testing.utils.assert_frame_equal(pd_df_in, pd_df_out)
2052+
2053+
19342054
@pytest.mark.parametrize(
19352055
"compression",
19362056
[

0 commit comments

Comments
 (0)