Skip to content

Commit 353352a

Browse files
committed
fix: fix exporter register
1 parent f05bd89 commit 353352a

6 files changed

Lines changed: 111 additions & 62 deletions

File tree

tests/test_tracing.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,5 +112,6 @@ async def test_tracing_with_apmplus_global_provider():
112112
# init OpentelemetryTracer
113113
tracer = OpentelemetryTracer(exporters=exporters)
114114

115-
# apmplus exporter won't init again, so there are cozeloop, tls, in_memory exporter
116-
assert len(tracer.exporters) == 3 # with extra 1 built-in exporters
115+
# apmplus exporter won't init again, so there are cozeloop, tls, and in_memory exporter
116+
# Note: APMPlusExporter still exists in the list but isn't registered with the tracer provider
117+
assert len(tracer.exporters) == 4 # with extra 1 built-in exporters

veadk/agent.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -287,17 +287,23 @@ def _prepare_tracers(self):
287287
if enable_apmplus_tracer and not any(
288288
isinstance(e, APMPlusExporter) for e in exporters
289289
):
290-
self.tracers[0].exporters.append(APMPlusExporter()) # type: ignore
290+
exporter = APMPlusExporter()
291+
self.tracers[0].exporters.append(exporter) # type: ignore
292+
exporter.register()
291293
logger.info("Enable APMPlus exporter by env.")
292294

293295
if enable_cozeloop_tracer and not any(
294296
isinstance(e, CozeloopExporter) for e in exporters
295297
):
296-
self.tracers[0].exporters.append(CozeloopExporter()) # type: ignore
298+
exporter = CozeloopExporter()
299+
self.tracers[0].exporters.append(exporter) # type: ignore
300+
exporter.register()
297301
logger.info("Enable CozeLoop exporter by env.")
298302

299303
if enable_tls_tracer and not any(isinstance(e, TLSExporter) for e in exporters):
300-
self.tracers[0].exporters.append(TLSExporter()) # type: ignore
304+
exporter = TLSExporter()
305+
self.tracers[0].exporters.append(exporter) # type: ignore
306+
exporter.register()
301307
logger.info("Enable TLS exporter by env.")
302308

303309
logger.debug(

veadk/tracing/telemetry/exporters/apmplus_exporter.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -535,6 +535,39 @@ def model_post_init(self, context: Any) -> None:
535535
resource_attributes=self.resource_attributes,
536536
)
537537

538+
@override
539+
def register(self) -> None:
540+
"""Register the APMPlus exporter with the global tracer provider.
541+
542+
This method ensures compatibility with automatic instrumentation by
543+
checking if there are any existing APMPlus span processors before
544+
registering the exporter. If there are existing ones, it skips
545+
registration to avoid conflicts.
546+
"""
547+
# Check for existing APMPlus span processors to ensure compatibility with automatic instrumentation
548+
from opentelemetry import trace
549+
from opentelemetry.sdk.trace.export import BatchSpanProcessor, SimpleSpanProcessor
550+
551+
# Get the global tracer provider
552+
tracer_provider = trace.get_tracer_provider()
553+
554+
# Check if there are any existing span processors that are APMPlus related
555+
span_processors = tracer_provider._active_span_processor._span_processors
556+
557+
# TODO: Compatible with one-agent collector
558+
has_existing_apmplus = any(
559+
isinstance(p, (BatchSpanProcessor, SimpleSpanProcessor))
560+
and hasattr(p.span_exporter, "_endpoint")
561+
and "apmplus" in p.span_exporter._endpoint
562+
for p in span_processors
563+
)
564+
565+
if not has_existing_apmplus:
566+
# Call the base class register method to handle resource attributes and processor registration
567+
super().register()
568+
else:
569+
logger.info("APMPlusExporter: Skipping registration as APMPlus processor already exists.")
570+
538571
@override
539572
def export(self) -> None:
540573
"""Force immediate export of pending telemetry data to APMPlus.

veadk/tracing/telemetry/exporters/base_exporter.py

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,26 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
from opentelemetry.sdk.trace import SpanProcessor
15+
from opentelemetry import trace
16+
from opentelemetry.sdk.resources import Resource
17+
from opentelemetry.sdk.trace import SpanProcessor, TracerProvider
1618
from opentelemetry.sdk.trace.export import SpanExporter
1719
from pydantic import BaseModel, ConfigDict, Field
1820

21+
def _update_resource_attributions(
22+
provider: TracerProvider, resource_attributes: dict
23+
) -> None:
24+
"""Update the resource attributes of a TracerProvider instance.
25+
26+
This function merges new resource attributes with the existing ones in the
27+
provider, allowing dynamic configuration of telemetry metadata.
28+
29+
Args:
30+
provider: The TracerProvider instance to update
31+
resource_attributes: Dictionary of attributes to merge with existing resources
32+
"""
33+
provider._resource = provider._resource.merge(Resource.create(resource_attributes))
34+
1935

2036
class BaseExporter(BaseModel):
2137
"""Abstract base class for OpenTelemetry span exporters in VeADK tracing system.
@@ -33,6 +49,28 @@ class BaseExporter(BaseModel):
3349

3450
_exporter: SpanExporter | None = None
3551
processor: SpanProcessor | None = None
52+
_registered: bool = False
53+
54+
def register(self) -> None:
55+
"""Register the exporter with the global tracer provider.
56+
57+
This method will automatically get the global tracer provider
58+
and register the exporter's span processor with it.
59+
The registration Dprocess will only be executed once.
60+
"""
61+
if self._registered:
62+
return
63+
64+
tracer_provider = trace.get_tracer_provider()
65+
# Update resource attributes if any
66+
if self.resource_attributes:
67+
_update_resource_attributions(tracer_provider, self.resource_attributes)
68+
69+
# Add processor to tracer provider if exists
70+
if self.processor:
71+
tracer_provider.add_span_processor(self.processor)
72+
73+
self._registered = True
3674

