Skip to content

Commit 3256bf0

Browse files
committed
fix(sdk): regenerate service.instance.id post-fork in MeterProvider and TracerProvider
When a prefork server (e.g. gunicorn) forks workers, all workers inherit the same Resource from the master process, including the same service.instance.id. Register an os.register_at_fork(after_in_child=...) hook on both MeterProvider and TracerProvider that replaces service.instance.id with a fresh UUID in each forked worker, ensuring distinct resource identities without any user configuration. Resource.merge() preserves all other resource attributes. WeakMethod is used for the hook reference, consistent with the existing pattern in PeriodicExportingMetricReader and BatchSpanProcessor. Fixes: #4390 Related: #3885
1 parent 4b7334a commit 3256bf0

File tree

5 files changed

+289
-0
lines changed

5 files changed

+289
-0
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1212

1313
## Unreleased
1414

15+
- `opentelemetry-sdk`: Add `service.instance.id` to default resource so every process gets a unique instance identity at startup
16+
([#5000](https://github.com/open-telemetry/opentelemetry-python/pull/5000))
17+
- `opentelemetry-sdk`: Regenerate `service.instance.id` post-fork in `MeterProvider` and `TracerProvider` to ensure distinct resource identities across prefork workers
18+
([#5000](https://github.com/open-telemetry/opentelemetry-python/pull/5000))
1519
- `opentelemetry-sdk`: Add file configuration support with YAML/JSON loading, environment variable substitution, and schema validation against the vendored OTel config JSON schema
1620
([#4898](https://github.com/open-telemetry/opentelemetry-python/pull/4898))
1721
- Fix intermittent CI failures in `getting-started` and `tracecontext` jobs caused by GitHub git CDN SHA propagation lag by installing contrib packages from the already-checked-out local copy instead of a second git clone

opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,15 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
import os
1516
import weakref
1617
from atexit import register, unregister
1718
from logging import getLogger
1819
from os import environ
1920
from threading import Lock
2021
from time import time_ns
2122
from typing import Optional, Sequence
23+
from uuid import uuid4
2224

2325
# This kind of import is needed to avoid Sphinx errors.
2426
import opentelemetry.sdk.metrics
@@ -456,6 +458,12 @@ def __init__(
456458
self._shutdown_once = Once()
457459
self._shutdown = False
458460

461+
if hasattr(os, "register_at_fork"):
462+
weak_reinit = weakref.WeakMethod(self._at_fork_reinit)
463+
os.register_at_fork(
464+
after_in_child=lambda: weak_reinit()() # pylint: disable=unnecessary-lambda
465+
)
466+
459467
for metric_reader in self._sdk_config.metric_readers:
460468
with self._all_metric_readers_lock:
461469
if metric_reader in self._all_metric_readers:
@@ -471,6 +479,22 @@ def __init__(
471479
self._measurement_consumer.collect
472480
)
473481

482+
def _at_fork_reinit(self) -> None:
483+
"""Update the resource with a new unique service.instance.id after a fork.
484+
485+
When gunicorn (or any other prefork server) forks workers, all workers
486+
inherit the same Resource, including the same service.instance.id. This
487+
causes metric collisions in backends like Datadog where multiple workers
488+
exporting with the same resource identity result in last-write-wins
489+
instead of correct aggregation.
490+
491+
This hook runs post-fork in each worker and replaces service.instance.id
492+
with a fresh UUID, ensuring each worker is a distinct instance.
493+
"""
494+
self._sdk_config.resource = self._sdk_config.resource.merge(
495+
Resource({"service.instance.id": str(uuid4())})
496+
)
497+
474498
def force_flush(self, timeout_millis: float = 10_000) -> bool:
475499
deadline_ns = time_ns() + timeout_millis * 10**6
476500

opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
Type,
4343
Union,
4444
)
45+
from uuid import uuid4
4546
from warnings import filterwarnings
4647

4748
from typing_extensions import deprecated
@@ -1366,6 +1367,28 @@ def __init__(
13661367
_tracer_configurator or _default_tracer_configurator
13671368
)
13681369

1370+
if hasattr(os, "register_at_fork"):
1371+
weak_reinit = weakref.WeakMethod(self._at_fork_reinit)
1372+
os.register_at_fork(
1373+
after_in_child=lambda: weak_reinit()() # pylint: disable=unnecessary-lambda
1374+
)
1375+
1376+
def _at_fork_reinit(self) -> None:
1377+
"""Update the resource with a new unique service.instance.id after a fork.
1378+
1379+
When gunicorn (or any other prefork server) forks workers, all workers
1380+
inherit the same Resource, including the same service.instance.id. This
1381+
causes metric collisions in backends like Datadog where multiple workers
1382+
exporting with the same resource identity result in last-write-wins
1383+
instead of correct aggregation.
1384+
1385+
This hook runs post-fork in each worker and replaces service.instance.id
1386+
with a fresh UUID, ensuring each worker is a distinct instance.
1387+
"""
1388+
self._resource = self._resource.merge(
1389+
Resource({"service.instance.id": str(uuid4())})
1390+
)
1391+
13691392
def _set_tracer_configurator(
13701393
self, *, tracer_configurator: _TracerConfiguratorT
13711394
):
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
# Copyright The OpenTelemetry Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
# pylint: disable=protected-access
16+
17+
import multiprocessing
18+
import os
19+
import unittest
20+
from platform import system
21+
22+
from opentelemetry.sdk.metrics import MeterProvider
23+
from opentelemetry.sdk.resources import Resource
24+
25+
_fork_ctx = (
26+
multiprocessing.get_context("fork") if system() != "Windows" else None
27+
)
28+
29+
30+
@unittest.skipUnless(
31+
hasattr(os, "fork"),
32+
"needs *nix",
33+
)
34+
class TestMeterProviderFork(unittest.TestCase):
35+
def test_at_fork_reinit_changes_service_instance_id(self):
36+
"""_at_fork_reinit should assign a new service.instance.id."""
37+
resource = Resource({"service.instance.id": "original-id"})
38+
provider = MeterProvider(resource=resource)
39+
40+
original_id = provider._sdk_config.resource.attributes.get(
41+
"service.instance.id"
42+
)
43+
self.assertEqual(original_id, "original-id")
44+
45+
provider._at_fork_reinit()
46+
47+
new_id = provider._sdk_config.resource.attributes.get(
48+
"service.instance.id"
49+
)
50+
self.assertNotEqual(new_id, "original-id")
51+
self.assertIsNotNone(new_id)
52+
53+
def test_at_fork_reinit_preserves_other_resource_attributes(self):
54+
"""_at_fork_reinit should not affect other resource attributes."""
55+
resource = Resource(
56+
{
57+
"service.name": "my-service",
58+
"service.instance.id": "original-id",
59+
"deployment.environment": "production",
60+
}
61+
)
62+
provider = MeterProvider(resource=resource)
63+
64+
provider._at_fork_reinit()
65+
66+
attrs = provider._sdk_config.resource.attributes
67+
self.assertEqual(attrs.get("service.name"), "my-service")
68+
self.assertEqual(attrs.get("deployment.environment"), "production")
69+
70+
def test_fork_produces_unique_service_instance_ids(self):
71+
"""Each forked worker should get a distinct service.instance.id."""
72+
provider = MeterProvider()
73+
74+
parent_id = provider._sdk_config.resource.attributes.get(
75+
"service.instance.id"
76+
)
77+
self.assertIsNotNone(parent_id)
78+
79+
def child(conn):
80+
child_id = provider._sdk_config.resource.attributes.get(
81+
"service.instance.id"
82+
)
83+
conn.send(child_id)
84+
conn.close()
85+
86+
parent_conn, child_conn = _fork_ctx.Pipe()
87+
process = _fork_ctx.Process(target=child, args=(child_conn,))
88+
process.start()
89+
child_id = parent_conn.recv()
90+
process.join()
91+
92+
# Child should have a different service.instance.id than parent
93+
self.assertNotEqual(parent_id, child_id)
94+
self.assertIsNotNone(child_id)
95+
96+
def test_multiple_forks_produce_unique_service_instance_ids(self):
97+
"""Each of N forked workers should have a distinct service.instance.id."""
98+
provider = MeterProvider()
99+
100+
def child(conn):
101+
child_id = provider._sdk_config.resource.attributes.get(
102+
"service.instance.id"
103+
)
104+
conn.send(child_id)
105+
conn.close()
106+
107+
ids = set()
108+
processes = []
109+
conns = []
110+
111+
for _ in range(4):
112+
parent_conn, child_conn = _fork_ctx.Pipe()
113+
process = _fork_ctx.Process(target=child, args=(child_conn,))
114+
processes.append(process)
115+
conns.append(parent_conn)
116+
process.start()
117+
118+
for conn in conns:
119+
ids.add(conn.recv())
120+
121+
for process in processes:
122+
process.join()
123+
124+
# All 4 workers should have distinct IDs
125+
self.assertEqual(len(ids), 4)
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
# Copyright The OpenTelemetry Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
# pylint: disable=protected-access
16+
17+
import multiprocessing
18+
import os
19+
import unittest
20+
from platform import system
21+
22+
from opentelemetry.sdk.resources import Resource
23+
from opentelemetry.sdk.trace import TracerProvider
24+
25+
_fork_ctx = (
26+
multiprocessing.get_context("fork") if system() != "Windows" else None
27+
)
28+
29+
30+
@unittest.skipUnless(
31+
hasattr(os, "fork"),
32+
"needs *nix",
33+
)
34+
class TestTracerProviderFork(unittest.TestCase):
35+
def test_at_fork_reinit_changes_service_instance_id(self):
36+
"""_at_fork_reinit should assign a new service.instance.id."""
37+
resource = Resource({"service.instance.id": "original-id"})
38+
provider = TracerProvider(resource=resource)
39+
40+
original_id = provider._resource.attributes.get("service.instance.id")
41+
self.assertEqual(original_id, "original-id")
42+
43+
provider._at_fork_reinit()
44+
45+
new_id = provider._resource.attributes.get("service.instance.id")
46+
self.assertNotEqual(new_id, "original-id")
47+
self.assertIsNotNone(new_id)
48+
49+
def test_at_fork_reinit_preserves_other_resource_attributes(self):
50+
"""_at_fork_reinit should not affect other resource attributes."""
51+
resource = Resource(
52+
{
53+
"service.name": "my-service",
54+
"service.instance.id": "original-id",
55+
"deployment.environment": "production",
56+
}
57+
)
58+
provider = TracerProvider(resource=resource)
59+
60+
provider._at_fork_reinit()
61+
62+
attrs = provider._resource.attributes
63+
self.assertEqual(attrs.get("service.name"), "my-service")
64+
self.assertEqual(attrs.get("deployment.environment"), "production")
65+
66+
def test_fork_produces_unique_service_instance_ids(self):
67+
"""Each forked worker should get a distinct service.instance.id."""
68+
provider = TracerProvider()
69+
70+
parent_id = provider._resource.attributes.get("service.instance.id")
71+
self.assertIsNotNone(parent_id)
72+
73+
def child(conn):
74+
child_id = provider._resource.attributes.get("service.instance.id")
75+
conn.send(child_id)
76+
conn.close()
77+
78+
parent_conn, child_conn = _fork_ctx.Pipe()
79+
process = _fork_ctx.Process(target=child, args=(child_conn,))
80+
process.start()
81+
child_id = parent_conn.recv()
82+
process.join()
83+
84+
self.assertNotEqual(parent_id, child_id)
85+
self.assertIsNotNone(child_id)
86+
87+
def test_multiple_forks_produce_unique_service_instance_ids(self):
88+
"""Each of N forked workers should have a distinct service.instance.id."""
89+
provider = TracerProvider()
90+
91+
def child(conn):
92+
child_id = provider._resource.attributes.get("service.instance.id")
93+
conn.send(child_id)
94+
conn.close()
95+
96+
ids = set()
97+
processes = []
98+
conns = []
99+
100+
for _ in range(4):
101+
parent_conn, child_conn = _fork_ctx.Pipe()
102+
process = _fork_ctx.Process(target=child, args=(child_conn,))
103+
processes.append(process)
104+
conns.append(parent_conn)
105+
process.start()
106+
107+
for conn in conns:
108+
ids.add(conn.recv())
109+
110+
for process in processes:
111+
process.join()
112+
113+
self.assertEqual(len(ids), 4)

0 commit comments

Comments
 (0)