Skip to content

Latest commit

 

History

History
507 lines (392 loc) · 12.8 KB

File metadata and controls

507 lines (392 loc) · 12.8 KB

S3 operations

Driver-side helpers list buckets or prefixes and often write resulting paths into YT tables for later map/YQL stages. Inside mappers you typically construct ytjobs.s3.client.S3Client from secrets.

Job-side `S3Client` lives in `ytjobs`; see [YT jobs (`ytjobs`)](../reference/ytjobs.md).

Overview

Common workflow:

  1. List or filter object keys under a prefix.
  2. Persist key list (or small metadata) to a Cypress table.
  3. Downstream stages read that table.

Credentials come from load_secrets(self.deps.configs_dir) and the S3_* keys documented in Secrets.

Quick Start

Minimal Example

Stage (stages/list_s3/stage.py):

from yt_framework.core.pipeline import DebugContext
from yt_framework.core.stage import BaseStage
from yt_framework.operations.s3 import list_s3_files, save_s3_paths_to_table
from yt_framework.utils.env import load_secrets
from ytjobs.s3.client import S3Client

class ListS3Stage(BaseStage):
    def __init__(self, deps, logger):
        super().__init__(deps, logger)
        
        # Create S3 client
        secrets = load_secrets(self.deps.configs_dir)
        self.s3_client = S3Client.create(
            secrets=secrets,
            client_type="download",  # or "upload" for write access
        )
    
    def run(self, debug: DebugContext) -> DebugContext:
        # List files from S3
        paths = list_s3_files(
            s3_client=self.s3_client,
            bucket=self.config.client.input_bucket,
            prefix=self.config.client.input_prefix,
            logger=self.logger,
        )
        
        # Save paths to YT table
        save_s3_paths_to_table(
            yt_client=self.deps.yt_client,
            bucket=self.config.client.input_bucket,
            paths=paths,
            output_table=self.config.client.output_table,
            logger=self.logger,
        )
        
        return debug

Stage config (stages/list_s3/config.yaml):

client:
  input_bucket: my-bucket
  input_prefix: data/2024/
  output_table: //tmp/my_pipeline/s3_paths

Secrets (configs/secrets.env):

S3_ENDPOINT=https://your-s3-endpoint.com
S3_DOWNLOAD_ACCESS_KEY=your-download-access-key
S3_DOWNLOAD_SECRET_KEY=your-download-secret-key
S3_UPLOAD_ACCESS_KEY=your-upload-access-key
S3_UPLOAD_SECRET_KEY=your-upload-secret-key

See Example: 06_s3_integration for a complete example.

S3 Client Setup

Creating S3 Client

Create an S3 client in your stage's __init__ method:

from yt_framework.utils.env import load_secrets
from ytjobs.s3.client import S3Client

class MyStage(BaseStage):
    def __init__(self, deps, logger):
        super().__init__(deps, logger)
        
        # Load secrets
        secrets = load_secrets(self.deps.configs_dir)
        
        # Create S3 client
        self.s3_client = S3Client.create(
            secrets=secrets,
            client_type="download",  # or "upload"
        )

Client types:

  • "download": Read-only access (for listing and downloading)
  • "upload": Write access (for uploading files)

S3 Credentials

S3 credentials are stored in configs/secrets.env:

S3_ENDPOINT=https://your-s3-endpoint.com
S3_DOWNLOAD_ACCESS_KEY=your-download-access-key
S3_DOWNLOAD_SECRET_KEY=your-download-secret-key
S3_UPLOAD_ACCESS_KEY=your-upload-access-key
S3_UPLOAD_SECRET_KEY=your-upload-secret-key