3775
def export(self) -> None:
3876
"""Force export of telemetry data."""

veadk/tracing/telemetry/exporters/inmemory_exporter.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
from typing import Sequence
1616

17+
from opentelemetry import trace
1718
from opentelemetry.context import (
1819
_SUPPRESS_INSTRUMENTATION_KEY,
1920
attach,
@@ -246,3 +247,18 @@ def __init__(self, name: str = "inmemory_exporter") -> None:
246247

247248
self._exporter = _InMemoryExporter()
248249
self.processor = _InMemorySpanProcessor(self._exporter)
250+
251+
@override
252+
def register(self) -> None:
253+
"""Register the in-memory exporter with the global tracer provider.
254+
255+
Ensures the in-memory exporter's processor is added at the beginning
256+
of the span processors list to record all spans.
257+
"""
258+
tracer_provider = trace.get_tracer_provider()
259+
260+
# Ensure the in-memory exporter processor is added at index 0
261+
# because we use this to record all spans
262+
tracer_provider._active_span_processor._span_processors = (
263+
self.processor,
264+
) + tracer_provider._active_span_processor._span_processors

veadk/tracing/telemetry/opentelemetry_tracer.py

Lines changed: 11 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
from typing_extensions import override
2828

2929
from veadk.tracing.base_tracer import BaseTracer
30-
from veadk.tracing.telemetry.exporters.apmplus_exporter import APMPlusExporter
3130
from veadk.tracing.telemetry.exporters.base_exporter import BaseExporter
3231
from veadk.tracing.telemetry.exporters.inmemory_exporter import InMemoryExporter
3332
from veadk.utils.logger import get_logger
@@ -37,21 +36,6 @@
3736
logger = get_logger(__name__)
3837

3938

40-
def _update_resource_attributions(
41-
provider: TracerProvider, resource_attributes: dict
42-
) -> None:
43-
"""Update the resource attributes of a TracerProvider instance.
44-
45-
This function merges new resource attributes with the existing ones in the
46-
provider, allowing dynamic configuration of telemetry metadata.
47-
48-
Args:
49-
provider: The TracerProvider instance to update
50-
resource_attributes: Dictionary of attributes to merge with existing resources
51-
"""
52-
provider._resource = provider._resource.merge(Resource.create(resource_attributes))
53-
54-
5539
class OpentelemetryTracer(BaseModel, BaseTracer):
5640
"""OpenTelemetry-based tracer implementation for comprehensive agent observability.
5741
@@ -152,41 +136,27 @@ def _init_global_tracer_provider(self) -> None:
152136
duplicate exporter detection and in-memory span collection setup.
153137
"""
154138
# set provider anyway, then get global provider
139+
# set if not exist
155140
trace_api.set_tracer_provider(
156141
trace_sdk.TracerProvider(
157142
span_limits=SpanLimits(
158143
max_attributes=4096,
159144
)
160145
)
161146
)
162-
global_tracer_provider: TracerProvider = trace_api.get_tracer_provider() # type: ignore
163-
164-
span_processors = global_tracer_provider._active_span_processor._span_processors
165-
have_apmplus_exporter = any(
166-
isinstance(p, (BatchSpanProcessor, SimpleSpanProcessor))
167-
and hasattr(p.span_exporter, "_endpoint")
168-
and "apmplus" in p.span_exporter._endpoint
169-
for p in span_processors
170-
)
171147

172-
if have_apmplus_exporter:
173-
self.exporters = [
174-
e for e in self.exporters if not isinstance(e, APMPlusExporter)
175-
]
148+
# add in-memory exporter to exporters list
149+
self._inmemory_exporter = InMemoryExporter()
150+
self.exporters.append(self._inmemory_exporter)
176151

152+
# Call each exporter's register method to register with the global tracer provider
153+
# This allows each exporter to handle its own configuration and registration logic
177154
for exporter in self.exporters:
178-
processor = exporter.processor
179-
resource_attributes = exporter.resource_attributes
180-
181-
if resource_attributes:
182-
_update_resource_attributions(
183-
global_tracer_provider, resource_attributes
184-
)
185-
186-
if processor:
187-
global_tracer_provider.add_span_processor(processor)
188-
self._processors.append(processor)
189-
155+
exporter.register()
156+
157+
# Add processor to internal list if exists
158+
if exporter.processor:
159+
self._processors.append(exporter.processor)
190160
logger.debug(
191161
f"Add span processor for exporter `{exporter.__class__.__name__}` to OpentelemetryTracer."
192162
)
@@ -195,21 +165,6 @@ def _init_global_tracer_provider(self) -> None:
195165
f"Add span processor for exporter `{exporter.__class__.__name__}` to OpentelemetryTracer failed."
196166
)
197167

198-
self._inmemory_exporter = InMemoryExporter()
199-
if self._inmemory_exporter.processor:
200-
# make sure the in memory exporter processor is added at index 0
201-
# because we use this to record all spans
202-
global_tracer_provider._active_span_processor._span_processors = (
203-
self._inmemory_exporter.processor,
204-
) + global_tracer_provider._active_span_processor._span_processors
205-
206-
self._processors.append(self._inmemory_exporter.processor)
207-
self.exporters.append(self._inmemory_exporter)
208-
else:
209-
logger.warning(
210-
"InMemoryExporter processor is not initialized, cannot add to OpentelemetryTracer."
211-
)
212-
213168
logger.info(
214169
f"Init OpentelemetryTracer with {len(self._processors)} exporter(s)."
215170
)

0 commit comments

Comments
 (0)