Skip to content

Commit 5b64c2a

Browse files
authored
Fix metrics response (#1176)
1 parent 0726c46 commit 5b64c2a

8 files changed

Lines changed: 86 additions & 31 deletions

File tree

container/grafana/dashboards/karapace.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -501,7 +501,7 @@
501501
},
502502
"disableTextWrap": false,
503503
"editorMode": "builder",
504-
"expr": "karapace_schema_reader_subjects_total",
504+
"expr": "karapace_schema_reader_subjects_total{exported_job=\"karapace-schema-registry\"}",
505505
"fullMetaSearch": false,
506506
"includeNullMetadata": true,
507507
"instant": false,
@@ -574,7 +574,7 @@
574574
},
575575
"disableTextWrap": false,
576576
"editorMode": "builder",
577-
"expr": "karapace_schema_reader_schemas_total",
577+
"expr": "karapace_schema_reader_schemas_total{exported_job=\"karapace-schema-registry\"}",
578578
"fullMetaSearch": false,
579579
"includeNullMetadata": true,
580580
"instant": false,

container/opentelemetry/collector-config.yaml

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,8 @@ exporters:
1414
endpoint: jaeger:4317
1515
tls:
1616
insecure: true
17-
otlphttp/prometheus:
18-
endpoint: http://prometheus:9090/api/v1/otlp
19-
tls:
20-
insecure: true
17+
prometheus:
18+
endpoint: "0.0.0.0:8889"
2119

2220
service:
2321
pipelines:
@@ -26,4 +24,4 @@ service:
2624
exporters: [otlp]
2725
metrics:
2826
receivers: [otlp]
29-
exporters: [otlphttp/prometheus]
27+
exporters: [prometheus]

container/prometheus/prometheus.yml

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,18 @@ rule_files:
1414
scrape_configs:
1515
- job_name: karapace-registry
1616
metrics_path: /metrics
17+
scheme: https
18+
tls_config:
19+
ca_file: /opt/prometheus/certs/rootCA.pem
1720
static_configs:
1821
- targets:
19-
- karapace-registry:8081
22+
- karapace-schema-registry:8081
2023

2124
- job_name: karapace-rest
2225
metrics_path: /metrics
2326
static_configs:
2427
- targets:
25-
- karapace-rest:8082
28+
- karapace-rest-proxy:8082
2629

2730
- job_name: statsd-exporter
2831
metrics_path: /metrics
@@ -33,6 +36,7 @@ scrape_configs:
3336
- statsd-exporter:9102
3437

3538
- job_name: opentelemetry-collector
39+
metrics_path: /metrics
3640
static_configs:
3741
- targets:
38-
- opentelemetry-collector:8888
42+
- opentelemetry-collector:8889

src/karapace/api/routers/metrics.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from fastapi import APIRouter, Depends, Response
88
from karapace.api.container import SchemaRegistryContainer
99
from karapace.core.instrumentation.prometheus import PrometheusInstrumentation
10+
from karapace.rapu import HTTPResponse
1011

