A map job reads an input table, runs your mapper.py once per row (grouped into YT tasks in prod), and writes JSON lines to an output table. Use it when the transform is row-local in Python and does not fit YQL cleanly.
**When map fits**
Reach for map when each output row (or zero rows) depends only on one input row and you need arbitrary Python. Prefer YQL for set-style SQL operations.
Behavior:
- Stdin/stdout JSON lines (one object per line); flush after each printed row.
- Prod: many tasks; dev: a single local subprocess and a sandbox under
.dev/. - Requires
input_table,output_table, andresourcesin YAML.
**Mapper Script Requirements**
Your mapper script must read from stdin and write to stdout. Each line is a JSON-encoded row. Make sure to flush output after each row.
Stage (stages/process_data/stage.py):
from yt_framework.core.pipeline import DebugContext
from yt_framework.core.stage import BaseStage
from yt_framework.operations.command_ops.map import run_map
class ProcessDataStage(BaseStage):
def run(self, debug: DebugContext) -> DebugContext:
success = run_map(
context=self.context,
operation_config=self.config.client.operations.map,
)
if not success:
raise RuntimeError("Map operation failed")
return debugStage config (stages/process_data/config.yaml):
client:
operations:
map:
input_table: //tmp/my_pipeline/input
output_table: //tmp/my_pipeline/output
resources:
pool: default
memory_limit_gb: 4
cpu_limit: 2
job_count: 2Mapper script (stages/process_data/src/mapper.py):
#!/usr/bin/env python3
import sys
import json
from omegaconf import OmegaConf
from ytjobs.config import get_config_path
def main():
# Load configuration
config = OmegaConf.load(get_config_path())
# Process each input row
for line in sys.stdin:
row = json.loads(line)
# Transform row
output_row = {
"id": row["id"],
"processed_value": row["value"] * 2
}
# Write output row
print(json.dumps(output_row), flush=True)
if __name__ == "__main__":
main()See Example: 04_map_operation for a complete example.
(append-output)=
Use append: true under client.operations.map when mapper rows should be appended to an existing output table rather than replacing it.
On the cluster, the output table must already exist and incoming rows must match its schema (including typed columns). In dev mode, if the output .jsonl already exists, mapper stdout is appended after the current lines.
client:
operations:
map:
input_table: //tmp/my_pipeline/input
output_table: //tmp/my_pipeline/output
append: true
resources:
pool: defaultThe mapper script (src/mapper.py) is executed for each row of the input table.
- Input: One JSON object per line via stdin
- Output: One JSON object per line to stdout
Example:
import sys
import json
def process_row(row: dict) -> dict:
return row # replace with your transform
for line in sys.stdin:
row = json.loads(line)
processed = process_row(row)
print(json.dumps(processed), flush=True)Access stage configuration in mapper script:
from omegaconf import OmegaConf
from ytjobs.config import get_config_path
config = OmegaConf.load(get_config_path())
# Access job config
multiplier = config.job.multiplier
prefix = config.job.prefix
# Access client config (read-only)
input_table = config.client.operations.map.input_tableStage config (stages/my_stage/config.yaml):
job:
multiplier: 2
prefix: "processed_"
client:
operations:
map:
input_table: //tmp/my_pipeline/input
output_table: //tmp/my_pipeline/outputHandle errors gracefully:
import sys
import json
import traceback
for line in sys.stdin:
try:
row = json.loads(line)
output_row = process_row(row)
print(json.dumps(output_row), flush=True)
except Exception as e:
# Log error and skip row
print(f"Error processing row: {e}", file=sys.stderr)
traceback.print_exc(file=sys.stderr)
continueImportant: Failed jobs will cause the operation to fail if max_failed_job_count is exceeded.
Use YT logging utilities:
import logging
from ytjobs.logging.logger import get_logger
logger = get_logger("mapper", level=logging.INFO)
for line in sys.stdin:
row = json.loads(line)
logger.info(f"Processing row {row['id']}")
# Process row...Logs appear in YT operation logs (prod mode) or .dev/ directory (dev mode).
client:
operations:
map:
input_table: //tmp/my_pipeline/input
output_table: //tmp/my_pipeline/output
resources:
pool: default
memory_limit_gb: 4
cpu_limit: 2
job_count: 2input_table: YT path to input tableoutput_table: YT path to output tableresources: Resource configuration
resources:
pool: default # YT pool name
pool_tree: null # Optional: pool tree
memory_limit_gb: 4 # Memory per job (GB)
cpu_limit: 2 # CPU cores per job
job_count: 2 # Number of parallel jobs
gpu_limit: 0 # GPU count (default: 0)
user_slots: null # Optional: user slots limitResources (rule of thumb):
- Raise
memory_limit_gbwhen a single row plus model weights no longer fits. cpu_limithelps per-task throughput;job_countspreads rows across tasks.gpu_limit> 0 only works with an image that actually exposes GPUs to Python.
Max failed jobs:
client:
operations:
map:
input_table: //tmp/my_pipeline/input
output_table: //tmp/my_pipeline/output
max_failed_job_count: 1 # Fail operation after N failed jobs
resources:
# ...Max row weight:
max_row_weight defaults to 128M for map operations (that is also the maximum the cluster accepts). Override it per operation with a value at or below 128M:
client:
operations:
map:
input_table: //tmp/my_pipeline/input
output_table: //tmp/my_pipeline/output
max_row_weight: 64M
resources:
# ...Custom Docker:
client:
operations:
map:
input_table: //tmp/my_pipeline/input
output_table: //tmp/my_pipeline/output
resources:
docker_image: my-registry/my-image:latest
# ...See Docker Guide for details.
Checkpoints:
client:
operations:
map:
input_table: //tmp/my_pipeline/input
output_table: //tmp/my_pipeline/output
checkpoint:
checkpoint_base: //tmp/my_pipeline/checkpoints
local_checkpoint_path: /path/to/local/model.pth
resources:
# ...See Checkpoints Guide for details.
Use run_map() function:
from yt_framework.operations.command_ops.map import run_map
success = run_map(
context=self.context,
operation_config=self.config.client.operations.map,
)- Code upload: Code is packaged and uploaded to YT (prod mode)
- Job creation: YT creates multiple jobs based on
job_count - Row distribution: Input table rows are distributed across jobs
- Execution: Each job runs mapper.py for its assigned rows
- Output collection: Results are written to output table
- Completion: Operation completes when all jobs finish
In dev mode:
- Runs sequentially (single job)
- Creates sandbox directory:
.dev/sandbox_<input>-><output>/ - Input table copied to sandbox
- Mapper script executed locally
- Output written to
.dev/<output>.jsonl
Run multiple map operations in one stage:
class ProcessAndValidateStage(BaseStage):
def run(self, debug: DebugContext) -> DebugContext:
# First map operation
success = run_map(
context=self.context,
operation_config=self.config.client.operations.process,
)
if not success:
raise RuntimeError("Process failed")
# Second map operation
success = run_map(
context=self.context,
operation_config=self.config.client.operations.validate,
)
if not success:
raise RuntimeError("Validate failed")
return debugSee Example: 09_multiple_operations for details.
For GPU workloads:
- Custom Docker: Create Docker image with GPU support
- GPU resources: Set
gpu_limit: 1or higher - GPU code: Use GPU libraries in mapper script
Example:
client:
operations:
map:
resources:
docker_image: my-registry/gpu-image:latest
gpu_limit: 1
memory_limit_gb: 16See Example: video_gpu for GPU processing example.
Load ML models from checkpoints:
import os
from ytjobs.config import get_config_path
from omegaconf import OmegaConf
# Checkpoint file is available in sandbox
checkpoint_file = os.environ.get("CHECKPOINT_FILE")
if checkpoint_file:
# Load model from checkpoint
model = load_model(checkpoint_file)See Checkpoints Guide for details.
- Keep transforms deterministic for the same input row.
- Decide whether a bad row should skip, fail the task, or poison the whole op (
max_failed_job_count). - Size memory from peak RSS you observe in dev, not from guesses.
- Log row ids sparingly; high-volume logs hurt both dev and prod.
- Run dev mode on a slice of production schema before widening
job_countin prod.
for line in sys.stdin:
row = json.loads(line)
output_row = {
"id": row["id"],
"transformed": transform(row["data"])
}
print(json.dumps(output_row), flush=True)for line in sys.stdin:
row = json.loads(line)
if should_include(row):
print(json.dumps(row), flush=True)for line in sys.stdin:
row = json.loads(line)
enriched = {
**row,
"computed_field": compute(row)
}
print(json.dumps(enriched), flush=True)| Symptom | Checks |
|---|---|
| Op fails at start | Mapper entrypoint path, import errors in bundle, missing input table |
| Sparse row failures | max_failed_job_count, stderr in failing task |
| Slow wall clock | job_count too low for data size, CPU-bound Python, remote I/O inside mapper |
| OOM kills | Raise memory_limit_gb, shrink per-row allocations, stream instead of buffering |