Skip to content

Feat: add new feature to handle reading multiple parquet dataset#257

Open
lbesnard wants to merge 3 commits into
mainfrom
MultipleParquetHandler
Open

Feat: add new feature to handle reading multiple parquet dataset#257
lbesnard wants to merge 3 commits into
mainfrom
MultipleParquetHandler

Conversation

@lbesnard

@lbesnard lbesnard commented Mar 4, 2026

Copy link
Copy Markdown
Collaborator

No description provided.

Copilot AI review requested due to automatic review settings March 4, 2026 04:40
@lbesnard

lbesnard commented Mar 4, 2026

Copy link
Copy Markdown
Collaborator Author

Plan: Support Multiple Parquet Datasets/Files in a Folder

Problem Statement

Currently, the aodn_cloud_optimised library supports pointing to:

  • A single parquet file (with type="parquet", no partitioning)
  • A single hive-partitioned dataset folder (with type="parquet", partitioning="hive")

Example current usage:

{
  "type": "parquet",
  "partitioning": "hive",
  "s3_uri": "s3://aodn-cloud-optimised/wave_buoy_realtime_nonqc.parquet"
}

This reads ONE hive-partitioned dataset at that path.

The feature request is to extend s3_uri to point to a folder containing multiple parquet sources:

  1. Folder with multiple hive-partitioned datasets (subdirectories):

    s3://bucket/my-data/
      ├── dataset1.parquet/  (hive partitioned: site_name=X/)
      ├── dataset2.parquet/  (hive partitioned: site_name=Y/)
      └── dataset3.parquet/  (hive partitioned: site_name=Z/)
    
  2. Folder with multiple flat .parquet files (non-hive):

    s3://bucket/my-data/
      ├── file1.parquet
      ├── file2.parquet
      └── file3.parquet
    

All discovered parquet datasets/files should be read and concatenated into a single output parquet dataset.

Current Implementation Analysis

Current Architecture

  1. PathConfig Model (generic_cloud_optimised_creation.py, lines 70-235):

    • s3_uri: points to ONE parquet source (file or hive dataset folder)
    • type: can be "files", "parquet", or "zarr"
    • partitioning: optional, "hive" for hive-partitioned datasets
  2. collect_files() function (lines 1252-1334):

    • For type="parquet": returns [s3_uri] - a single path
    • This path is treated as one "file" to process
  3. GenericParquetHandler.preprocess_data_parquet() (lines 263-345):

    • Takes parquet_fp - the path from collect_files
    • Matches it to a PathConfig by checking full_path.startswith(s3_uri)
    • Based on partitioning:
      • None: reads single file with pq.read_table(parquet_fp)
      • "hive": reads entire hive dataset with pds.dataset(key_prefix, partitioning="hive")
    • Returns ONE DataFrame per call

How It Currently Works

Hive Example: s3://bucket/dataset.parquet/ contains site_name=X/, site_name=Y/, etc.

  • collect_files() returns ["s3://bucket/dataset.parquet"]
  • preprocess_data_parquet() called once with that path
  • pds.dataset() reads ALL hive partitions automatically
  • Result: one big DataFrame with all data

Non-Hive Example: s3://bucket/file.parquet

  • collect_files() returns ["s3://bucket/file.parquet"]
  • preprocess_data_parquet() called once
  • pq.read_table() reads the single file
  • Result: one DataFrame

Proposed Solution

Approach: Auto-Discover Parquet Sources in a Folder

Keep the current s3_uri field but enhance its behavior:

Current behavior:

  • s3_uri points to ONE parquet file or ONE hive-partitioned dataset folder

New behavior:

  • s3_uri can point to a parent folder containing multiple parquet sources
  • Add new field discover_parquet_datasets: bool = False to PathConfig
  • When discover_parquet_datasets=True:
    • Non-hive mode (partitioning=None): List all .parquet files at the first level (non-recursive)
    • Hive mode (partitioning="hive"): List all subdirectories ending with .parquet (each is a hive dataset)
    • Read each discovered source and concatenate them

