Skip to content

Commit d2d0e8e

Browse files
committed
IGNITE-28392 Add LogStorageManager for SegstoreLogStorage
1 parent d3044b5 commit d2d0e8e

File tree

9 files changed

+247
-100
lines changed

9 files changed

+247
-100
lines changed

modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
package org.apache.ignite.internal.raft.storage.segstore;
1919

2020
import static java.lang.Math.toIntExact;
21+
import static java.util.concurrent.CompletableFuture.completedFuture;
22+
import static java.util.concurrent.CompletableFuture.failedFuture;
2123
import static org.apache.ignite.internal.raft.configuration.LogStorageConfigurationSchema.UNSPECIFIED_MAX_LOG_ENTRY_SIZE;
2224
import static org.apache.ignite.internal.raft.configuration.LogStorageConfigurationSchema.computeDefaultMaxLogEntrySizeBytes;
2325
import static org.apache.ignite.internal.raft.storage.segstore.SegmentInfo.MISSING_SEGMENT_FILE_OFFSET;
@@ -33,16 +35,18 @@
3335
import java.nio.file.NoSuchFileException;
3436
import java.nio.file.Path;
3537
import java.util.Iterator;
38+
import java.util.concurrent.CompletableFuture;
39+
import java.util.concurrent.CompletionException;
3640
import java.util.concurrent.atomic.AtomicReference;
3741
import java.util.stream.Stream;
38-
import org.apache.ignite.internal.close.ManuallyCloseable;
3942
import org.apache.ignite.internal.failure.FailureProcessor;
4043
import org.apache.ignite.internal.lang.IgniteInternalException;
4144
import org.apache.ignite.internal.logger.IgniteLogger;
4245
import org.apache.ignite.internal.logger.Loggers;
46+
import org.apache.ignite.internal.manager.ComponentContext;
47+
import org.apache.ignite.internal.manager.IgniteComponent;
4348
import org.apache.ignite.internal.raft.configuration.LogStorageConfiguration;
4449
import org.apache.ignite.internal.raft.configuration.LogStorageView;
45-
import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
4650
import org.apache.ignite.internal.raft.storage.segstore.EntrySearchResult.SearchOutcome;
4751
import org.apache.ignite.internal.raft.storage.segstore.SegmentFile.WriteBuffer;
4852
import org.apache.ignite.raft.jraft.entity.LogEntry;
@@ -91,7 +95,7 @@
9195
* <p>When a rollover happens and the segment file being replaced has at least 8 bytes left, a special {@link #SWITCH_SEGMENT_RECORD} is
9296
* written at the end of the file. If there are less than 8 bytes left, no switch records are written.
9397
*/
94-
class SegmentFileManager implements ManuallyCloseable {
98+
class SegmentFileManager implements IgniteComponent {
9599
private static final IgniteLogger LOG = Loggers.forClass(SegmentFileManager.class);
96100

97101
private static final int ROLLOVER_WAIT_TIMEOUT_MS = 30_000;
@@ -159,12 +163,12 @@ class SegmentFileManager implements ManuallyCloseable {
159163
Path baseDir,
160164
int stripes,
161165
FailureProcessor failureProcessor,
162-
RaftConfiguration raftConfiguration,
166+
boolean isSync,
163167
LogStorageConfiguration storageConfiguration
164168
) throws IOException {
165169
this.segmentFilesDir = baseDir.resolve("segments");
166170
this.stripes = stripes;
167-
this.isSync = raftConfiguration.fsync().value();
171+
this.isSync = isSync;
168172

169173
Files.createDirectories(segmentFilesDir);
170174

@@ -194,6 +198,17 @@ class SegmentFileManager implements ManuallyCloseable {
194198
);
195199
}
196200

201+
@Override
202+
public CompletableFuture<Void> startAsync(ComponentContext componentContext) {
203+
return CompletableFuture.runAsync(() -> {
204+
try {
205+
start();
206+
} catch (IOException e) {
207+
throw new CompletionException(e);
208+
}
209+
}, componentContext.executor());
210+
}
211+
197212
void start() throws IOException {
198213
LOG.info("Starting segment file manager [segmentFilesDir={}, fileSize={}].", segmentFilesDir, segmentFileSize);
199214

@@ -245,6 +260,17 @@ void start() throws IOException {
245260
checkpointer.start();
246261
}
247262

263+
@Override
264+
public CompletableFuture<Void> stopAsync(ComponentContext componentContext) {
265+
try {
266+
stop();
267+
} catch (Exception e) {
268+
return failedFuture(e);
269+
}
270+
271+
return completedFuture(null);
272+
}
273+
248274
Path segmentFilesDir() {
249275
return segmentFilesDir;
250276
}
@@ -477,6 +503,8 @@ long lastLogIndexExclusiveOnRecovery(long groupId) {
477503
private SegmentFileWithMemtable currentSegmentFile() {
478504
SegmentFileWithMemtable segmentFile = currentSegmentFile.get();
479505

506+
assert segmentFile != null : "Segment file manager is not started";
507+
480508
if (!segmentFile.readOnly()) {
481509
return segmentFile;
482510
}
@@ -530,8 +558,7 @@ private void initiateRollover(SegmentFileWithMemtable observedSegmentFile) throw
530558
}
531559
}
532560

533-
@Override
534-
public void close() throws Exception {
561+
public void stop() throws Exception {
535562
synchronized (rolloverLock) {
536563
if (isStopped) {
537564
return;
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.internal.raft.storage.segstore;
19+
20+
import java.io.IOException;
21+
import java.nio.file.Path;
22+
import java.util.Set;
23+
import java.util.concurrent.CompletableFuture;
24+
import java.util.regex.Pattern;
25+
import org.apache.ignite.internal.failure.FailureProcessor;
26+
import org.apache.ignite.internal.manager.ComponentContext;
27+
import org.apache.ignite.internal.raft.configuration.LogStorageConfiguration;
28+
import org.apache.ignite.internal.raft.storage.LogStorageManager;
29+
import org.apache.ignite.internal.raft.storage.impl.LogStorageException;
30+
import org.apache.ignite.raft.jraft.option.RaftOptions;
31+
import org.apache.ignite.raft.jraft.storage.LogStorage;
32+
33+
/**
34+
* Log storage manager for {@link SegstoreLogStorage} instances.
35+
*/
36+
public class SegmentLogStorageManager implements LogStorageManager {
37+
private static final Pattern GROUP_ID_PATTERN = Pattern.compile("_part_");
38+
39+
private final SegmentFileManager fileManager;
40+
41+
public SegmentLogStorageManager(
42+
String nodeName,
43+
Path logStoragePath,
44+
int stripes,
45+
FailureProcessor failureProcessor,
46+
boolean fsync,
47+
LogStorageConfiguration storageConfiguration
48+
) throws IOException {
49+
this.fileManager = new SegmentFileManager(nodeName, logStoragePath, stripes, failureProcessor, fsync, storageConfiguration);
50+
}
51+
52+
@Override
53+
public LogStorage createLogStorage(String groupId, RaftOptions raftOptions) {
54+
return new SegstoreLogStorage(convertGroupId(groupId), fileManager);
55+
}
56+
57+
@Override
58+
public void destroyLogStorage(String groupId) {
59+
try {
60+
fileManager.reset(convertGroupId(groupId), 1);
61+
} catch (IOException e) {
62+
throw new LogStorageException("Failed to destroy log storage for group " + groupId, e);
63+
}
64+
}
65+
66+
@Override
67+
public Set<String> raftNodeStorageIdsOnDisk() {
68+
throw new UnsupportedOperationException("");
69+
}
70+
71+
@Override
72+
public long totalBytesOnDisk() {
73+
throw new UnsupportedOperationException("");
74+
}
75+
76+
@Override
77+
public CompletableFuture<Void> startAsync(ComponentContext componentContext) {
78+
return fileManager.startAsync(componentContext);
79+
}
80+
81+
@Override
82+
public CompletableFuture<Void> stopAsync(ComponentContext componentContext) {
83+
return fileManager.stopAsync(componentContext);
84+
}
85+
86+
private static long convertGroupId(String groupId) {
87+
if ("metastorage_group".equals(groupId)) {
88+
return 1;
89+
}
90+
91+
if ("cmg_group".equals(groupId)) {
92+
return 2;
93+
}
94+
95+
String[] partS = GROUP_ID_PATTERN.split(groupId);
96+
97+
if (partS.length == 2) {
98+
return Long.parseLong(partS[0]) << 32 | Long.parseLong(partS[1]);
99+
} else {
100+
throw new IllegalArgumentException("Invalid groupId: " + groupId);
101+
}
102+
}
103+
}

modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogGarbageCollectorTest.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ void setUp() throws IOException {
9393
workDir,
9494
STRIPES,
9595
new NoOpFailureManager(),
96-
raftConfiguration,
96+
raftConfiguration.fsync().value(),
9797
storageConfiguration
9898
);
9999

@@ -107,7 +107,7 @@ void setUp() throws IOException {
107107
@AfterEach
108108
void tearDown() throws Exception {
109109
if (fileManager != null) {
110-
fileManager.close();
110+
fileManager.stop();
111111
}
112112
}
113113

@@ -408,14 +408,14 @@ void testCleanupLeftoverFilesOnRecovery() throws Exception {
408408

409409
Files.createFile(orphanedIndexFile);
410410

411-
fileManager.close();
411+
fileManager.stop();
412412

413413
fileManager = new SegmentFileManager(
414414
NODE_NAME,
415415
workDir,
416416
STRIPES,
417417
new NoOpFailureManager(),
418-
raftConfiguration,
418+
raftConfiguration.fsync().value(),
419419
storageConfiguration
420420
);
421421

@@ -733,14 +733,14 @@ private void triggerAndAwaitCheckpoint(long lastGroupIndex) throws IOException {
733733
}
734734

735735
private void restartSegmentFileManager() throws Exception {
736-
fileManager.close();
736+
fileManager.stop();
737737

738738
fileManager = new SegmentFileManager(
739739
NODE_NAME,
740740
workDir,
741741
STRIPES,
742742
new NoOpFailureManager(),
743-
raftConfiguration,
743+
raftConfiguration.fsync().value(),
744744
storageConfiguration
745745
);
746746

modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogGcSoftLimitTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ void setUp() throws IOException {
9090
workDir,
9191
STRIPES,
9292
new NoOpFailureManager(),
93-
raftConfiguration,
93+
raftConfiguration.fsync().value(),
9494
storageConfiguration
9595
);
9696

@@ -102,7 +102,7 @@ void setUp() throws IOException {
102102
@AfterEach
103103
void tearDown() throws Exception {
104104
if (fileManager != null) {
105-
fileManager.close();
105+
fileManager.stop();
106106
}
107107
}
108108

modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerGetEntryTest.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import static java.util.stream.Collectors.toList;
2121
import static org.apache.ignite.internal.testframework.IgniteTestUtils.randomBytes;
2222
import static org.apache.ignite.internal.testframework.IgniteTestUtils.runRace;
23-
import static org.apache.ignite.internal.util.IgniteUtils.closeAllManually;
2423
import static org.apache.ignite.internal.util.IgniteUtils.newHashMap;
2524
import static org.hamcrest.MatcherAssert.assertThat;
2625
import static org.hamcrest.Matchers.either;
@@ -87,7 +86,7 @@ void setUp(
8786
workDir,
8887
STRIPES,
8988
new NoOpFailureManager(),
90-
raftConfiguration,
89+
raftConfiguration.fsync().value(),
9190
storageConfiguration
9291
);
9392

@@ -96,7 +95,7 @@ void setUp(
9695

9796
@AfterEach
9897
void tearDown() throws Exception {
99-
closeAllManually(fileManager);
98+
fileManager.stop();
10099
}
101100

102101
@Test

modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerTest.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import static org.apache.ignite.internal.testframework.IgniteTestUtils.runRace;
3232
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
3333
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
34-
import static org.apache.ignite.internal.util.IgniteUtils.closeAllManually;
3534
import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
3635
import static org.awaitility.Awaitility.await;
3736
import static org.hamcrest.MatcherAssert.assertThat;
@@ -119,14 +118,14 @@ private SegmentFileManager createFileManager() throws IOException {
119118
workDir,
120119
STRIPES,
121120
failureManager,
122-
raftConfiguration,
121+
raftConfiguration.fsync().value(),
123122
storageConfiguration
124123
);
125124
}
126125

127126
@AfterEach
128127
void tearDown() throws Exception {
129-
closeAllManually(fileManager);
128+
fileManager.stop();
130129
}
131130

132131
@Test
@@ -281,7 +280,7 @@ void testConcurrentWrites() throws IOException {
281280
}
282281

283282
@RepeatedTest(10)
284-
void testConcurrentWritesWithClose(@InjectExecutorService(threadCount = 10) ExecutorService executor) throws Exception {
283+
void testConcurrentWritesWithStop(@InjectExecutorService(threadCount = 10) ExecutorService executor) throws Exception {
285284
int batchSize = FILE_SIZE / 10;
286285

287286
List<byte[]> batches = randomData(batchSize, 100);
@@ -296,7 +295,7 @@ void testConcurrentWritesWithClose(@InjectExecutorService(threadCount = 10) Exec
296295
if (i == batches.size() / 2) {
297296
stopTask = runAsync(() -> {
298297
try {
299-
fileManager.close();
298+
fileManager.stop();
300299
} catch (Exception e) {
301300
throw new CompletionException(e);
302301
}
@@ -454,7 +453,7 @@ void testRecovery() throws Exception {
454453

455454
List<Path> indexFiles = await().until(this::indexFiles, hasSize(segmentFiles.size() - 1));
456455

457-
fileManager.close();
456+
fileManager.stop();
458457

459458
// Delete an index file. We expect it to be re-created after recovery.
460459
Files.delete(indexFiles.get(0));
@@ -503,7 +502,7 @@ void testRecoveryWithTmpIndexFiles() throws Exception {
503502

504503
assertThat(tmpIndexFiles(), hasSize(1));
505504

506-
fileManager.close();
505+
fileManager.stop();
507506

508507
for (Path indexFile : indexFiles()) {
509508
Files.delete(indexFile);
@@ -542,7 +541,7 @@ void testTruncateSuffix(boolean restart) throws Exception {
542541
fileManager.truncateSuffix(GROUP_ID, lastLogIndexKept);
543542

544543
if (restart) {
545-
fileManager.close();
544+
fileManager.stop();
546545

547546
for (Path indexFile : indexFiles()) {
548547
Files.deleteIfExists(indexFile);
@@ -591,7 +590,7 @@ void testTruncatePrefix(boolean restart) throws Exception {
591590
}
592591

593592
if (restart) {
594-
fileManager.close();
593+
fileManager.stop();
595594

596595
for (Path indexFile : indexFiles()) {
597596
Files.deleteIfExists(indexFile);
@@ -640,7 +639,7 @@ void testReset(boolean restart) throws Exception {
640639
}
641640

642641
if (restart) {
643-
fileManager.close();
642+
fileManager.stop();
644643

645644
for (Path indexFile : indexFiles()) {
646645
Files.deleteIfExists(indexFile);

0 commit comments

Comments
 (0)