1112
metrics_router = APIRouter(
1213
prefix=PrometheusInstrumentation.METRICS_ENDPOINT_PATH,
@@ -20,4 +21,10 @@
2021
async def metrics(
2122
prometheus: PrometheusInstrumentation = Depends(Provide[SchemaRegistryContainer.karapace_container.prometheus]),
2223
) -> Response:
23-
return Response(content=await prometheus.serve_metrics(), media_type=prometheus.CONTENT_TYPE_LATEST)
24+
try:
25+
await prometheus.serve_metrics()
26+
except HTTPResponse as ex:
27+
# FastAPI needs to extract the body and return a Response
28+
# generate_latest() returns bytes, so ex.body is already bytes
29+
content = ex.body if isinstance(ex.body, bytes) else ex.body.encode("utf-8")
30+
return Response(content=content, media_type=prometheus.CONTENT_TYPE_LATEST)

src/karapace/core/instrumentation/prometheus.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,10 @@
1010

1111
from aiohttp.web import middleware, Request, Response
1212
from collections.abc import Awaitable, Callable
13-
from karapace.rapu import RestApp
13+
from http import HTTPStatus
14+
from karapace.rapu import HTTPResponse, RestApp
1415
from prometheus_client import CollectorRegistry, Counter, Gauge, generate_latest, Histogram
15-
from typing import Final
16+
from typing import Final, NoReturn
1617

1718
import logging
1819
import time
@@ -70,8 +71,14 @@ def setup_metrics(cls, *, app: RestApp) -> None:
7071
app.app[cls.karapace_http_requests_in_progress] = cls.karapace_http_requests_in_progress
7172

7273
@classmethod
73-
async def serve_metrics(cls) -> bytes:
74-
return generate_latest(cls.registry)
74+
async def serve_metrics(cls) -> NoReturn:
75+
metrics_data = generate_latest(cls.registry)
76+
# Raise HTTPResponse for RestApp compatibility
77+
raise HTTPResponse(
78+
body=metrics_data,
79+
status=HTTPStatus.OK,
80+
content_type=cls.CONTENT_TYPE_LATEST,
81+
)
7582

7683
@classmethod
7784
@middleware

src/karapace/core/schema_reader.py

Lines changed: 32 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -367,6 +367,8 @@ def _is_ready(self) -> bool:
367367
if ready:
368368
self.max_messages_to_process = MAX_MESSAGES_TO_CONSUME_AFTER_STARTUP
369369
LOG.info("Ready in %s seconds", time.monotonic() - self.start_time)
370+
# Initialize metrics with current database state when becoming ready
371+
self._initialize_metrics()
370372
return ready
371373

372374
def highest_offset(self) -> int:
@@ -499,6 +501,33 @@ def _update_is_ready_flag(self) -> None:
499501
with self._ready_lock:
500502
self._ready = new_ready_flag
501503

504+
def _update_schema_gauges(self) -> tuple[int, int, int, int]:
505+
"""Update all schema-related gauge metrics with current database state.
506+
507+
Returns:
508+
Tuple of (num_subjects, num_schemas, live_versions, soft_deleted_versions)
509+
"""
510+
num_schemas = self.database.num_schemas()
511+
num_subjects = self.database.num_subjects()
512+
live_versions, soft_deleted_versions = self.database.num_schema_versions()
513+
514+
self.stats.set_schemas_num_total(value=num_schemas)
515+
self.stats.set_subjects_num_total(value=num_subjects)
516+
self.stats.set_schema_versions_num_total(live_versions=live_versions, soft_deleted_versions=soft_deleted_versions)
517+
518+
return num_subjects, num_schemas, live_versions, soft_deleted_versions
519+
520+
def _initialize_metrics(self) -> None:
521+
"""Initialize metrics with current database state."""
522+
num_subjects, num_schemas, live_versions, soft_deleted_versions = self._update_schema_gauges()
523+
LOG.info(
524+
"Metrics initialized: subjects=%s, schemas=%s, live_versions=%s, soft_deleted_versions=%s",
525+
num_subjects,
526+
num_schemas,
527+
live_versions,
528+
soft_deleted_versions,
529+
)
530+
502531
def _report_schema_metrics(
503532
self,
504533
schema_records_processed_keymode_canonical: int,
@@ -510,20 +539,9 @@ def _report_schema_metrics(
510539
with_deprecated_key=schema_records_processed_keymode_deprecated_karapace,
511540
)
512541

513-
# Update following gauges only if there is a possibility of a change.
514-
records_processed = bool(
515-
schema_records_processed_keymode_canonical or schema_records_processed_keymode_deprecated_karapace
516-
)
517-
if records_processed:
518-
num_schemas = self.database.num_schemas()
519-
num_subjects = self.database.num_subjects()
520-
self.stats.set_schemas_num_total(value=num_schemas)
521-
self.stats.set_subjects_num_total(value=num_subjects)
522-
523-
live_versions, soft_deleted_versions = self.database.num_schema_versions()
524-
self.stats.set_schema_versions_num_total(
525-
live_versions=live_versions, soft_deleted_versions=soft_deleted_versions
526-
)
542+
# Always update gauges to reflect current database state
543+
# This ensures metrics stay accurate even if no new records are processed
544+
self._update_schema_gauges()
527545

528546
def _handle_msg_config(self, key: dict, value: dict | None) -> None:
529547
subject = key.get("subject")

src/karapace/core/stats.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ def __init__(self, *, config: Config, meter: Meter) -> None:
3737
self.sentry_client: Final = get_sentry_client(sentry_config=(config.sentry or None))
3838
self._meter = meter
3939

40+
LOG.info("Initializing StatsClient with tags: %s", self._tags)
41+
4042
# Supports labels for keymode
4143
self._schema_records_processed_counter: Final[Counter] = self._meter.get_meter().create_counter(
4244
name=METRIC_SCHEMA_TOPIC_RECORDS_PROCESSED_COUNT,
@@ -67,12 +69,20 @@ def schema_records_processed(self, *, with_canonical_key: int, with_deprecated_k
6769
)
6870

6971
def set_schemas_num_total(self, *, value: int) -> None:
72+
LOG.debug("Setting schemas gauge to %s with attributes %s", value, self._tags)
7073
self._total_schemas_gauge.set(amount=value, attributes=self._tags)
7174

7275
def set_subjects_num_total(self, *, value: int) -> None:
76+
LOG.debug("Setting subjects gauge to %s with attributes %s", value, self._tags)
7377
self._total_subjects_gauge.set(amount=value, attributes=self._tags)
7478

7579
def set_schema_versions_num_total(self, *, live_versions: int, soft_deleted_versions: int) -> None:
80+
LOG.debug(
81+
"Setting schema versions gauge: live=%s, soft_deleted=%s with attributes %s",
82+
live_versions,
83+
soft_deleted_versions,
84+
self._tags,
85+
)
7686
self._schema_versions_gauge.set(amount=live_versions, attributes={"state": "live", **self._tags})
7787
self._schema_versions_gauge.set(amount=soft_deleted_versions, attributes={"state": "soft_deleted", **self._tags})
7888

tests/unit/instrumentation/test_prometheus.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
"""
77

88
import logging
9+
from http import HTTPStatus
910
from unittest.mock import AsyncMock, MagicMock, call, patch
1011

1112
import aiohttp.web
@@ -14,7 +15,7 @@
1415
from prometheus_client import CollectorRegistry, Counter, Gauge, Histogram
1516

1617
from karapace.core.instrumentation.prometheus import PrometheusInstrumentation
17-
from karapace.rapu import RestApp
18+
from karapace.rapu import HTTPResponse, RestApp
1819

1920

2021
# Simple request object implementing mapping semantics for START_TIME used by tests
@@ -130,9 +131,19 @@ def test_setup_metrics(self, caplog: LogCaptureFixture, prometheus: PrometheusIn
130131

131132
@patch("karapace.core.instrumentation.prometheus.generate_latest")
132133
async def test_serve_metrics(self, generate_latest: MagicMock, prometheus: PrometheusInstrumentation) -> None:
133-
await prometheus.serve_metrics()
134+
mock_metrics_data = b"# HELP test_metric Test metric\n# TYPE test_metric counter\ntest_metric 1\n"
135+
generate_latest.return_value = mock_metrics_data
136+
137+
with pytest.raises(HTTPResponse) as exc_info:
138+
await prometheus.serve_metrics()
139+
134140
generate_latest.assert_called_once_with(prometheus.registry)
135141

142+
# Verify HTTPResponse has correct attributes
143+
assert exc_info.value.status == HTTPStatus.OK
144+
assert exc_info.value.headers.get("Content-Type") == prometheus.CONTENT_TYPE_LATEST
145+
assert exc_info.value.body == mock_metrics_data
146+
136147
@patch("karapace.core.instrumentation.prometheus.time")
137148
async def test_http_request_metrics_middleware(
138149
self,

0 commit comments

Comments
 (0)