Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .github/instructions/*.instructions.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
# GitHub Copilot Instructions for aodn_cloud_optimised

## Core Architecture & Documentation
- **Sphinx Documentation:** The source of truth for documentation is in the `docs/` folder.
- **Sphinx Documentation:** The source of truth for documentation is in the `docs/` folder.
- Whenever a PR modifies core library logic in `aodn_cloud_optimised/lib` or `aodn_cloud_optimised/bin`, ensure corresponding changes are suggested for the `.rst` or `.md` files in `docs/`. Have a good read of the docs already available. Don't hesitate to suggest other changes, maybe not directly related (in another section)
- If a new feature is added, prompt the user to add a new section in the documentation and provide what to add and where in the doc.
- Check for "Breaking Changes", flag it immediately and insist on a "Migration Guide" entry in the docs.

## Notebooks & Datasets
- **Notebooks Folder:** This repository uses Jupyter Notebooks (`notebooks/`) for demonstration and data processing.
- If a PR updates configuration for a new dataset (under `aodn_cloud_optimised/config/dataset`), especially for new year_range or new paths of data added , verify that the notebook examples are updated to reflect the new config. They have similar filenames.
- If a PR updates configuration for a new dataset (under `aodn_cloud_optimised/config/dataset`), especially for new year_range or new paths of data added , verify that the notebook examples are updated to reflect the new config. They have similar filenames.
- **Configs**:
- under `aodn_cloud_optimised/config/dataset`, Check the values of clear_existing_data and force_previous_parquet_deletaion and that the user understand their implication (explained in the docs and code).
- disregard the empty NetCDF files in that directory, they're only here for a dataset template, coordination size ...
Expand All @@ -17,9 +17,9 @@
- Don't suggest values for the tags in the AWS config part. But suggest to update it and get it from our metadata expert

## Dependency Management (pyproject.toml)
- **Pinning Policy:** Be extremely strict with `pyproject.toml`.
- **Pinning Policy:** Be extremely strict with `pyproject.toml`.
- Check the comments/notes within `pyproject.toml` regarding version pinning.
- Do not suggest unpinned dependencies (e.g., `package = "*"`) unless specifically allowed by the internal notes.
- Do not suggest unpinned dependencies (e.g., `package = "*"`) unless specifically allowed by the internal notes.
- If a dependency is updated, remind the user to check for compatibility with the pinned versions of core scientific libraries (numpy, xarray, etc.).

## Code Quality & Style
Expand Down
26 changes: 24 additions & 2 deletions aodn_cloud_optimised/bin/generic_cloud_optimised_creation.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,11 @@
load_dataset_config,
load_variable_from_config,
)
from aodn_cloud_optimised.lib.s3Tools import boto3_from_opts_dict, s3_ls
from aodn_cloud_optimised.lib.s3Tools import (
boto3_from_opts_dict,
discover_parquet_datasets,
s3_ls,
)
from aodn_cloud_optimised.lib.schema import get_pyarrow_type_map

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -76,6 +80,7 @@ class PathConfig(BaseModel):
partitioning: Optional, used only for Parquet datasets (e.g., "hive").
filter: List of regex patterns to filter files (only valid for type="files").
year_range: Optional Year filter: None, one year, or a two-year inclusive range, or a list of exclusive years to process. (only valid for type="files")
discover_parquet_datasets: If True, discover and process multiple parquet datasets/files in the s3_uri folder. Only valid when type='parquet'.

"""

Expand All @@ -96,6 +101,16 @@ class PathConfig(BaseModel):
default=None,
description="Must be None (no filtering), a single year [YYYY], a two-year range [YYYY, YYYY], or a list of exclusive years to process [YYYY, YYYY, YYYY]",
)
discover_parquet_datasets: bool = Field(
default=False,
description=(
"If True, discover and process all parquet datasets/files in the s3_uri folder. "
"For hive partitioning: discovers subdirectories ending with .parquet. "
"For non-hive: discovers files ending with .parquet at first level. "
"All discovered sources are read and concatenated into a single output. "
"Only valid when type='parquet'."
),
)

@field_validator("year_range", mode="after")
def validate_year_range(cls, v: Optional[List[int]]) -> Optional[List[int]]:
Expand Down Expand Up @@ -207,6 +222,12 @@ def validate_cross_fields(cls, values):
"type must be defined as 'zarr' in run_settings.paths config if ingesting a zarr dataset."
)

# Validate discover_parquet_datasets only valid for parquet type
if values.discover_parquet_datasets and dataset_type != "parquet":
raise ValueError(
"discover_parquet_datasets can only be True when type='parquet'"
)

if dataset_type == "parquet":
if values.filter:
raise ValueError("filter must not be defined when type='parquet'")
Expand Down Expand Up @@ -1260,7 +1281,8 @@ def collect_files(

Supports:
- 'files': lists and filters regular files (e.g., NetCDF, CSV)
- 'parquet': handles both single Parquet files and Hive-partitioned datasets
- 'parquet': returns the s3_uri path. If discover_parquet_datasets=True,
the handler will discover and process multiple parquet sources internally.
- 'zarr': returns the Zarr store path directly

Args:
Expand Down
188 changes: 156 additions & 32 deletions aodn_cloud_optimised/lib/GenericParquetHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from aodn_cloud_optimised.lib.s3Tools import (
create_fileset,
delete_objects_in_prefix,
discover_parquet_datasets,
prefix_exists,
split_s3_path,
)
Expand Down Expand Up @@ -264,39 +265,41 @@ def preprocess_data_parquet(
self, parquet_fp
) -> Generator[Tuple[pd.DataFrame, xr.Dataset], None, None]:
"""
Preprocesses a parquet file using pyarrow and converts it into an xarray Dataset based on the dataset configuration.
Preprocesses a parquet file or discovers and concatenates multiple parquet datasets.

Args:
parquet_fp (str or s3fs.core.S3File): File path or s3fs object of the parquet file to be processed.
If discover_parquet_datasets is True in the matched config, this should be the parent folder.

Yields:
Tuple[pd.DataFrame, xr.Dataset]: A generator yielding a tuple containing the processed pandas DataFrame
and its corresponding xarray Dataset.

This method reads a parquet file(`parquet_fp`) using pyarrow.parquet `read_table` function.
This method reads parquet file(s) using pyarrow and converts to pandas DataFrame and xarray Dataset.

The resultin DataFrame (`df`) is then converted into an xarray Dataset using `xr.Dataset.from_dataframe()`.
When discover_parquet_datasets=True:
- Discovers all parquet datasets/files in the parent folder
- Reads each source individually
- Concatenates all tables using PyArrow (with schema promotion for compatibility)
- Returns a single merged DataFrame and Dataset

# TODO: Document `pq.read_table` options
The resulting DataFrame is converted into an xarray Dataset using `xr.Dataset.from_dataframe()`.

The method also uses the 'schema' from the dataset configuration to assign attributes to variables in the
xarray Dataset. Each variable's attributes are extracted from the 'schema' and assigned to the Dataset variable's
attributes. The 'type' attribute from the `pyarrow_schema` is removed from the Dataset variables' attributes since it
is considered unnecessary.

If a variable in the Dataset is not found in the schema, an error is logged.
The method uses the 'schema' from the dataset configuration to assign attributes to variables in the
xarray Dataset.

Notes:
Ensure that the config schema includes a column named "index" of type int64. When the internal conversions
occur between xarray, pandas and pyarrow, an "index" column is added to the pyarrow table. Rather than
detect when "index" should not have been added, it is easier to add "index" as an expected column that is
added by the cloud optimisation process.
Ensure that the config schema includes a column named "index" of type int64.
"""

key_path = getattr(parquet_fp, "path", None)
full_path = key_path if key_path.startswith("s3://") else f"s3://{key_path}"
full_path = (
key_path
if key_path and key_path.startswith("s3://")
else f"s3://{key_path}" if key_path else str(parquet_fp)
)

# matching the parquet file with the correct config in the paths array
# Match the parquet file with the correct config in the paths array
matched_cfg = None
for path_cfg in self.dataset_config["run_settings"]["paths"]:
s3_uri = path_cfg.get("s3_uri", "").rstrip("/")
Expand All @@ -308,24 +311,116 @@ def preprocess_data_parquet(
raise ValueError(f"No matching path configuration found for {full_path}")

partitioning = matched_cfg.get("partitioning", None)
discover_mode = matched_cfg.get("discover_parquet_datasets", False)

match partitioning:
case None:
# reading as a single Parquet file
table = pq.read_table(parquet_fp)
# Check if discovery mode is enabled
if discover_mode:
self.logger.info(
f"{self.uuid_log}: Discovery mode enabled - discovering parquet sources in {full_path}"
)

case "hive":
key_prefix = parquet_fp.path # S3File objects have `.path` attribute
table = pds.dataset(
key_prefix,
format="parquet",
# Discover all parquet sources in the folder
try:
discovered_sources = discover_parquet_datasets(
s3_uri=full_path,
partitioning=partitioning,
filesystem=self.s3_fs_output,
).to_table()
case _:
raise ValueError(
f"Partitioning value {partitioning} is not yet supported"
bucket_raw=self.dataset_config["run_settings"].get(
"bucket_raw_default_name"
),
s3_fs_opts=(
self.s3_fs_output.storage_options
if hasattr(self.s3_fs_output, "storage_options")
else None
),
)
Comment on lines +323 to 335

Copilot AI Mar 4, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discovery is executed via discover_parquet_datasets(...), but the S3FS options passed here are derived from self.s3_fs_output.storage_options (and otherwise None). Since discover_parquet_datasets() creates a brand new S3FileSystem, this can cause discovery to run against the wrong endpoint/credentials compared to the handler’s existing self.s3_fs_output (e.g., missing Moto endpoint_url). Consider changing the discovery API to accept an existing filesystem (preferred) and pass self.s3_fs_output directly, so listing and subsequent reads use the same backend config.

Copilot uses AI. Check for mistakes.
except ValueError as e:
self.logger.error(f"{self.uuid_log}: Discovery failed: {e}")
raise

self.logger.info(
f"{self.uuid_log}: Discovered {len(discovered_sources)} parquet source(s), reading and concatenating..."
)

# Read all discovered sources and concatenate
tables = []
for idx, source_uri in enumerate(discovered_sources):
self.logger.debug(
f"{self.uuid_log}: Reading source {idx+1}/{len(discovered_sources)}: {source_uri}"
)

try:
# Remove s3:// prefix when using filesystem parameter
# PyArrow expects paths relative to the filesystem
source_path = source_uri.replace("s3://", "")

if partitioning == "hive":
# Read hive-partitioned dataset
table = pds.dataset(
source_path, # Path without s3:// prefix
format="parquet",
partitioning=partitioning,
filesystem=self.s3_fs_output,
).to_table()
else:
# Read single parquet file
table = pq.read_table(source_path, filesystem=self.s3_fs_output)

tables.append(table)
self.logger.debug(
f"{self.uuid_log}: Successfully read {source_uri} ({len(table)} rows)"
)

except Exception as e:
self.logger.error(
f"{self.uuid_log}: Failed to read {source_uri}: {e}"
)
raise

# Concatenate all tables with schema promotion for compatibility
try:
self.logger.info(
f"{self.uuid_log}: Concatenating {len(tables)} table(s)..."
)
combined_table = pa.concat_tables(tables, promote=True)
self.logger.info(
f"{self.uuid_log}: Concatenation successful - total rows: {len(combined_table)}"
)
except pa.ArrowInvalid as e:
self.logger.error(
f"{self.uuid_log}: Schema incompatibility detected during concatenation: {e}"
)
# Log schema details for debugging
for idx, table in enumerate(tables):
self.logger.error(
f"{self.uuid_log}: Schema for source {idx+1}: {table.schema}"
)
raise ValueError(
f"Cannot concatenate parquet sources due to incompatible schemas: {e}"
) from e

table = combined_table

else:
# Original single-source logic
match partitioning:
case None:
# reading as a single Parquet file
table = pq.read_table(parquet_fp)

case "hive":
key_prefix = (
parquet_fp.path
) # S3File objects have `.path` attribute
table = pds.dataset(
key_prefix,
format="parquet",
partitioning=partitioning,
filesystem=self.s3_fs_output,
).to_table()
case _:
raise ValueError(
f"Partitioning value {partitioning} is not yet supported"
)

df = table.to_pandas()
df = df.drop(columns=self.drop_variables, errors="ignore")
Expand Down Expand Up @@ -360,18 +455,47 @@ def preprocess_data(
If `fp` ends with ".nc", it delegates to `self.preprocess_data_netcdf(fp)`.
Elif `fp` ends with ".csv", it delegates to `self.preprocess_data_csv(fp)`.
Elif `fp` ends with ".parquet", it delegates to `self.preprocess_data_parquet(fp)`.
Elif `fp` is a folder path (ends with `/` or has no suffix) and matches a parquet discovery config,
it delegates to `self.preprocess_data_parquet(fp)`.
Else raises a NotImplementedError

Raises:
NotImplementedError: Where the file type is not yet implemented
"""
# Extract file suffix
if isinstance(fp, str):
file_path = fp
file_suffix = pathlib.Path(fp).suffix
elif isinstance(fp, s3fs.core.S3File):
file_path = fp.path
file_suffix = pathlib.Path(fp.path).suffix

# Match preprocess method
else:
file_path = str(fp)
file_suffix = ""

# Check if this is a parquet discovery case (folder path or .parquet directory)
# by checking if it matches a parquet config with discover_parquet_datasets=True
is_parquet_discovery = False
if not file_suffix or file_path.endswith("/") or ".parquet" in file_path:
for path_cfg in self.dataset_config["run_settings"]["paths"]:
if path_cfg.get("type") == "parquet" and path_cfg.get(
"discover_parquet_datasets", False
):
s3_uri = path_cfg.get("s3_uri", "").rstrip("/")
# Normalize both paths for comparison (remove s3:// prefix if present)
normalized_file_path = file_path.replace("s3://", "").rstrip("/")
normalized_s3_uri = s3_uri.replace("s3://", "").rstrip("/")
if (
normalized_file_path.startswith(normalized_s3_uri)
or normalized_file_path == normalized_s3_uri
Comment on lines +479 to +490

Copilot AI Mar 4, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The discovery detection uses normalized_file_path.startswith(normalized_s3_uri), which will treat any parquet file inside the configured folder as a discovery input. If a caller passes a single parquet file path under that folder, the handler will attempt discovery using the file path as the parent and likely fail. Consider tightening this check to only trigger when the input path equals the configured discovery folder (after normalization), and/or when the input has no filename component (ends with /).

Suggested change
if not file_suffix or file_path.endswith("/") or ".parquet" in file_path:
for path_cfg in self.dataset_config["run_settings"]["paths"]:
if path_cfg.get("type") == "parquet" and path_cfg.get(
"discover_parquet_datasets", False
):
s3_uri = path_cfg.get("s3_uri", "").rstrip("/")
# Normalize both paths for comparison (remove s3:// prefix if present)
normalized_file_path = file_path.replace("s3://", "").rstrip("/")
normalized_s3_uri = s3_uri.replace("s3://", "").rstrip("/")
if (
normalized_file_path.startswith(normalized_s3_uri)
or normalized_file_path == normalized_s3_uri
# Only treat directory-like paths (no suffix or explicit trailing slash) as discovery inputs.
if not file_suffix or file_path.endswith("/"):
for path_cfg in self.dataset_config["run_settings"]["paths"]:
if path_cfg.get("type") == "parquet" and path_cfg.get(
"discover_parquet_datasets", False
):
s3_uri = path_cfg.get("s3_uri", "").rstrip("/")
if not s3_uri:
continue
# Normalize both paths for comparison (remove s3:// prefix if present)
normalized_file_path = file_path.replace("s3://", "").rstrip("/")
normalized_s3_uri = s3_uri.replace("s3://", "").rstrip("/")
is_directory_like = not file_suffix or file_path.endswith("/")
if (
normalized_file_path == normalized_s3_uri
or (
is_directory_like
and normalized_file_path.startswith(
normalized_s3_uri + "/"
)
)

Copilot uses AI. Check for mistakes.
):
is_parquet_discovery = True
break

if is_parquet_discovery:
return self.preprocess_data_parquet(fp)

# Match preprocess method by suffix
match file_suffix.lower():
case ".nc":
return self.preprocess_data_netcdf(fp)
Expand Down
Loading
Loading