Skip to content

Commit dc7935b

Browse files
committed
Merge branch 'cassandra-6.0' into trunk
2 parents 206485e + a0dc6f8 commit dc7935b

42 files changed

Lines changed: 2244 additions & 373 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.build/checkstyle.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,12 @@
6363
<property name="influenceFormat" value="0"/>
6464
</module>
6565

66+
<module name="SuppressWithNearbyCommentFilter">
67+
<property name="commentFormat" value="checkstyle: permit this invocation"/>
68+
<property name="idFormat" value="blockInstantNow"/>
69+
<property name="influenceFormat" value="0"/>
70+
</module>
71+
6672
<module name="RegexpSinglelineJava">
6773
<!-- block system time -->
6874
<property name="id" value="blockSystemClock"/>

modules/accord

Submodule accord updated 25 files

src/java/org/apache/cassandra/config/AccordConfig.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,11 +66,15 @@ public enum QueueShardModel
6666
/**
6767
* Same number of threads as queue shards, but the shard lock is held only while managing the queue,
6868
* so that submitting threads may queue load/save work.
69+
*
70+
* This is incompatible with QueueSubmissionModel.SIGNAL
6971
*/
7072
THREAD_PER_SHARD,
7173

7274
/**
7375
* Same number of threads as shards, and the shard lock is held for the duration of serving requests.
76+
*
77+
* This is incompatible with QueueSubmissionModel.SIGNAL
7478
*/
7579
THREAD_PER_SHARD_SYNC_QUEUE,
7680

@@ -103,6 +107,14 @@ public enum QueueSubmissionModel
103107
*/
104108
ASYNC,
105109

110+
/**
111+
* Queue workers try to avoid competing for the lock, with the lock owner distributing work to any waiting threads
112+
* and signalling them without them taking the lock
113+
*
114+
* NOTE: EXPERIMENTAL
115+
*/
116+
SIGNAL,
117+
106118
/**
107119
* The queue is backed by submission to a single-threaded plain executor.
108120
* This implementation does not honor the sharding model option.
@@ -151,8 +163,30 @@ public enum QueuePriorityModel
151163
*/
152164
public volatile OptionaldPositiveInt queue_shard_count = OptionaldPositiveInt.UNDEFINED;
153165

166+
/**
167+
* The total number of threads to share between queue shards
168+
*/
169+
public volatile OptionaldPositiveInt queue_thread_count = OptionaldPositiveInt.UNDEFINED;
170+
154171
public QueuePriorityModel queue_priority_model = HLC_FIFO;
155172

173+
/**
174+
* If set, the signal loop does not match park/unpark pairs, but instead consumers perform timed-park spin waits
175+
*/
176+
public DurationSpec.LongMicrosecondsBound queue_spin_interval;
177+
178+
/**
179+
* If set, the signal loop reduces the number of threads it is using when the time spent parked exceeds real-time
180+
* by this interval.
181+
*/
182+
public DurationSpec.LongMicrosecondsBound queue_stop_check_interval;
183+
184+
/**
185+
* If set, the signal loop reduces the number of threads it is using when the time spent parked exceeds real-time
186+
* by this interval.
187+
*/
188+
public DurationSpec.LongMicrosecondsBound queue_signal_stop_check_interval_credit;
189+
156190
// yield to other executor threads after executing this many tasks in a row, if there are waiting threads and tasks
157191
public int queue_yield_interval = 100;
158192

@@ -456,4 +490,9 @@ public int userVersion()
456490
return version.version;
457491
}
458492
}
493+
494+
public int commandStoreShardCount()
495+
{
496+
return command_store_shard_count.or(DatabaseDescriptor::getAvailableProcessors);
497+
}
459498
}

src/java/org/apache/cassandra/config/Config.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,6 @@ public static Set<String> splitCommaDelimited(String src)
203203

