Skip to content

Commit d3377d1

Browse files
committed
refactor file format configs into private helpers, updated error message
1 parent 9fe594c commit d3377d1

1 file changed

Lines changed: 94 additions & 74 deletions

File tree

python/lib/sift_client/resources/data_imports.py

Lines changed: 94 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -120,12 +120,7 @@ async def import_from_path(
120120
if isinstance(
121121
config, (ParquetFlatDatasetImportConfig, ParquetSingleChannelPerRowImportConfig)
122122
):
123-
if config.footer_offset == 0 and config.footer_length == 0:
124-
footer_bytes, footer_offset = await run_sync_function(
125-
lambda: extract_parquet_footer(path)
126-
)
127-
config.footer_offset = footer_offset
128-
config.footer_length = len(footer_bytes)
123+
await _prepare_parquet_config(config, path)
129124

130125
if show_progress is None:
131126
show_progress = resolve_show_progress(is_sync=getattr(self, "_is_sync", False))
@@ -191,23 +186,7 @@ async def detect_config(
191186
if not path.is_file():
192187
raise FileNotFoundError(f"File not found: {file_path}")
193188

194-
ext = path.suffix.lower()
195-
if ext in (".parquet", ".pqt"):
196-
if data_type is None:
197-
raise ValueError(
198-
"Parquet files require 'data_type' to be specified. "
199-
"Use DataTypeKey.PARQUET_FLATDATASET or DataTypeKey.PARQUET_SINGLE_CHANNEL_PER_ROW."
200-
)
201-
data_type_key = data_type
202-
elif data_type is not None:
203-
data_type_key = data_type
204-
else:
205-
if ext not in EXTENSION_TO_DATA_TYPE_KEY:
206-
raise ValueError(
207-
f"Unsupported file extension '{ext}'. "
208-
f"Supported: {', '.join(sorted(EXTENSION_TO_DATA_TYPE_KEY))}"
209-
)
210-
data_type_key = EXTENSION_TO_DATA_TYPE_KEY[ext]
189+
data_type_key = _resolve_data_type_key(path.suffix.lower(), data_type)
211190

212191
is_parquet = data_type_key in (
213192
DataTypeKey.PARQUET_FLATDATASET,
@@ -234,58 +213,99 @@ def _read_sample() -> bytes:
234213
response = await self._low_level_client.detect_config(sample, data_type_key.value)
235214

236215
if response.HasField("csv_config"):
237-
csv_config = CsvImportConfig._from_proto(response.csv_config)
238-
# Filter out the time column from data_columns to avoid overlap.
239-
time_col = csv_config.time_column.column
240-
csv_config.data_columns = [
241-
dc for dc in csv_config.data_columns if dc.column != time_col
242-
]
243-
if not csv_config.data_columns:
244-
raise ValueError(f"No data columns detected in '{path.name}'.")
245-
return csv_config
216+
return _parse_csv_detect_response(response.csv_config, path.name)
246217

247218
if response.HasField("parquet_config"):
248-
proto = response.parquet_config
249-
if proto.HasField("flat_dataset"):
250-
parquet_config = ParquetFlatDatasetImportConfig._from_proto(
251-
proto, footer_offset=footer_offset, footer_length=footer_length
252-
)
253-
# Filter out the time column from data_columns to avoid overlap.
254-
time_path = parquet_config.time_column.path
255-
if time_path:
256-
parquet_config.data_columns = [
257-
dc for dc in parquet_config.data_columns if dc.path != time_path
258-
]
259-
else:
260-
# The backend only detects arrow timestamp types. Fall back to
261-
# an integer column whose name starts with "time".
262-
_integer_types = {
263-
ChannelDataType.INT_32,
264-
ChannelDataType.INT_64,
265-
ChannelDataType.UINT_32,
266-
ChannelDataType.UINT_64,
267-
}
268-
match = None
269-
for dc in parquet_config.data_columns:
270-
if dc.data_type in _integer_types and dc.name.lower().startswith("time"):
271-
match = dc
272-
break
273-
if match is not None:
274-
parquet_config.time_column = ParquetTimeColumn(path=match.path)
275-
parquet_config.data_columns = [
276-
c for c in parquet_config.data_columns if c.path != match.path
277-
]
278-
if not parquet_config.time_column.path:
279-
raise ValueError(
280-
f"No time column detected in '{path.name}'. "
281-
"Set the time column manually on the config before importing."
282-
)
283-
if not parquet_config.data_columns:
284-
raise ValueError(f"No data columns detected in '{path.name}'.")
285-
return parquet_config
286-
elif proto.HasField("single_channel_per_row"):
287-
return ParquetSingleChannelPerRowImportConfig._from_proto(
288-
proto, footer_offset=footer_offset, footer_length=footer_length
289-
)
219+
return _parse_parquet_detect_response(
220+
response.parquet_config, path.name, footer_offset, footer_length
221+
)
290222

291223
raise ValueError("Server returned an empty DetectConfig response.")
224+
225+
226+
def _resolve_data_type_key(ext: str, data_type: DataTypeKey | None) -> DataTypeKey:
227+
"""Resolve the data type key from file extension and explicit override."""
228+
if ext in (".parquet", ".pqt"):
229+
if data_type is None:
230+
raise ValueError(
231+
"Parquet files require 'data_type' to be specified. "
232+
"Use DataTypeKey.PARQUET_FLATDATASET or DataTypeKey.PARQUET_SINGLE_CHANNEL_PER_ROW."
233+
)
234+
return data_type
235+
if data_type is not None:
236+
return data_type
237+
if ext not in EXTENSION_TO_DATA_TYPE_KEY:
238+
raise ValueError(
239+
f"Unsupported file extension '{ext}'. "
240+
f"Supported: {', '.join(sorted(EXTENSION_TO_DATA_TYPE_KEY))}. "
241+
"You can also specify 'data_type' explicitly using a DataTypeKey value."
242+
)
243+
return EXTENSION_TO_DATA_TYPE_KEY[ext]
244+
245+
246+
def _parse_csv_detect_response(proto, filename: str) -> CsvImportConfig:
247+
"""Parse a CSV DetectConfig response into a config."""
248+
csv_config = CsvImportConfig._from_proto(proto)
249+
time_col = csv_config.time_column.column
250+
csv_config.data_columns = [dc for dc in csv_config.data_columns if dc.column != time_col]
251+
if not csv_config.data_columns:
252+
raise ValueError(f"No data columns detected in '{filename}'.")
253+
return csv_config
254+
255+
256+
def _parse_parquet_detect_response(
257+
proto, filename: str, footer_offset: int, footer_length: int
258+
) -> ParquetFlatDatasetImportConfig | ParquetSingleChannelPerRowImportConfig:
259+
"""Parse a Parquet DetectConfig response into a config."""
260+
if proto.HasField("flat_dataset"):
261+
parquet_config = ParquetFlatDatasetImportConfig._from_proto(
262+
proto, footer_offset=footer_offset, footer_length=footer_length
263+
)
264+
time_path = parquet_config.time_column.path
265+
if time_path:
266+
parquet_config.data_columns = [
267+
dc for dc in parquet_config.data_columns if dc.path != time_path
268+
]
269+
else:
270+
# The backend only detects arrow timestamp types. Fall back to
271+
# an integer column whose name starts with "time".
272+
_integer_types = {
273+
ChannelDataType.INT_32,
274+
ChannelDataType.INT_64,
275+
ChannelDataType.UINT_32,
276+
ChannelDataType.UINT_64,
277+
}
278+
match = None
279+
for dc in parquet_config.data_columns:
280+
if dc.data_type in _integer_types and dc.name.lower().startswith("time"):
281+
match = dc
282+
break
283+
if match is not None:
284+
parquet_config.time_column = ParquetTimeColumn(path=match.path)
285+
parquet_config.data_columns = [
286+
c for c in parquet_config.data_columns if c.path != match.path
287+
]
288+
if not parquet_config.time_column.path:
289+
raise ValueError(
290+
f"No time column detected in '{filename}'. "
291+
"Set the time column manually on the config before importing."
292+
)
293+
if not parquet_config.data_columns:
294+
raise ValueError(f"No data columns detected in '{filename}'.")
295+
return parquet_config
296+
elif proto.HasField("single_channel_per_row"):
297+
return ParquetSingleChannelPerRowImportConfig._from_proto(
298+
proto, footer_offset=footer_offset, footer_length=footer_length
299+
)
300+
raise ValueError(f"Unsupported parquet layout in DetectConfig response for '{filename}'.")
301+
302+
303+
async def _prepare_parquet_config(
304+
config: ParquetFlatDatasetImportConfig | ParquetSingleChannelPerRowImportConfig,
305+
path: Path,
306+
) -> None:
307+
"""Populate parquet footer fields on the config if not already set."""
308+
if config.footer_offset == 0 and config.footer_length == 0:
309+
footer_bytes, footer_offset = await run_sync_function(lambda: extract_parquet_footer(path))
310+
config.footer_offset = footer_offset
311+
config.footer_length = len(footer_bytes)

0 commit comments

Comments
 (0)