Skip to content

Commit fdc2a54

Browse files
tvaron3Copilot
andauthored
feat(cosmos/workloads): add performance metrics collection for DR drill testing (#46271)
* fix(workloads): use concurrent operations in r_w_q_workload, set 1M throughput - Uncomment concurrent upsert/read/query calls - Remove manual timing counters and log_request_counts - Set THROUGHPUT to 1000000 in workload_configs.py - Keep CIRCUIT_BREAKER_ENABLED = False (PPCB disabled) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * feat(workloads): add performance metrics collection for DR drill testing Add a performance metrics library that reports PerfResult documents to a Cosmos DB results account, matching the Rust perf tool schema exactly so both SDKs feed the same ADX → Grafana pipeline. New files: - perf_stats.py: Thread-safe latency histogram with sorted-list percentile calculation and atomic drain_all() for consistent summary+error snapshots - perf_config.py: All config from environment variables (RESULTS_COSMOS_URI, PERF_REPORT_INTERVAL=300s, perfdb/perfresults defaults) - perf_reporter.py: Background daemon thread that drains Stats every 5 min and upserts PerfResult documents via sync CosmosClient with AAD auth Modified files: - workload_configs.py: All configs now driven by environment variables - workload_utils.py: Added timed operation wrappers with error tracking (CosmosHttpResponseError status_code/sub_status extraction), only successful operations record latency to avoid polluting percentiles - All *_workload.py files: Integrated Stats + PerfReporter with try/finally lifecycle management Key design decisions: - Sorted-list percentiles (no hdrhistogram native dependency) - psutil for CPU/memory with /proc fallback on Linux - Cached psutil.Process() instance for accurate CPU readings - CosmosClient stored and closed properly to avoid resource leaks - sdk_language='python', sdk_version from azure.cosmos.__version__ - PPCB disabled by default - All reporter errors caught and logged as warnings (never crash workload) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * fix(workloads): require psutil, remove /proc fallback code psutil is now a hard import (not optional). Removed all /proc/meminfo and /proc/self/status fallback parsing — if psutil is not installed, the import will fail immediately rather than silently degrading. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * refactor(workloads): unify to single workload.py with env-var config - Single workload.py replaces 6 operation-specific files - WORKLOAD_OPERATIONS env var controls which ops run (read,write,query) - WORKLOAD_USE_PROXY env var enables Envoy proxy routing - WORKLOAD_USE_SYNC env var enables sync client - Validate operation names at import time with clear error - Replace manual sorted-list percentiles with hdrhistogram (O(1) record/query) - Fixed memory usage (~40KB per histogram vs unbounded list growth) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * refactor(workloads): delete old workload files replaced by unified workload.py Removed: r_workload.py, w_workload.py, r_proxy_workload.py, w_proxy_workload.py, r_w_q_workload.py, r_w_q_proxy_workload.py, r_w_q_sync_workload.py All replaced by workload.py with WORKLOAD_OPERATIONS and WORKLOAD_USE_PROXY env vars. Kept: r_w_q_with_incorrect_client_workload.py (special test case) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * feat(workloads): add WORKLOAD_SKIP_CLOSE to simulate unclosed clients Replaces r_w_q_with_incorrect_client_workload.py with an env var: WORKLOAD_SKIP_CLOSE=true creates the client without a context manager, simulating applications that don't properly close the Cosmos client. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * perf(workloads): use perf_counter_ns for higher precision timing Switch from time.perf_counter() * 1000 to time.perf_counter_ns() / 1_000_000 for nanosecond precision without floating-point multiplication artifacts. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * refactor(workloads): remove run_workloads.sh and dev.md Infra/orchestration scripts belong in the cosmos-sdk-copilot-toolkit repo, not in the SDK repo. Workload code (workload.py, perf_*, workload_utils.py) stays here. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * fix(workloads): correct hdrhistogram import — module is hdrh not hdrhistogram The pip package is 'hdrhistogram' but the Python module is 'hdrh'. Import changed from 'from hdrhistogram import HdrHistogram' to 'from hdrh.histogram import HdrHistogram'. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * fix(workloads): use get_mean_value() for hdrh API Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * style(workloads): fix black formatting and setup_env.sh references Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * feat(workloads): add config_multi_write_enabled to PerfResult Reports COSMOS_USE_MULTIPLE_WRITABLE_LOCATIONS in the config snapshot so it's visible in the Grafana dashboard and queryable from Kusto. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * fix(workloads): define multi_write variable in perf_reporter The variable was used but never defined — caused pylint E0602. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * fix(workloads): address review findings — lazy imports, session close, histogram clamp, safe parsing Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * fix(workloads): add perfdb, perfresults, ppcb, hdrh to cosmos cspell dictionary Move cspell words to sdk/cosmos/azure-cosmos/cspell.json instead of root .vscode/cspell.json to keep changes within cosmos folder scope. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * fix(workloads): use RESULTS_COSMOS_KEY when available, fallback to DefaultAzureCredential Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * feat(workloads): add config_proxy_enabled and config_skip_close to PerfResult Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * refactor(workloads): remove setup_env.sh — moved to cosmos-sdk-copilot-toolkit Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * refactor(workloads): deduplicate timed operations with shared helpers Extract _extra_kwargs(), _timed_call(), and _timed_call_async() to eliminate duplicated excluded_locations branching and timing/error recording across 6 operation functions. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * fix(workloads): address review — public import, sync+proxy fail-fast, flush lock 1. Use public azure.core.pipeline.transport import for AioHttpTransport 2. Fail fast with RuntimeError if WORKLOAD_USE_SYNC + WORKLOAD_USE_PROXY 3. Add threading.Lock around _flush(), skip final flush if thread alive Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * fix(workloads): re-enable WorkloadLoggerFilter to reduce log noise Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * fix(workloads): rename coro to coroutine/awaitable to fix cspell errors Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * ci: retrigger pipeline Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * feat(workloads): add WORKLOAD_NUM_CLIENTS for multi-client memory profiling Allows spawning multiple CosmosClient instances in a single process via WORKLOAD_NUM_CLIENTS env var (default: 1). Useful for reproducing memory scaling issues with many client instances. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * fix(workloads): cap error buffer at 2000, fix sync query variable capture - perf_stats: Replace unbounded _errors list with deque(maxlen=2000). Errors at this level are minimal due to SDK internal retries. - workload_utils: Capture loop variable in sync query_items with def _do_query(ri=random_item) to match async pattern. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * fix(workloads): address review — shared reporter, safe ints, consolidated helpers 1. Share Stats+PerfReporter across multi-client tasks (single workload_id) 2. Remove drain_summaries/drain_errors (use drain_all only) 3. Revert THROUGHPUT default to 100K (set COSMOS_THROUGHPUT=1000000 for DR drills) 4. Use _safe_int for all int env vars in workload_configs.py 5. Add try/except to sync reporter.stop() matching async path 6. Move circuit breaker env var from create_logger to workload_configs 7. Consolidate duplicated client constructor 8. Add TIMESTAMP schema compatibility comment 9. Consolidate _safe_int_env into perf_config.py Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --------- Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent bec9bd8 commit fdc2a54

18 files changed

Lines changed: 887 additions & 585 deletions
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
{
2+
"ignoreWords": [
3+
"perfdb",
4+
"perfresults",
5+
"ppcb",
6+
"hdrh",
7+
"hdrhistogram"
8+
]
9+
}

sdk/cosmos/azure-cosmos/tests/workloads/dev.md

Lines changed: 0 additions & 45 deletions
This file was deleted.
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
# The MIT License (MIT)
2+
# Copyright (c) Microsoft Corporation. All rights reserved.
3+
"""Performance reporting configuration from environment variables."""
4+
5+
import os
6+
import subprocess
7+
import uuid
8+
9+
10+
def _get_git_sha() -> str:
11+
"""Get the current git commit SHA, or 'unknown' if unavailable."""
12+
try:
13+
result = subprocess.run(
14+
["git", "rev-parse", "--short", "HEAD"],
15+
capture_output=True,
16+
text=True,
17+
timeout=5,
18+
)
19+
if result.returncode == 0:
20+
return result.stdout.strip()
21+
except Exception:
22+
pass
23+
return "unknown"
24+
25+
26+
def _safe_int_env(name: str, default: int) -> int:
27+
"""Read an integer from an environment variable with a fallback default."""
28+
return _safe_int(os.environ.get(name, str(default)), default)
29+
30+
31+
def _safe_int(value: object, default: int) -> int:
32+
try:
33+
return int(value)
34+
except (ValueError, TypeError):
35+
return default
36+
37+
38+
def get_perf_config() -> dict:
39+
"""Build performance reporter configuration from environment variables."""
40+
return {
41+
"enabled": os.environ.get("PERF_ENABLED", "true").lower() == "true",
42+
"results_endpoint": os.environ.get("RESULTS_COSMOS_URI", ""),
43+
"results_database": os.environ.get("RESULTS_COSMOS_DATABASE", "perfdb"),
44+
"results_container": os.environ.get("RESULTS_COSMOS_CONTAINER", "perfresults"),
45+
"report_interval": _safe_int(
46+
os.environ.get("PERF_REPORT_INTERVAL", "300"), 300
47+
),
48+
"workload_id": os.environ.get("PERF_WORKLOAD_ID", str(uuid.uuid4())),
49+
"commit_sha": os.environ.get("PERF_COMMIT_SHA", _get_git_sha()),
50+
}
Lines changed: 253 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,253 @@
1+
# The MIT License (MIT)
2+
# Copyright (c) Microsoft Corporation. All rights reserved.
3+
"""Background reporter that drains Stats and upserts PerfResult documents to Cosmos DB."""
4+
5+
import logging
6+
import os
7+
import socket
8+
import threading
9+
import uuid
10+
from datetime import datetime, timezone
11+
12+
import psutil
13+
14+
from perf_config import _safe_int_env
15+
from perf_stats import Stats
16+
17+
logger = logging.getLogger(__name__)
18+
19+
20+
def _get_sdk_version() -> str:
21+
"""Get the azure-cosmos SDK version string."""
22+
try:
23+
from azure.cosmos import __version__
24+
25+
return __version__
26+
except Exception:
27+
return "unknown"
28+
29+
30+
def _get_cpu_percent(process) -> float:
31+
"""Get current process CPU percent."""
32+
try:
33+
return process.cpu_percent(interval=None)
34+
except Exception:
35+
return 0.0
36+
37+
38+
def _get_memory_bytes(process) -> int:
39+
"""Get current process RSS in bytes."""
40+
try:
41+
return process.memory_info().rss
42+
except Exception:
43+
return 0
44+
45+
46+
def _get_system_cpu_percent() -> float:
47+
"""Get system-wide CPU percent."""
48+
try:
49+
return psutil.cpu_percent(interval=None)
50+
except Exception:
51+
return 0.0
52+
53+
54+
def _get_system_memory() -> tuple:
55+
"""Get system total and used memory in bytes."""
56+
try:
57+
mem = psutil.virtual_memory()
58+
return mem.total, mem.used
59+
except Exception:
60+
return 0, 0
61+
62+
63+
class PerfReporter:
64+
"""Background reporter that upserts PerfResult documents to Cosmos DB.
65+
66+
Uses a daemon thread with a sync CosmosClient. The reporter drains
67+
Stats at the configured interval and upserts one PerfResult document
68+
per operation. All errors are caught and logged — the workload is
69+
never affected.
70+
"""
71+
72+
def __init__(self, stats: Stats, config: dict):
73+
self._stats = stats
74+
self._config = config
75+
self._stop_event = threading.Event()
76+
self._thread = None
77+
self._flush_lock = threading.Lock()
78+
self._client = None
79+
self._container = None
80+
self._hostname = socket.gethostname()
81+
self._sdk_version = _get_sdk_version()
82+
self._process = psutil.Process()
83+
84+
def start(self):
85+
"""Start the background reporting thread (daemon)."""
86+
self._thread = threading.Thread(
87+
target=self._run, daemon=True, name="perf-reporter"
88+
)
89+
self._thread.start()
90+
logger.info(
91+
"PerfReporter started (interval=%ds, workload_id=%s)",
92+
self._config["report_interval"],
93+
self._config["workload_id"],
94+
)
95+
96+
def stop(self):
97+
"""Stop the reporter and flush final results."""
98+
self._stop_event.set()
99+
if self._thread and self._thread.is_alive():
100+
self._thread.join(timeout=30)
101+
# Final flush — only if background thread has stopped to avoid concurrent writes
102+
if self._thread and self._thread.is_alive():
103+
logger.warning("PerfReporter thread still alive after join timeout, skipping final flush")
104+
else:
105+
try:
106+
with self._flush_lock:
107+
self._ensure_container()
108+
self._flush()
109+
except Exception as e:
110+
logger.warning("PerfReporter final flush failed: %s", e)
111+
if self._client:
112+
try:
113+
self._client.close()
114+
except Exception:
115+
pass
116+
logger.info("PerfReporter stopped")
117+
118+
def _run(self):
119+
"""Reporter loop: wait for interval, then flush."""
120+
try:
121+
self._ensure_container()
122+
except Exception as e:
123+
logger.warning("PerfReporter failed to initialize Cosmos client: %s", e)
124+
return
125+
126+
# Prime psutil CPU counters (first call always returns 0)
127+
_get_cpu_percent(self._process)
128+
_get_system_cpu_percent()
129+
130+
while not self._stop_event.wait(timeout=self._config["report_interval"]):
131+
try:
132+
with self._flush_lock:
133+
self._flush()
134+
except Exception as e:
135+
logger.warning("PerfReporter flush failed: %s", e)
136+
137+
def _ensure_container(self):
138+
"""Lazily create the sync CosmosClient and get the container reference."""
139+
if self._container is not None:
140+
return
141+
142+
from azure.cosmos import CosmosClient
143+
144+
endpoint = self._config["results_endpoint"]
145+
if not endpoint:
146+
raise ValueError("RESULTS_COSMOS_URI is not set")
147+
148+
key = os.environ.get("RESULTS_COSMOS_KEY", "")
149+
if key:
150+
credential = key
151+
else:
152+
from azure.identity import DefaultAzureCredential
153+
credential = DefaultAzureCredential()
154+
155+
self._client = CosmosClient(endpoint, credential)
156+
db = self._client.get_database_client(self._config["results_database"])
157+
self._container = db.get_container_client(self._config["results_container"])
158+
159+
def _flush(self):
160+
"""Drain stats and upsert PerfResult + ErrorResult documents."""
161+
if self._container is None:
162+
return
163+
164+
now = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%fZ")
165+
cpu = _get_cpu_percent(self._process)
166+
mem = _get_memory_bytes(self._process)
167+
sys_cpu = _get_system_cpu_percent()
168+
sys_total, sys_used = _get_system_memory()
169+
170+
concurrency = _safe_int_env("COSMOS_CONCURRENT_REQUESTS", 100)
171+
preferred = os.environ.get("COSMOS_PREFERRED_LOCATIONS", "")
172+
excluded = os.environ.get("COSMOS_CLIENT_EXCLUDED_LOCATIONS", "")
173+
ppcb = (
174+
os.environ.get("AZURE_COSMOS_ENABLE_CIRCUIT_BREAKER", "false").lower()
175+
== "true"
176+
)
177+
multi_write = (
178+
os.environ.get("COSMOS_USE_MULTIPLE_WRITABLE_LOCATIONS", "false").lower()
179+
== "true"
180+
)
181+
proxy_enabled = (
182+
os.environ.get("WORKLOAD_USE_PROXY", "false").lower() == "true"
183+
)
184+
skip_close = (
185+
os.environ.get("WORKLOAD_SKIP_CLOSE", "false").lower() == "true"
186+
)
187+
188+
summaries, errors = self._stats.drain_all()
189+
for s in summaries:
190+
doc = {
191+
"id": str(uuid.uuid4()),
192+
"partition_key": str(uuid.uuid4()),
193+
"workload_id": self._config["workload_id"],
194+
"commit_sha": self._config["commit_sha"],
195+
"hostname": self._hostname,
196+
"TIMESTAMP": now, # ALL_CAPS for Rust SDK PerfResults schema compatibility
197+
"operation": s["operation"],
198+
"count": s["count"],
199+
"errors": s["errors"],
200+
"min_ms": round(s["min_ms"], 3),
201+
"max_ms": round(s["max_ms"], 3),
202+
"mean_ms": round(s["mean_ms"], 3),
203+
"p50_ms": round(s["p50_ms"], 3),
204+
"p90_ms": round(s["p90_ms"], 3),
205+
"p99_ms": round(s["p99_ms"], 3),
206+
"cpu_percent": round(cpu, 1),
207+
"memory_bytes": mem,
208+
"system_cpu_percent": round(sys_cpu, 1),
209+
"system_total_memory_bytes": sys_total,
210+
"system_used_memory_bytes": sys_used,
211+
"sdk_language": "python",
212+
"sdk_version": self._sdk_version,
213+
"config_concurrency": concurrency,
214+
"config_application_region": preferred,
215+
"config_excluded_regions": excluded,
216+
"config_ppcb_enabled": ppcb,
217+
"config_multi_write_enabled": multi_write,
218+
"config_proxy_enabled": proxy_enabled,
219+
"config_skip_close": skip_close,
220+
}
221+
try:
222+
self._container.upsert_item(doc)
223+
except Exception as e:
224+
logger.warning(
225+
"PerfReporter upsert failed for %s: %s", s["operation"], e
226+
)
227+
228+
for err in errors:
229+
doc = {
230+
"id": str(uuid.uuid4()),
231+
"partition_key": str(uuid.uuid4()),
232+
"workload_id": self._config["workload_id"],
233+
"commit_sha": self._config["commit_sha"],
234+
"hostname": self._hostname,
235+
"TIMESTAMP": now, # ALL_CAPS for Rust SDK PerfResults schema compatibility
236+
"operation": err["operation"],
237+
"error_message": err["error_message"][:2000],
238+
"source_message": err["source_message"][:4000],
239+
"sdk_language": "python",
240+
"error_status_code": err.get("error_status_code"),
241+
"error_sub_status_code": err.get("error_sub_status_code"),
242+
}
243+
try:
244+
self._container.upsert_item(doc)
245+
except Exception as e:
246+
logger.warning("PerfReporter error upsert failed: %s", e)
247+
248+
249+
def _safe_int_env(name: str, default: int) -> int:
250+
try:
251+
return int(os.environ.get(name, str(default)))
252+
except (ValueError, TypeError):
253+
return default

0 commit comments

Comments
 (0)