Skip to content

Commit dd7e2a6

Browse files
[v3-2-test] Fix OTel metrics lost in forked task processes (#64703) (#64720)
Reset the OTel SDK's Once() guard on _METER_PROVIDER_SET_ONCE before calling set_meter_provider() in get_otel_logger(). When a forked child process re-initializes Stats (detected via PID mismatch in stats.py), the inherited Once._done = True flag prevents the new MeterProvider from being registered. The child falls back to the parent's stale provider whose PeriodicExportingMetricReader thread is dead after fork, causing task-level metrics like ti.finish to be silently dropped. The fix resets _done and _METER_PROVIDER before each set_meter_provider() call. On first initialization (no fork), _done is already False so this is a no-op. On re-initialization after fork, it allows the new provider to be set correctly. (cherry picked from commit ff77bd2) Closes: #64690 Co-authored-by: Michael Black <4128408+MichaelRBlack@users.noreply.github.com>
1 parent 9765bfc commit dd7e2a6

File tree

2 files changed

+59
-0
lines changed

2 files changed

+59
-0
lines changed

shared/observability/src/airflow_shared/observability/metrics/otel_logger.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -433,6 +433,22 @@ def get_otel_logger(
433433
)
434434
readers.append(export_to_console)
435435

436+
# Reset the OTel SDK's Once() guard so set_meter_provider() can succeed.
437+
# This is necessary when get_otel_logger() is called after a process fork:
438+
# the parent's _METER_PROVIDER_SET_ONCE._done = True is inherited by the child,
439+
# causing set_meter_provider() to silently fail with "Overriding of current
440+
# MeterProvider is not allowed". The child then uses the parent's stale provider
441+
# whose PeriodicExportingMetricReader thread is dead after fork.
442+
# On first call (no fork), _done is already False so this is a no-op.
443+
# See: https://github.com/apache/airflow/issues/64690
444+
try:
445+
import opentelemetry.metrics._internal as _metrics_internal
446+
447+
_metrics_internal._METER_PROVIDER_SET_ONCE._done = False
448+
_metrics_internal._METER_PROVIDER = None
449+
except (ImportError, AttributeError):
450+
pass
451+
436452
metrics.set_meter_provider(
437453
MeterProvider(
438454
resource=resource,

shared/observability/tests/observability/metrics/test_otel_logger.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -442,7 +442,50 @@ def test_atexit_flush_on_process_exit(self):
442442
f"stderr:\n{proc.stderr}"
443443
)
444444

445+
def test_reinit_after_fork_exports_metrics(self):
446+
"""Calling get_otel_logger() twice (simulating post-fork re-init) should still export metrics.
447+
448+
Reproduces https://github.com/apache/airflow/issues/64690: the OTel SDK's Once()
449+
guard on set_meter_provider() survives fork, preventing the child from setting a
450+
fresh MeterProvider. The fix resets the guard before each set_meter_provider() call.
451+
"""
452+
test_module_name = "tests.observability.metrics.test_otel_logger"
453+
function_call_str = f"import {test_module_name} as m; m.mock_service_run_reinit()"
454+
455+
proc = subprocess.run(
456+
[sys.executable, "-c", function_call_str],
457+
check=False,
458+
env=os.environ.copy(),
459+
capture_output=True,
460+
text=True,
461+
timeout=20,
462+
)
463+
464+
assert proc.returncode == 0, f"Process failed\nstdout:\n{proc.stdout}\nstderr:\n{proc.stderr}"
465+
466+
assert "post_fork_stat" in proc.stdout, (
467+
"Expected 'post_fork_stat' in stdout after re-initialization but it wasn't found. "
468+
"This suggests set_meter_provider() failed due to the Once() guard.\n"
469+
f"stdout:\n{proc.stdout}\n"
470+
f"stderr:\n{proc.stderr}"
471+
)
472+
445473

446474
def mock_service_run():
447475
logger = get_otel_logger(debug=True)
448476
logger.incr("my_test_stat")
477+
478+
479+
def mock_service_run_reinit():
480+
"""Simulate re-initialization after fork by calling get_otel_logger() twice.
481+
482+
The first call sets the global MeterProvider and the Once() guard.
483+
The second call simulates what happens in a forked child: stats.py detects
484+
a PID mismatch and calls the factory again. Without the fix, the second
485+
set_meter_provider() silently fails and the child uses a stale provider.
486+
"""
487+
# First init — sets Once._done = True
488+
get_otel_logger(debug=True)
489+
# Second init — simulates post-fork re-initialization
490+
logger = get_otel_logger(debug=True)
491+
logger.incr("post_fork_stat")

0 commit comments

Comments
 (0)