2020
2121import org .apache .hadoop .conf .Configuration ;
2222import org .apache .hadoop .fs .Path ;
23+ import org .apache .hadoop .hive .metastore .api .AbortTxnsRequest ;
2324import org .apache .hadoop .hive .metastore .api .CommitTxnRequest ;
2425import org .apache .hadoop .hive .metastore .api .CompactionRequest ;
2526import org .apache .hadoop .hive .metastore .api .CompactionType ;
3233import org .junit .jupiter .api .BeforeEach ;
3334import org .junit .jupiter .api .Test ;
3435
36+ import java .util .Collections ;
3537import java .util .List ;
38+ import java .util .Map ;
39+ import java .util .concurrent .TimeUnit ;
3640
37- import static org .apache .hadoop .hive .metastore .txn .TxnStore .FAILED_RESPONSE ;
41+ import static org .apache .hadoop .hive .metastore .txn .TxnStore .CLEANING_RESPONSE ;
3842import static org .apache .hadoop .hive .metastore .txn .TxnStore .SUCCEEDED_RESPONSE ;
3943import static org .apache .hadoop .hive .metastore .txn .TxnStore .WORKING_RESPONSE ;
4044import static org .apache .hadoop .hive .metastore .txn .TxnStore .INITIATED_STATE ;
@@ -59,7 +63,7 @@ protected boolean useMinHistoryWriteId() {
5963
6064 @ Test
6165 public void cleanupAfterAbortedAndRetriedMajorCompaction () throws Exception {
62- Table t = prepareTestTable ();
66+ Table t = prepareTestTable ("camtc" );
6367 CompactionRequest rqst = new CompactionRequest ("default" , "camtc" , CompactionType .MAJOR );
6468 long compactTxn = compactInTxn (rqst , CommitAction .ABORT );
6569 addBaseFile (t , null , 25L , 25 , compactTxn );
@@ -83,10 +87,10 @@ public void cleanupAfterAbortedAndRetriedMajorCompaction() throws Exception {
8387
8488 @ Test
8589 public void cleanupAfterKilledAndRetriedMajorCompaction () throws Exception {
86- Table t = prepareTestTable ();
90+ Table t = prepareTestTable ("camtc" );
8791 CompactionRequest rqst = new CompactionRequest ("default" , "camtc" , CompactionType .MAJOR );
88- long compactTxn = compactInTxn (rqst , CommitAction .NONE );
89- addBaseFile (t , null , 25L , 25 , compactTxn );
92+ long compactTxn1 = compactInTxn (rqst , CommitAction .NONE );
93+ addBaseFile (t , null , 25L , 25 , compactTxn1 );
9094
9195 txnHandler .revokeTimedoutWorkers (1L );
9296 // an open txn should prevent the retry
@@ -96,20 +100,32 @@ public void cleanupAfterKilledAndRetriedMajorCompaction() throws Exception {
96100
97101 // force retry
98102 revokeTimedoutWorkers (conf );
99- compactTxn = compactInTxn (rqst );
100- addBaseFile (t , null , 25L , 25 , compactTxn );
103+ long compactTxn2 = compactInTxn (rqst );
104+ addBaseFile (t , null , 25L , 25 , compactTxn2 );
101105
102106 startCleaner ();
103107
104- // Validate that the cleanup attempt has failed .
108+ // Validate that the cleanup attempt was skipped .
105109 rsp = txnHandler .showCompact (new ShowCompactRequest ());
106110 assertEquals (1 , rsp .getCompactsSize ());
107- assertEquals (FAILED_RESPONSE , rsp .getCompacts ().getFirst ().getState ());
108- assertEquals ("txnid:26 is open and <= hwm: 27" , rsp .getCompacts ().getFirst ().getErrorMessage ());
111+ assertEquals (CLEANING_RESPONSE , rsp .getCompacts ().getFirst ().getState ());
109112
110113 // Check that the files are not removed
111114 List <Path > paths = getDirectories (conf , t , null );
112115 assertEquals (6 , paths .size ());
116+
117+ // Abort the open compaction txn, so that the Cleaner can proceed.
118+ txnHandler .abortTxns (
119+ new AbortTxnsRequest (Collections .singletonList (compactTxn1 )));
120+ startCleaner ();
121+
122+ rsp = txnHandler .showCompact (new ShowCompactRequest ());
123+ assertEquals (1 , rsp .getCompactsSize ());
124+ assertEquals (SUCCEEDED_RESPONSE , rsp .getCompacts ().getFirst ().getState ());
125+
126+ // Check that the files are removed
127+ paths = getDirectories (conf , t , null );
128+ assertEquals (1 , paths .size ());
113129 }
114130
115131 private static void revokeTimedoutWorkers (Configuration conf ) throws Exception {
@@ -121,39 +137,88 @@ private static void revokeTimedoutWorkers(Configuration conf) throws Exception {
121137 }
122138
123139 @ Test
124- public void cleanupAfterMajorCompactionWithQueryWaitingToLockTheSnapshot () throws Exception {
125- Table t = prepareTestTable ();
140+ public void cleanupAndDanglingOpenTxnOnSameTable () throws Exception {
141+ Table t = prepareTestTable ("camtc" );
126142 CompactionRequest rqst = new CompactionRequest ("default" , "camtc" , CompactionType .MAJOR );
127143 long compactTxn = compactInTxn (rqst , CommitAction .MARK_COMPACTED );
128144 addBaseFile (t , null , 25L , 25 , compactTxn );
129145
130- // Open a query during compaction,
146+ // Open a readerTxn during compaction,
131147 // Do not register minOpenWriteId (i.e. simulate delay locking the snapshot)
132- openTxn ();
148+ long readerTxn = openTxn ();
133149
134150 txnHandler .commitTxn (new CommitTxnRequest (compactTxn ));
135- startCleaner ();
151+ Thread .sleep (MetastoreConf .getTimeVar (
152+ conf , ConfVars .TXN_OPENTXN_TIMEOUT , TimeUnit .MILLISECONDS ));
136153
137- // Validate that the cleanup attempt has failed.
154+ startCleaner (10 , TimeUnit .SECONDS );
155+
156+ // Validate that the cleanup attempt was delayed by retention time.
138157 ShowCompactResponse rsp = txnHandler .showCompact (new ShowCompactRequest ());
139158 assertEquals (1 , rsp .getCompactsSize ());
140- assertEquals (FAILED_RESPONSE , rsp .getCompacts ().getFirst ().getState ());
141- assertEquals ("txnid:27 is open and <= hwm: 27" , rsp .getCompacts ().getFirst ().getErrorMessage ());
159+ assertEquals (CLEANING_RESPONSE , rsp .getCompacts ().getFirst ().getState ());
142160
143161 // Check that the files are not removed
144162 List <Path > paths = getDirectories (conf , t , null );
145163 assertEquals (5 , paths .size ());
164+
165+ // Register minOpenWriteId for the readerTxn.
166+ txnHandler .addWriteIdsToMinHistory (readerTxn , Map .of ("default.camtc" , 1L ));
167+
168+ startCleaner (0 , TimeUnit .SECONDS );
169+
170+ // Validate that the cleanup attempt was blocked by readerTxn.
171+ rsp = txnHandler .showCompact (new ShowCompactRequest ());
172+ assertEquals (1 , rsp .getCompactsSize ());
173+ assertEquals (CLEANING_RESPONSE , rsp .getCompacts ().getFirst ().getState ());
174+
175+ // Check that the files are not removed
176+ paths = getDirectories (conf , t , null );
177+ assertEquals (5 , paths .size ());
178+ }
179+
180+ @ Test
181+ public void cleanupNotBlockedByOpenTxnOnAnotherTable () throws Exception {
182+ Table t1 = prepareTestTable ("camtc1" );
183+ Table t2 = prepareTestTable ("camtc2" );
184+
185+ // Open a readerTxn on t1 and register minOpenWriteId.
186+ long readerTxn = openTxn ();
187+ txnHandler .addWriteIdsToMinHistory (readerTxn , Map .of ("default.camtc1" , 1L ));
188+
189+ CompactionRequest rqstTbl1 = new CompactionRequest ("default" , "camtc1" , CompactionType .MAJOR );
190+ long compactTxn = compactInTxn (rqstTbl1 );
191+ addBaseFile (t1 , null , 25L , 25 , compactTxn );
192+
193+ CompactionRequest rqstTbl2 = new CompactionRequest ("default" , "camtc2" , CompactionType .MAJOR );
194+ compactTxn = compactInTxn (rqstTbl2 );
195+ addBaseFile (t2 , null , 25L , 25 , compactTxn );
196+
197+ startCleaner ();
198+
199+ ShowCompactResponse rsp = txnHandler .showCompact (new ShowCompactRequest ());
200+ assertEquals (2 , rsp .getCompactsSize ());
201+
202+ assertEquals (SUCCEEDED_RESPONSE , rsp .getCompacts ().get (0 ).getState ());
203+ assertEquals ("camtc2" , rsp .getCompacts ().get (0 ).getTablename ());
204+ // camtc2 was cleaned: only the new base remains.
205+ assertEquals (1 , getDirectories (conf , t2 , null ).size ());
206+
207+ assertEquals (CLEANING_RESPONSE , rsp .getCompacts ().get (1 ).getState ());
208+ assertEquals ("camtc1" , rsp .getCompacts ().get (1 ).getTablename ());
209+ // camtc1 wasn't actually cleaned (admission filter held it back).
210+ assertEquals (5 , getDirectories (conf , t1 , null ).size ());
146211 }
147212
148- private Table prepareTestTable () throws Exception {
149- Table t = newTable ("default" , "camtc" , false );
213+ private Table prepareTestTable (String tblName ) throws Exception {
214+ Table t = newTable ("default" , tblName , false );
150215
151216 addBaseFile (t , null , 20L , 20 );
152217 addDeltaFile (t , null , 21L , 22L , 2 );
153218 addDeltaFile (t , null , 23L , 24L , 2 );
154219 addDeltaFile (t , null , 25L , 25 , 2 );
155220
156- burnThroughTransactions ("default" , "camtc" , 25 );
221+ burnThroughTransactions ("default" , tblName , 25 );
157222 return t ;
158223 }
159224
0 commit comments