Driver-side helpers list buckets or prefixes and often write resulting paths into YT tables for later map/YQL stages. Inside mappers you typically construct ytjobs.s3.client.S3Client from secrets.
Job-side `S3Client` lives in `ytjobs`; see [YT jobs (`ytjobs`)](../reference/ytjobs.md).
Common workflow:
- List or filter object keys under a prefix.
- Persist key list (or small metadata) to a Cypress table.
- Downstream stages read that table.
Credentials come from load_secrets(self.deps.configs_dir) and the S3_* keys documented in Secrets.
Stage (stages/list_s3/stage.py):
from yt_framework.core.pipeline import DebugContext
from yt_framework.core.stage import BaseStage
from yt_framework.operations.s3 import list_s3_files, save_s3_paths_to_table
from yt_framework.utils.env import load_secrets
from ytjobs.s3.client import S3Client
class ListS3Stage(BaseStage):
def __init__(self, deps, logger):
super().__init__(deps, logger)
# Create S3 client
secrets = load_secrets(self.deps.configs_dir)
self.s3_client = S3Client.create(
secrets=secrets,
client_type="download", # or "upload" for write access
)
def run(self, debug: DebugContext) -> DebugContext:
# List files from S3
paths = list_s3_files(
s3_client=self.s3_client,
bucket=self.config.client.input_bucket,
prefix=self.config.client.input_prefix,
logger=self.logger,
)
# Save paths to YT table
save_s3_paths_to_table(
yt_client=self.deps.yt_client,
bucket=self.config.client.input_bucket,
paths=paths,
output_table=self.config.client.output_table,
logger=self.logger,
)
return debugStage config (stages/list_s3/config.yaml):
client:
input_bucket: my-bucket
input_prefix: data/2024/
output_table: //tmp/my_pipeline/s3_pathsSecrets (configs/secrets.env):
S3_ENDPOINT=https://your-s3-endpoint.com
S3_DOWNLOAD_ACCESS_KEY=your-download-access-key
S3_DOWNLOAD_SECRET_KEY=your-download-secret-key
S3_UPLOAD_ACCESS_KEY=your-upload-access-key
S3_UPLOAD_SECRET_KEY=your-upload-secret-keySee Example: 06_s3_integration for a complete example.
Create an S3 client in your stage's __init__ method:
from yt_framework.utils.env import load_secrets
from ytjobs.s3.client import S3Client
class MyStage(BaseStage):
def __init__(self, deps, logger):
super().__init__(deps, logger)
# Load secrets
secrets = load_secrets(self.deps.configs_dir)
# Create S3 client
self.s3_client = S3Client.create(
secrets=secrets,
client_type="download", # or "upload"
)Client types:
"download": Read-only access (for listing and downloading)"upload": Write access (for uploading files)
S3 credentials are stored in configs/secrets.env:
S3_ENDPOINT=https://your-s3-endpoint.com
S3_DOWNLOAD_ACCESS_KEY=your-download-access-key
S3_DOWNLOAD_SECRET_KEY=your-download-secret-key
S3_UPLOAD_ACCESS_KEY=your-upload-access-key
S3_UPLOAD_SECRET_KEY=your-upload-secret-keyRequired fields:
S3_ENDPOINT: S3 endpoint URL (e.g.,https://s3.example.com)S3_DOWNLOAD_ACCESS_KEY: Access key for download operationsS3_DOWNLOAD_SECRET_KEY: Secret key for download operationsS3_UPLOAD_ACCESS_KEY: Access key for upload operations (optional, if different from download)S3_UPLOAD_SECRET_KEY: Secret key for upload operations (optional, if different from download)
List files in an S3 bucket with optional filtering:
from yt_framework.operations.s3 import list_s3_files
paths = list_s3_files(
s3_client=self.s3_client,
bucket="my-bucket",
prefix="data/2024/",
logger=self.logger,
extension=".json", # Optional: filter by extension
max_files=1000, # Optional: limit results
)Parameters:
s3_client: S3Client instancebucket: S3 bucket nameprefix: S3 prefix to filter (required, but can be empty string""to list all files)logger: Logger instanceextension: File extension filter (optional, e.g.,".json")max_files: Maximum number of files to return (optional)
Returns: List of S3 paths (strings)
Example:
# List all JSON files in bucket with prefix
paths = list_s3_files(
s3_client=self.s3_client,
bucket="my-bucket",
prefix="data/",
extension=".json",
logger=self.logger,
)
# List all files in bucket (no prefix filter - use empty string)
paths = list_s3_files(
s3_client=self.s3_client,
bucket="my-bucket",
prefix="", # Empty string lists all files
logger=self.logger,
)
# List first 100 files
paths = list_s3_files(
s3_client=self.s3_client,
bucket="my-bucket",
prefix="data/",
max_files=100,
logger=self.logger,
)Save S3 file paths to a YT table for processing:
from yt_framework.operations.s3 import save_s3_paths_to_table
save_s3_paths_to_table(
yt_client=self.deps.yt_client,
bucket="my-bucket",
paths=paths,
output_table="//tmp/my_pipeline/s3_paths",
logger=self.logger,
)Parameters:
yt_client: YT client instancebucket: S3 bucket namepaths: List of S3 paths (fromlist_s3_files)output_table: YT table path to writelogger: Logger instance
Table schema:
The output table has the following schema:
{
"bucket": "my-bucket",
"path": "data/file1.json", # S3 key (not full s3:// URL)
}Example:
# List files
paths = list_s3_files(
s3_client=self.s3_client,
bucket="my-bucket",
prefix="data/",
logger=self.logger,
)
# Save to table
save_s3_paths_to_table(
yt_client=self.deps.yt_client,
bucket="my-bucket",
paths=paths,
output_table="//tmp/my_pipeline/s3_paths",
logger=self.logger,
)
# Process paths from table
rows = list(self.deps.yt_client.read_table("//tmp/my_pipeline/s3_paths"))
for row in rows:
s3_path = row["path"]
# Process S3 file...from yt_framework.core.pipeline import DebugContext
from yt_framework.core.stage import BaseStage
from yt_framework.operations.s3 import list_s3_files, save_s3_paths_to_table
from yt_framework.utils.env import load_secrets
from ytjobs.s3.client import S3Client
class ListS3Stage(BaseStage):
def __init__(self, deps, logger):
super().__init__(deps, logger)
# Create S3 client
secrets = load_secrets(self.deps.configs_dir)
self.s3_client = S3Client.create(
secrets=secrets,
client_type="download",
)
def run(self, debug: DebugContext) -> DebugContext:
self.logger.info("Listing files from S3...")
# List files with optional filters
paths = list_s3_files(
s3_client=self.s3_client,
bucket=self.config.client.input_bucket,
prefix=self.config.client.input_prefix,
logger=self.logger,
extension=self.config.client.get("file_extension"), # Optional
max_files=self.config.client.get("max_files"), # Optional
)
if not paths:
self.logger.warning("No files found in S3")
return debug
self.logger.info(f"Found {len(paths)} files")
# Save paths to YT table
save_s3_paths_to_table(
yt_client=self.deps.yt_client,
bucket=self.config.client.input_bucket,
paths=paths,
output_table=self.config.client.output_table,
logger=self.logger,
)
self.logger.info(f"Saved {len(paths)} paths to {self.config.client.output_table}")
return debugConfiguration (stages/list_s3/config.yaml):
client:
input_bucket: my-bucket
input_prefix: data/2024/
file_extension: .json # Optional: filter by extension
max_files: 1000 # Optional: limit results
output_table: //tmp/my_pipeline/s3_pathsAfter saving S3 paths to a table, you can process them in subsequent stages:
# stages/process_s3/stage.py
class ProcessS3Stage(BaseStage):
def run(self, debug: DebugContext) -> DebugContext:
# Read S3 paths from table
s3_paths = list(self.deps.yt_client.read_table(
self.config.client.s3_paths_table
))
# Process each S3 file
results = []
for row in s3_paths:
s3_path = row["path"]
# Download and process file
result = process_s3_file(s3_path)
results.append(result)
# Write results to output table
self.deps.yt_client.write_table(
self.config.client.output_table,
results,
)
return debugFor large numbers of files, use a map operation:
# stages/process_s3/src/mapper.py
import sys
import json
import boto3
from omegaconf import OmegaConf
from ytjobs.config import get_config_path
def main():
config = OmegaConf.load(get_config_path())
s3_client = boto3.client('s3')
for line in sys.stdin:
row = json.loads(line)
s3_path = row["path"]
bucket = row["bucket"]
key = row["key"]
# Download file from S3
response = s3_client.get_object(Bucket=bucket, Key=key)
data = response['Body'].read()
# Process data
result = process_data(data)
# Output result
output_row = {
"path": s3_path,
"result": result,
}
print(json.dumps(output_row), flush=True)
if __name__ == "__main__":
main()# stages/list_s3/config.yaml
client:
input_bucket: my-bucket
input_prefix: data/2024/
file_extension: .json # Optional
max_files: 1000 # Optional
output_table: //tmp/my_pipeline/s3_paths# configs/secrets.env
S3_ENDPOINT=https://your-s3-endpoint.com
S3_DOWNLOAD_ACCESS_KEY=your-download-access-key
S3_DOWNLOAD_SECRET_KEY=your-download-secret-key
S3_UPLOAD_ACCESS_KEY=your-upload-access-key
S3_UPLOAD_SECRET_KEY=your-upload-secret-key- Filter early: Use
prefixandextensionto filter files before listing - Limit results: Use
max_filesfor large buckets - Process in batches: For many files, use map operations
- Handle errors: Check for empty results and handle S3 errors
- Secure credentials: Never commit secrets to version control
- Use appropriate client type: Use
"download"for read-only,"upload"for write
# List files
paths = list_s3_files(
s3_client=self.s3_client,
bucket="my-bucket",
prefix="data/",
logger=self.logger,
)
# Save to table
save_s3_paths_to_table(
yt_client=self.deps.yt_client,
bucket="my-bucket",
paths=paths,
output_table="//tmp/my_pipeline/s3_paths",
logger=self.logger,
)
# Process in next stage using map operation# List only JSON files
paths = list_s3_files(
s3_client=self.s3_client,
bucket="my-bucket",
prefix="data/",
extension=".json",
logger=self.logger,
)# List first 100 files
paths = list_s3_files(
s3_client=self.s3_client,
bucket="my-bucket",
prefix="data/",
max_files=100,
logger=self.logger,
)- Check AWS credentials in
secrets.env - Verify credentials have S3 access
- Check AWS region is correct
- Verify bucket name is correct
- Check prefix path is correct
- Ensure files exist in S3
- Check IAM permissions for S3 access
- Verify credentials have read/list permissions
- Check bucket policy
- Verify YT table path is correct
- Check YT credentials and permissions
- Ensure output table path exists or can be created
- Learn about Map Operations for processing S3 files
- Explore Configuration for secrets management
- Check out Example: 06_s3_integration for complete example