2424import com .arcadedb .engine .timeseries .codec .DeltaOfDeltaCodec ;
2525import com .arcadedb .engine .timeseries .codec .DictionaryCodec ;
2626import com .arcadedb .engine .timeseries .codec .TimeSeriesCodec ;
27+ import com .arcadedb .exception .ConcurrentModificationException ;
2728import com .arcadedb .log .LogManager ;
2829import com .arcadedb .schema .LocalSchema ;
2930
@@ -56,6 +57,9 @@ public class TimeSeriesShard implements AutoCloseable {
5657 private final long compactionBucketIntervalMs ;
5758 private final TimeSeriesBucket mutableBucket ;
5859 private final TimeSeriesSealedStore sealedStore ;
60+ // LOCK ORDERING: any code path that takes both {@link #appendLock} and {@link #compactionLock}
61+ // MUST acquire appendLock FIRST (see appendSamples()). compact() takes only compactionLock
62+ // (never appendLock), so no inversion exists today; preserve this order to keep it that way.
5963 // Read lock: held by scan/iterate (concurrent reads allowed).
6064 // Write lock: held by compact() to prevent queries from seeing data twice
6165 // during the window where sealed blocks are written but mutable not yet cleared.
@@ -76,6 +80,15 @@ public class TimeSeriesShard implements AutoCloseable {
7680 // does not spam the log every maintenance cycle.
7781 private volatile long lastOversizedWarnMs = 0 ;
7882
83+ /**
84+ * Test-only hook. When non-null, fires in {@link #compactInternal()} immediately before Phase 4c
85+ * acquires the write lock. Used to race concurrent appends against Phase 4c deterministically.
86+ * <p>
87+ * Tests that set this MUST reset it to {@code null} in an {@code @AfterEach} method, otherwise
88+ * it leaks into subsequent tests in the same JVM and silently alters their compaction timing.
89+ */
90+ public static volatile Runnable TEST_PRE_PHASE4C_HOOK = null ;
91+
7992 public TimeSeriesShard (final DatabaseInternal database , final String baseName , final int shardIndex ,
8093 final List <ColumnDefinition > columns ) throws IOException {
8194 this (database , baseName , shardIndex , columns , 0 );
@@ -161,54 +174,74 @@ public TimeSeriesShard(final DatabaseInternal database, final String baseName, f
161174 /**
162175 * Appends samples to the mutable bucket.
163176 * <p>
164- * The read lock is held for the <em>entire</em> internal transaction lifecycle
165- * (begin → write → commit), not just during the page writes. This is the key invariant
166- * that prevents MVCC conflicts with Phase 4:
167- * <ul>
168- * <li>Phase 4 acquires the <em>write</em> lock to clear the mutable bucket.</li>
169- * <li>The write lock can only be granted after all read-lock holders have released.</li>
170- * <li>Because this method releases the read lock only <em>after</em> the commit, Phase 4
171- * is guaranteed that every in-flight append has already persisted its page-0 modifications
172- * before Phase 4 starts its own transaction. Phase 4 always sees the latest page-0
173- * version and commits without conflict; insert transactions are never affected.</li>
174- * </ul>
175- * <p>
176- * This method always manages its own transaction. If the caller already has an active
177- * transaction, ArcadeDB creates a nested transaction (a new {@code TransactionContext} pushed
178- * onto the per-thread stack). The nested transaction commits independently; the caller's outer
179- * transaction remains unaffected because it holds none of the modified pages in its dirty set.
180- * <p>
181177 * Concurrent calls on the <em>same shard</em> are serialized by {@link #appendLock} so that
182178 * MVCC page-version conflicts can never arise between two concurrent appends. Writes to
183179 * different shards still proceed in parallel.
180+ * <p>
181+ * On Raft HA leaders, the compaction read lock is released before {@code commit()} to prevent a
182+ * deadlock. {@code commit()} calls {@code waitForActiveRecordingSession()}, which waits for the
183+ * compaction recording session to end; but the session ends only after Phase 4c (which needs the
184+ * write lock); and Phase 4c cannot acquire the write lock while this thread holds the read lock.
185+ * Releasing the lock early lets Phase 4c proceed. If Phase 4c clears the mutable bucket before
186+ * our commit, we get a {@code ConcurrentModificationException} and retry transparently on the
187+ * freshly-cleared page (see issue #4458). In standalone mode the lock is held through commit as
188+ * before, since there is no recording-session deadlock.
184189 */
185190 public void appendSamples (final long [] timestamps , final Object []... columnValues ) throws IOException {
186- compactionLock .readLock ().lock ();
191+ // Route the append transaction through the HA wrapper so the mutable-bucket page writes are
192+ // shipped to followers via the Raft WAL (TX_ENTRY). On a standalone database,
193+ // getWrappedDatabaseInstance() returns the same instance.
194+ final DatabaseInternal db = database .getWrappedDatabaseInstance ();
195+
196+ appendLock .lock ();
187197 try {
188- // Serialize concurrent appends on this shard to prevent MVCC conflicts.
189- // Two concurrent nested transactions both modifying page 0 would otherwise produce a
190- // ConcurrentModificationException at commit time (current v.X <> database v.Y).
191- appendLock .lock ();
192- try {
193- // Route the append transaction through the HA wrapper so the mutable-bucket page writes are
194- // shipped to followers via the Raft WAL (TX_ENTRY). Committing on the inner LocalDatabase here
195- // would persist the pages locally only, leaving followers with an empty bucket (issue #4382).
196- // On a standalone database getWrappedDatabaseInstance() returns the same instance.
197- final DatabaseInternal db = database .getWrappedDatabaseInstance ();
198- db .begin ();
198+ for (int attempt = 3 ; attempt > 0 ; attempt --) {
199+ compactionLock .readLock ().lock ();
200+ boolean readLockHeld = true ;
199201 try {
200- mutableBucket .appendSamples (timestamps , columnValues );
201- db .commit ();
202- } catch (final Exception e ) {
203- if (db .isTransactionActive ())
204- db .rollback ();
205- throw e instanceof IOException ? (IOException ) e : new IOException ("Failed to append timeseries samples" , e );
202+ db .begin ();
203+ try {
204+ mutableBucket .appendSamples (timestamps , columnValues );
205+ } catch (final Exception e ) {
206+ if (db .isTransactionActive ())
207+ db .rollback ();
208+ throw e instanceof IOException ? (IOException ) e : new IOException ("Failed to append timeseries samples" , e );
209+ }
210+ // On Raft HA leaders, release the read lock BEFORE commit. See appendSamples() javadoc for
211+ // why. In standalone (non-replicated) mode there is no waitForActiveRecordingSession()
212+ // call in commit(), so no deadlock is possible and the read lock must be held through
213+ // commit to protect Phase 4c from MVCC conflicts.
214+ if (db .isReplicated ()) {
215+ compactionLock .readLock ().unlock ();
216+ readLockHeld = false ;
217+ }
218+ try {
219+ db .commit ();
220+ return ; // success
221+ } catch (final ConcurrentModificationException e ) {
222+ // Phase 4c committed a page-0 clear between the read-lock release and our commit (HA only).
223+ // Roll back and retry on the freshly-cleared page.
224+ if (db .isTransactionActive ())
225+ db .rollback ();
226+ if (attempt == 1 )
227+ throw new IOException ("Failed to append timeseries samples after compaction-race retries" , e );
228+ final int attemptNumber = 4 - attempt ; // ascending 1..3 for human-readable logging
229+ LogManager .instance ().log (this , Level .FINE ,
230+ "CME on TimeSeries append for shard %d (attempt %d/3) - retrying after compaction Phase 4c race" ,
231+ shardIndex , attemptNumber );
232+ // else loop again
233+ } catch (final Exception e ) {
234+ if (db .isTransactionActive ())
235+ db .rollback ();
236+ throw e instanceof IOException ? (IOException ) e : new IOException ("Failed to append timeseries samples" , e );
237+ }
238+ } finally {
239+ if (readLockHeld )
240+ compactionLock .readLock ().unlock ();
206241 }
207- } finally {
208- appendLock .unlock ();
209242 }
210243 } finally {
211- compactionLock . readLock () .unlock ();
244+ appendLock .unlock ();
212245 }
213246 }
214247
@@ -378,43 +411,65 @@ private void compactInternal() throws IOException {
378411 final DatabaseInternal db = database .getWrappedDatabaseInstance ();
379412
380413 // ── Phase 0 (brief writeLock + brief TX): snapshot page count, set crash flag ─────────
381- // The write lock blocks concurrent appendSamples() so the TX modifying page-0 cannot
382- // get an MVCC conflict from a concurrent insert.
414+ // Holds the write lock to block new appendSamples() calls. However, an in-flight append
415+ // (one that released compactionLock.readLock() before its own commit, per the fix for
416+ // issue #4458) may still be executing its DB-level commit concurrently. If that commit
417+ // applies page-0 between Phase 0's begin and commit, Phase 0 gets a
418+ // ConcurrentModificationException. A brief retry inside the same write lock ensures
419+ // Phase 0 always sees the latest page-0 version and commits without conflict. Since
420+ // appendLock serializes appends one at a time, at most one in-flight commit can be
421+ // outstanding, so one retry is sufficient in practice; the loop uses 3 for safety.
383422 final int snapshotDataPageCount ;
384423 compactionLock .writeLock ().lock ();
385424 try {
386- db .begin ();
387- try {
388- final int pageCount = mutableBucket .getDataPageCount ();
389- if (pageCount == 0 ) {
390- db .rollback ();
391- return ;
392- }
393-
394- // HA safety valve (issue #4382): if the rewritten sealed store would be too large to ship
395- // inline in a single Raft entry, skip compaction entirely this cycle. The data stays in the
396- // mutable bucket, which IS fully replicated, so there is no divergence or data loss - just no
397- // sealing. The projected size over-estimates (raw mutable bytes are a ceiling on the
398- // compressed sealed delta), so we always stay safely under the transport cap.
399- if (db .isReplicated ()) {
400- final long cap = database .getConfiguration ().getValueAsLong (GlobalConfiguration .HA_TS_MAX_SEALED_INLINE_SIZE );
401- final long projected = sealedStore .getFileSizeBytes () + (long ) pageCount * mutableBucket .getPageSize ();
402- if (projected > cap ) {
425+ int capturedPageCount = -1 ;
426+ // Count down to mirror the retry convention used in appendSamples().
427+ for (int attempt = 3 ; attempt > 0 ; attempt --) {
428+ db .begin ();
429+ try {
430+ final int pageCount = mutableBucket .getDataPageCount ();
431+ if (pageCount == 0 ) {
403432 db .rollback ();
404- warnOversizedSealedSkip (projected , cap );
405433 return ;
406434 }
407- }
408435
409- snapshotDataPageCount = pageCount ;
410- mutableBucket .setCompactionInProgress (true );
411- mutableBucket .setCompactionWatermark (initialBlockCount );
412- db .commit ();
413- } catch (final Exception e ) {
414- if (db .isTransactionActive ())
415- db .rollback ();
416- throw e instanceof IOException ? (IOException ) e : new IOException ("Compaction failed in phase 0" , e );
436+ // HA safety valve (issue #4382): if the rewritten sealed store would be too large to ship
437+ // inline in a single Raft entry, skip compaction entirely this cycle.
438+ if (db .isReplicated ()) {
439+ final long cap = database .getConfiguration ().getValueAsLong (GlobalConfiguration .HA_TS_MAX_SEALED_INLINE_SIZE );
440+ final long projected = sealedStore .getFileSizeBytes () + (long ) pageCount * mutableBucket .getPageSize ();
441+ if (projected > cap ) {
442+ db .rollback ();
443+ warnOversizedSealedSkip (projected , cap );
444+ return ;
445+ }
446+ }
447+
448+ mutableBucket .setCompactionInProgress (true );
449+ mutableBucket .setCompactionWatermark (initialBlockCount );
450+ db .commit ();
451+ capturedPageCount = pageCount ;
452+ break ;
453+ } catch (final ConcurrentModificationException e ) {
454+ if (db .isTransactionActive ())
455+ db .rollback ();
456+ if (attempt == 1 )
457+ throw new IOException ("Compaction failed in phase 0 after retries" , e );
458+ // An in-flight append committed page-0 between begin and commit; retry with the
459+ // latest version.
460+ } catch (final Exception e ) {
461+ if (db .isTransactionActive ())
462+ db .rollback ();
463+ throw e instanceof IOException ? (IOException ) e : new IOException ("Compaction failed in phase 0" , e );
464+ }
417465 }
466+ // Every loop iteration that does not break either returns or throws, so a successful exit
467+ // always sets capturedPageCount. Enforce the invariant with an explicit guard (not an assert,
468+ // which is disabled by default in production) so a future regression cannot silently propagate
469+ // a -1 page count into Phase 4.
470+ if (capturedPageCount < 0 )
471+ throw new IllegalStateException ("Phase 0 exited the retry loop without a valid page count" );
472+ snapshotDataPageCount = capturedPageCount ;
418473 } finally {
419474 compactionLock .writeLock ().unlock ();
420475 }
@@ -529,6 +584,9 @@ else if (phase2Spill != null)
529584 // ── Phase 4c (brief writeLock + brief TX): read tail pages, swap + clear ──────────────
530585 // Only pages created DURING Phase 4b need to be processed under the lock. This is
531586 // typically just 0-2 pages worth of data, keeping the lock hold time minimal.
587+ final Runnable prePhase4cHook = TEST_PRE_PHASE4C_HOOK ;
588+ if (prePhase4cHook != null )
589+ prePhase4cHook .run ();
532590 compactionLock .writeLock ().lock ();
533591 try {
534592 db .begin ();
0 commit comments