Skip to content

Latest commit

 

History

History
490 lines (378 loc) · 12.6 KB

File metadata and controls

490 lines (378 loc) · 12.6 KB

Multiple operations in one stage

Call run_map, run_vanilla, YQL helpers, etc. multiple times inside a single BaseStage.run() when they share one stage directory and one merged config.yaml subtree.

Overview

  • Operations execute in the order you call them.
  • They reuse the same self.context / upload bundle for that stage.
  • Each operation reads its own block under client.operations.<name>.

Use separate stages instead when you want different enabled_stages ordering, different configs on disk, or clearer failure isolation.

Quick Start

Map Then Vanilla

Run a map operation followed by a vanilla validation:

from yt_framework.core.pipeline import DebugContext
from yt_framework.core.stage import BaseStage
from yt_framework.operations.command_ops.map import run_map
from yt_framework.operations.command_ops.vanilla import run_vanilla

class ProcessAndValidateStage(BaseStage):
    def run(self, debug: DebugContext) -> DebugContext:
        # Step 1: Process data with map operation
        success = run_map(
            context=self.context,
            operation_config=self.config.client.operations.process,
        )
        if not success:
            raise RuntimeError("Process operation failed")
        
        # Step 2: Validate with vanilla operation
        success = run_vanilla(
            context=self.context,
            operation_config=self.config.client.operations.validate,
        )
        if not success:
            raise RuntimeError("Validate operation failed")
        
        return debug

Configuration:

# stages/process_and_validate/config.yaml
client:
  operations:
    process:
      input_table: //tmp/my_pipeline/input
      output_table: //tmp/my_pipeline/processed
      resources:
        pool: default
        memory_limit_gb: 4
        cpu_limit: 2
    validate:
      resources:
        pool: default
        memory_limit_gb: 2
        cpu_limit: 1

See Example: 09_multiple_operations for complete example.

Use Cases

Process and Validate

Process data with map, then validate with vanilla:

class ProcessAndValidateStage(BaseStage):
    def run(self, debug: DebugContext) -> DebugContext:
        # Process
        success = run_map(
            context=self.context,
            operation_config=self.config.client.operations.process,
        )
        if not success:
            raise RuntimeError("Process failed")
        
        # Validate
        success = run_vanilla(
            context=self.context,
            operation_config=self.config.client.operations.validate,
        )
        if not success:
            raise RuntimeError("Validation failed")
        
        return debug

Multiple Map Operations

Run multiple map operations in sequence:

class MultiMapStage(BaseStage):
    def run(self, debug: DebugContext) -> DebugContext:
        # First map: Transform data
        success = run_map(
            context=self.context,
            operation_config=self.config.client.operations.transform,
        )
        if not success:
            raise RuntimeError("Transform failed")
        
        # Second map: Enrich data
        success = run_map(
            context=self.context,
            operation_config=self.config.client.operations.enrich,
        )
        if not success:
            raise RuntimeError("Enrich failed")
        
        # Third map: Aggregate data
        success = run_map(
            context=self.context,
            operation_config=self.config.client.operations.aggregate,
        )
        if not success:
            raise RuntimeError("Aggregate failed")
        
        return debug

Setup, Process, Cleanup

Combine setup, processing, and cleanup:

class FullPipelineStage(BaseStage):
    def run(self, debug: DebugContext) -> DebugContext:
        # Setup
        success = run_vanilla(
            context=self.context,
            operation_config=self.config.client.operations.setup,
        )
        if not success:
            raise RuntimeError("Setup failed")
        
        # Process
        success = run_map(
            context=self.context,
            operation_config=self.config.client.operations.process,
        )
        if not success:
            raise RuntimeError("Process failed")
        
        # Cleanup
        success = run_vanilla(
            context=self.context,
            operation_config=self.config.client.operations.cleanup,
        )
        if not success:
            raise RuntimeError("Cleanup failed")
        
        return debug

Configuration

Multiple Operation Configs

Define multiple operations in stage config:

# stages/multi_ops/config.yaml
client:
  operations:
    # First operation
    process:
      input_table: //tmp/my_pipeline/input
      output_table: //tmp/my_pipeline/processed
      resources:
        pool: default
        memory_limit_gb: 4
        cpu_limit: 2
    
    # Second operation
    validate:
      resources:
        pool: default
        memory_limit_gb: 2
        cpu_limit: 1
    
    # Third operation
    aggregate:
      input_table: //tmp/my_pipeline/processed
      output_table: //tmp/my_pipeline/aggregated
      resources:
        pool: default
        memory_limit_gb: 8
        cpu_limit: 4

Operation Naming

Use descriptive names for operations:

client:
  operations:
    transform_data:      # Clear operation name
      input_table: ...
    validate_results:    # Clear operation name
      resources: ...
    aggregate_output:    # Clear operation name
      input_table: ...

Complete Example

Stage Code

# stages/process_and_validate/stage.py
from yt_framework.core.pipeline import DebugContext
from yt_framework.core.stage import BaseStage
from yt_framework.operations.command_ops.map import run_map
from yt_framework.operations.command_ops.vanilla import run_vanilla
from yt_framework.utils.logging import log_header