204204
public int concurrent_reads = 32;
205205
public int concurrent_writes = 32;
206-
public int concurrent_accord_operations = 32;
207206
public int concurrent_counter_writes = 32;
208207
public int concurrent_materialized_view_writes = 32;
209208
public OptionaldPositiveInt available_processors = new OptionaldPositiveInt(CASSANDRA_AVAILABLE_PROCESSORS.getInt(OptionaldPositiveInt.UNDEFINED_VALUE));

src/java/org/apache/cassandra/config/DatabaseDescriptor.java

Lines changed: 3 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -700,9 +700,6 @@ else if (DiskAccessMode.direct == conf.compaction_read_disk_access_mode)
700700
if (conf.concurrent_counter_writes < 2)
701701
throw new ConfigurationException("concurrent_counter_writes must be at least 2, but was " + conf.concurrent_counter_writes, false);
702702

703-
if (conf.concurrent_accord_operations < 1)
704-
throw new ConfigurationException("concurrent_accord_operations must be at least 1, but was " + conf.concurrent_accord_operations, false);
705-
706703
if (conf.networking_cache_size == null)
707704
conf.networking_cache_size = new DataStorageSpec.IntMebibytesBound(Math.min(128, (int) (Runtime.getRuntime().maxMemory() / (16 * 1048576))));
708705

@@ -2900,7 +2897,7 @@ public static void setConcurrentViewWriters(int concurrent_materialized_view_wri
29002897

29012898
public static int getAccordConcurrentOps()
29022899
{
2903-
return conf.concurrent_accord_operations;
2900+
return conf.accord.queue_thread_count.or(2 * FBUtilities.getAvailableProcessors());
29042901
}
29052902

29062903
public static void setConcurrentAccordOps(int concurrent_operations)
@@ -2909,7 +2906,7 @@ public static void setConcurrentAccordOps(int concurrent_operations)
29092906
{
29102907
throw new IllegalArgumentException("Concurrent accord operations must be non-negative");
29112908
}
2912-
conf.concurrent_accord_operations = concurrent_operations;
2909+
conf.accord.queue_thread_count = new OptionaldPositiveInt(concurrent_operations);
29132910
}
29142911

29152912
public static int getFlushWriters()
@@ -5635,6 +5632,7 @@ public static AccordConfig getAccord()
56355632
return conf.accord;
56365633
}
56375634

5635+
// TODO (expected): move all getAccordX into AccordConfig
56385636
public static AccordConfig.TransactionalRangeMigration getTransactionalRangeMigration()
56395637
{
56405638
return conf.accord.range_migration;
@@ -5665,44 +5663,6 @@ public static void setAccordTransactionsEnabled(boolean b)
56655663
conf.accord.enabled = b;
56665664
}
56675665

