1515
1616from ducktape .mark import matrix
1717from ducktape .utils .util import wait_until
18+ from ducktape .cluster .cluster import ClusterNode
1819import ducktape .errors
1920
2021from rptest .clients .offline_log_viewer import OfflineLogViewer
@@ -951,6 +952,8 @@ def do_test_tx_control_batch_removal(self, test_case_name, test_case):
951952 f"Running test case { test_case_name } with topic { self .topic_spec .name } "
952953 )
953954
955+ self .check_all_offsets (lambda offset : offset <= 0 )
956+
954957 self .start_partition_movement ()
955958 self .produce (test_case )
956959
@@ -976,6 +979,31 @@ def produce_func():
976979
977980 self .wait_for_all_tx_batches_removed (produce_func )
978981
982+ self .check_all_offsets (lambda offset : offset > 0 )
983+
984+ def get_partition_state (self , node : ClusterNode | None = None ):
985+ state = self .redpanda ._admin .get_partition_state (
986+ namespace = "kafka" ,
987+ topic = self .topic_spec .name ,
988+ partition = 0 ,
989+ node = node ,
990+ )["replicas" ]
991+ self .redpanda .logger .debug (f"partition state: { state } " )
992+ return state
993+
994+ def check_all_offsets (self , predicate ):
995+ states = self .get_partition_state ()
996+ for state in states :
997+ for offset_name in (
998+ "max_tombstone_removable_offset" ,
999+ "max_transaction_removable_offset" ,
1000+ "max_cleanly_compacted_offset" ,
1001+ "max_transaction_free_offset" ,
1002+ ):
1003+ if not predicate (state [offset_name ]):
1004+ return False
1005+ return True
1006+
9791007
9801008class LogCompactionTxRemovalTest (LogCompactionTxRemovalTestBase ):
9811009 def __init__ (self , test_context ):
@@ -1184,13 +1212,7 @@ def get_leader() -> Tuple[bool, int]:
11841212
11851213 # make sure it sets MTRO beyond the segment with the commit batch
11861214 def get_mtro ():
1187- state = self .redpanda ._admin .get_partition_state (
1188- namespace = "kafka" ,
1189- topic = self .topic_spec .name ,
1190- partition = 0 ,
1191- node = self .redpanda .get_node_by_id (leader_node ),
1192- )["replicas" ]
1193- self .redpanda .logger .debug (f"partition state: { state } " )
1215+ state = self .get_partition_state (self .redpanda .get_node_by_id (leader_node ))
11941216 assert len (state ) == 1
11951217 return state [0 ]["max_tombstone_removable_offset" ]
11961218
0 commit comments