Required fields:

  • S3_ENDPOINT: S3 endpoint URL (e.g., https://s3.example.com)
  • S3_DOWNLOAD_ACCESS_KEY: Access key for download operations
  • S3_DOWNLOAD_SECRET_KEY: Secret key for download operations
  • S3_UPLOAD_ACCESS_KEY: Access key for upload operations (optional, if different from download)
  • S3_UPLOAD_SECRET_KEY: Secret key for upload operations (optional, if different from download)

Operations

List S3 Files

List files in an S3 bucket with optional filtering:

from yt_framework.operations.s3 import list_s3_files

paths = list_s3_files(
    s3_client=self.s3_client,
    bucket="my-bucket",
    prefix="data/2024/",
    logger=self.logger,
    extension=".json",      # Optional: filter by extension
    max_files=1000,         # Optional: limit results
)

Parameters:

  • s3_client: S3Client instance
  • bucket: S3 bucket name
  • prefix: S3 prefix to filter (required, but can be empty string "" to list all files)
  • logger: Logger instance
  • extension: File extension filter (optional, e.g., ".json")
  • max_files: Maximum number of files to return (optional)

Returns: List of S3 paths (strings)

Example:

# List all JSON files in bucket with prefix
paths = list_s3_files(
    s3_client=self.s3_client,
    bucket="my-bucket",
    prefix="data/",
    extension=".json",
    logger=self.logger,
)

# List all files in bucket (no prefix filter - use empty string)
paths = list_s3_files(
    s3_client=self.s3_client,
    bucket="my-bucket",
    prefix="",  # Empty string lists all files
    logger=self.logger,
)

# List first 100 files
paths = list_s3_files(
    s3_client=self.s3_client,
    bucket="my-bucket",
    prefix="data/",
    max_files=100,
    logger=self.logger,
)

Save S3 Paths to Table

Save S3 file paths to a YT table for processing:

from yt_framework.operations.s3 import save_s3_paths_to_table

save_s3_paths_to_table(
    yt_client=self.deps.yt_client,
    bucket="my-bucket",
    paths=paths,
    output_table="//tmp/my_pipeline/s3_paths",
    logger=self.logger,
)

Parameters:

  • yt_client: YT client instance
  • bucket: S3 bucket name
  • paths: List of S3 paths (from list_s3_files)
  • output_table: YT table path to write
  • logger: Logger instance

Table schema:

The output table has the following schema:

{
    "bucket": "my-bucket",
    "path": "data/file1.json",  # S3 key (not full s3:// URL)
}

Example:

# List files
paths = list_s3_files(
    s3_client=self.s3_client,
    bucket="my-bucket",
    prefix="data/",
    logger=self.logger,
)

# Save to table
save_s3_paths_to_table(
    yt_client=self.deps.yt_client,
    bucket="my-bucket",
    paths=paths,
    output_table="//tmp/my_pipeline/s3_paths",
    logger=self.logger,
)

# Process paths from table
rows = list(self.deps.yt_client.read_table("//tmp/my_pipeline/s3_paths"))
for row in rows:
    s3_path = row["path"]
    # Process S3 file...

Complete Example

from yt_framework.core.pipeline import DebugContext
from yt_framework.core.stage import BaseStage
from yt_framework.operations.s3 import list_s3_files, save_s3_paths_to_table
from yt_framework.utils.env import load_secrets
from ytjobs.s3.client import S3Client

class ListS3Stage(BaseStage):
    def __init__(self, deps, logger):
        super().__init__(deps, logger)
        
        # Create S3 client
        secrets = load_secrets(self.deps.configs_dir)
        self.s3_client = S3Client.create(
            secrets=secrets,
            client_type="download",
        )
    
    def run(self, debug: DebugContext) -> DebugContext:
        self.logger.info("Listing files from S3...")
        
        # List files with optional filters
        paths = list_s3_files(
            s3_client=self.s3_client,
            bucket=self.config.client.input_bucket,
            prefix=self.config.client.input_prefix,
            logger=self.logger,
            extension=self.config.client.get("file_extension"),  # Optional
            max_files=self.config.client.get("max_files"),       # Optional
        )
        
        if not paths:
            self.logger.warning("No files found in S3")
            return debug
        
        self.logger.info(f"Found {len(paths)} files")
        
        # Save paths to YT table
        save_s3_paths_to_table(
            yt_client=self.deps.yt_client,
            bucket=self.config.client.input_bucket,
            paths=paths,
            output_table=self.config.client.output_table,
            logger=self.logger,
        )
        
        self.logger.info(f"Saved {len(paths)} paths to {self.config.client.output_table}")
        
        return debug

Configuration (stages/list_s3/config.yaml):

client:
  input_bucket: my-bucket
  input_prefix: data/2024/
  file_extension: .json  # Optional: filter by extension
  max_files: 1000        # Optional: limit results
  output_table: //tmp/my_pipeline/s3_paths

Processing S3 Files

After saving S3 paths to a table, you can process them in subsequent stages:

Map Operation Example

# stages/process_s3/stage.py
class ProcessS3Stage(BaseStage):
    def run(self, debug: DebugContext) -> DebugContext:
        # Read S3 paths from table
        s3_paths = list(self.deps.yt_client.read_table(
            self.config.client.s3_paths_table
        ))
        
        # Process each S3 file
        results = []
        for row in s3_paths:
            s3_path = row["path"]
            # Download and process file
            result = process_s3_file(s3_path)
            results.append(result)
        
        # Write results to output table
        self.deps.yt_client.write_table(
            self.config.client.output_table,
            results,
        )
        
        return debug

Using Map Operation

For large numbers of files, use a map operation:

# stages/process_s3/src/mapper.py
import sys
import json
import boto3
from omegaconf import OmegaConf
from ytjobs.config import get_config_path

def main():
    config = OmegaConf.load(get_config_path())
    s3_client = boto3.client('s3')
    
    for line in sys.stdin:
        row = json.loads(line)
        s3_path = row["path"]
        bucket = row["bucket"]
        key = row["key"]
        
        # Download file from S3
        response = s3_client.get_object(Bucket=bucket, Key=key)
        data = response['Body'].read()
        
        # Process data
        result = process_data(data)
        
        # Output result
        output_row = {
            "path": s3_path,
            "result": result,
        }
        print(json.dumps(output_row), flush=True)

if __name__ == "__main__":
    main()

Configuration

Stage Configuration

# stages/list_s3/config.yaml
client:
  input_bucket: my-bucket
  input_prefix: data/2024/
  file_extension: .json      # Optional
  max_files: 1000           # Optional
  output_table: //tmp/my_pipeline/s3_paths

Secrets Configuration

# configs/secrets.env
S3_ENDPOINT=https://your-s3-endpoint.com
S3_DOWNLOAD_ACCESS_KEY=your-download-access-key
S3_DOWNLOAD_SECRET_KEY=your-download-secret-key
S3_UPLOAD_ACCESS_KEY=your-upload-access-key
S3_UPLOAD_SECRET_KEY=your-upload-secret-key

Best Practices

  1. Filter early: Use prefix and extension to filter files before listing
  2. Limit results: Use max_files for large buckets
  3. Process in batches: For many files, use map operations
  4. Handle errors: Check for empty results and handle S3 errors
  5. Secure credentials: Never commit secrets to version control
  6. Use appropriate client type: Use "download" for read-only, "upload" for write

Common Patterns

List and Process

# List files
paths = list_s3_files(
    s3_client=self.s3_client,
    bucket="my-bucket",
    prefix="data/",
    logger=self.logger,
)

# Save to table
save_s3_paths_to_table(
    yt_client=self.deps.yt_client,
    bucket="my-bucket",
    paths=paths,
    output_table="//tmp/my_pipeline/s3_paths",
    logger=self.logger,
)

# Process in next stage using map operation

Filter by Extension

# List only JSON files
paths = list_s3_files(
    s3_client=self.s3_client,
    bucket="my-bucket",
    prefix="data/",
    extension=".json",
    logger=self.logger,
)

Limit Results

# List first 100 files
paths = list_s3_files(
    s3_client=self.s3_client,
    bucket="my-bucket",
    prefix="data/",
    max_files=100,
    logger=self.logger,
)

Troubleshooting

Issue: S3 client creation fails

  • Check AWS credentials in secrets.env
  • Verify credentials have S3 access
  • Check AWS region is correct

Issue: No files found

  • Verify bucket name is correct
  • Check prefix path is correct
  • Ensure files exist in S3

Issue: Permission denied

  • Check IAM permissions for S3 access
  • Verify credentials have read/list permissions
  • Check bucket policy

Issue: Table write fails

  • Verify YT table path is correct
  • Check YT credentials and permissions
  • Ensure output table path exists or can be created

Next Steps