Benefits:

  • ✅ Backward compatible (default is False)
  • ✅ Explicit opt-in via new flag
  • ✅ No breaking changes to existing configs
  • ✅ Keeps s3_uri as single string

Implementation Changes

1. Pydantic Config Changes

PathConfig Model (generic_cloud_optimised_creation.py, lines 70-235):

  • Add field: discover_parquet_datasets: bool = False
  • Add docstring: "If True, discover and process all parquet datasets/files in the s3_uri folder"
  • Validation: discover_parquet_datasets only valid when type="parquet"

2. Processing Logic Changes

collect_files() function (lines 1252-1334):

  • When type="parquet":
    • Check if discover_parquet_datasets=True
    • If False (default): return [s3_uri] as before (backward compatible)
    • If True:
      • List contents of s3_uri using s3_fs.ls(s3_uri, detail=False)
      • Filter based on partitioning setting:
        • partitioning="hive": Find all subdirectories ending with .parquet
        • partitioning=None: Find all files ending with .parquet (first level only)
      • Return list of discovered paths

GenericParquetHandler.preprocess_data_parquet() (lines 263-345):

  • When multiple parquet sources discovered:
    • Current implementation already handles each path individually
    • Each call reads one source and returns a DataFrame
    • The framework will call this method multiple times (once per discovered path)
    • Need to verify: Does the current publishing logic concatenate multiple DataFrames?
    • If not: Need to add concatenation logic in the handler or modify the publishing step

3. Discovery Logic Implementation

Add new helper function discover_parquet_datasets():

def discover_parquet_datasets(
    s3_uri: str,
    partitioning: Optional[str],
    s3_fs: s3fs.S3FileSystem
) -> List[str]:
    """Discover parquet datasets or files in a folder.
    
    Args:
        s3_uri: S3 path to folder containing parquet sources
        partitioning: "hive" for hive datasets, None for flat files
        s3_fs: S3 filesystem instance
        
    Returns:
        List of S3 paths to parquet datasets/files
    """
    # Parse s3_uri to get bucket and prefix
    # List contents at first level
    # Filter based on partitioning mode
    # Return discovered paths

Logic:

  • For partitioning="hive":

    • List directories at first level
    • Filter those ending with .parquet
    • Each represents a complete hive-partitioned dataset
  • For partitioning=None:

    • List files at first level (not recursive)
    • Filter those ending with .parquet
    • Each is a standalone parquet file

4. Concatenation Logic

Key Question: How does the current framework handle multiple inputs?

Looking at the code flow:

  1. collect_files() returns list of paths
  2. Each path processed individually by handler
  3. preprocess_data_parquet() yields (df, ds) tuples
  4. publish_cloud_optimised() writes the output

Current behavior: When processing multiple NetCDF files (type="files"), each file creates its own output. The framework doesn't auto-concatenate.

For this feature: We need the discovered parquet sources to be concatenated into ONE output.

Two options:

Option A - Concatenate in preprocess_data_parquet():

  • Modify to detect when discover_parquet_datasets=True
  • Read ALL discovered sources
  • Concatenate DataFrames before yielding
  • Yield once with the merged data

Option B - Modify collect_files() to return synthetic wrapper:

  • Create a special marker/wrapper that indicates "process these together"
  • Handler detects the marker and reads all sources

Decision: Go with Option A - simpler and cleaner.

Modified preprocess_data_parquet() logic:

def preprocess_data_parquet(self, parquet_fp):
    # Match config
    matched_cfg = find_matching_config(parquet_fp)
    
    # Check if this is a discovery scenario
    if matched_cfg.get("discover_parquet_datasets", False):
        # This parquet_fp might be the parent folder
        # OR it's one of the discovered datasets
        # We need to identify and handle appropriately
        
        # Read ALL discovered datasets and concatenate
        tables = []
        for source_path in discovered_paths:
            table = read_parquet_source(source_path, partitioning)
            tables.append(table)
        
        # Concatenate tables
        combined_table = pa.concat_tables(tables)
        df = combined_table.to_pandas()
    else:
        # Current single-source logic
        ...