5668-
public static AccordConfig.QueueShardModel getAccordQueueShardModel()
5669-
{
5670-
return conf.accord.queue_shard_model;
5671-
}
5672-
5673-
public static AccordConfig.QueueSubmissionModel getAccordQueueSubmissionModel()
5674-
{
5675-
return conf.accord.queue_submission_model;
5676-
}
5677-
5678-
public static int getAccordQueueShardCount()
5679-
{
5680-
switch (getAccordQueueShardModel())
5681-
{
5682-
default: throw new AssertionError("Unhandled queue_shard_model: " + conf.accord.queue_shard_model);
5683-
case THREAD_PER_SHARD:
5684-
case THREAD_PER_SHARD_SYNC_QUEUE:
5685-
return conf.accord.queue_shard_count.or(DatabaseDescriptor::getAvailableProcessors);
5686-
case THREAD_POOL_PER_SHARD:
5687-
return conf.accord.queue_shard_count.or(DatabaseDescriptor.getAvailableProcessors()/4);
5688-
}
5689-
}
5690-
5691-
public static int getAccordCommandStoreShardCount()
5692-
{
5693-
return conf.accord.command_store_shard_count.or(DatabaseDescriptor::getAvailableProcessors);
5694-
}
5695-
5696-
public static int getAccordMaxQueuedLoadCount()
5697-
{
5698-
return conf.accord.max_queued_loads.or(getAccordConcurrentOps());
5699-
}
5700-
5701-
public static int getAccordMaxQueuedRangeLoadCount()
5702-
{
5703-
return conf.accord.max_queued_range_loads.or(Math.max(4, getAccordConcurrentOps() / 4));
5704-
}
5705-
57065666
public static DefaultProgressLog.Config getAccordProgressLogConfig()
57075667
{
57085668
return accordProgressLogConfig;

src/java/org/apache/cassandra/service/accord/AccordCommandStore.java

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -245,26 +245,6 @@ public AccordCommandStore(int id,
245245
if (this.progressLog instanceof DefaultProgressLog)
246246
((DefaultProgressLog)this.progressLog).unsafeSetConfig(DatabaseDescriptor.getAccordProgressLogConfig());
247247

248-
final AccordCache.Type<TxnId, Command, AccordSafeCommand>.Instance commands;
249-
final AccordCache.Type<RoutingKey, CommandsForKey, AccordSafeCommandsForKey>.Instance commandsForKey;
250-
try (AccordExecutor.ExclusiveGlobalCaches exclusive = sharedExecutor.lockCaches())
251-
{
252-
commands = exclusive.commands.newInstance(this);
253-
commandsForKey = exclusive.commandsForKey.newInstance(this);
254-
this.caches = new ExclusiveCaches(sharedExecutor.unsafeLock(), exclusive.global, commands, commandsForKey);
255-
}
256-
257-
this.exclusiveExecutor = sharedExecutor.executor(id);
258-
{
259-
AccordConfig.RangeIndexMode mode = getAccord().range_index_mode;
260-
switch (mode)
261-
{
262-
default: throw new UnhandledEnum(mode);
263-
case journal_sai: rangeIndex = new JournalRangeIndex(this); break;
264-
case in_memory: rangeIndex = new InMemoryRangeIndex(this); break;
265-
}
266-
}
267-
268248
maybeLoadRedundantBefore(journal.loadRedundantBefore(id()));
269249
maybeLoadBootstrapBeganAt(journal.loadBootstrapBeganAt(id()));
270250
maybeLoadSafeToRead(journal.loadSafeToRead(id()));
@@ -284,6 +264,26 @@ public AccordCommandStore(int id,
284264
return a;
285265
}).orElseThrow(() -> Invariants.illegalState("CommandStore %d created with no ranges", id));
286266

267+
final AccordCache.Type<TxnId, Command, AccordSafeCommand>.Instance commands;
268+
final AccordCache.Type<RoutingKey, CommandsForKey, AccordSafeCommandsForKey>.Instance commandsForKey;
269+
try (AccordExecutor.ExclusiveGlobalCaches exclusive = sharedExecutor.lockCaches())
270+
{
271+
commands = exclusive.commands.newInstance(this);
272+
commandsForKey = exclusive.commandsForKey.newInstance(this);
273+
this.caches = new ExclusiveCaches(sharedExecutor.unsafeLock(), exclusive.global, commands, commandsForKey);
274+
}
275+
this.exclusiveExecutor = sharedExecutor.executor(id);
276+
277+
{
278+
AccordConfig.RangeIndexMode mode = getAccord().range_index_mode;
279+
switch (mode)
280+
{
281+
default: throw new UnhandledEnum(mode);
282+
case journal_sai: rangeIndex = new JournalRangeIndex(this); break;
283+
case in_memory: rangeIndex = new InMemoryRangeIndex(this); break;
284+
}
285+
}
286+
287287
if (AccordService.isStarted())
288288
progressLog.unsafeStart();
289289
}

src/java/org/apache/cassandra/service/accord/AccordCommandStores.java

