Skip to content

Commit e7f925b

Browse files
fix: intermittent app insights event loss in serverless env (#1371)
1 parent f47405a commit e7f925b

File tree

4 files changed

+161
-20
lines changed

4 files changed

+161
-20
lines changed

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "uipath"
3-
version = "2.9.10"
3+
version = "2.9.11"
44
description = "Python SDK and CLI for UiPath Platform, enabling programmatic interaction with automation services, process management, and deployment tools."
55
readme = { file = "README.md", content-type = "text/markdown" }
66
requires-python = ">=3.11"

src/uipath/telemetry/_track.py

Lines changed: 108 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -41,29 +41,47 @@
4141
from applicationinsights import ( # type: ignore[import-untyped]
4242
TelemetryClient as AppInsightsTelemetryClient,
4343
)
44+
from applicationinsights.channel import ( # type: ignore[import-untyped]
45+
SynchronousQueue,
46+
SynchronousSender,
47+
TelemetryChannel,
48+
)
4449

4550
_HAS_APPINSIGHTS = True
4651
except ImportError:
4752
_HAS_APPINSIGHTS = False
4853
AppInsightsTelemetryClient = None
54+
SynchronousSender = None
55+
SynchronousQueue = None
56+
TelemetryChannel = None
4957

5058

51-
def _parse_connection_string(connection_string: str) -> Optional[str]:
52-
"""Parse Azure Application Insights connection string to get instrumentation key.
59+
def _parse_connection_string(
60+
connection_string: str,
61+
) -> Optional[Dict[str, str]]:
62+
"""Parse Azure Application Insights connection string.
5363
5464
Args:
5565
connection_string: The full connection string from Azure.
5666
5767
Returns:
58-
The instrumentation key if found, None otherwise.
68+
Dict with 'InstrumentationKey' and optionally 'IngestionEndpoint',
69+
or None if InstrumentationKey is not found.
5970
"""
6071
try:
61-
parts = {}
72+
parts: Dict[str, str] = {}
6273
for part in connection_string.split(";"):
6374
if "=" in part:
6475
key, value = part.split("=", 1)
6576
parts[key] = value
66-
return parts.get("InstrumentationKey")
77+
ikey = parts.get("InstrumentationKey")
78+
if not ikey:
79+
return None
80+
result: Dict[str, str] = {"InstrumentationKey": ikey}
81+
ingestion = parts.get("IngestionEndpoint")
82+
if ingestion:
83+
result["IngestionEndpoint"] = ingestion
84+
return result
6785
except Exception:
6886
return None
6987

@@ -136,6 +154,59 @@ def _get_attributes(record: LogRecord) -> Mapping[str, AnyValue]:
136154
return attributes
137155

138156

157+
class _DiagnosticSender(SynchronousSender):
158+
"""SynchronousSender that logs HTTP failures the base SDK silently discards."""
159+
160+
def send(self, data_to_send: Any) -> None:
161+
"""Send telemetry data with diagnostic logging.
162+
163+
The base SDK silently discards HTTP 400 responses and swallows all
164+
other network errors. This override adds WARNING-level logs so
165+
silent data loss becomes visible in logs.
166+
"""
167+
import json as _json
168+
169+
try:
170+
import urllib.request as HTTPClient
171+
from urllib.error import HTTPError
172+
except ImportError:
173+
super().send(data_to_send)
174+
return
175+
176+
request_payload = _json.dumps([a.write() for a in data_to_send])
177+
request = HTTPClient.Request(
178+
self._service_endpoint_uri,
179+
bytearray(request_payload, "utf-8"),
180+
{
181+
"Accept": "application/json",
182+
"Content-Type": "application/json; charset=utf-8",
183+
},
184+
)
185+
try:
186+
response = HTTPClient.urlopen(request, timeout=self._timeout)
187+
status_code = response.getcode()
188+
if 200 <= status_code < 300:
189+
return
190+
except HTTPError as e:
191+
if e.getcode() == 400:
192+
_logger.warning(
193+
"AppInsights send: HTTP 400 — payload rejected (%d items discarded)",
194+
len(data_to_send),
195+
)
196+
return
197+
_logger.warning(
198+
"AppInsights send: HTTP %d (%d items re-queued)",
199+
e.getcode(),
200+
len(data_to_send),
201+
)
202+
except Exception as e:
203+
_logger.warning("AppInsights send: %s (%s)", type(e).__name__, e)
204+
205+
# Re-queue unsent data
206+
for data in data_to_send:
207+
self._queue.put(data)
208+
209+
139210
class _AppInsightsEventClient:
140211
"""Application Insights SDK client for sending custom events.
141212
@@ -168,12 +239,25 @@ def _initialize() -> None:
168239
return
169240

170241
try:
171-
instrumentation_key = _parse_connection_string(connection_string)
172-
if not instrumentation_key:
242+
parsed = _parse_connection_string(connection_string)
243+
if not parsed:
173244
return
174245

246+
instrumentation_key = parsed["InstrumentationKey"]
247+
ingestion_endpoint = parsed.get("IngestionEndpoint")
248+
249+
# Build custom channel: DiagnosticSender → SynchronousQueue → TelemetryChannel
250+
if ingestion_endpoint:
251+
endpoint_url = ingestion_endpoint.rstrip("/") + "/v2/track"
252+
else:
253+
endpoint_url = None # SDK default
254+
255+
sender = _DiagnosticSender(service_endpoint_uri=endpoint_url)
256+
queue = SynchronousQueue(sender)
257+
channel = TelemetryChannel(queue=queue)
258+
175259
_AppInsightsEventClient._client = AppInsightsTelemetryClient(
176-
instrumentation_key
260+
instrumentation_key, telemetry_channel=channel
177261
)
178262

179263
# Set application version
@@ -222,6 +306,18 @@ def flush() -> None:
222306
if _AppInsightsEventClient._client:
223307
try:
224308
_AppInsightsEventClient._client.flush()
309+
# Check if items remain after flush (indicates send failure)
310+
try:
311+
remaining = (
312+
_AppInsightsEventClient._client.channel.queue._queue.qsize()
313+
)
314+
if remaining > 0:
315+
_logger.warning(
316+
"AppInsights flush: %d items still in queue after flush",
317+
remaining,
318+
)
319+
except Exception:
320+
pass
225321
except Exception as e:
226322
# Log but don't raise - telemetry should never break the main application
227323
_logger.warning(f"Failed to flush telemetry events: {e}")
@@ -308,6 +404,10 @@ def track_event(
308404

309405
try:
310406
_AppInsightsEventClient.track_event(name, properties)
407+
# Safety net: register atexit flush so events are sent even if
408+
# the caller never explicitly flushes (e.g. serverless containers).
409+
# Idempotent — only registers once.
410+
_AppInsightsEventClient.register_atexit_flush()
311411
except Exception as e:
312412
# Log but don't raise - telemetry should never break the main application
313413
_logger.warning(f"Failed to track event '{name}': {e}")

tests/telemetry/test_track.py

Lines changed: 51 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,18 @@ def test_parse_valid_connection_string(self):
2727

2828
result = _parse_connection_string(connection_string)
2929

30-
assert result == "test-key-123"
30+
assert result == {
31+
"InstrumentationKey": "test-key-123",
32+
"IngestionEndpoint": "https://example.com/",
33+
}
3134

3235
def test_parse_connection_string_only_instrumentation_key(self):
3336
"""Test parsing connection string with only InstrumentationKey."""
3437
connection_string = "InstrumentationKey=simple-key"
3538

3639
result = _parse_connection_string(connection_string)
3740

38-
assert result == "simple-key"
41+
assert result == {"InstrumentationKey": "simple-key"}
3942

4043
def test_parse_connection_string_missing_instrumentation_key(self):
4144
"""Test parsing connection string without InstrumentationKey."""
@@ -68,7 +71,7 @@ def test_parse_connection_string_with_special_chars_in_value(self):
6871

6972
result = _parse_connection_string(connection_string)
7073

71-
assert result == "key=with=equals"
74+
assert result == {"InstrumentationKey": "key=with=equals"}
7275

7376

7477
class TestAppInsightsEventClient:
@@ -96,14 +99,17 @@ def test_initialize_no_connection_string(self):
9699
assert _AppInsightsEventClient._initialized is True
97100
assert _AppInsightsEventClient._client is None
98101

102+
@patch("uipath.telemetry._track.TelemetryChannel")
103+
@patch("uipath.telemetry._track.SynchronousQueue")
104+
@patch("uipath.telemetry._track._DiagnosticSender")
99105
@patch("uipath.telemetry._track._HAS_APPINSIGHTS", True)
100106
@patch("uipath.telemetry._track.AppInsightsTelemetryClient")
101107
@patch(
102108
"uipath.telemetry._track._CONNECTION_STRING",
103109
"InstrumentationKey=builtin-key;IngestionEndpoint=https://example.com/",
104110
)
105111
def test_initialize_falls_back_to_builtin_connection_string(
106-
self, mock_client_class
112+
self, mock_client_class, mock_sender_class, mock_queue_class, mock_channel_class
107113
):
108114
"""Test initialization uses _CONNECTION_STRING when env var is not set."""
109115
mock_client = MagicMock()
@@ -116,7 +122,12 @@ def test_initialize_falls_back_to_builtin_connection_string(
116122

117123
assert _AppInsightsEventClient._initialized is True
118124
assert _AppInsightsEventClient._client is mock_client
119-
mock_client_class.assert_called_once_with("builtin-key")
125+
mock_sender_class.assert_called_once_with(
126+
service_endpoint_uri="https://example.com/v2/track"
127+
)
128+
mock_client_class.assert_called_once_with(
129+
"builtin-key", telemetry_channel=mock_channel_class.return_value
130+
)
120131

121132
@patch("uipath.telemetry._track._HAS_APPINSIGHTS", False)
122133
def test_initialize_no_appinsights_package(self):
@@ -126,9 +137,14 @@ def test_initialize_no_appinsights_package(self):
126137
assert _AppInsightsEventClient._initialized is True
127138
assert _AppInsightsEventClient._client is None
128139

140+
@patch("uipath.telemetry._track.TelemetryChannel")
141+
@patch("uipath.telemetry._track.SynchronousQueue")
142+
@patch("uipath.telemetry._track._DiagnosticSender")
129143
@patch("uipath.telemetry._track._HAS_APPINSIGHTS", True)
130144
@patch("uipath.telemetry._track.AppInsightsTelemetryClient")
131-
def test_initialize_creates_client(self, mock_client_class):
145+
def test_initialize_creates_client(
146+
self, mock_client_class, mock_sender_class, mock_queue_class, mock_channel_class
147+
):
132148
"""Test that initialization creates Application Insights client."""
133149
mock_client = MagicMock()
134150
mock_client_class.return_value = mock_client
@@ -145,11 +161,19 @@ def test_initialize_creates_client(self, mock_client_class):
145161

146162
assert _AppInsightsEventClient._initialized is True
147163
assert _AppInsightsEventClient._client is mock_client
148-
mock_client_class.assert_called_once_with("test-key")
164+
mock_sender_class.assert_called_once_with(
165+
service_endpoint_uri="https://example.com/v2/track"
166+
)
167+
mock_client_class.assert_called_once_with(
168+
"test-key", telemetry_channel=mock_channel_class.return_value
169+
)
149170

171+
@patch("uipath.telemetry._track._DiagnosticSender")
150172
@patch("uipath.telemetry._track._HAS_APPINSIGHTS", True)
151173
@patch("uipath.telemetry._track.AppInsightsTelemetryClient")
152-
def test_initialize_invalid_connection_string(self, mock_client_class):
174+
def test_initialize_invalid_connection_string(
175+
self, mock_client_class, mock_sender_class
176+
):
153177
"""Test initialization with invalid connection string."""
154178
with patch.dict(
155179
os.environ,
@@ -276,16 +300,28 @@ def test_track_event_disabled(self, mock_is_enabled):
276300

277301
mock_track.assert_not_called()
278302

303+
@patch.object(_AppInsightsEventClient, "register_atexit_flush")
279304
@patch.object(_TelemetryClient, "_is_enabled", return_value=True)
280305
@patch.object(_AppInsightsEventClient, "track_event")
281-
def test_track_event_enabled(self, mock_track, mock_is_enabled):
306+
def test_track_event_enabled(self, mock_track, mock_is_enabled, mock_atexit):
282307
"""Test that track_event calls AppInsightsEventClient when enabled."""
283308
properties = {"key": "value"}
284309

285310
_TelemetryClient.track_event("test_event", properties)
286311

287312
mock_track.assert_called_once_with("test_event", properties)
288313

314+
@patch.object(_AppInsightsEventClient, "register_atexit_flush")
315+
@patch.object(_TelemetryClient, "_is_enabled", return_value=True)
316+
@patch.object(_AppInsightsEventClient, "track_event")
317+
def test_track_event_registers_atexit_handler(
318+
self, mock_track, mock_is_enabled, mock_atexit
319+
):
320+
"""Test that track_event registers atexit flush handler."""
321+
_TelemetryClient.track_event("test_event", {"key": "value"})
322+
323+
mock_atexit.assert_called_once()
324+
289325

290326
class TestPublicFunctions:
291327
"""Test public telemetry functions."""
@@ -488,9 +524,14 @@ def test_flush_handles_exception(self):
488524
# Should not raise exception
489525
_AppInsightsEventClient.flush()
490526

527+
@patch("uipath.telemetry._track.TelemetryChannel")
528+
@patch("uipath.telemetry._track.SynchronousQueue")
529+
@patch("uipath.telemetry._track._DiagnosticSender")
491530
@patch("uipath.telemetry._track._HAS_APPINSIGHTS", True)
492531
@patch("uipath.telemetry._track.AppInsightsTelemetryClient")
493-
def test_initialize_handles_exception(self, mock_client_class):
532+
def test_initialize_handles_exception(
533+
self, mock_client_class, mock_sender_class, mock_queue_class, mock_channel_class
534+
):
494535
"""Test that initialization handles exceptions."""
495536
mock_client_class.side_effect = Exception("Init error")
496537

uv.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)