5. Documentation Updates

6. Test Updates

Update/create tests:

  • Test PathConfig validation with discover_parquet_datasets flag
  • Test discover_parquet_datasets() helper function
    • Hive mode: discovers .parquet directories
    • Non-hive mode: discovers .parquet files
  • Test collect_files() with discovery enabled
  • Test GenericParquetHandler concatenation
  • Test schema consistency when merging
  • Test backward compatibility (default False)

Revised Implementation Todos

Core Changes

  1. add-discovery-flag-to-pathconfig: Add discover_parquet_datasets: bool = False field to PathConfig model with validation
  2. implement-discovery-helper: Create discover_parquet_datasets() helper function to list parquet sources in a folder
  3. update-collect-files: Modify collect_files() to call discovery helper when flag is True
  4. update-parquet-handler-matching: Update matching logic in preprocess_data_parquet() to handle discovered datasets
  5. implement-concatenation: Modify preprocess_data_parquet() to read and concatenate multiple sources when discovery is enabled
  6. handle-schema-merge: Add schema compatibility checking and merging using PyArrow

Documentation

  1. update-usage-docs: Add examples to docs/usage.rst showing the discover_parquet_datasets feature
  2. update-module-docs: Update docstrings and module documentation

Testing

  1. create-unit-tests: Create unit tests for discovery logic and validation
  2. test-concatenation: Test DataFrame concatenation with schema merging
  3. test-backward-compat: Ensure existing configs still work (flag defaults to False)
  4. verify-all-tests: Run full test suite

Implementation Details

Discovery Logic Pseudocode

def discover_parquet_datasets(
    s3_uri: str,
    partitioning: Optional[str],
    bucket_raw: Optional[str],
    s3_client_opts: Optional[dict] = None,
) -> List[str]:
    # Initialize S3 filesystem
    s3_fs = s3fs.S3FileSystem(**s3_client_opts or {})
    
    # Parse URI
    if s3_uri.startswith("s3://"):
        parsed = urlparse(s3_uri)
        bucket = parsed.netloc
        prefix = parsed.path.lstrip("/")
    else:
        bucket = bucket_raw
        prefix = s3_uri
    
    full_prefix = f"{bucket}/{prefix}".rstrip("/")
    
    # List contents (first level only)
    try:
        entries = s3_fs.ls(full_prefix, detail=True)
    except FileNotFoundError:
        raise ValueError(f"Path not found: s3://{full_prefix}")
    
    discovered = []
    
    if partitioning == "hive":
        # Find directories ending with .parquet
        for entry in entries:
            if entry['type'] == 'directory':
                name = entry['name'].split('/')[-1]
                if name.endswith('.parquet'):
                    discovered.append(f"s3://{entry['name']}")
    else:
        # Find files ending with .parquet
        for entry in entries:
            if entry['type'] == 'file':
                name = entry['name'].split('/')[-1]
                if name.endswith('.parquet'):
                    discovered.append(f"s3://{entry['name']}")
    
    if not discovered:
        raise ValueError(
            f"No parquet {'datasets' if partitioning=='hive' else 'files'} "
            f"found in s3://{full_prefix}"
        )
    
    return discovered

Concatenation Strategy

When discover_parquet_datasets=True, the handler should:

  1. First call to preprocess_data_parquet():
    • Detect that discovery mode is enabled
    • Check if we're being called with the parent folder or a discovered dataset
    • If parent folder: discover all sources, read each, concatenate
    • If discovered dataset: this is coming from collect_files returning multiple paths

Problem: If collect_files() returns multiple paths, the framework calls the handler multiple times. We need ALL data in ONE output, not separate outputs.

