Skip to content

Commit 1dff2bf

Browse files
committed
fix: timeseries concurrency
Fixed issue ArcadeData#3861
1 parent 9ac49ff commit 1dff2bf

3 files changed

Lines changed: 179 additions & 55 deletions

File tree

engine/src/main/java/com/arcadedb/engine/timeseries/TimeSeriesShard.java

Lines changed: 27 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,14 @@ public class TimeSeriesShard implements AutoCloseable {
6161
// Both writeTempCompactionFile() and truncateToBlockCount() use the same .ts.sealed.tmp path;
6262
// concurrent execution would corrupt each other's temp files.
6363
private final Lock compactionMutex = new ReentrantLock();
64+
// Serializes concurrent appendSamples() calls on this shard.
65+
// Multiple threads routing to the same shard via round-robin would otherwise produce
66+
// MVCC conflicts (ConcurrentModificationException) on page 0, because each call
67+
// starts its own nested transaction and two transactions cannot both modify the same
68+
// page version at commit time. With this lock, only one append runs the begin→commit
69+
// cycle at a time per shard, so page versions are always consistent at commit.
70+
// Writes to *different* shards are still fully concurrent.
71+
private final Lock appendLock = new ReentrantLock();
6472

6573
public TimeSeriesShard(final DatabaseInternal database, final String baseName, final int shardIndex,
6674
final List<ColumnDefinition> columns) throws IOException {
@@ -156,25 +164,30 @@ public TimeSeriesShard(final DatabaseInternal database, final String baseName, f
156164
* transaction, ArcadeDB creates a nested transaction (a new {@code TransactionContext} pushed
157165
* onto the per-thread stack). The nested transaction commits independently; the caller's outer
158166
* transaction remains unaffected because it holds none of the modified pages in its dirty set.
167+
* <p>
168+
* Concurrent calls on the <em>same shard</em> are serialized by {@link #appendLock} so that
169+
* MVCC page-version conflicts can never arise between two concurrent appends. Writes to
170+
* different shards still proceed in parallel.
159171
*/
160172
public void appendSamples(final long[] timestamps, final Object[]... columnValues) throws IOException {
161173
compactionLock.readLock().lock();
162174
try {
163-
database.begin();
175+
// Serialize concurrent appends on this shard to prevent MVCC conflicts.
176+
// Two concurrent nested transactions both modifying page 0 would otherwise produce a
177+
// ConcurrentModificationException at commit time (current v.X <> database v.Y).
178+
appendLock.lock();
164179
try {
165-
mutableBucket.appendSamples(timestamps, columnValues);
166-
database.commit();
167-
} catch (final ConcurrentModificationException cme) {
168-
// Roll back the nested TX only. Do NOT touch the caller's outer transaction:
169-
// the Javadoc promises it remains unaffected, and the caller must decide whether
170-
// to rollback/retry their own transaction.
171-
if (database.isTransactionActive())
172-
database.rollback();
173-
throw cme; // propagate as-is so callers can catch and retry
174-
} catch (final Exception e) {
175-
if (database.isTransactionActive())
176-
database.rollback();
177-
throw e instanceof IOException ? (IOException) e : new IOException("Failed to append timeseries samples", e);
180+
database.begin();
181+
try {
182+
mutableBucket.appendSamples(timestamps, columnValues);
183+
database.commit();
184+
} catch (final Exception e) {
185+
if (database.isTransactionActive())
186+
database.rollback();
187+
throw e instanceof IOException ? (IOException) e : new IOException("Failed to append timeseries samples", e);
188+
}
189+
} finally {
190+
appendLock.unlock();
178191
}
179192
} finally {
180193
compactionLock.readLock().unlock();
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/*
2+
* Copyright © 2021-present Arcade Data Ltd (info@arcadedata.com)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
* SPDX-FileCopyrightText: 2021-present Arcade Data Ltd (info@arcadedata.com)
17+
* SPDX-License-Identifier: Apache-2.0
18+
*/
19+
package com.arcadedb.engine.timeseries;
20+
21+
import com.arcadedb.TestHelper;
22+
import com.arcadedb.database.DatabaseInternal;
23+
import com.arcadedb.schema.Type;
24+
import org.junit.jupiter.api.Test;
25+
26+
import java.util.List;
27+
import java.util.concurrent.CountDownLatch;
28+
import java.util.concurrent.atomic.AtomicReference;
29+
30+
import static org.assertj.core.api.Assertions.assertThat;
31+
32+
/**
33+
* Regression test for https://github.com/ArcadeData/arcadedb/discussions/3860 .
34+
* <p>
35+
* Verifies that concurrent writes to the same shard do NOT produce MVCC
36+
* {@code ConcurrentModificationException} errors and do NOT cause data duplication
37+
* (which would occur if the HTTP client retried after receiving a 503 for a partially
38+
* committed batch).
39+
* <p>
40+
* Prior to the fix, the per-shard {@code appendLock} was absent and two threads racing
41+
* to commit page 0 of the same shard would get a version mismatch at commit time:
42+
* "current v.X &lt;&gt; database v.Y. Please retry the operation".
43+
*
44+
* @author Luca Garulli (l.garulli@arcadedata.com)
45+
*/
46+
class TimeSeriesConcurrentWriteNoDuplicatesTest extends TestHelper {
47+
48+
/**
49+
* Launches multiple threads that all write to a single-shard engine simultaneously.
50+
* With one shard every append competes for the same mutable-bucket page.
51+
* The test asserts that:
52+
* <ul>
53+
* <li>no exception is thrown by any writer thread;</li>
54+
* <li>the total sample count equals exactly threadCount × samplesPerThread
55+
* (i.e. no duplicates and no missing writes).</li>
56+
* </ul>
57+
*/
58+
@Test
59+
void concurrentWritesToSameShardNoDuplicatesAndNoMvccErrors() throws Exception {
60+
final int THREAD_COUNT = 8;
61+
final int SAMPLES_PER_THREAD = 200;
62+
63+
final List<ColumnDefinition> columns = List.of(
64+
new ColumnDefinition("ts", Type.LONG, ColumnDefinition.ColumnRole.TIMESTAMP),
65+
new ColumnDefinition("value", Type.DOUBLE, ColumnDefinition.ColumnRole.FIELD)
66+
);
67+
68+
// Use shardCount=1 to force maximum contention: every append goes to the same shard.
69+
final TimeSeriesEngine engine = new TimeSeriesEngine((DatabaseInternal) database, "test_concurrent_no_dup", columns, 1);
70+
71+
final CountDownLatch startLatch = new CountDownLatch(1);
72+
final CountDownLatch doneLatch = new CountDownLatch(THREAD_COUNT);
73+
final AtomicReference<Throwable> failure = new AtomicReference<>();
74+
75+
for (int t = 0; t < THREAD_COUNT; t++) {
76+
final int threadIdx = t;
77+
final Thread thread = new Thread(() -> {
78+
try {
79+
startLatch.await();
80+
for (int i = 0; i < SAMPLES_PER_THREAD; i++) {
81+
// Assign unique timestamps per thread so there are no intentional duplicates.
82+
final long ts = (long) threadIdx * SAMPLES_PER_THREAD + i;
83+
engine.appendSamples(new long[] { ts }, new Object[][] { { (double) ts } });
84+
}
85+
} catch (final Throwable e) {
86+
failure.compareAndSet(null, e);
87+
} finally {
88+
doneLatch.countDown();
89+
}
90+
}, "writer-" + t);
91+
thread.setDaemon(true);
92+
thread.start();
93+
}
94+
95+
startLatch.countDown();
96+
assertThat(doneLatch.await(30, java.util.concurrent.TimeUnit.SECONDS))
97+
.as("All writer threads should finish within 30 seconds")
98+
.isTrue();
99+
100+
assertThat(failure.get())
101+
.as("No writer thread should throw an exception (MVCC or otherwise)")
102+
.isNull();
103+
104+
// Total samples must equal exactly threadCount × samplesPerThread.
105+
// Any MVCC exception that slipped through without the appendLock would have caused
106+
// the client (HTTP handler) to return 503; the client would then retry the whole
107+
// batch — duplicating every sample that was already committed before the failure.
108+
final long totalSamples = engine.countSamples();
109+
assertThat(totalSamples)
110+
.as("Sample count must be exactly %d (no duplicates, no lost writes)", THREAD_COUNT * SAMPLES_PER_THREAD)
111+
.isEqualTo((long) THREAD_COUNT * SAMPLES_PER_THREAD);
112+
113+
engine.close();
114+
}
115+
}

server/src/main/java/com/arcadedb/server/http/handler/PostTimeSeriesWriteHandler.java

Lines changed: 37 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -92,49 +92,45 @@ protected ExecutionResponse execute(final HttpServerExchange exchange, final Ser
9292
if (samples.isEmpty())
9393
return new ExecutionResponse(204, "");
9494

95-
// Group by measurement and insert
95+
// Group by measurement and insert.
96+
// Each engine.appendSamples() call manages its own internal transaction, so samples
97+
// from this batch are committed individually. There is no outer transaction wrapping
98+
// the loop: the nested TXs commit independently and cannot be rolled back as a unit.
9699
int inserted = 0;
97-
database.begin();
98-
try {
99-
for (final Sample sample : samples) {
100-
final String measurement = sample.getMeasurement();
101-
102-
if (!database.getSchema().existsType(measurement))
103-
continue; // skip unknown measurement types
104-
105-
final DocumentType docType = database.getSchema().getType(measurement);
106-
if (!(docType instanceof LocalTimeSeriesType tsType) || tsType.getEngine() == null)
107-
continue; // skip non-timeseries types
108-
109-
final TimeSeriesEngine engine = tsType.getEngine();
110-
final List<ColumnDefinition> columns = tsType.getTsColumns();
111-
112-
final long[] timestamps = new long[] { sample.getTimestampMs() };
113-
final Object[][] columnValues = new Object[columns.size() - 1][1]; // exclude timestamp
114-
115-
int colIdx = 0;
116-
for (int i = 0; i < columns.size(); i++) {
117-
final ColumnDefinition col = columns.get(i);
118-
if (col.getRole() == ColumnDefinition.ColumnRole.TIMESTAMP)
119-
continue;
120-
121-
Object value;
122-
if (col.getRole() == ColumnDefinition.ColumnRole.TAG)
123-
value = sample.getTags().get(col.getName());
124-
else
125-
value = sample.getFields().get(col.getName());
126-
127-
columnValues[colIdx][0] = value;
128-
colIdx++;
129-
}
130-
131-
engine.appendSamples(timestamps, columnValues);
132-
inserted++;
100+
for (final Sample sample : samples) {
101+
final String measurement = sample.getMeasurement();
102+
103+
if (!database.getSchema().existsType(measurement))
104+
continue; // skip unknown measurement types
105+
106+
final DocumentType docType = database.getSchema().getType(measurement);
107+
if (!(docType instanceof LocalTimeSeriesType tsType) || tsType.getEngine() == null)
108+
continue; // skip non-timeseries types
109+
110+
final TimeSeriesEngine engine = tsType.getEngine();
111+
final List<ColumnDefinition> columns = tsType.getTsColumns();
112+
113+
final long[] timestamps = new long[] { sample.getTimestampMs() };
114+
final Object[][] columnValues = new Object[columns.size() - 1][1]; // exclude timestamp
115+
116+
int colIdx = 0;
117+
for (int i = 0; i < columns.size(); i++) {
118+
final ColumnDefinition col = columns.get(i);
119+
if (col.getRole() == ColumnDefinition.ColumnRole.TIMESTAMP)
120+
continue;
121+
122+
Object value;
123+
if (col.getRole() == ColumnDefinition.ColumnRole.TAG)
124+
value = sample.getTags().get(col.getName());
125+
else
126+
value = sample.getFields().get(col.getName());
127+
128+
columnValues[colIdx][0] = value;
129+
colIdx++;
133130
}
134-
database.commit();
135-
} catch (final Exception e) {
136-
database.rollback();
137-
throw e;
131+
132+
engine.appendSamples(timestamps, columnValues);
133+
inserted++;
138134
}
139135

140136
// Return 204 No Content (InfluxDB convention) or 200 with count

0 commit comments

Comments
 (0)