-
Notifications
You must be signed in to change notification settings - Fork 4
Feat: add new feature to handle reading multiple parquet dataset #257
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -28,6 +28,7 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from aodn_cloud_optimised.lib.s3Tools import ( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| create_fileset, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| delete_objects_in_prefix, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| discover_parquet_datasets, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| prefix_exists, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| split_s3_path, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -264,39 +265,41 @@ def preprocess_data_parquet( | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self, parquet_fp | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) -> Generator[Tuple[pd.DataFrame, xr.Dataset], None, None]: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Preprocesses a parquet file using pyarrow and converts it into an xarray Dataset based on the dataset configuration. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Preprocesses a parquet file or discovers and concatenates multiple parquet datasets. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Args: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| parquet_fp (str or s3fs.core.S3File): File path or s3fs object of the parquet file to be processed. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| If discover_parquet_datasets is True in the matched config, this should be the parent folder. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Yields: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Tuple[pd.DataFrame, xr.Dataset]: A generator yielding a tuple containing the processed pandas DataFrame | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| and its corresponding xarray Dataset. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| This method reads a parquet file(`parquet_fp`) using pyarrow.parquet `read_table` function. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| This method reads parquet file(s) using pyarrow and converts to pandas DataFrame and xarray Dataset. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| The resultin DataFrame (`df`) is then converted into an xarray Dataset using `xr.Dataset.from_dataframe()`. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| When discover_parquet_datasets=True: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| - Discovers all parquet datasets/files in the parent folder | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| - Reads each source individually | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| - Concatenates all tables using PyArrow (with schema promotion for compatibility) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| - Returns a single merged DataFrame and Dataset | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # TODO: Document `pq.read_table` options | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| The resulting DataFrame is converted into an xarray Dataset using `xr.Dataset.from_dataframe()`. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| The method also uses the 'schema' from the dataset configuration to assign attributes to variables in the | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| xarray Dataset. Each variable's attributes are extracted from the 'schema' and assigned to the Dataset variable's | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| attributes. The 'type' attribute from the `pyarrow_schema` is removed from the Dataset variables' attributes since it | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| is considered unnecessary. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| If a variable in the Dataset is not found in the schema, an error is logged. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| The method uses the 'schema' from the dataset configuration to assign attributes to variables in the | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| xarray Dataset. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Notes: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Ensure that the config schema includes a column named "index" of type int64. When the internal conversions | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| occur between xarray, pandas and pyarrow, an "index" column is added to the pyarrow table. Rather than | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| detect when "index" should not have been added, it is easier to add "index" as an expected column that is | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| added by the cloud optimisation process. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Ensure that the config schema includes a column named "index" of type int64. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| key_path = getattr(parquet_fp, "path", None) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| full_path = key_path if key_path.startswith("s3://") else f"s3://{key_path}" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| full_path = ( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| key_path | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if key_path and key_path.startswith("s3://") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| else f"s3://{key_path}" if key_path else str(parquet_fp) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # matching the parquet file with the correct config in the paths array | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Match the parquet file with the correct config in the paths array | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| matched_cfg = None | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| for path_cfg in self.dataset_config["run_settings"]["paths"]: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| s3_uri = path_cfg.get("s3_uri", "").rstrip("/") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -308,24 +311,116 @@ def preprocess_data_parquet( | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| raise ValueError(f"No matching path configuration found for {full_path}") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| partitioning = matched_cfg.get("partitioning", None) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| discover_mode = matched_cfg.get("discover_parquet_datasets", False) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| match partitioning: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| case None: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # reading as a single Parquet file | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| table = pq.read_table(parquet_fp) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Check if discovery mode is enabled | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if discover_mode: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self.logger.info( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| f"{self.uuid_log}: Discovery mode enabled - discovering parquet sources in {full_path}" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| case "hive": | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| key_prefix = parquet_fp.path # S3File objects have `.path` attribute | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| table = pds.dataset( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| key_prefix, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| format="parquet", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Discover all parquet sources in the folder | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| discovered_sources = discover_parquet_datasets( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| s3_uri=full_path, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| partitioning=partitioning, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| filesystem=self.s3_fs_output, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ).to_table() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| case _: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| raise ValueError( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| f"Partitioning value {partitioning} is not yet supported" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| bucket_raw=self.dataset_config["run_settings"].get( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| "bucket_raw_default_name" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| s3_fs_opts=( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self.s3_fs_output.storage_options | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if hasattr(self.s3_fs_output, "storage_options") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| else None | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| except ValueError as e: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self.logger.error(f"{self.uuid_log}: Discovery failed: {e}") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| raise | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self.logger.info( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| f"{self.uuid_log}: Discovered {len(discovered_sources)} parquet source(s), reading and concatenating..." | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Read all discovered sources and concatenate | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| tables = [] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| for idx, source_uri in enumerate(discovered_sources): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self.logger.debug( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| f"{self.uuid_log}: Reading source {idx+1}/{len(discovered_sources)}: {source_uri}" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Remove s3:// prefix when using filesystem parameter | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # PyArrow expects paths relative to the filesystem | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| source_path = source_uri.replace("s3://", "") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if partitioning == "hive": | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Read hive-partitioned dataset | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| table = pds.dataset( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| source_path, # Path without s3:// prefix | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| format="parquet", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| partitioning=partitioning, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| filesystem=self.s3_fs_output, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ).to_table() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| else: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Read single parquet file | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| table = pq.read_table(source_path, filesystem=self.s3_fs_output) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| tables.append(table) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self.logger.debug( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| f"{self.uuid_log}: Successfully read {source_uri} ({len(table)} rows)" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| except Exception as e: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self.logger.error( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| f"{self.uuid_log}: Failed to read {source_uri}: {e}" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| raise | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Concatenate all tables with schema promotion for compatibility | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self.logger.info( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| f"{self.uuid_log}: Concatenating {len(tables)} table(s)..." | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| combined_table = pa.concat_tables(tables, promote=True) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self.logger.info( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| f"{self.uuid_log}: Concatenation successful - total rows: {len(combined_table)}" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| except pa.ArrowInvalid as e: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self.logger.error( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| f"{self.uuid_log}: Schema incompatibility detected during concatenation: {e}" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Log schema details for debugging | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| for idx, table in enumerate(tables): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self.logger.error( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| f"{self.uuid_log}: Schema for source {idx+1}: {table.schema}" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| raise ValueError( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| f"Cannot concatenate parquet sources due to incompatible schemas: {e}" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) from e | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| table = combined_table | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| else: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Original single-source logic | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| match partitioning: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| case None: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # reading as a single Parquet file | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| table = pq.read_table(parquet_fp) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| case "hive": | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| key_prefix = ( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| parquet_fp.path | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) # S3File objects have `.path` attribute | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| table = pds.dataset( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| key_prefix, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| format="parquet", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| partitioning=partitioning, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| filesystem=self.s3_fs_output, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ).to_table() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| case _: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| raise ValueError( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| f"Partitioning value {partitioning} is not yet supported" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| df = table.to_pandas() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| df = df.drop(columns=self.drop_variables, errors="ignore") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -360,18 +455,47 @@ def preprocess_data( | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| If `fp` ends with ".nc", it delegates to `self.preprocess_data_netcdf(fp)`. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Elif `fp` ends with ".csv", it delegates to `self.preprocess_data_csv(fp)`. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Elif `fp` ends with ".parquet", it delegates to `self.preprocess_data_parquet(fp)`. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Elif `fp` is a folder path (ends with `/` or has no suffix) and matches a parquet discovery config, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| it delegates to `self.preprocess_data_parquet(fp)`. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Else raises a NotImplementedError | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Raises: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| NotImplementedError: Where the file type is not yet implemented | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Extract file suffix | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if isinstance(fp, str): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| file_path = fp | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| file_suffix = pathlib.Path(fp).suffix | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| elif isinstance(fp, s3fs.core.S3File): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| file_path = fp.path | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| file_suffix = pathlib.Path(fp.path).suffix | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Match preprocess method | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| else: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| file_path = str(fp) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| file_suffix = "" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Check if this is a parquet discovery case (folder path or .parquet directory) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # by checking if it matches a parquet config with discover_parquet_datasets=True | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| is_parquet_discovery = False | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if not file_suffix or file_path.endswith("/") or ".parquet" in file_path: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| for path_cfg in self.dataset_config["run_settings"]["paths"]: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if path_cfg.get("type") == "parquet" and path_cfg.get( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| "discover_parquet_datasets", False | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| s3_uri = path_cfg.get("s3_uri", "").rstrip("/") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Normalize both paths for comparison (remove s3:// prefix if present) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| normalized_file_path = file_path.replace("s3://", "").rstrip("/") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| normalized_s3_uri = s3_uri.replace("s3://", "").rstrip("/") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if ( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| normalized_file_path.startswith(normalized_s3_uri) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| or normalized_file_path == normalized_s3_uri | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+479
to
+490
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if not file_suffix or file_path.endswith("/") or ".parquet" in file_path: | |
| for path_cfg in self.dataset_config["run_settings"]["paths"]: | |
| if path_cfg.get("type") == "parquet" and path_cfg.get( | |
| "discover_parquet_datasets", False | |
| ): | |
| s3_uri = path_cfg.get("s3_uri", "").rstrip("/") | |
| # Normalize both paths for comparison (remove s3:// prefix if present) | |
| normalized_file_path = file_path.replace("s3://", "").rstrip("/") | |
| normalized_s3_uri = s3_uri.replace("s3://", "").rstrip("/") | |
| if ( | |
| normalized_file_path.startswith(normalized_s3_uri) | |
| or normalized_file_path == normalized_s3_uri | |
| # Only treat directory-like paths (no suffix or explicit trailing slash) as discovery inputs. | |
| if not file_suffix or file_path.endswith("/"): | |
| for path_cfg in self.dataset_config["run_settings"]["paths"]: | |
| if path_cfg.get("type") == "parquet" and path_cfg.get( | |
| "discover_parquet_datasets", False | |
| ): | |
| s3_uri = path_cfg.get("s3_uri", "").rstrip("/") | |
| if not s3_uri: | |
| continue | |
| # Normalize both paths for comparison (remove s3:// prefix if present) | |
| normalized_file_path = file_path.replace("s3://", "").rstrip("/") | |
| normalized_s3_uri = s3_uri.replace("s3://", "").rstrip("/") | |
| is_directory_like = not file_suffix or file_path.endswith("/") | |
| if ( | |
| normalized_file_path == normalized_s3_uri | |
| or ( | |
| is_directory_like | |
| and normalized_file_path.startswith( | |
| normalized_s3_uri + "/" | |
| ) | |
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discovery is executed via
discover_parquet_datasets(...), but the S3FS options passed here are derived fromself.s3_fs_output.storage_options(and otherwiseNone). Sincediscover_parquet_datasets()creates a brand newS3FileSystem, this can cause discovery to run against the wrong endpoint/credentials compared to the handler’s existingself.s3_fs_output(e.g., missing Motoendpoint_url). Consider changing the discovery API to accept an existing filesystem (preferred) and passself.s3_fs_outputdirectly, so listing and subsequent reads use the same backend config.