From 4d39cd154ecc34c010f03090a1e3b1b4d5839b26 Mon Sep 17 00:00:00 2001 From: Michael Black <4128408+MichaelRBlack@users.noreply.github.com> Date: Sat, 4 Apr 2026 07:26:13 -0600 Subject: [PATCH] [v3-2-test] Fix OTel metrics lost in forked task processes (#64703) Reset the OTel SDK's Once() guard on _METER_PROVIDER_SET_ONCE before calling set_meter_provider() in get_otel_logger(). When a forked child process re-initializes Stats (detected via PID mismatch in stats.py), the inherited Once._done = True flag prevents the new MeterProvider from being registered. The child falls back to the parent's stale provider whose PeriodicExportingMetricReader thread is dead after fork, causing task-level metrics like ti.finish to be silently dropped. The fix resets _done and _METER_PROVIDER before each set_meter_provider() call. On first initialization (no fork), _done is already False so this is a no-op. On re-initialization after fork, it allows the new provider to be set correctly. (cherry picked from commit ff77bd2f095ae3169d04024ecf8c444c3123a973) Co-authored-by: Michael Black <4128408+MichaelRBlack@users.noreply.github.com> Closes: #64690 --- .../observability/metrics/otel_logger.py | 16 +++++++ .../observability/metrics/test_otel_logger.py | 43 +++++++++++++++++++ 2 files changed, 59 insertions(+) diff --git a/shared/observability/src/airflow_shared/observability/metrics/otel_logger.py b/shared/observability/src/airflow_shared/observability/metrics/otel_logger.py index 4b5174a3bf3ca..14726e3ecc064 100644 --- a/shared/observability/src/airflow_shared/observability/metrics/otel_logger.py +++ b/shared/observability/src/airflow_shared/observability/metrics/otel_logger.py @@ -433,6 +433,22 @@ def get_otel_logger( ) readers.append(export_to_console) + # Reset the OTel SDK's Once() guard so set_meter_provider() can succeed. + # This is necessary when get_otel_logger() is called after a process fork: + # the parent's _METER_PROVIDER_SET_ONCE._done = True is inherited by the child, + # causing set_meter_provider() to silently fail with "Overriding of current + # MeterProvider is not allowed". The child then uses the parent's stale provider + # whose PeriodicExportingMetricReader thread is dead after fork. + # On first call (no fork), _done is already False so this is a no-op. + # See: https://github.com/apache/airflow/issues/64690 + try: + import opentelemetry.metrics._internal as _metrics_internal + + _metrics_internal._METER_PROVIDER_SET_ONCE._done = False + _metrics_internal._METER_PROVIDER = None + except (ImportError, AttributeError): + pass + metrics.set_meter_provider( MeterProvider( resource=resource, diff --git a/shared/observability/tests/observability/metrics/test_otel_logger.py b/shared/observability/tests/observability/metrics/test_otel_logger.py index c27c372996968..f7b348354d7c9 100644 --- a/shared/observability/tests/observability/metrics/test_otel_logger.py +++ b/shared/observability/tests/observability/metrics/test_otel_logger.py @@ -442,7 +442,50 @@ def test_atexit_flush_on_process_exit(self): f"stderr:\n{proc.stderr}" ) + def test_reinit_after_fork_exports_metrics(self): + """Calling get_otel_logger() twice (simulating post-fork re-init) should still export metrics. + + Reproduces https://github.com/apache/airflow/issues/64690: the OTel SDK's Once() + guard on set_meter_provider() survives fork, preventing the child from setting a + fresh MeterProvider. The fix resets the guard before each set_meter_provider() call. + """ + test_module_name = "tests.observability.metrics.test_otel_logger" + function_call_str = f"import {test_module_name} as m; m.mock_service_run_reinit()" + + proc = subprocess.run( + [sys.executable, "-c", function_call_str], + check=False, + env=os.environ.copy(), + capture_output=True, + text=True, + timeout=20, + ) + + assert proc.returncode == 0, f"Process failed\nstdout:\n{proc.stdout}\nstderr:\n{proc.stderr}" + + assert "post_fork_stat" in proc.stdout, ( + "Expected 'post_fork_stat' in stdout after re-initialization but it wasn't found. " + "This suggests set_meter_provider() failed due to the Once() guard.\n" + f"stdout:\n{proc.stdout}\n" + f"stderr:\n{proc.stderr}" + ) + def mock_service_run(): logger = get_otel_logger(debug=True) logger.incr("my_test_stat") + + +def mock_service_run_reinit(): + """Simulate re-initialization after fork by calling get_otel_logger() twice. + + The first call sets the global MeterProvider and the Once() guard. + The second call simulates what happens in a forked child: stats.py detects + a PID mismatch and calls the factory again. Without the fix, the second + set_meter_provider() silently fails and the child uses a stale provider. + """ + # First init — sets Once._done = True + get_otel_logger(debug=True) + # Second init — simulates post-fork re-initialization + logger = get_otel_logger(debug=True) + logger.incr("post_fork_stat")