Solution:

  • When discover_parquet_datasets=True, collect_files() should return only the parent folder once
  • The handler detects discovery mode and handles multiple sources internally
  • This way we get ONE output with concatenated data

Updated collect_files logic:

elif dataset_type == "parquet":
    if path_cfg.discover_parquet_datasets:
        # Return only the parent folder
        # Handler will do the discovery and concatenation
        return [s3_uri]
    else:
        # Current behavior
        return [s3_uri]

Updated preprocess_data_parquet logic:

def preprocess_data_parquet(self, parquet_fp):
    matched_cfg = find_matching_config(parquet_fp)
    partitioning = matched_cfg.get("partitioning", None)
    
    # Check discovery mode
    if matched_cfg.get("discover_parquet_datasets", False):
        # Discover all parquet sources
        sources = discover_parquet_datasets(
            parquet_fp.path,  # parent folder
            partitioning,
            ...
        )
        
        # Read and concatenate all sources
        tables = []
        for source_uri in sources:
            if partitioning == "hive":
                table = pds.dataset(
                    source_uri,
                    format="parquet", 
                    partitioning="hive",
                    filesystem=self.s3_fs_output
                ).to_table()
            else:
                table = pq.read_table(source_uri, filesystem=self.s3_fs_output)
            tables.append(table)
        
        # Concatenate with schema merge
        combined_table = pa.concat_tables(tables, promote=True)
        df = combined_table.to_pandas()
    else:
        # Existing single-source logic
        ...
    
    # Rest of the method (schema validation, xarray conversion, etc.)

Notes and Considerations

Schema Consistency

  • When merging multiple parquet sources, schemas must be compatible
  • Use PyArrow's concat_tables() with promote=True to handle compatible schema differences
  • Log warnings if schemas differ but are promotable
  • Error if schemas are fundamentally incompatible

Performance

  • Reading multiple parquet datasets has linear cost (N datasets = N reads)
  • Concatenation happens in memory - consider memory limits for very large datasets
  • Each hive dataset is still read efficiently with predicate pushdown
  • Consider chunked processing if memory becomes an issue (future enhancement)

Discovery Scope

  • Discovery is non-recursive - only looks at first level in s3_uri folder
  • For hive mode: finds directories ending with .parquet
  • For non-hive mode: finds files ending with .parquet
  • Empty folders are skipped

Partitioning Consistency

  • All discovered sources must use the same partitioning scheme
  • Cannot mix hive and non-hive parquet in the same discovery
  • The partitioning field applies to all discovered datasets

Clear Existing Data

  • clear_existing_data works as before - clears the output location
  • Does NOT affect input parquet sources
  • Discovery happens fresh on each run (no caching)

Year Range and Filters

  • Already validated: year_range and filter not allowed when type="parquet"
  • This remains unchanged for discovery mode
  • Filtering can be done at query time using partition columns

Error Handling

  • If no parquet datasets/files found: raise clear error
  • If schema incompatibility: raise error with details
  • If S3 path doesn't exist: raise clear error
  • Log each discovered source for transparency

Breaking Changes

None - This is a backwards-compatible additive feature:

  • New field discover_parquet_datasets defaults to False
  • Existing configs with s3_uri continue to work exactly as before
  • No changes to output format or behavior for single sources
  • Opt-in behavior via explicit flag

Migration Guide

Not needed for existing users. New users who want to merge multiple parquet sources:

  1. Set discover_parquet_datasets: true in PathConfig
  2. Point s3_uri to parent folder containing parquet datasets/files
  3. Ensure all sources have compatible schemas
  4. For hive mode: each subdirectory ending with .parquet will be merged
  5. For non-hive mode: each file ending with .parquet at first level will be merged

Example Configs

Example 1: Multiple Hive-Partitioned Datasets

S3 Structure:

s3://my-bucket/historical-wave-data/
  ├── 2020_wave_data.parquet/
  │   ├── site_name=Albany/
  │   └── site_name=Brighton/
  ├── 2021_wave_data.parquet/
  │   ├── site_name=Albany/
  │   └── site_name=Brighton/
  └── 2022_wave_data.parquet/
      ├── site_name=Albany/
      └── site_name=Brighton/