Lines changed: 50 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -48,16 +48,17 @@
4848
import org.apache.cassandra.cache.CacheSize;
4949
import org.apache.cassandra.concurrent.ScheduledExecutors;
5050
import org.apache.cassandra.concurrent.Shutdownable;
51+
import org.apache.cassandra.config.AccordConfig;
5152
import org.apache.cassandra.config.AccordConfig.QueueShardModel;
5253
import org.apache.cassandra.config.DatabaseDescriptor;
5354
import org.apache.cassandra.journal.Descriptor;
5455
import org.apache.cassandra.schema.TableId;
5556
import org.apache.cassandra.service.accord.AccordCommandStore.DurablyAppliedTo;
5657
import org.apache.cassandra.service.accord.AccordExecutor.AccordExecutorFactory;
58+
import org.apache.cassandra.utils.FBUtilities;
5759

5860
import static org.apache.cassandra.config.AccordConfig.QueueShardModel.THREAD_PER_SHARD;
59-
import static org.apache.cassandra.config.DatabaseDescriptor.getAccordQueueShardCount;
60-
import static org.apache.cassandra.config.DatabaseDescriptor.getAccordQueueSubmissionModel;
61+
import static org.apache.cassandra.config.DatabaseDescriptor.getAccord;
6162
import static org.apache.cassandra.service.accord.AccordExecutor.Mode.RUN_WITHOUT_LOCK;
6263
import static org.apache.cassandra.service.accord.AccordExecutor.Mode.RUN_WITH_LOCK;
6364
import static org.apache.cassandra.service.accord.AccordExecutor.constant;
@@ -83,10 +84,12 @@ public class AccordCommandStores extends CommandStores implements CacheSize, Shu
8384
AccordCommandStore.factory(id -> executors[id % executors.length]));
8485
this.executors = executors;
8586
this.mask = Integer.highestOneBit(executors.length) - 1;
87+
8688
cacheSize = DatabaseDescriptor.getAccordCacheSizeInMiB() << 20;
8789
workingSetSize = DatabaseDescriptor.getAccordWorkingSetSizeInMiB() << 20;
88-
maxQueuedLoads = DatabaseDescriptor.getAccordMaxQueuedLoadCount();
89-
maxQueuedRangeLoads = DatabaseDescriptor.getAccordMaxQueuedRangeLoadCount();
90+
AccordConfig config = DatabaseDescriptor.getAccord();
91+
maxQueuedLoads = maxQueuedLoads(config);
92+
maxQueuedRangeLoads = maxQueuedRangeLoads(config);
9093
shrinkingOn = DatabaseDescriptor.getAccordCacheShrinkingOn();
9194
refreshCapacities();
9295
ScheduledExecutors.scheduledFastTasks.scheduleWithFixedDelay(() -> {
@@ -102,15 +105,21 @@ public class AccordCommandStores extends CommandStores implements CacheSize, Shu
102105
static Factory factory()
103106
{
104107
return (NodeCommandStoreService time, Agent agent, DataStore store, RandomSource random, Journal journal, ShardDistributor shardDistributor, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory) -> {
105-
AccordExecutor[] executors = new AccordExecutor[getAccordQueueShardCount()];
108+
AccordConfig config = getAccord();
109+
AccordExecutor[] executors = new AccordExecutor[executorShards(config)];
106110
AccordExecutorFactory factory;
107111
int maxThreads = Integer.MAX_VALUE;
108-
switch (getAccordQueueSubmissionModel())
112+
switch (config.queue_submission_model)
109113
{
110-
default: throw new AssertionError("Unhandled QueueSubmissionModel: " + getAccordQueueSubmissionModel());
114+
default: throw new AssertionError("Unhandled QueueSubmissionModel: " + config.queue_submission_model);
111115
case SYNC: factory = AccordExecutorSyncSubmit::new; break;
112116
case SEMI_SYNC: factory = AccordExecutorSemiSyncSubmit::new; break;
113117
case ASYNC: factory = AccordExecutorAsyncSubmit::new; break;
118+
case SIGNAL:
119+
long spinIntervalNanos = config.queue_spin_interval == null ? 0 : config.queue_spin_interval.to(TimeUnit.NANOSECONDS);
120+
long stopCheckIntervalNanos = config.queue_stop_check_interval == null ? 0 : config.queue_stop_check_interval.to(TimeUnit.NANOSECONDS);
121+
factory = (executorId, mode, threads, name, agent0) -> new AccordExecutorSignalLoop(executorId, mode, threads, spinIntervalNanos, stopCheckIntervalNanos, TimeUnit.NANOSECONDS, name, agent0);
122+
break;
114123
case EXEC_ST:
115124
factory = AccordExecutorSimple::new;
116125
maxThreads = 1;
@@ -119,16 +128,16 @@ static Factory factory()
119128

120129
for (int id = 0; id < executors.length; id++)
121130
{
122-
QueueShardModel shardModel = DatabaseDescriptor.getAccordQueueShardModel();
131+
QueueShardModel shardModel = config.queue_shard_model;
123132
String baseName = AccordExecutor.class.getSimpleName() + '[' + id;
124-
int threads = Math.min(maxThreads, Math.max(DatabaseDescriptor.getAccordConcurrentOps() / getAccordQueueShardCount(), 1));
125133
switch (shardModel)
126134
{
127135
case THREAD_PER_SHARD:
128136
case THREAD_PER_SHARD_SYNC_QUEUE:
129137
executors[id] = factory.get(id, shardModel == THREAD_PER_SHARD ? RUN_WITHOUT_LOCK : RUN_WITH_LOCK, 1, constant(baseName + ']'), agent);
130138
break;
131139
case THREAD_POOL_PER_SHARD:
140+
int threads = Math.min(maxThreads, Math.max(DatabaseDescriptor.getAccordConcurrentOps() / executors.length, 1));
132141
executors[id] = factory.get(id, RUN_WITHOUT_LOCK, threads, num -> baseName + ',' + num + ']', agent);
133142
break;
134143
}
@@ -204,8 +213,8 @@ synchronized void refreshCapacities()
204213
{
205214
long capacityPerExecutor = cacheSize / executors.length;
206215
long workingSetPerExecutor = workingSetSize < 0 ? Long.MAX_VALUE : workingSetSize / executors.length;
207-
int maxLoadsPerExecutor = (maxQueuedLoads + executors.length - 1) / executors.length;
208-
int maxRangeLoadsPerExecutor = (maxQueuedRangeLoads + executors.length - 1) / executors.length;
216+
int maxLoadsPerExecutor = Math.max(1, (maxQueuedLoads + executors.length - 1) / executors.length);
217+
int maxRangeLoadsPerExecutor = Math.max(1, (maxQueuedRangeLoads + executors.length - 1) / executors.length);
209218
for (AccordExecutor executor : executors)
210219
{
211220
executor.executeDirectlyWithLock(() -> {
@@ -319,4 +328,34 @@ public AsyncChain<List<Map.Entry<Integer, Long>>> restoreState()
319328
}
320329
return AsyncChains.allOf(chains);
321330
}
331+
332+
private static int executorShards(AccordConfig config)
333+
{
334+
switch (config.queue_shard_model)
335+
{
336+
default: throw new AssertionError("Unhandled queue_shard_model: " + config.queue_shard_model);
337+
case THREAD_PER_SHARD:
338+
case THREAD_PER_SHARD_SYNC_QUEUE:
339+
return config.queue_shard_count.or(DatabaseDescriptor::getAvailableProcessors);
340+
case THREAD_POOL_PER_SHARD:
341+
return Math.max(1, config.queue_shard_count.or(DatabaseDescriptor.getAvailableProcessors() / 8));
342+
}
343+
}
344+
345+
private static int threads(AccordConfig config)
346+
{
347+
return config.queue_thread_count.or(2 * FBUtilities.getAvailableProcessors());
348+
}
349+
350+
public static int maxQueuedLoads(AccordConfig config)
351+
{
352+
return config.max_queued_loads.or(FBUtilities.getAvailableProcessors());
353+
}
354+
355+
public static int maxQueuedRangeLoads(AccordConfig config)
356+
{
357+
return config.max_queued_range_loads.or(maxQueuedLoads(config) / 4);
358+
}
359+
360+
322361
}

0 commit comments

Comments
 (0)