Skip to content

Commit 01d7a4f

Browse files
authored
Merge pull request redpanda-data#29127 from bashtanov/coco-expose
Expose coordinated compaction details in debug endpoint
2 parents 1873a0f + c8da2dd commit 01d7a4f

5 files changed

Lines changed: 64 additions & 11 deletions

File tree

src/v/cluster/cluster_utils.cc

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#include "cluster/snapshot.h"
1717
#include "cluster/types.h"
1818
#include "model/fips_config.h"
19+
#include "raft/consensus.h"
1920
#include "raft/consensus_utils.h"
2021
#include "raft/errc.h"
2122
#include "storage/disk_log_impl.h"
@@ -270,8 +271,15 @@ partition_state get_partition_state(ss::lw_shared_ptr<partition> partition) {
270271
state.revision_id = partition->get_revision_id();
271272
state.log_size_bytes = partition->size_bytes();
272273
state.non_log_disk_size_bytes = partition->non_log_disk_size_bytes();
274+
auto& coco = partition->raft()->get_compaction_coordinator();
273275
state.max_tombstone_removable_offset
274-
= partition->log()->stm_manager()->max_tombstone_remove_offset();
276+
= coco.get_max_tombstone_remove_offset();
277+
state.max_transaction_removable_offset
278+
= coco.get_max_transaction_remove_offset();
279+
state.max_cleanly_compacted_offset
280+
= coco.get_local_max_cleanly_compacted_offset();
281+
state.max_transaction_free_offset
282+
= coco.get_local_max_transaction_free_offset();
275283
state.is_read_replica_mode_enabled
276284
= partition->is_read_replica_mode_enabled();
277285
state.is_remote_fetch_enabled = partition->is_remote_fetch_enabled();

src/v/cluster/types.h

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2905,7 +2905,7 @@ struct partition_raft_state
29052905

29062906
struct partition_state
29072907
: serde::
2908-
envelope<partition_state, serde::version<2>, serde::compat_version<0>> {
2908+
envelope<partition_state, serde::version<3>, serde::compat_version<0>> {
29092909
model::offset start_offset;
29102910
model::offset committed_offset;
29112911
model::offset last_stable_offset;
@@ -2924,6 +2924,9 @@ struct partition_state
29242924
ss::sstring iceberg_mode;
29252925
partition_raft_state raft_state;
29262926
model::offset max_tombstone_removable_offset;
2927+
model::offset max_transaction_removable_offset;
2928+
model::offset max_cleanly_compacted_offset;
2929+
model::offset max_transaction_free_offset;
29272930

29282931
auto serde_fields() {
29292932
return std::tie(
@@ -2944,7 +2947,10 @@ struct partition_state
29442947
read_replica_bucket,
29452948
raft_state,
29462949
iceberg_mode,
2947-
max_tombstone_removable_offset);
2950+
max_tombstone_removable_offset,
2951+
max_transaction_removable_offset,
2952+
max_cleanly_compacted_offset,
2953+
max_transaction_free_offset);
29482954
}
29492955

29502956
friend bool operator==(const partition_state&, const partition_state&)

src/v/redpanda/admin/api-doc/debug.json

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1396,7 +1396,19 @@
13961396
},
13971397
"max_tombstone_removable_offset": {
13981398
"type": "long",
1399-
"description": "Maximum offset up to which tombstones/transaction end markers can be removed."
1399+
"description": "Maximum offset up to which tombstones can be removed (MTRO)."
1400+
},
1401+
"max_transaction_removable_offset": {
1402+
"type": "long",
1403+
"description": "Maximum offset up to which transaction end markers can be removed (MXRO)."
1404+
},
1405+
"max_cleanly_compacted_offset": {
1406+
"type": "long",
1407+
"description": "Maximum offset up to which this replica is cleanly compacted (MCCO)."
1408+
},
1409+
"max_transaction_free_offset": {
1410+
"type": "long",
1411+
"description": "Maximum offset up to which this replica is free of transaction fence batches and data (MXFO)."
14001412
},
14011413
"raft_state": {
14021414
"type": "raft_replica_state",

src/v/redpanda/admin/debug.cc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -925,6 +925,11 @@ admin_server::get_partition_state_handler(
925925
replica.non_log_disk_size_bytes = state.non_log_disk_size_bytes;
926926
replica.max_tombstone_removable_offset
927927
= state.max_tombstone_removable_offset;
928+
replica.max_transaction_removable_offset
929+
= state.max_transaction_removable_offset;
930+
replica.max_cleanly_compacted_offset
931+
= state.max_cleanly_compacted_offset;
932+
replica.max_transaction_free_offset = state.max_transaction_free_offset;
928933
replica.is_read_replica_mode_enabled
929934
= state.is_read_replica_mode_enabled;
930935
replica.read_replica_bucket = state.read_replica_bucket;

tests/rptest/tests/log_compaction_test.py

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
from ducktape.mark import matrix
1717
from ducktape.utils.util import wait_until
18+
from ducktape.cluster.cluster import ClusterNode
1819
import ducktape.errors
1920

2021
from rptest.clients.offline_log_viewer import OfflineLogViewer
@@ -951,6 +952,8 @@ def do_test_tx_control_batch_removal(self, test_case_name, test_case):
951952
f"Running test case {test_case_name} with topic {self.topic_spec.name}"
952953
)
953954

955+
self.check_all_offsets(lambda offset: offset <= 0)
956+
954957
self.start_partition_movement()
955958
self.produce(test_case)
956959

@@ -976,6 +979,31 @@ def produce_func():
976979

977980
self.wait_for_all_tx_batches_removed(produce_func)
978981

982+
self.check_all_offsets(lambda offset: offset > 0)
983+
984+
def get_partition_state(self, node: ClusterNode | None = None):
985+
state = self.redpanda._admin.get_partition_state(
986+
namespace="kafka",
987+
topic=self.topic_spec.name,
988+
partition=0,
989+
node=node,
990+
)["replicas"]
991+
self.redpanda.logger.debug(f"partition state: {state}")
992+
return state
993+
994+
def check_all_offsets(self, predicate):
995+
states = self.get_partition_state()
996+
for state in states:
997+
for offset_name in (
998+
"max_tombstone_removable_offset",
999+
"max_transaction_removable_offset",
1000+
"max_cleanly_compacted_offset",
1001+
"max_transaction_free_offset",
1002+
):
1003+
if not predicate(state[offset_name]):
1004+
return False
1005+
return True
1006+
9791007

9801008
class LogCompactionTxRemovalTest(LogCompactionTxRemovalTestBase):
9811009
def __init__(self, test_context):
@@ -1184,13 +1212,7 @@ def get_leader() -> Tuple[bool, int]:
11841212

11851213
# make sure it sets MTRO beyond the segment with the commit batch
11861214
def get_mtro():
1187-
state = self.redpanda._admin.get_partition_state(
1188-
namespace="kafka",
1189-
topic=self.topic_spec.name,
1190-
partition=0,
1191-
node=self.redpanda.get_node_by_id(leader_node),
1192-
)["replicas"]
1193-
self.redpanda.logger.debug(f"partition state: {state}")
1215+
state = self.get_partition_state(self.redpanda.get_node_by_id(leader_node))
11941216
assert len(state) == 1
11951217
return state[0]["max_tombstone_removable_offset"]
11961218

0 commit comments

Comments
 (0)