Skip to content

Commit 3e5c46f

Browse files
committed
A365 telemetry: align with SDK 0.1.0, remove use_tenant_island_endpoint
Signed-off-by: afourniernv <afournier@nvidia.com>
1 parent cbe3bee commit 3e5c46f

File tree

3 files changed

+128
-32
lines changed

3 files changed

+128
-32
lines changed

packages/nvidia_nat_a365/src/nat/plugins/a365/telemetry/a365_exporter.py

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,8 @@ def __init__(
155155
resource_attributes: dict[str, str] | None = None,
156156
auth_provider=None,
157157
token_cache=None,
158+
auth_ref=None,
159+
builder=None,
158160
):
159161
"""Initialize the A365 exporter."""
160162
super().__init__(
@@ -173,8 +175,11 @@ def __init__(
173175
self._cluster_category = cluster_category
174176
self._use_s2s_endpoint = use_s2s_endpoint
175177
self._suppress_invoke_agent_input = suppress_invoke_agent_input
176-
self._auth_provider = auth_provider # For proactive token refresh
177-
self._token_cache = token_cache # For updating cached tokens
178+
self._auth_provider = auth_provider
179+
self._token_cache = token_cache
180+
self._auth_ref = auth_ref
181+
self._builder = builder
182+
self._auth_resolve_lock = asyncio.Lock()
178183

179184
# SDK requires token_resolver to be non-None, so if None is passed, SDK will raise ValueError
180185
self._a365_exporter = Agent365Exporter(
@@ -188,6 +193,52 @@ def __init__(
188193
f"tenant_id={tenant_id}, cluster={cluster_category}"
189194
)
190195

196+
async def _resolve_auth_once(self) -> None:
197+
"""Resolve auth provider and fill token cache on first export (lazy).
198+
199+
Telemetry is built in __aenter__ before auth exists; by first export,
200+
populate_builder has run so we can resolve here. Keeps core unchanged.
201+
"""
202+
if self._auth_provider is not None or self._auth_ref is None or self._builder is None:
203+
return
204+
if self._token_cache is None:
205+
return
206+
async with self._auth_resolve_lock:
207+
if self._auth_provider is not None:
208+
return
209+
try:
210+
auth_provider = await self._builder.get_auth_provider(self._auth_ref)
211+
from nat.builder.context import Context
212+
user_id = Context.get().user_id
213+
auth_result = await auth_provider.authenticate(user_id=user_id)
214+
if not auth_result.credentials:
215+
raise A365AuthenticationError("No credentials available from auth provider")
216+
from nat.data_models.authentication import BearerTokenCred, HeaderCred
217+
from nat.authentication.interfaces import AUTHORIZATION_HEADER
218+
token = None
219+
for cred in auth_result.credentials:
220+
if isinstance(cred, BearerTokenCred):
221+
token = cred.token.get_secret_value()
222+
break
223+
if isinstance(cred, HeaderCred) and cred.name == AUTHORIZATION_HEADER:
224+
hv = cred.value.get_secret_value()
225+
token = hv[7:] if hv.startswith("Bearer ") else hv
226+
break
227+
if token is None:
228+
raise A365AuthenticationError(
229+
f"No bearer token in credentials. "
230+
f"Types: {[type(c).__name__ for c in auth_result.credentials]}"
231+
)
232+
self._token_cache.update_token(token, auth_result.token_expires_at)
233+
self._auth_provider = auth_provider
234+
except Exception as e:
235+
logger.error(
236+
f"Failed to resolve auth on first export (agent_id={self._agent_id}, "
237+
f"tenant_id={self._tenant_id}): {e}",
238+
exc_info=True,
239+
)
240+
raise
241+
191242
async def _refresh_token_if_needed(self) -> None:
192243
"""Refresh token proactively if it's expiring soon.
193244
@@ -261,6 +312,7 @@ async def export_otel_spans(self, spans: list[OtelSpan]) -> None:
261312
if not spans:
262313
return
263314

315+
await self._resolve_auth_once()
264316
await self._refresh_token_if_needed()
265317

266318
try:

packages/nvidia_nat_a365/src/nat/plugins/a365/telemetry/register.py

Lines changed: 36 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -76,21 +76,32 @@ def __init__(self, token: str, expires_at: datetime | None):
7676
self._token = token
7777
self._expires_at = expires_at
7878

79+
def _expires_at_utc(self) -> datetime | None:
80+
"""Return expiration as timezone-aware UTC for comparison.
81+
82+
If naive, assume local time (e.g. from provider using datetime.now() + delta).
83+
"""
84+
if self._expires_at is None:
85+
return None
86+
if self._expires_at.tzinfo is not None:
87+
return self._expires_at
88+
local_tz = datetime.now().astimezone().tzinfo
89+
return self._expires_at.replace(tzinfo=local_tz).astimezone(timezone.utc)
90+
7991
def get_token(self) -> str | None:
8092
"""Get cached token if still valid (with 5 minute buffer).
8193
8294
Returns:
8395
Token string if valid, None if expired or expiring soon
8496
"""
8597
with self._lock:
86-
if self._expires_at:
98+
expires_utc = self._expires_at_utc()
99+
if expires_utc is not None:
87100
buffer_time = datetime.now(timezone.utc) + timedelta(minutes=5)
88-
if self._expires_at > buffer_time:
101+
if expires_utc > buffer_time:
89102
return self._token
90103
return None
91-
else:
92-
# No expiration info, assume token is valid
93-
return self._token
104+
return self._token
94105

95106
def update_token(self, token: str, expires_at: datetime | None) -> None:
96107
"""Update cached token.
@@ -113,10 +124,11 @@ def is_expiring_soon(self, buffer_minutes: int = 5) -> bool:
113124
True if token expires within buffer time, False otherwise
114125
"""
115126
with self._lock:
116-
if self._expires_at is None:
127+
expires_utc = self._expires_at_utc()
128+
if expires_utc is None:
117129
return False
118130
buffer_time = datetime.now(timezone.utc) + timedelta(minutes=buffer_minutes)
119-
return self._expires_at <= buffer_time
131+
return expires_utc <= buffer_time
120132

121133

122134
async def _create_token_resolver_from_auth_ref(
@@ -195,25 +207,35 @@ async def a365_telemetry_exporter(config: A365TelemetryExporter, builder: Builde
195207
196208
Integrates A365's Agent365Exporter with NAT's telemetry system to send
197209
OpenTelemetry spans to Microsoft Agent 365 backend endpoints.
210+
211+
Auth is resolved lazily on first export (not at build time) because
212+
telemetry exporters are built in WorkflowBuilder.__aenter__ before
213+
auth providers exist. This keeps core unchanged; the plugin handles
214+
the timing constraint locally.
198215
"""
199216
from nat.plugins.a365.telemetry.a365_exporter import A365OtelExporter
200217

201-
token_resolver_callable, auth_provider, token_cache = await _create_token_resolver_from_auth_ref(
202-
config.token_resolver, builder
203-
)
218+
# Defer auth: do not call get_auth_provider here (not available yet in __aenter__).
219+
token_cache = _TokenCache(None, None)
220+
221+
def token_resolver(agent_id: str, tenant_id: str) -> str | None:
222+
"""Sync callable for SDK; returns cached token (filled on first export)."""
223+
return token_cache.get_token()
204224

205225
logger.info(
206226
f"A365 telemetry exporter initialized for agent_id={config.agent_id}, "
207227
f"tenant_id={config.tenant_id}, cluster={config.cluster_category}, "
208-
f"token_resolver=configured (auth_provider='{config.token_resolver}')"
228+
f"token_resolver=deferred (auth resolved on first export)"
209229
)
210230

211231
exporter = A365OtelExporter(
212232
agent_id=config.agent_id,
213233
tenant_id=config.tenant_id,
214-
token_resolver=token_resolver_callable,
215-
auth_provider=auth_provider, # Pass auth provider for proactive refresh
216-
token_cache=token_cache, # Pass token cache for updating tokens
234+
token_resolver=token_resolver,
235+
auth_provider=None,
236+
token_cache=token_cache,
237+
auth_ref=config.token_resolver,
238+
builder=builder,
217239
cluster_category=config.cluster_category,
218240
use_s2s_endpoint=config.use_s2s_endpoint,
219241
suppress_invoke_agent_input=config.suppress_invoke_agent_input,

packages/nvidia_nat_a365/tests/telemetry/test_registration_integration.py

Lines changed: 38 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -76,16 +76,32 @@ async def test_registration_creates_exporter_successfully(
7676

7777
with patch("nat.plugins.a365.telemetry.a365_exporter.Agent365Exporter"):
7878
async with a365_telemetry_exporter(config, mock_builder) as exporter:
79-
# Verify exporter was created
79+
# Lazy init: auth not resolved at build time (exporter built in __aenter__ before auth exists)
8080
assert exporter is not None
8181
assert exporter._agent_id == "test-agent-123"
8282
assert exporter._tenant_id == "test-tenant-456"
8383
assert exporter._token_resolver is not None
84-
assert exporter._auth_provider == mock_auth_provider
84+
assert exporter._auth_provider is None
85+
assert exporter._auth_ref == config.token_resolver
86+
assert exporter._builder is mock_builder
8587
assert exporter._token_cache is not None
86-
87-
# Verify auth provider was resolved
88+
mock_builder.get_auth_provider.assert_not_called()
89+
# Resolve on first export (minimal span-like object for conversion)
90+
mock_span = Mock()
91+
mock_span.attributes = {}
92+
mock_span.events = []
93+
mock_span.links = []
94+
mock_span.name = "test"
95+
mock_span.kind = 1
96+
mock_span.start_time = 0
97+
mock_span.end_time = 0
98+
mock_span.status = None
99+
mock_span.instrumentation_scope = None
100+
mock_span.resource = None
101+
mock_span.parent = None
102+
await exporter.export_otel_spans([mock_span])
88103
mock_builder.get_auth_provider.assert_called_once_with(config.token_resolver)
104+
assert exporter._auth_provider is mock_auth_provider
89105

90106
@pytest.mark.asyncio
91107
async def test_registration_passes_all_config_to_exporter(
@@ -124,42 +140,48 @@ async def test_registration_passes_all_config_to_exporter(
124140
async def test_registration_handles_auth_provider_resolution_failure(
125141
self, config, mock_builder
126142
):
127-
"""Test that registration handles auth provider resolution failure."""
143+
"""Test that auth provider resolution failure is raised on first export (lazy)."""
128144
mock_builder.get_auth_provider.side_effect = ValueError("Auth provider not found")
129145

130-
with pytest.raises(ValueError, match="Auth provider not found"):
131-
async with a365_telemetry_exporter(config, mock_builder):
132-
pass
146+
with patch("nat.plugins.a365.telemetry.a365_exporter.Agent365Exporter"):
147+
async with a365_telemetry_exporter(config, mock_builder) as exporter:
148+
mock_span = Mock()
149+
with pytest.raises(ValueError, match="Auth provider not found"):
150+
await exporter.export_otel_spans([mock_span])
133151

134152
@pytest.mark.asyncio
135153
async def test_registration_handles_authentication_failure(
136154
self, config, mock_builder, mock_auth_provider
137155
):
138-
"""Test that registration handles authentication failure."""
156+
"""Test that authentication failure is raised on first export (lazy)."""
139157
mock_auth_provider.authenticate.side_effect = A365AuthenticationError("Auth failed")
140158

141159
with patch("nat.builder.context.Context") as mock_context_class:
142160
mock_context_class.get.return_value.user_id = "test_user"
143161

144-
with pytest.raises(A365AuthenticationError, match="Auth failed"):
145-
async with a365_telemetry_exporter(config, mock_builder):
146-
pass
162+
with patch("nat.plugins.a365.telemetry.a365_exporter.Agent365Exporter"):
163+
async with a365_telemetry_exporter(config, mock_builder) as exporter:
164+
mock_span = Mock()
165+
with pytest.raises(A365AuthenticationError, match="Auth failed"):
166+
await exporter.export_otel_spans([mock_span])
147167

148168
@pytest.mark.asyncio
149169
async def test_registration_handles_no_credentials(
150170
self, config, mock_builder, mock_auth_provider
151171
):
152-
"""Test that registration handles case when auth provider returns no credentials."""
172+
"""Test that no credentials is raised on first export (lazy)."""
153173
auth_result = Mock(spec=AuthResult)
154174
auth_result.credentials = []
155175
mock_auth_provider.authenticate.return_value = auth_result
156176

157177
with patch("nat.builder.context.Context") as mock_context_class:
158178
mock_context_class.get.return_value.user_id = "test_user"
159179

160-
with pytest.raises(A365AuthenticationError, match="No credentials available"):
161-
async with a365_telemetry_exporter(config, mock_builder):
162-
pass
180+
with patch("nat.plugins.a365.telemetry.a365_exporter.Agent365Exporter"):
181+
async with a365_telemetry_exporter(config, mock_builder) as exporter:
182+
mock_span = Mock()
183+
with pytest.raises(A365AuthenticationError, match="No credentials available"):
184+
await exporter.export_otel_spans([mock_span])
163185

164186
@pytest.mark.asyncio
165187
async def test_registration_is_context_manager(self, config, mock_builder, mock_auth_provider):

0 commit comments

Comments
 (0)