class ProcessAndValidateStage(BaseStage):
    def run(self, debug: DebugContext) -> DebugContext:
        # Step 1: Process operation
        log_header(self.logger, "Process", "Running map operation")
        
        success = run_map(
            context=self.context,
            operation_config=self.config.client.operations.process,
        )
        if not success:
            raise RuntimeError("Process operation failed")
        
        output_table = self.config.client.operations.process.output_table
        row_count = self.deps.yt_client.row_count(output_table)
        self.logger.info(f"Process operation completed: {row_count} rows processed")
        
        # Step 2: Validate operation
        log_header(self.logger, "Validate", "Running vanilla operation")
        
        success = run_vanilla(
            context=self.context,
            operation_config=self.config.client.operations.validate,
        )
        if not success:
            raise RuntimeError("Validate operation failed")
        
        self.logger.info("Validate operation completed")
        
        return debug

Stage Configuration

# stages/process_and_validate/config.yaml
job:
  multiplier: 2

client:
  operations:
    process:
      input_table: //tmp/my_pipeline/input
      output_table: //tmp/my_pipeline/processed
      resources:
        pool: default
        memory_limit_gb: 4
        cpu_limit: 2
        job_count: 2
    
    validate:
      resources:
        pool: default
        memory_limit_gb: 2
        cpu_limit: 1

Mapper Script

# stages/process_and_validate/src/mapper.py
import sys
import json
from omegaconf import OmegaConf
from ytjobs.config import get_config_path

def main():
    config = OmegaConf.load(get_config_path())
    multiplier = config.job.multiplier
    
    for line in sys.stdin:
        row = json.loads(line)
        output_row = {
            "id": row["id"],
            "value": row["value"] * multiplier,
        }
        print(json.dumps(output_row), flush=True)

if __name__ == "__main__":
    main()

Vanilla Script

# stages/process_and_validate/src/vanilla.py
import logging
from yt_framework.utils.env import load_secrets
from ytjobs.logging.logger import get_logger
from ytjobs.config import get_config_path
from omegaconf import OmegaConf

def main():
    logger = get_logger("validate", level=logging.INFO)
    config = OmegaConf.load(get_config_path())
    
    # Validate processed data
    # (In real scenario, read from table and validate)
    logger.info("Validation completed successfully")

if __name__ == "__main__":
    main()

Best Practices

Error Handling

Check each operation:

success = run_map(...)
if not success:
    raise RuntimeError("Operation failed")

Provide context:

try:
    success = run_map(...)
    if not success:
        raise RuntimeError("Map operation failed")
except Exception as e:
    self.logger.error(f"Error in map operation: {e}")
    raise

Operation Ordering

Order matters:

  • Operations run sequentially
  • Later operations depend on earlier results
  • Ensure data flow is correct

Example:

# Correct order: transform -> aggregate
run_map(transform_config)  # First
run_map(aggregate_config)   # Second (uses transform output)

Resource Management

Allocate resources appropriately:

client:
  operations:
    process:
      resources:
        memory_limit_gb: 8  # More memory for processing
        cpu_limit: 4
    validate:
      resources:
        memory_limit_gb: 2  # Less memory for validation
        cpu_limit: 1

Logging

Log operation progress:

self.logger.info("Starting process operation...")
success = run_map(...)
self.logger.info("Process operation completed")

self.logger.info("Starting validate operation...")
success = run_vanilla(...)
self.logger.info("Validate operation completed")

YQL, map-reduce, and sort

The examples above chain run_map and run_vanilla. The same sequential pattern applies to other entry points:

  • YQL: call join_tables_request, filter_table_request, and related helpers on self.deps.yt_client with request objects from yt_framework.yt.clients.yql.yql_requests—see YQL operations.
  • Map-reduce / reduce: use run_map_reduce or run_reduce from yt_framework.operations.command_ops.map_reduce with self.context and self.config.client.operations.*—see TypedJob map-reduce and Command mode.
  • Sort: use run_sort from yt_framework.operations.command_ops.sort—see Sort operations.

You can mix these in one run() method as long as each step’s inputs (tables, configs) match the previous output.

Common Patterns

Pipeline Pattern

class PipelineStage(BaseStage):
    def run(self, debug: DebugContext) -> DebugContext:
        # Setup
        run_vanilla(setup_config)
        
        # Process
        run_map(process_config)
        
        # Validate
        run_vanilla(validate_config)
        
        # Cleanup
        run_vanilla(cleanup_config)
        
        return debug

Transform Chain Pattern

class TransformChainStage(BaseStage):
    def run(self, debug: DebugContext) -> DebugContext:
        # Transform 1
        run_map(transform1_config)
        
        # Transform 2
        run_map(transform2_config)
        
        # Transform 3
        run_map(transform3_config)
        
        return debug

Validation Pattern

class ProcessWithValidationStage(BaseStage):
    def run(self, debug: DebugContext) -> DebugContext:
        # Process
        run_map(process_config)
        
        # Validate
        run_vanilla(validate_config)
        
        # If validation fails, operation raises exception
        return debug

Troubleshooting

Issue: Second operation fails

  • Check first operation completed successfully
  • Verify output table from first operation exists
  • Check table paths are correct
  • Review operation logs

Issue: Operations run out of order

  • Operations run sequentially in code order
  • Check operation calls are in correct sequence
  • Verify no parallel execution

Issue: Resource conflicts

  • Ensure sufficient resources for all operations
  • Check pool availability
  • Review resource allocation

Next Steps