Call run_map, run_vanilla, YQL helpers, etc. multiple times inside a single BaseStage.run() when they share one stage directory and one merged config.yaml subtree.
- Operations execute in the order you call them.
- They reuse the same
self.context/ upload bundle for that stage. - Each operation reads its own block under
client.operations.<name>.
Use separate stages instead when you want different enabled_stages ordering, different configs on disk, or clearer failure isolation.
Run a map operation followed by a vanilla validation:
from yt_framework.core.pipeline import DebugContext
from yt_framework.core.stage import BaseStage
from yt_framework.operations.command_ops.map import run_map
from yt_framework.operations.command_ops.vanilla import run_vanilla
class ProcessAndValidateStage(BaseStage):
def run(self, debug: DebugContext) -> DebugContext:
# Step 1: Process data with map operation
success = run_map(
context=self.context,
operation_config=self.config.client.operations.process,
)
if not success:
raise RuntimeError("Process operation failed")
# Step 2: Validate with vanilla operation
success = run_vanilla(
context=self.context,
operation_config=self.config.client.operations.validate,
)
if not success:
raise RuntimeError("Validate operation failed")
return debugConfiguration:
# stages/process_and_validate/config.yaml
client:
operations:
process:
input_table: //tmp/my_pipeline/input
output_table: //tmp/my_pipeline/processed
resources:
pool: default
memory_limit_gb: 4
cpu_limit: 2
validate:
resources:
pool: default
memory_limit_gb: 2
cpu_limit: 1See Example: 09_multiple_operations for complete example.
Process data with map, then validate with vanilla:
class ProcessAndValidateStage(BaseStage):
def run(self, debug: DebugContext) -> DebugContext:
# Process
success = run_map(
context=self.context,
operation_config=self.config.client.operations.process,
)
if not success:
raise RuntimeError("Process failed")
# Validate
success = run_vanilla(
context=self.context,
operation_config=self.config.client.operations.validate,
)
if not success:
raise RuntimeError("Validation failed")
return debugRun multiple map operations in sequence:
class MultiMapStage(BaseStage):
def run(self, debug: DebugContext) -> DebugContext:
# First map: Transform data
success = run_map(
context=self.context,
operation_config=self.config.client.operations.transform,
)
if not success:
raise RuntimeError("Transform failed")
# Second map: Enrich data
success = run_map(
context=self.context,
operation_config=self.config.client.operations.enrich,
)
if not success:
raise RuntimeError("Enrich failed")
# Third map: Aggregate data
success = run_map(
context=self.context,
operation_config=self.config.client.operations.aggregate,
)
if not success:
raise RuntimeError("Aggregate failed")
return debugCombine setup, processing, and cleanup:
class FullPipelineStage(BaseStage):
def run(self, debug: DebugContext) -> DebugContext:
# Setup
success = run_vanilla(
context=self.context,
operation_config=self.config.client.operations.setup,
)
if not success:
raise RuntimeError("Setup failed")
# Process
success = run_map(
context=self.context,
operation_config=self.config.client.operations.process,
)
if not success:
raise RuntimeError("Process failed")
# Cleanup
success = run_vanilla(
context=self.context,
operation_config=self.config.client.operations.cleanup,
)
if not success:
raise RuntimeError("Cleanup failed")
return debugDefine multiple operations in stage config:
# stages/multi_ops/config.yaml
client:
operations:
# First operation
process:
input_table: //tmp/my_pipeline/input
output_table: //tmp/my_pipeline/processed
resources:
pool: default
memory_limit_gb: 4
cpu_limit: 2
# Second operation
validate:
resources:
pool: default
memory_limit_gb: 2
cpu_limit: 1
# Third operation
aggregate:
input_table: //tmp/my_pipeline/processed
output_table: //tmp/my_pipeline/aggregated
resources:
pool: default
memory_limit_gb: 8
cpu_limit: 4Use descriptive names for operations:
client:
operations:
transform_data: # Clear operation name
input_table: ...
validate_results: # Clear operation name
resources: ...
aggregate_output: # Clear operation name
input_table: ...# stages/process_and_validate/stage.py
from yt_framework.core.pipeline import DebugContext
from yt_framework.core.stage import BaseStage
from yt_framework.operations.command_ops.map import run_map
from yt_framework.operations.command_ops.vanilla import run_vanilla
from yt_framework.utils.logging import log_header
class ProcessAndValidateStage(BaseStage):
def run(self, debug: DebugContext) -> DebugContext:
# Step 1: Process operation
log_header(self.logger, "Process", "Running map operation")
success = run_map(
context=self.context,
operation_config=self.config.client.operations.process,
)
if not success:
raise RuntimeError("Process operation failed")
output_table = self.config.client.operations.process.output_table
row_count = self.deps.yt_client.row_count(output_table)
self.logger.info(f"Process operation completed: {row_count} rows processed")
# Step 2: Validate operation
log_header(self.logger, "Validate", "Running vanilla operation")
success = run_vanilla(
context=self.context,
operation_config=self.config.client.operations.validate,
)
if not success:
raise RuntimeError("Validate operation failed")
self.logger.info("Validate operation completed")
return debug# stages/process_and_validate/config.yaml
job:
multiplier: 2
client:
operations:
process:
input_table: //tmp/my_pipeline/input
output_table: //tmp/my_pipeline/processed
resources:
pool: default
memory_limit_gb: 4
cpu_limit: 2
job_count: 2
validate:
resources:
pool: default
memory_limit_gb: 2
cpu_limit: 1# stages/process_and_validate/src/mapper.py
import sys
import json
from omegaconf import OmegaConf
from ytjobs.config import get_config_path
def main():
config = OmegaConf.load(get_config_path())
multiplier = config.job.multiplier
for line in sys.stdin:
row = json.loads(line)
output_row = {
"id": row["id"],
"value": row["value"] * multiplier,
}
print(json.dumps(output_row), flush=True)
if __name__ == "__main__":
main()# stages/process_and_validate/src/vanilla.py
import logging
from yt_framework.utils.env import load_secrets
from ytjobs.logging.logger import get_logger
from ytjobs.config import get_config_path
from omegaconf import OmegaConf
def main():
logger = get_logger("validate", level=logging.INFO)
config = OmegaConf.load(get_config_path())
# Validate processed data
# (In real scenario, read from table and validate)
logger.info("Validation completed successfully")
if __name__ == "__main__":
main()Check each operation:
success = run_map(...)
if not success:
raise RuntimeError("Operation failed")Provide context:
try:
success = run_map(...)
if not success:
raise RuntimeError("Map operation failed")
except Exception as e:
self.logger.error(f"Error in map operation: {e}")
raiseOrder matters:
- Operations run sequentially
- Later operations depend on earlier results
- Ensure data flow is correct
Example:
# Correct order: transform -> aggregate
run_map(transform_config) # First
run_map(aggregate_config) # Second (uses transform output)Allocate resources appropriately:
client:
operations:
process:
resources:
memory_limit_gb: 8 # More memory for processing
cpu_limit: 4
validate:
resources:
memory_limit_gb: 2 # Less memory for validation
cpu_limit: 1Log operation progress:
self.logger.info("Starting process operation...")
success = run_map(...)
self.logger.info("Process operation completed")
self.logger.info("Starting validate operation...")
success = run_vanilla(...)
self.logger.info("Validate operation completed")The examples above chain run_map and run_vanilla. The same sequential pattern applies to other entry points:
- YQL: call
join_tables_request,filter_table_request, and related helpers onself.deps.yt_clientwith request objects fromyt_framework.yt.clients.yql.yql_requests—see YQL operations. - Map-reduce / reduce: use
run_map_reduceorrun_reducefromyt_framework.operations.command_ops.map_reducewithself.contextandself.config.client.operations.*—see TypedJob map-reduce and Command mode. - Sort: use
run_sortfromyt_framework.operations.command_ops.sort—see Sort operations.
You can mix these in one run() method as long as each step’s inputs (tables, configs) match the previous output.
class PipelineStage(BaseStage):
def run(self, debug: DebugContext) -> DebugContext:
# Setup
run_vanilla(setup_config)
# Process
run_map(process_config)
# Validate
run_vanilla(validate_config)
# Cleanup
run_vanilla(cleanup_config)
return debugclass TransformChainStage(BaseStage):
def run(self, debug: DebugContext) -> DebugContext:
# Transform 1
run_map(transform1_config)
# Transform 2
run_map(transform2_config)
# Transform 3
run_map(transform3_config)
return debugclass ProcessWithValidationStage(BaseStage):
def run(self, debug: DebugContext) -> DebugContext:
# Process
run_map(process_config)
# Validate
run_vanilla(validate_config)
# If validation fails, operation raises exception
return debug- Check first operation completed successfully
- Verify output table from first operation exists
- Check table paths are correct
- Review operation logs
- Operations run sequentially in code order
- Check operation calls are in correct sequence
- Verify no parallel execution
- Ensure sufficient resources for all operations
- Check pool availability
- Review resource allocation
- Learn about Map Operations
- Explore Vanilla Operations
- Check out Example: 09_multiple_operations for complete example