Config:

{
  "run_settings": {
    "paths": [
      {
        "type": "parquet",
        "partitioning": "hive",
        "s3_uri": "s3://my-bucket/historical-wave-data/",
        "discover_parquet_datasets": true
      }
    ]
  }
}

Result: All three years of data merged into one output parquet dataset.

Example 2: Multiple Flat Parquet Files

S3 Structure:

s3://my-bucket/monthly-exports/
  ├── january.parquet
  ├── february.parquet
  ├── march.parquet
  └── april.parquet

Config:

{
  "run_settings": {
    "paths": [
      {
        "type": "parquet",
        "s3_uri": "s3://my-bucket/monthly-exports/",
        "discover_parquet_datasets": true
      }
    ]
  }
}

Result: All monthly files merged into one output parquet dataset.

Example 3: Backward Compatible (No Discovery)

Existing Config (unchanged):

{
  "run_settings": {
    "paths": [
      {
        "type": "parquet",
        "partitioning": "hive",
        "s3_uri": "s3://aodn-cloud-optimised/wave_buoy_realtime_nonqc.parquet"
      }
    ]
  }
}

Result: Works exactly as before, processes single hive-partitioned dataset.

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds a “parquet discovery” mode that allows the parquet handler to discover multiple parquet datasets/files under a parent S3 folder and merge them into a single cloud-optimised parquet output.

Changes:

  • Introduces discover_parquet_datasets() in s3Tools.py to list parquet datasets/files under a folder (hive vs flat).
  • Updates GenericParquetHandler to support discovery mode and concatenate multiple parquet sources before converting to pandas/xarray.
  • Adds documentation and a new integration-style test plus a dataset config resource to exercise discovery mode.

Reviewed changes

Copilot reviewed 5 out of 9 changed files in this pull request and generated 9 comments.

Show a summary per file
File Description
aodn_cloud_optimised/lib/s3Tools.py Adds parquet source discovery helper (hive/flat listing).
aodn_cloud_optimised/lib/GenericParquetHandler.py Enables discovery-mode preprocessing and concatenation logic.
aodn_cloud_optimised/bin/generic_cloud_optimised_creation.py Extends config schema/docs to include discover_parquet_datasets flag.
docs/development/dataset-configuration.rst Documents single parquet vs discovery-mode parquet configuration.
test_aodn_cloud_optimised/test_parquet_discovery.py Adds tests for discovery mode and backward compatibility.
test_aodn_cloud_optimised/resources/wave_buoy_realtime_from_parquet.json Adds test dataset config for parquet discovery mode.
test_aodn_cloud_optimised/resources/BOM_20250201_CAPE-SORELL_RT_WAVE-PARAMETERS_monthly.nc Adds a NetCDF test fixture used by the new discovery integration test.

You can also share your feedback on Copilot code review. Take the survey.

Comment on lines +1 to +18
import copy
import json
import os
import unittest

import boto3
import pandas as pd
import pyarrow as pa
import s3fs
from moto import mock_aws
from moto.moto_server.threaded_moto_server import ThreadedMotoServer

from aodn_cloud_optimised.lib.config import load_dataset_config
from aodn_cloud_optimised.lib.GenericParquetHandler import GenericHandler
from aodn_cloud_optimised.lib.s3Tools import (
discover_parquet_datasets,
get_free_local_port,
)

Copilot AI Mar 4, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Several imports appear unused in this test module (copy, pyarrow as pa, and discover_parquet_datasets). Removing them will keep the test lean and avoid flake8/pyflakes failures if linting is enabled.

Copilot uses AI. Check for mistakes.
Comment on lines +274 to +325
def test_discover_parquet_datasets_hive_mode(self):
"""Test discover_parquet_datasets() function in hive mode"""

