Feat: add new feature to handle reading multiple parquet dataset#257
Feat: add new feature to handle reading multiple parquet dataset#257lbesnard wants to merge 3 commits into
Conversation
Plan: Support Multiple Parquet Datasets/Files in a FolderProblem StatementCurrently, the
Example current usage: {
"type": "parquet",
"partitioning": "hive",
"s3_uri": "s3://aodn-cloud-optimised/wave_buoy_realtime_nonqc.parquet"
}This reads ONE hive-partitioned dataset at that path. The feature request is to extend
All discovered parquet datasets/files should be read and concatenated into a single output parquet dataset. Current Implementation AnalysisCurrent Architecture
How It Currently WorksHive Example:
Non-Hive Example:
Proposed SolutionApproach: Auto-Discover Parquet Sources in a FolderKeep the current Current behavior:
New behavior:
Benefits:
Implementation Changes1. Pydantic Config ChangesPathConfig Model (
2. Processing Logic Changescollect_files() function (lines 1252-1334):
GenericParquetHandler.preprocess_data_parquet() (lines 263-345):
3. Discovery Logic ImplementationAdd new helper function def discover_parquet_datasets(
s3_uri: str,
partitioning: Optional[str],
s3_fs: s3fs.S3FileSystem
) -> List[str]:
"""Discover parquet datasets or files in a folder.
Args:
s3_uri: S3 path to folder containing parquet sources
partitioning: "hive" for hive datasets, None for flat files
s3_fs: S3 filesystem instance
Returns:
List of S3 paths to parquet datasets/files
"""
# Parse s3_uri to get bucket and prefix
# List contents at first level
# Filter based on partitioning mode
# Return discovered pathsLogic:
4. Concatenation LogicKey Question: How does the current framework handle multiple inputs? Looking at the code flow:
Current behavior: When processing multiple NetCDF files (type="files"), each file creates its own output. The framework doesn't auto-concatenate. For this feature: We need the discovered parquet sources to be concatenated into ONE output. Two options: Option A - Concatenate in preprocess_data_parquet():
Option B - Modify collect_files() to return synthetic wrapper:
Decision: Go with Option A - simpler and cleaner. Modified def preprocess_data_parquet(self, parquet_fp):
# Match config
matched_cfg = find_matching_config(parquet_fp)
# Check if this is a discovery scenario
if matched_cfg.get("discover_parquet_datasets", False):
# This parquet_fp might be the parent folder
# OR it's one of the discovered datasets
# We need to identify and handle appropriately
# Read ALL discovered datasets and concatenate
tables = []
for source_path in discovered_paths:
table = read_parquet_source(source_path, partitioning)
tables.append(table)
# Concatenate tables
combined_table = pa.concat_tables(tables)
df = combined_table.to_pandas()
else:
# Current single-source logic
...5. Documentation Updates6. Test UpdatesUpdate/create tests:
Revised Implementation TodosCore Changes
Documentation
Testing
Implementation DetailsDiscovery Logic Pseudocodedef discover_parquet_datasets(
s3_uri: str,
partitioning: Optional[str],
bucket_raw: Optional[str],
s3_client_opts: Optional[dict] = None,
) -> List[str]:
# Initialize S3 filesystem
s3_fs = s3fs.S3FileSystem(**s3_client_opts or {})
# Parse URI
if s3_uri.startswith("s3://"):
parsed = urlparse(s3_uri)
bucket = parsed.netloc
prefix = parsed.path.lstrip("/")
else:
bucket = bucket_raw
prefix = s3_uri
full_prefix = f"{bucket}/{prefix}".rstrip("/")
# List contents (first level only)
try:
entries = s3_fs.ls(full_prefix, detail=True)
except FileNotFoundError:
raise ValueError(f"Path not found: s3://{full_prefix}")
discovered = []
if partitioning == "hive":
# Find directories ending with .parquet
for entry in entries:
if entry['type'] == 'directory':
name = entry['name'].split('/')[-1]
if name.endswith('.parquet'):
discovered.append(f"s3://{entry['name']}")
else:
# Find files ending with .parquet
for entry in entries:
if entry['type'] == 'file':
name = entry['name'].split('/')[-1]
if name.endswith('.parquet'):
discovered.append(f"s3://{entry['name']}")
if not discovered:
raise ValueError(
f"No parquet {'datasets' if partitioning=='hive' else 'files'} "
f"found in s3://{full_prefix}"
)
return discoveredConcatenation StrategyWhen
Problem: If Solution:
Updated collect_files logic: elif dataset_type == "parquet":
if path_cfg.discover_parquet_datasets:
# Return only the parent folder
# Handler will do the discovery and concatenation
return [s3_uri]
else:
# Current behavior
return [s3_uri]Updated preprocess_data_parquet logic: def preprocess_data_parquet(self, parquet_fp):
matched_cfg = find_matching_config(parquet_fp)
partitioning = matched_cfg.get("partitioning", None)
# Check discovery mode
if matched_cfg.get("discover_parquet_datasets", False):
# Discover all parquet sources
sources = discover_parquet_datasets(
parquet_fp.path, # parent folder
partitioning,
...
)
# Read and concatenate all sources
tables = []
for source_uri in sources:
if partitioning == "hive":
table = pds.dataset(
source_uri,
format="parquet",
partitioning="hive",
filesystem=self.s3_fs_output
).to_table()
else:
table = pq.read_table(source_uri, filesystem=self.s3_fs_output)
tables.append(table)
# Concatenate with schema merge
combined_table = pa.concat_tables(tables, promote=True)
df = combined_table.to_pandas()
else:
# Existing single-source logic
...
# Rest of the method (schema validation, xarray conversion, etc.)Notes and ConsiderationsSchema Consistency
Performance
Discovery Scope
Partitioning Consistency
Clear Existing Data
Year Range and Filters
Error Handling
Breaking ChangesNone - This is a backwards-compatible additive feature:
Migration GuideNot needed for existing users. New users who want to merge multiple parquet sources:
Example ConfigsExample 1: Multiple Hive-Partitioned DatasetsS3 Structure: Config: {
"run_settings": {
"paths": [
{
"type": "parquet",
"partitioning": "hive",
"s3_uri": "s3://my-bucket/historical-wave-data/",
"discover_parquet_datasets": true
}
]
}
}Result: All three years of data merged into one output parquet dataset. Example 2: Multiple Flat Parquet FilesS3 Structure: Config: {
"run_settings": {
"paths": [
{
"type": "parquet",
"s3_uri": "s3://my-bucket/monthly-exports/",
"discover_parquet_datasets": true
}
]
}
}Result: All monthly files merged into one output parquet dataset. Example 3: Backward Compatible (No Discovery)Existing Config (unchanged): {
"run_settings": {
"paths": [
{
"type": "parquet",
"partitioning": "hive",
"s3_uri": "s3://aodn-cloud-optimised/wave_buoy_realtime_nonqc.parquet"
}
]
}
}Result: Works exactly as before, processes single hive-partitioned dataset. |
There was a problem hiding this comment.
Pull request overview
Adds a “parquet discovery” mode that allows the parquet handler to discover multiple parquet datasets/files under a parent S3 folder and merge them into a single cloud-optimised parquet output.
Changes:
- Introduces
discover_parquet_datasets()ins3Tools.pyto list parquet datasets/files under a folder (hive vs flat). - Updates
GenericParquetHandlerto support discovery mode and concatenate multiple parquet sources before converting to pandas/xarray. - Adds documentation and a new integration-style test plus a dataset config resource to exercise discovery mode.
Reviewed changes
Copilot reviewed 5 out of 9 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
aodn_cloud_optimised/lib/s3Tools.py |
Adds parquet source discovery helper (hive/flat listing). |
aodn_cloud_optimised/lib/GenericParquetHandler.py |
Enables discovery-mode preprocessing and concatenation logic. |
aodn_cloud_optimised/bin/generic_cloud_optimised_creation.py |
Extends config schema/docs to include discover_parquet_datasets flag. |
docs/development/dataset-configuration.rst |
Documents single parquet vs discovery-mode parquet configuration. |
test_aodn_cloud_optimised/test_parquet_discovery.py |
Adds tests for discovery mode and backward compatibility. |
test_aodn_cloud_optimised/resources/wave_buoy_realtime_from_parquet.json |
Adds test dataset config for parquet discovery mode. |
test_aodn_cloud_optimised/resources/BOM_20250201_CAPE-SORELL_RT_WAVE-PARAMETERS_monthly.nc |
Adds a NetCDF test fixture used by the new discovery integration test. |
You can also share your feedback on Copilot code review. Take the survey.
| import copy | ||
| import json | ||
| import os | ||
| import unittest | ||
|
|
||
| import boto3 | ||
| import pandas as pd | ||
| import pyarrow as pa | ||
| import s3fs | ||
| from moto import mock_aws | ||
| from moto.moto_server.threaded_moto_server import ThreadedMotoServer | ||
|
|
||
| from aodn_cloud_optimised.lib.config import load_dataset_config | ||
| from aodn_cloud_optimised.lib.GenericParquetHandler import GenericHandler | ||
| from aodn_cloud_optimised.lib.s3Tools import ( | ||
| discover_parquet_datasets, | ||
| get_free_local_port, | ||
| ) |
There was a problem hiding this comment.
Several imports appear unused in this test module (copy, pyarrow as pa, and discover_parquet_datasets). Removing them will keep the test lean and avoid flake8/pyflakes failures if linting is enabled.
| def test_discover_parquet_datasets_hive_mode(self): | ||
| """Test discover_parquet_datasets() function in hive mode""" | ||
|
|
||
| # Create multiple hive-partitioned datasets | ||
| parent_folder = f"s3://{self.BUCKET_OPTIMISED_NAME}/test_discovery_hive/" | ||
|
|
||
| # Create fake parquet datasets (just create the directories) | ||
| for dataset_name in [ | ||
| "data_2020.parquet", | ||
| "data_2021.parquet", | ||
| "data_2022.parquet", | ||
| ]: | ||
| dataset_path = f"{parent_folder}{dataset_name}/site_name=A/" | ||
| self.s3.put_object( | ||
| Bucket=self.BUCKET_OPTIMISED_NAME, | ||
| Key=dataset_path.replace(f"s3://{self.BUCKET_OPTIMISED_NAME}/", "") | ||
| + "_metadata", | ||
| Body=b"fake parquet metadata", | ||
| ) | ||
|
|
||
| # Test discovery via the handler (integration test rather than unit test) | ||
| # The discover_parquet_datasets function is tested indirectly through the handler | ||
| # Verify the files were created | ||
| result = self.s3.list_objects_v2( | ||
| Bucket=self.BUCKET_OPTIMISED_NAME, Prefix="test_discovery_hive/" | ||
| ) | ||
|
|
||
| # Should have 3 datasets created | ||
| self.assertIn("Contents", result) | ||
| self.assertEqual(len(result["Contents"]), 3) | ||
|
|
||
| def test_discover_parquet_datasets_flat_mode(self): | ||
| """Test discover_parquet_datasets() function for flat parquet files""" | ||
|
|
||
| parent_folder = f"s3://{self.BUCKET_OPTIMISED_NAME}/test_discovery_flat/" | ||
|
|
||
| # Create fake flat parquet files | ||
| for file_name in ["january.parquet", "february.parquet", "march.parquet"]: | ||
| file_path = f"{parent_folder}{file_name}" | ||
| self.s3.put_object( | ||
| Bucket=self.BUCKET_OPTIMISED_NAME, | ||
| Key=file_path.replace(f"s3://{self.BUCKET_OPTIMISED_NAME}/", ""), | ||
| Body=b"fake parquet content", | ||
| ) | ||
|
|
||
| # Verify files were created | ||
| result = self.s3.list_objects_v2( | ||
| Bucket=self.BUCKET_OPTIMISED_NAME, Prefix="test_discovery_flat/" | ||
| ) | ||
|
|
||
| self.assertIn("Contents", result) | ||
| self.assertEqual(len(result["Contents"]), 3) |
There was a problem hiding this comment.
These tests create objects in S3 but never actually call discover_parquet_datasets() (or invoke the handler in discovery mode) and therefore don’t validate the new discovery behavior. Consider asserting on the discovered source list (e.g., the returned URIs in hive vs flat modes) and/or adding a failure assertion for empty/nonexistent folders using the discovery function itself.
| { | ||
| "run_settings": { | ||
| "paths": [ | ||
| { | ||
| "type": "parquet", | ||
| "partitioning": "hive", // or null for flat parquet files | ||
| "s3_uri": "s3://bucket/dataset.parquet", | ||
| "filter": [], | ||
| "year_range": [] | ||
| } |
There was a problem hiding this comment.
The JSON example includes a // comment ("partitioning": "hive", // ...), which is not valid JSON and can mislead users copying the snippet. Consider removing the inline comment and instead explain the null option in surrounding prose (or show two separate valid JSON examples).
| Args: | ||
| s3_uri: S3 path to folder containing parquet sources (can be full s3:// URI or relative path) | ||
| partitioning: "hive" for hive-partitioned datasets, None for flat parquet files | ||
| bucket_raw: Required if s3_uri is not a full S3 URI | ||
| s3_client_opts: Optional dict with boto3 S3 client options | ||
|
|
There was a problem hiding this comment.
Parameter s3_client_opts is described as “boto3 S3 client options”, but it’s passed directly into s3fs.S3FileSystem(**...). This mismatch is confusing and makes it easy to pass the wrong shape of options. Please rename/re-document this parameter as S3FS options (e.g., s3_fs_opts matching s3fs.S3FileSystem kwargs) or accept an already-constructed s3fs.S3FileSystem instead.
| bucket = bucket_raw | ||
| prefix = s3_uri | ||
|
|
||
| prefix = str(PurePosixPath(prefix)) # normalize path |
There was a problem hiding this comment.
PurePosixPath(prefix) will turn an empty prefix into '.', producing bucket/. and breaking discovery for URIs like s3://bucket or s3://bucket/. Consider special-casing empty prefixes (or '.') so the normalized prefix stays empty before building full_path.
| prefix = str(PurePosixPath(prefix)) # normalize path | |
| # Normalize path, but preserve empty/"." prefixes so bucket root stays "bucket" | |
| if prefix: | |
| normalized_prefix = str(PurePosixPath(prefix)) | |
| prefix = "" if normalized_prefix == "." else normalized_prefix | |
| else: | |
| prefix = "" |
| try: | ||
| discovered_sources = discover_parquet_datasets( | ||
| s3_uri=full_path, | ||
| partitioning=partitioning, | ||
| filesystem=self.s3_fs_output, | ||
| ).to_table() | ||
| case _: | ||
| raise ValueError( | ||
| f"Partitioning value {partitioning} is not yet supported" | ||
| bucket_raw=self.dataset_config["run_settings"].get( | ||
| "bucket_raw_default_name" | ||
| ), | ||
| s3_client_opts=( | ||
| self.s3_fs_output.storage_options | ||
| if hasattr(self.s3_fs_output, "storage_options") | ||
| else None | ||
| ), | ||
| ) |
There was a problem hiding this comment.
Discovery is executed via discover_parquet_datasets(...), but the S3FS options passed here are derived from self.s3_fs_output.storage_options (and otherwise None). Since discover_parquet_datasets() creates a brand new S3FileSystem, this can cause discovery to run against the wrong endpoint/credentials compared to the handler’s existing self.s3_fs_output (e.g., missing Moto endpoint_url). Consider changing the discovery API to accept an existing filesystem (preferred) and pass self.s3_fs_output directly, so listing and subsequent reads use the same backend config.
| if not file_suffix or file_path.endswith("/") or ".parquet" in file_path: | ||
| for path_cfg in self.dataset_config["run_settings"]["paths"]: | ||
| if path_cfg.get("type") == "parquet" and path_cfg.get( | ||
| "discover_parquet_datasets", False | ||
| ): | ||
| s3_uri = path_cfg.get("s3_uri", "").rstrip("/") | ||
| # Normalize both paths for comparison (remove s3:// prefix if present) | ||
| normalized_file_path = file_path.replace("s3://", "").rstrip("/") | ||
| normalized_s3_uri = s3_uri.replace("s3://", "").rstrip("/") | ||
| if ( | ||
| normalized_file_path.startswith(normalized_s3_uri) | ||
| or normalized_file_path == normalized_s3_uri |
There was a problem hiding this comment.
The discovery detection uses normalized_file_path.startswith(normalized_s3_uri), which will treat any parquet file inside the configured folder as a discovery input. If a caller passes a single parquet file path under that folder, the handler will attempt discovery using the file path as the parent and likely fail. Consider tightening this check to only trigger when the input path equals the configured discovery folder (after normalization), and/or when the input has no filename component (ends with /).
| if not file_suffix or file_path.endswith("/") or ".parquet" in file_path: | |
| for path_cfg in self.dataset_config["run_settings"]["paths"]: | |
| if path_cfg.get("type") == "parquet" and path_cfg.get( | |
| "discover_parquet_datasets", False | |
| ): | |
| s3_uri = path_cfg.get("s3_uri", "").rstrip("/") | |
| # Normalize both paths for comparison (remove s3:// prefix if present) | |
| normalized_file_path = file_path.replace("s3://", "").rstrip("/") | |
| normalized_s3_uri = s3_uri.replace("s3://", "").rstrip("/") | |
| if ( | |
| normalized_file_path.startswith(normalized_s3_uri) | |
| or normalized_file_path == normalized_s3_uri | |
| # Only treat directory-like paths (no suffix or explicit trailing slash) as discovery inputs. | |
| if not file_suffix or file_path.endswith("/"): | |
| for path_cfg in self.dataset_config["run_settings"]["paths"]: | |
| if path_cfg.get("type") == "parquet" and path_cfg.get( | |
| "discover_parquet_datasets", False | |
| ): | |
| s3_uri = path_cfg.get("s3_uri", "").rstrip("/") | |
| if not s3_uri: | |
| continue | |
| # Normalize both paths for comparison (remove s3:// prefix if present) | |
| normalized_file_path = file_path.replace("s3://", "").rstrip("/") | |
| normalized_s3_uri = s3_uri.replace("s3://", "").rstrip("/") | |
| is_directory_like = not file_suffix or file_path.endswith("/") | |
| if ( | |
| normalized_file_path == normalized_s3_uri | |
| or ( | |
| is_directory_like | |
| and normalized_file_path.startswith( | |
| normalized_s3_uri + "/" | |
| ) | |
| ) |
|
|
||
| # Should have data from all 3 years (2023, 2024, 2025) | ||
| self.assertGreater(len(merged_df), 0) | ||
| print(f"✅ Merged dataset contains {len(merged_df)} rows from 3 yearly sources") |
There was a problem hiding this comment.
Unit tests should avoid print(...) noise in normal runs; it makes CI output harder to scan and doesn’t contribute to assertions. Please remove this print (or emit via the test logger only when debugging).
| print(f"✅ Merged dataset contains {len(merged_df)} rows from 3 yearly sources") |
| # Initialize S3 filesystem | ||
| s3_fs = s3fs.S3FileSystem(**(s3_client_opts or {})) | ||
|
|
There was a problem hiding this comment.
discover_parquet_datasets() always constructs a new s3fs.S3FileSystem, which can ignore the caller’s configured endpoint/credentials/session (e.g., Moto server endpoint in tests, custom AWS profiles, VPC endpoints). To avoid discovery working against a different S3 backend than the subsequent reads, consider accepting an optional s3_fs argument and using that for ls() (and only creating a new filesystem when one isn’t provided).
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #257 +/- ##
==========================================
- Coverage 64.88% 64.66% -0.23%
==========================================
Files 29 30 +1
Lines 5252 5479 +227
==========================================
+ Hits 3408 3543 +135
- Misses 1844 1936 +92 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
|
|
||
| if partitioning == "hive": |
There was a problem hiding this comment.
Pretty sure you can pass all this off to pyarrow.dataset.dataset and pyarrow.fs.S3FileSystem
eg for a hive partitioned dataset:
FILE_SYSTEM = pyarrow.fs.S3FileSystem(
region="ap-southeast-2",
anonymous=True,
)
# --- Dataset Connection ---
# Create a PyArrow Dataset handle for the AMSA source.
# This performs a lazy connection; data is not loaded into memory yet.
ds = pyarrow.dataset.dataset(
# Here `pyarrow` resolves the directory input to a multi fragment dataset
source="data-uplift-public/stored/datauplift/amsa/dataset",
filesystem=FILE_SYSTEM,
)eg for a non partitioned dataset
# Construct the anonymous file system responsible for reading from the public S3 bucket
FILE_SYSTEM = pyarrow.fs.S3FileSystem(
region="ap-southeast-2",
anonymous=True,
)
# Create the dataset connection
# By convention, datasets are labelled `ds`
ds = pyarrow.dataset.dataset(
# Here `pyarrow` resolves the file input to a single fragment dataset
source="data-uplift-public/stored/datauplift/seabird/seabird.parquet",
filesystem=FILE_SYSTEM,
)
No description provided.