The Kindling Framework provides comprehensive logging and distributed tracing capabilities to enable full observability of data pipelines. These features help with debugging, performance monitoring, and operational insights. Tracing semantics have been intentionally similar to the OpenTelemetry framework. Logging semantics have been designed to be semantically similar to log4j.
The logging system in Kindling is built on top of Spark's native logging capabilities, with extensions for structured logging and context tracking.
Abstract interface for obtaining loggers.
class PythonLoggerProvider(ABC):
@abstractmethod
def get_logger(self, name: str):
"""Get a logger instance with the specified name"""
passDefault implementation of PythonLoggerProvider that creates SparkLogger instances.
@GlobalInjector.singleton_autobind()
class SparkLoggerProvider(PythonLoggerProvider):
def get_logger(self, name: str):
return SparkLogger(name)A wrapper around Spark's Log4j logger with additional functionality:
- Log level filtering
- Structured log formatting
- Trace context inclusion
- Pattern-based logging
class SparkLogger:
def __init__(self, name: str, baselogger = None):
# Initialize logger with name
pass
def debug(self, msg: str):
# Log at debug level
pass
def info(self, msg: str):
# Log at info level
pass
def warn(self, msg: str):
# Log at warning level
pass
def error(self, msg: str):
# Log at error level
pass
def with_pattern(self, pattern: str):
# Set custom logging pattern
pass# Get a logger through dependency injection
@inject
def __init__(self, lp: PythonLoggerProvider):
self.logger = lp.get_logger("my_component")
# Log at different levels
self.logger.debug("Detailed diagnostic information")
self.logger.info("Normal operational information")
self.logger.warn("Warning condition")
self.logger.error("Error condition")# Create a logger with custom pattern
logger = lp.get_logger("custom_logger").with_pattern(
"%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m"
)The tracing system provides distributed tracing capabilities to track operations across multiple components, including capturing timing, errors, and context information.
The tracing system is built upon the existing Spark event infrastructure using a custom event emitter. Events are emitted based on the creation of Microsoft's custom event class that is part of the Fabric and Synapse spark runtime and passing the custom event to the underlying Spark emitter.
class CustomEventEmitter(ABC):
@abstractmethod
def emit_custom_event(self,
component: str,
operation: str,
details: dict,
eventId: str,
traceId: uuid ) -> None:
passDefault implementation of CustomEventEmitter based on Microsoft Synapse runtime components that emits to Spark's event ingestion.
@GlobalInjector.singleton_autobind()
class AzureEventEmitter(CustomEventEmitter):
# Implementation of event emission
passProvider for creating and managing trace spans.
class SparkTraceProvider(ABC):
@abstractmethod
def span(self, component: str, operation: str, details: Optional[Dict] = None, reraise: bool = False):
"""Create a new trace span"""
pass
@abstractmethod
def get_current_trace_id(self) -> Optional[str]:
"""Get the current trace ID"""
passRepresents a single operation within a trace.
@dataclass
class SparkSpan:
component: str
operation: str
details: Dict = field(default_factory=dict)
parent_id: Optional[str] = None
span_id: str = field(default_factory=lambda: str(uuid.uuid4()))
start_time: datetime = field(default_factory=datetime.now)
end_time: Optional[datetime] = None
error: Optional[Exception] = None# Get the trace provider
trace_provider = GlobalInjector.get(SparkTraceProvider)
# Create a span around an operation
with trace_provider.span(
component="DataTransformation",
operation="ValidateCustomerData",
details={"source": "CRM", "records": 1000}
) as span:
# Your operation code here
# Span automatically captures timing and errors
passwith trace_provider.span(component="Pipeline", operation="ProcessData") as outer_span:
# Do some initial work
with trace_provider.span(component="Validation", operation="ValidateSchema") as inner_span:
# Validation logic
pass
with trace_provider.span(component="Transformation", operation="CleanData") as inner_span:
# Transformation logic
passThe framework provides an MDC (Mapped Diagnostic Context) feature for enriching logs with trace context:
with mdc_context(
component="DataLoader",
operation="LoadCustomers",
trace_id=str(uuid.uuid4())
):
# All logs within this context will include these attributes
logger.info("Loading customer data")Configure log levels through Spark configuration:
spark.sparkContext.setLogLevel("INFO") # Set Spark's log level
# Or through the framework's configuration system
config.set("LOG_LEVEL", "DEBUG")-
Consistent Component Names: Use consistent naming conventions for components and operations.
-
Appropriate Log Levels: Use the appropriate log level for each message:
- DEBUG: Detailed diagnostic information
- INFO: Normal operational messages
- WARN: Warning conditions
- ERROR: Error conditions
-
Structured Details: Include structured details in traces for easier analysis:
details = { "entity_id": entity.entityid, "record_count": df.count(), "execution_id": execution_id }
-
Trace Correlation: Use trace IDs to correlate operations across components.
-
Error Handling: Ensure errors are properly captured in traces:
with trace_provider.span(component="Processing", operation="TransformData", reraise=True): # With reraise=True, exceptions will be captured in the trace and then re-thrown