# Create multiple hive-partitioned datasets
parent_folder = f"s3://{self.BUCKET_OPTIMISED_NAME}/test_discovery_hive/"

# Create fake parquet datasets (just create the directories)
for dataset_name in [
"data_2020.parquet",
"data_2021.parquet",
"data_2022.parquet",
]:
dataset_path = f"{parent_folder}{dataset_name}/site_name=A/"
self.s3.put_object(
Bucket=self.BUCKET_OPTIMISED_NAME,
Key=dataset_path.replace(f"s3://{self.BUCKET_OPTIMISED_NAME}/", "")
+ "_metadata",
Body=b"fake parquet metadata",
)

# Test discovery via the handler (integration test rather than unit test)
# The discover_parquet_datasets function is tested indirectly through the handler
# Verify the files were created
result = self.s3.list_objects_v2(
Bucket=self.BUCKET_OPTIMISED_NAME, Prefix="test_discovery_hive/"
)

# Should have 3 datasets created
self.assertIn("Contents", result)
self.assertEqual(len(result["Contents"]), 3)

def test_discover_parquet_datasets_flat_mode(self):
"""Test discover_parquet_datasets() function for flat parquet files"""

parent_folder = f"s3://{self.BUCKET_OPTIMISED_NAME}/test_discovery_flat/"

# Create fake flat parquet files
for file_name in ["january.parquet", "february.parquet", "march.parquet"]:
file_path = f"{parent_folder}{file_name}"
self.s3.put_object(
Bucket=self.BUCKET_OPTIMISED_NAME,
Key=file_path.replace(f"s3://{self.BUCKET_OPTIMISED_NAME}/", ""),
Body=b"fake parquet content",
)

# Verify files were created
result = self.s3.list_objects_v2(
Bucket=self.BUCKET_OPTIMISED_NAME, Prefix="test_discovery_flat/"
)

self.assertIn("Contents", result)
self.assertEqual(len(result["Contents"]), 3)

Copilot AI Mar 4, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests create objects in S3 but never actually call discover_parquet_datasets() (or invoke the handler in discovery mode) and therefore don’t validate the new discovery behavior. Consider asserting on the discovered source list (e.g., the returned URIs in hive vs flat modes) and/or adding a failure assertion for empty/nonexistent folders using the discovery function itself.

