Skip to content

Commit 94922d0

Browse files
onewlandclaude
andcommitted
feat(lw-deletions): Use PartMutation metric for backpressure signal
Switch the lw-deletions consumer's backpressure signal from "number of incomplete mutations" (system.mutations WHERE is_done=0) to the PartMutation metric from system.metrics. This gives a real-time measurement of how many parts are actively being mutated, which correlates directly with CPU usage. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 26ecf4d commit 94922d0

4 files changed

Lines changed: 49 additions & 29 deletions

File tree

snuba/lw_deletions/strategy.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
from snuba.web.delete_query import (
4343
TooManyOngoingMutationsError,
4444
_execute_query,
45-
_num_ongoing_mutations,
45+
_num_parts_currently_mutating,
4646
)
4747

4848
TPayload = TypeVar("TPayload")
@@ -322,23 +322,23 @@ def _execute_single_delete(
322322
time.sleep(delay_ms / 1000.0)
323323

324324
def _check_ongoing_mutations(self, skip_throttle: bool = False) -> None:
325-
max_ongoing_mutations = typing.cast(
325+
max_parts_mutating = typing.cast(
326326
int,
327327
get_int_config(
328-
"max_ongoing_mutations_for_delete",
329-
default=settings.MAX_ONGOING_MUTATIONS_FOR_DELETE,
328+
"max_parts_mutating_for_delete",
329+
default=settings.MAX_PARTS_MUTATING_FOR_DELETE,
330330
),
331331
)
332332

333333
# Fast path: local counter already exceeds limit, no need to query CH
334-
if self.__local_inflight_count > max_ongoing_mutations:
334+
if self.__local_inflight_count > max_parts_mutating:
335335
now = time.time()
336336
if (
337337
self.__last_ongoing_mutations_check is not None
338338
and now - self.__last_ongoing_mutations_check < 1.0
339339
):
340340
raise TooManyOngoingMutationsError(
341-
f"local inflight count {self.__local_inflight_count} exceeds max {max_ongoing_mutations}"
341+
f"local inflight count {self.__local_inflight_count} exceeds max {max_parts_mutating}"
342342
)
343343
# Fall through to reconcile with CH
344344

@@ -352,14 +352,14 @@ def _check_ongoing_mutations(self, skip_throttle: bool = False) -> None:
352352
"ongoing mutations check is throttled to once per second"
353353
)
354354
start = time.time()
355-
ongoing_mutations = _num_ongoing_mutations(self.__storage.get_cluster(), self.__tables)
355+
parts_mutating = _num_parts_currently_mutating(self.__storage.get_cluster())
356356
self.__last_ongoing_mutations_check = time.time()
357357
# Reconcile: trust CH as source of truth for completions
358-
self.__local_inflight_count = ongoing_mutations
358+
self.__local_inflight_count = parts_mutating
359359
self.__metrics.timing("ongoing_mutations_query_ms", (time.time() - start) * 1000)
360-
if ongoing_mutations > max_ongoing_mutations:
360+
if parts_mutating > max_parts_mutating:
361361
raise TooManyOngoingMutationsError(
362-
f"{ongoing_mutations} mutations for {self.__tables} table(s) is above max ongoing mutations: {max_ongoing_mutations} "
362+
f"{parts_mutating} parts mutating is above max: {max_parts_mutating} "
363363
)
364364

365365
def close(self) -> None:

snuba/settings/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -469,6 +469,7 @@ class RedisClusters(TypedDict):
469469
VALIDATE_DATASET_YAMLS_ON_STARTUP = False
470470

471471
MAX_ONGOING_MUTATIONS_FOR_DELETE = 5
472+
MAX_PARTS_MUTATING_FOR_DELETE = 20
472473
LW_DELETES_PARTITION_TRACKING_TTL = 3600
473474
LW_DELETES_PER_SUBMIT_BUDGET = 5
474475
SNQL_DISABLED_DATASETS: set[str] = set([])

snuba/web/delete_query.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,25 @@ def _num_ongoing_mutations(cluster: ClickhouseCluster, tables: Sequence[str]) ->
203203
)
204204

205205

206+
def _num_parts_currently_mutating(cluster: ClickhouseCluster) -> int:
207+
"""
208+
Returns the number of parts currently being mutated across the cluster.
209+
Uses the PartMutation metric from system.metrics, which directly correlates
210+
with CPU usage from ongoing mutations.
211+
"""
212+
if cluster.is_single_node():
213+
query = "SELECT value FROM system.metrics WHERE metric = 'PartMutation'"
214+
else:
215+
query = f"""
216+
SELECT max(value)
217+
FROM clusterAllReplicas('{cluster.get_clickhouse_cluster_name()}', 'system', metrics)
218+
WHERE metric = 'PartMutation'
219+
"""
220+
return int(
221+
cluster.get_query_connection(ClickhouseClientSettings.QUERY).execute(query).results[0][0]
222+
)
223+
224+
206225
def deletes_are_enabled() -> bool:
207226
return bool(get_config("storage_deletes_enabled", 1))
208227

