Skip to content

Commit 70b6fff

Browse files
committed
fix(sdk): regenerate service.instance.id post-fork in MeterProvider and TracerProvider
When gunicorn (or any prefork server) forks workers, all workers inherit the same Resource from the master process, including the same service.instance.id. The SDK already restarts background threads post-fork (PeriodicExportingMetricReader, BatchProcessor) but never updates the resource identity. This causes metric collisions in OTLP backends where multiple workers exporting with the same resource identity result in incorrect aggregation instead of correct summation. Register an os.register_at_fork(after_in_child=...) hook on both MeterProvider and TracerProvider that replaces service.instance.id with a fresh UUID in each forked worker, ensuring distinct resource identities without any user configuration. Resource.merge() is used so all other resource attributes are preserved. WeakMethod is used for the hook reference, consistent with the existing pattern in PeriodicExportingMetricReader and BatchProcessor. Fixes: #4390 Related: #3885
1 parent 28b6852 commit 70b6fff

File tree

4 files changed

+289
-0
lines changed

4 files changed

+289
-0
lines changed

opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,12 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
import os
1516
import weakref
1617
from atexit import register, unregister
1718
from logging import getLogger
1819
from os import environ
20+
from uuid import uuid4
1921
from threading import Lock
2022
from time import time_ns
2123
from typing import Optional, Sequence
@@ -456,6 +458,12 @@ def __init__(
456458
self._shutdown_once = Once()
457459
self._shutdown = False
458460

461+
if hasattr(os, "register_at_fork"):
462+
weak_reinit = weakref.WeakMethod(self._at_fork_reinit)
463+
os.register_at_fork(
464+
after_in_child=lambda: weak_reinit()() # pylint: disable=unnecessary-lambda
465+
)
466+
459467
for metric_reader in self._sdk_config.metric_readers:
460468
with self._all_metric_readers_lock:
461469
if metric_reader in self._all_metric_readers:
@@ -471,6 +479,22 @@ def __init__(
471479
self._measurement_consumer.collect
472480
)
473481

482+
def _at_fork_reinit(self) -> None:
483+
"""Update the resource with a new unique service.instance.id after a fork.
484+
485+
When gunicorn (or any other prefork server) forks workers, all workers
486+
inherit the same Resource, including the same service.instance.id. This
487+
causes metric collisions in backends like Datadog where multiple workers
488+
exporting with the same resource identity result in last-write-wins
489+
instead of correct aggregation.
490+
491+
This hook runs post-fork in each worker and replaces service.instance.id
492+
with a fresh UUID, ensuring each worker is a distinct instance.
493+
"""
494+
self._sdk_config.resource = self._sdk_config.resource.merge(
495+
Resource({"service.instance.id": str(uuid4())})
496+
)
497+
474498
def force_flush(self, timeout_millis: float = 10_000) -> bool:
475499
deadline_ns = time_ns() + timeout_millis * 10**6
476500

opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import traceback
2525
import typing
2626
import weakref
27+
from uuid import uuid4
2728
from dataclasses import dataclass
2829
from functools import lru_cache
2930
from os import environ
@@ -1366,6 +1367,28 @@ def __init__(
13661367
_tracer_configurator or _default_tracer_configurator
13671368
)
13681369

1370+
if hasattr(os, "register_at_fork"):
1371+
weak_reinit = weakref.WeakMethod(self._at_fork_reinit)
1372+
os.register_at_fork(
1373+
after_in_child=lambda: weak_reinit()() # pylint: disable=unnecessary-lambda
1374+
)
1375+
1376+
def _at_fork_reinit(self) -> None:
1377+
"""Update the resource with a new unique service.instance.id after a fork.
1378+
1379+
When gunicorn (or any other prefork server) forks workers, all workers
1380+
inherit the same Resource, including the same service.instance.id. This
1381+
causes metric collisions in backends like Datadog where multiple workers
1382+
exporting with the same resource identity result in last-write-wins
1383+
instead of correct aggregation.
1384+
1385+
This hook runs post-fork in each worker and replaces service.instance.id
1386+
with a fresh UUID, ensuring each worker is a distinct instance.
1387+
"""
1388+
self._resource = self._resource.merge(
1389+
Resource({"service.instance.id": str(uuid4())})
1390+
)
1391+
13691392
def _set_tracer_configurator(
13701393
self, *, tracer_configurator: _TracerConfiguratorT
13711394
):
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
# Copyright The OpenTelemetry Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
# pylint: disable=protected-access
16+
17+
import multiprocessing
18+
import os
19+
import unittest
20+
from platform import system
21+
22+
from opentelemetry.sdk.metrics import MeterProvider
23+
from opentelemetry.sdk.resources import Resource
24+
25+
_fork_ctx = multiprocessing.get_context("fork") if system() != "Windows" else None
26+
27+
28+
@unittest.skipUnless(
29+
hasattr(os, "fork"),
30+
"needs *nix",
31+
)
32+
class TestMeterProviderFork(unittest.TestCase):
33+
def test_at_fork_reinit_changes_service_instance_id(self):
34+
"""_at_fork_reinit should assign a new service.instance.id."""
35+
resource = Resource({"service.instance.id": "original-id"})
36+
provider = MeterProvider(resource=resource)
37+
38+
original_id = provider._sdk_config.resource.attributes.get(
39+
"service.instance.id"
40+
)
41+
self.assertEqual(original_id, "original-id")
42+
43+
provider._at_fork_reinit()
44+
45+
new_id = provider._sdk_config.resource.attributes.get(
46+
"service.instance.id"
47+
)
48+
self.assertNotEqual(new_id, "original-id")
49+
self.assertIsNotNone(new_id)
50+
51+
def test_at_fork_reinit_preserves_other_resource_attributes(self):
52+
"""_at_fork_reinit should not affect other resource attributes."""
53+
resource = Resource(
54+
{
55+
"service.name": "my-service",
56+
"service.instance.id": "original-id",
57+
"deployment.environment": "production",
58+
}
59+
)
60+
provider = MeterProvider(resource=resource)
61+
62+
provider._at_fork_reinit()
63+
64+
attrs = provider._sdk_config.resource.attributes
65+
self.assertEqual(attrs.get("service.name"), "my-service")
66+
self.assertEqual(
67+
attrs.get("deployment.environment"), "production"
68+
)
69+
70+
def test_fork_produces_unique_service_instance_ids(self):
71+
"""Each forked worker should get a distinct service.instance.id."""
72+
provider = MeterProvider()
73+
74+
parent_id = provider._sdk_config.resource.attributes.get(
75+
"service.instance.id"
76+
)
77+
78+
def child(conn):
79+
child_id = provider._sdk_config.resource.attributes.get(
80+
"service.instance.id"
81+
)
82+
conn.send(child_id)
83+
conn.close()
84+
85+
parent_conn, child_conn = _fork_ctx.Pipe()
86+
process = _fork_ctx.Process(target=child, args=(child_conn,))
87+
process.start()
88+
child_id = parent_conn.recv()
89+
process.join()
90+
91+
# Child should have a different service.instance.id than parent
92+
self.assertNotEqual(parent_id, child_id)
93+
self.assertIsNotNone(child_id)
94+
95+
def test_multiple_forks_produce_unique_service_instance_ids(self):
96+
"""Each of N forked workers should have a distinct service.instance.id."""
97+
provider = MeterProvider()
98+
99+
def child(conn):
100+
child_id = provider._sdk_config.resource.attributes.get(
101+
"service.instance.id"
102+
)
103+
conn.send(child_id)
104+
conn.close()
105+
106+
ids = set()
107+
processes = []
108+
conns = []
109+
110+
for _ in range(4):
111+
parent_conn, child_conn = _fork_ctx.Pipe()
112+
process = _fork_ctx.Process(
113+
target=child, args=(child_conn,)
114+
)
115+
processes.append(process)
116+
conns.append(parent_conn)
117+
process.start()
118+
119+
for conn in conns:
120+
ids.add(conn.recv())
121+
122+
for process in processes:
123+
process.join()
124+
125+
# All 4 workers should have distinct IDs
126+
self.assertEqual(len(ids), 4)
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
# Copyright The OpenTelemetry Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
# pylint: disable=protected-access
16+
17+
import multiprocessing
18+
import os
19+
import unittest
20+
from platform import system
21+
22+
from opentelemetry.sdk.resources import Resource
23+
from opentelemetry.sdk.trace import TracerProvider
24+
25+
_fork_ctx = multiprocessing.get_context("fork") if system() != "Windows" else None
26+
27+
28+
@unittest.skipUnless(
29+
hasattr(os, "fork"),
30+
"needs *nix",
31+
)
32+
class TestTracerProviderFork(unittest.TestCase):
33+
def test_at_fork_reinit_changes_service_instance_id(self):
34+
"""_at_fork_reinit should assign a new service.instance.id."""
35+
resource = Resource({"service.instance.id": "original-id"})
36+
provider = TracerProvider(resource=resource)
37+
38+
original_id = provider._resource.attributes.get("service.instance.id")
39+
self.assertEqual(original_id, "original-id")
40+
41+
provider._at_fork_reinit()
42+
43+
new_id = provider._resource.attributes.get("service.instance.id")
44+
self.assertNotEqual(new_id, "original-id")
45+
self.assertIsNotNone(new_id)
46+
47+
def test_at_fork_reinit_preserves_other_resource_attributes(self):
48+
"""_at_fork_reinit should not affect other resource attributes."""
49+
resource = Resource(
50+
{
51+
"service.name": "my-service",
52+
"service.instance.id": "original-id",
53+
"deployment.environment": "production",
54+
}
55+
)
56+
provider = TracerProvider(resource=resource)
57+
58+
provider._at_fork_reinit()
59+
60+
attrs = provider._resource.attributes
61+
self.assertEqual(attrs.get("service.name"), "my-service")
62+
self.assertEqual(attrs.get("deployment.environment"), "production")
63+
64+
def test_fork_produces_unique_service_instance_ids(self):
65+
"""Each forked worker should get a distinct service.instance.id."""
66+
provider = TracerProvider()
67+
68+
parent_id = provider._resource.attributes.get("service.instance.id")
69+
70+
def child(conn):
71+
child_id = provider._resource.attributes.get(
72+
"service.instance.id"
73+
)
74+
conn.send(child_id)
75+
conn.close()
76+
77+
parent_conn, child_conn = _fork_ctx.Pipe()
78+
process = _fork_ctx.Process(target=child, args=(child_conn,))
79+
process.start()
80+
child_id = parent_conn.recv()
81+
process.join()
82+
83+
self.assertNotEqual(parent_id, child_id)
84+
self.assertIsNotNone(child_id)
85+
86+
def test_multiple_forks_produce_unique_service_instance_ids(self):
87+
"""Each of N forked workers should have a distinct service.instance.id."""
88+
provider = TracerProvider()
89+
90+
def child(conn):
91+
child_id = provider._resource.attributes.get(
92+
"service.instance.id"
93+
)
94+
conn.send(child_id)
95+
conn.close()
96+
97+
ids = set()
98+
processes = []
99+
conns = []
100+
101+
for _ in range(4):
102+
parent_conn, child_conn = _fork_ctx.Pipe()
103+
process = _fork_ctx.Process(
104+
target=child, args=(child_conn,)
105+
)
106+
processes.append(process)
107+
conns.append(parent_conn)
108+
process.start()
109+
110+
for conn in conns:
111+
ids.add(conn.recv())
112+
113+
for process in processes:
114+
process.join()
115+
116+
self.assertEqual(len(ids), 4)

0 commit comments

Comments
 (0)