Copilot uses AI. Check for mistakes.
Comment on lines +624 to +633
{
"run_settings": {
"paths": [
{
"type": "parquet",
"partitioning": "hive", // or null for flat parquet files
"s3_uri": "s3://bucket/dataset.parquet",
"filter": [],
"year_range": []
}

Copilot AI Mar 4, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The JSON example includes a // comment ("partitioning": "hive", // ...), which is not valid JSON and can mislead users copying the snippet. Consider removing the inline comment and instead explain the null option in surrounding prose (or show two separate valid JSON examples).

Copilot uses AI. Check for mistakes.
Comment on lines +349 to +354
Args:
s3_uri: S3 path to folder containing parquet sources (can be full s3:// URI or relative path)
partitioning: "hive" for hive-partitioned datasets, None for flat parquet files
bucket_raw: Required if s3_uri is not a full S3 URI
s3_client_opts: Optional dict with boto3 S3 client options

Copilot AI Mar 4, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Parameter s3_client_opts is described as “boto3 S3 client options”, but it’s passed directly into s3fs.S3FileSystem(**...). This mismatch is confusing and makes it easy to pass the wrong shape of options. Please rename/re-document this parameter as S3FS options (e.g., s3_fs_opts matching s3fs.S3FileSystem kwargs) or accept an already-constructed s3fs.S3FileSystem instead.

Copilot uses AI. Check for mistakes.
bucket = bucket_raw
prefix = s3_uri

prefix = str(PurePosixPath(prefix)) # normalize path

Copilot AI Mar 4, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PurePosixPath(prefix) will turn an empty prefix into '.', producing bucket/. and breaking discovery for URIs like s3://bucket or s3://bucket/. Consider special-casing empty prefixes (or '.') so the normalized prefix stays empty before building full_path.

Suggested change
prefix = str(PurePosixPath(prefix)) # normalize path
# Normalize path, but preserve empty/"." prefixes so bucket root stays "bucket"
if prefix:
normalized_prefix = str(PurePosixPath(prefix))
prefix = "" if normalized_prefix == "." else normalized_prefix
else:
prefix = ""

Copilot uses AI. Check for mistakes.
Comment on lines +323 to 335
try:
discovered_sources = discover_parquet_datasets(
s3_uri=full_path,
partitioning=partitioning,
filesystem=self.s3_fs_output,
).to_table()
case _:
raise ValueError(
f"Partitioning value {partitioning} is not yet supported"
bucket_raw=self.dataset_config["run_settings"].get(
"bucket_raw_default_name"
),
s3_client_opts=(
self.s3_fs_output.storage_options
if hasattr(self.s3_fs_output, "storage_options")
else None
),
)

Copilot AI Mar 4, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discovery is executed via discover_parquet_datasets(...), but the S3FS options passed here are derived from self.s3_fs_output.storage_options (and otherwise None). Since discover_parquet_datasets() creates a brand new S3FileSystem, this can cause discovery to run against the wrong endpoint/credentials compared to the handler’s existing self.s3_fs_output (e.g., missing Moto endpoint_url). Consider changing the discovery API to accept an existing filesystem (preferred) and pass self.s3_fs_output directly, so listing and subsequent reads use the same backend config.

Copilot uses AI. Check for mistakes.
Comment on lines +479 to +490
if not file_suffix or file_path.endswith("/") or ".parquet" in file_path:
for path_cfg in self.dataset_config["run_settings"]["paths"]:
if path_cfg.get("type") == "parquet" and path_cfg.get(
"discover_parquet_datasets", False
):
s3_uri = path_cfg.get("s3_uri", "").rstrip("/")
# Normalize both paths for comparison (remove s3:// prefix if present)
normalized_file_path = file_path.replace("s3://", "").rstrip("/")
normalized_s3_uri = s3_uri.replace("s3://", "").rstrip("/")
if (
normalized_file_path.startswith(normalized_s3_uri)
or normalized_file_path == normalized_s3_uri

Copilot AI Mar 4, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The discovery detection uses normalized_file_path.startswith(normalized_s3_uri), which will treat any parquet file inside the configured folder as a discovery input. If a caller passes a single parquet file path under that folder, the handler will attempt discovery using the file path as the parent and likely fail. Consider tightening this check to only trigger when the input path equals the configured discovery folder (after normalization), and/or when the input has no filename component (ends with /).

Suggested change
if not file_suffix or file_path.endswith("/") or ".parquet" in file_path:
for path_cfg in self.dataset_config["run_settings"]["paths"]:
if path_cfg.get("type") == "parquet" and path_cfg.get(
"discover_parquet_datasets", False
):
s3_uri = path_cfg.get("s3_uri", "").rstrip("/")
# Normalize both paths for comparison (remove s3:// prefix if present)
normalized_file_path = file_path.replace("s3://", "").rstrip("/")
normalized_s3_uri = s3_uri.replace("s3://", "").rstrip("/")
if (
normalized_file_path.startswith(normalized_s3_uri)
or normalized_file_path == normalized_s3_uri
# Only treat directory-like paths (no suffix or explicit trailing slash) as discovery inputs.
if not file_suffix or file_path.endswith("/"):
for path_cfg in self.dataset_config["run_settings"]["paths"]:
if path_cfg.get("type") == "parquet" and path_cfg.get(
"discover_parquet_datasets", False
):
s3_uri = path_cfg.get("s3_uri", "").rstrip("/")
if not s3_uri:
continue
# Normalize both paths for comparison (remove s3:// prefix if present)
normalized_file_path = file_path.replace("s3://", "").rstrip("/")
normalized_s3_uri = s3_uri.replace("s3://", "").rstrip("/")
is_directory_like = not file_suffix or file_path.endswith("/")
if (
normalized_file_path == normalized_s3_uri
or (
is_directory_like
and normalized_file_path.startswith(
normalized_s3_uri + "/"
)
)

Copilot uses AI. Check for mistakes.

# Should have data from all 3 years (2023, 2024, 2025)
self.assertGreater(len(merged_df), 0)
print(f"✅ Merged dataset contains {len(merged_df)} rows from 3 yearly sources")

Copilot AI Mar 4, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unit tests should avoid print(...) noise in normal runs; it makes CI output harder to scan and doesn’t contribute to assertions. Please remove this print (or emit via the test logger only when debugging).

Suggested change
print(f"✅ Merged dataset contains {len(merged_df)} rows from 3 yearly sources")

Copilot uses AI. Check for mistakes.
Comment on lines +361 to +363
# Initialize S3 filesystem
s3_fs = s3fs.S3FileSystem(**(s3_client_opts or {}))

Copilot AI Mar 4, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

discover_parquet_datasets() always constructs a new s3fs.S3FileSystem, which can ignore the caller’s configured endpoint/credentials/session (e.g., Moto server endpoint in tests, custom AWS profiles, VPC endpoints). To avoid discovery working against a different S3 backend than the subsequent reads, consider accepting an optional s3_fs argument and using that for ls() (and only creating a new filesystem when one isn’t provided).

Copilot uses AI. Check for mistakes.
@codecov-commenter

codecov-commenter commented Mar 4, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 59.24370% with 97 lines in your changes missing coverage. Please review.
✅ Project coverage is 64.66%. Comparing base (7205022) to head (23bdb3b).

Files with missing lines Patch % Lines
aodn_cloud_optimised/lib/s3Tools.py 7.40% 50 Missing ⚠️
aodn_cloud_optimised/lib/GenericParquetHandler.py 21.05% 45 Missing ⚠️
..._optimised/bin/generic_cloud_optimised_creation.py 75.00% 1 Missing ⚠️
...est_aodn_cloud_optimised/test_parquet_discovery.py 99.18% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #257      +/-   ##
==========================================
- Coverage   64.88%   64.66%   -0.23%     
==========================================
  Files          29       30       +1     
  Lines        5252     5479     +227     
==========================================
+ Hits         3408     3543     +135     
- Misses       1844     1936      +92     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Comment on lines +391 to +392

if partitioning == "hive":

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pretty sure you can pass all this off to pyarrow.dataset.dataset and pyarrow.fs.S3FileSystem

eg for a hive partitioned dataset:

FILE_SYSTEM = pyarrow.fs.S3FileSystem(
    region="ap-southeast-2", 
    anonymous=True,
)

# --- Dataset Connection ---
# Create a PyArrow Dataset handle for the AMSA source.
# This performs a lazy connection; data is not loaded into memory yet.
ds = pyarrow.dataset.dataset(

    # Here `pyarrow` resolves the directory input to a multi fragment dataset
    source="data-uplift-public/stored/datauplift/amsa/dataset",
    filesystem=FILE_SYSTEM,
)

eg for a non partitioned dataset

# Construct the anonymous file system responsible for reading from the public S3 bucket
FILE_SYSTEM = pyarrow.fs.S3FileSystem(
    region="ap-southeast-2", 
    anonymous=True,
)

# Create the dataset connection
# By convention, datasets are labelled `ds`
ds = pyarrow.dataset.dataset(

    # Here `pyarrow` resolves the file input to a single fragment dataset 
    source="data-uplift-public/stored/datauplift/seabird/seabird.parquet",
    filesystem=FILE_SYSTEM,
)

@lbesnard lbesnard closed this Apr 24, 2026
@lbesnard lbesnard reopened this Apr 24, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature Request] - Handle multiple parquet hived partition as an input dataset

4 participants