tests/lw_deletions/test_lw_deletions.py

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ def generate_message() -> Iterator[Message[KafkaPayload]]:
5959
i += 1
6060

6161

62-
@patch("snuba.lw_deletions.strategy._num_ongoing_mutations", return_value=1)
62+
@patch("snuba.lw_deletions.strategy._num_parts_currently_mutating", return_value=1)
6363
@patch("snuba.lw_deletions.strategy._execute_query")
6464
@pytest.mark.redis_db
6565
def test_multiple_batches_strategies(mock_execute: Mock, mock_num_mutations: Mock) -> None:
@@ -85,7 +85,7 @@ def test_multiple_batches_strategies(mock_execute: Mock, mock_num_mutations: Moc
8585
assert commit_step.submit.call_count == 2
8686

8787

88-
@patch("snuba.lw_deletions.strategy._num_ongoing_mutations", return_value=1)
88+
@patch("snuba.lw_deletions.strategy._num_parts_currently_mutating", return_value=1)
8989
@patch("snuba.lw_deletions.strategy._execute_query")
9090
@pytest.mark.redis_db
9191
def test_clickhouse_settings(mock_execute: Mock, mock_num_mutations: Mock) -> None:
@@ -125,7 +125,7 @@ def test_clickhouse_settings(mock_execute: Mock, mock_num_mutations: Mock) -> No
125125
assert commit_step.submit.call_count == 2
126126

127127

128-
@patch("snuba.lw_deletions.strategy._num_ongoing_mutations", return_value=1)
128+
@patch("snuba.lw_deletions.strategy._num_parts_currently_mutating", return_value=1)
129129
@patch("snuba.lw_deletions.strategy._execute_query")
130130
@pytest.mark.redis_db
131131
def test_single_batch(mock_execute: Mock, mock_num_mutations: Mock) -> None:
@@ -160,16 +160,16 @@ def test_single_batch(mock_execute: Mock, mock_num_mutations: Mock) -> None:
160160
assert commit_step.submit.call_count == 1
161161

162162

163-
@patch("snuba.lw_deletions.strategy._num_ongoing_mutations", return_value=10)
163+
@patch("snuba.lw_deletions.strategy._num_parts_currently_mutating", return_value=100)
164164
@patch("snuba.lw_deletions.strategy._execute_query")
165165
@pytest.mark.redis_db
166166
def test_too_many_mutations(mock_execute: Mock, mock_num_mutations: Mock) -> None:
167167
"""
168168
Before we execute the DELETE FROM query, we check to see how many
169-
ongoing mutations there are.If there are more ongoing mutations than
170-
the max allows, we raise MessageRejected and back pressure is applied.
169+
parts are currently mutating. If there are more than the max allows,
170+
we raise MessageRejected and back pressure is applied.
171171
172-
The max is 5 (the default) and our mocked ongoing mutations is 10.
172+
The max is 20 (the default) and our mocked parts mutating is 100.
173173
"""
174174
commit_step = Mock()
175175
metrics = Mock()
@@ -221,7 +221,7 @@ def _make_single_message(
221221
)
222222

223223

224-
@patch("snuba.lw_deletions.strategy._num_ongoing_mutations", return_value=1)
224+
@patch("snuba.lw_deletions.strategy._num_parts_currently_mutating", return_value=1)
225225
@patch("snuba.lw_deletions.strategy._execute_query")
226226
@patch.object(
227227
FormatQuery,
@@ -275,7 +275,7 @@ def test_split_by_partition_enabled(mock_execute: Mock, mock_num_mutations: Mock
275275
assert len(increment_calls) == 3
276276

277277

278-
@patch("snuba.lw_deletions.strategy._num_ongoing_mutations", return_value=1)
278+
@patch("snuba.lw_deletions.strategy._num_parts_currently_mutating", return_value=1)
279279
@patch("snuba.lw_deletions.strategy._execute_query")
280280
@pytest.mark.redis_db
281281
def test_split_by_partition_disabled(mock_execute: Mock, mock_num_mutations: Mock) -> None:
@@ -304,7 +304,7 @@ def test_split_by_partition_disabled(mock_execute: Mock, mock_num_mutations: Moc
304304
assert commit_step.submit.call_count == 1
305305

306306

307-
@patch("snuba.lw_deletions.strategy._num_ongoing_mutations", return_value=1)
307+
@patch("snuba.lw_deletions.strategy._num_parts_currently_mutating", return_value=1)
308308
@patch("snuba.lw_deletions.strategy._execute_query")
309309
@pytest.mark.redis_db
310310
def test_split_by_partition_redis_tracking(mock_execute: Mock, mock_num_mutations: Mock) -> None:
@@ -394,7 +394,7 @@ def test_split_by_partition_redis_tracking(mock_execute: Mock, mock_num_mutation
394394
assert commit_step.submit.call_count == 1
395395

396396

397-
@patch("snuba.lw_deletions.strategy._num_ongoing_mutations", return_value=1)
397+
@patch("snuba.lw_deletions.strategy._num_parts_currently_mutating", return_value=1)
398398
@patch("snuba.lw_deletions.strategy._execute_query")
399399
@pytest.mark.redis_db
400400
def test_split_by_partition_fallback(mock_execute: Mock, mock_num_mutations: Mock) -> None:
@@ -436,7 +436,7 @@ def test_split_by_partition_fallback(mock_execute: Mock, mock_num_mutations: Moc
436436
assert commit_step.submit.call_count == 1
437437

438438

439-
@patch("snuba.lw_deletions.strategy._num_ongoing_mutations", return_value=1)
439+
@patch("snuba.lw_deletions.strategy._num_parts_currently_mutating", return_value=1)
440440
@patch("snuba.lw_deletions.strategy._execute_query")
441441
@pytest.mark.redis_db
442442
def test_partition_date_filtering(mock_execute: Mock, mock_num_mutations: Mock) -> None:
@@ -500,7 +500,7 @@ def test_local_inflight_counter_reconciles_with_ch(mock_execute: Mock) -> None:
500500
metrics = Mock()
501501
storage = get_writable_storage(StorageKey("search_issues"))
502502

503-
state.set_config("max_ongoing_mutations_for_delete", 3)
503+
state.set_config("max_parts_mutating_for_delete", 3)
504504
state.set_config("lw_deletes_split_by_partition_search_issues", 1)
505505
state.set_config("lw_deletes_per_submit_budget", 100)
506506

@@ -510,7 +510,7 @@ def test_local_inflight_counter_reconciles_with_ch(mock_execute: Mock) -> None:
510510

511511
# CH always returns 0 (stale) — all 3 partitions should execute
512512
with (
513-
patch("snuba.lw_deletions.strategy._num_ongoing_mutations", return_value=0),
513+
patch("snuba.lw_deletions.strategy._num_parts_currently_mutating", return_value=0),
514514
patch.object(
515515
format_query,
516516
"_FormatQuery__partition_column",
@@ -548,11 +548,11 @@ def test_local_counter_increments_after_each_delete(mock_execute: Mock) -> None:
548548
metrics = Mock()
549549
storage = get_writable_storage(StorageKey("search_issues"))
550550

551-
state.set_config("max_ongoing_mutations_for_delete", 5)
551+
state.set_config("max_parts_mutating_for_delete", 5)
552552

553553
format_query = FormatQuery(Mock(), storage, SearchIssuesFormatter(), metrics)
554554

555-
with patch("snuba.lw_deletions.strategy._num_ongoing_mutations", return_value=0):
555+
with patch("snuba.lw_deletions.strategy._num_parts_currently_mutating", return_value=0):
556556
# Initial CH check: reconciles local counter to 0
557557
format_query._check_ongoing_mutations()
558558
assert format_query._FormatQuery__local_inflight_count == 0 # type: ignore[attr-defined]
@@ -570,7 +570,7 @@ def test_local_counter_increments_after_each_delete(mock_execute: Mock) -> None:
570570
assert format_query._FormatQuery__local_inflight_count == 0 # type: ignore[attr-defined]
571571

572572

573-
@patch("snuba.lw_deletions.strategy._num_ongoing_mutations", return_value=1)
573+
@patch("snuba.lw_deletions.strategy._num_parts_currently_mutating", return_value=1)
574574
@patch("snuba.lw_deletions.strategy._execute_query")
575575
@pytest.mark.redis_db
576576
def test_per_submit_budget_exhaustion(mock_execute: Mock, mock_num_mutations: Mock) -> None:
@@ -655,7 +655,7 @@ def test_per_submit_budget_exhaustion(mock_execute: Mock, mock_num_mutations: Mo
655655
assert len(members) == 5
656656

657657

658-
@patch("snuba.lw_deletions.strategy._num_ongoing_mutations", return_value=1)
658+
@patch("snuba.lw_deletions.strategy._num_parts_currently_mutating", return_value=1)
659659
@patch("snuba.lw_deletions.strategy._execute_query")
660660
@patch("snuba.lw_deletions.strategy.time.sleep")
661661
@pytest.mark.redis_db
@@ -684,7 +684,7 @@ def test_inter_delete_delay(mock_sleep: Mock, mock_execute: Mock, mock_num_mutat
684684
mock_sleep.assert_called_once_with(0.2)
685685

686686

687-
@patch("snuba.lw_deletions.strategy._num_ongoing_mutations", return_value=1)
687+
@patch("snuba.lw_deletions.strategy._num_parts_currently_mutating", return_value=1)
688688
@patch("snuba.lw_deletions.strategy._execute_query")
689689
@patch("snuba.lw_deletions.strategy.time.sleep")
690690
@pytest.mark.redis_db

0 commit comments

Comments
 (0)