Skip to content

Commit c2c13ca

Browse files
committed
WIP
1 parent c5e29d7 commit c2c13ca

4 files changed

Lines changed: 30 additions & 23 deletions

File tree

modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,6 @@ public DiscoveryMessageFactory(Marshaller cstDataMarshall, ClassLoader cstDataMa
262262
factory.register((short)518, MappingProposedMessage::new, new MappingProposedMessageSerializer());
263263
factory.register((short)519, MarshallerMappingItem::new, new MarshallerMappingItemSerializer());
264264

265-
266265
factory.register((short)520, SnapshotOperationResponse::new, new SnapshotOperationResponseSerializer());
267266
factory.register((short)521, SnapshotHandlerResult::new, new SnapshotHandlerResultSerializer());
268267
factory.register((short)522, DataStreamerUpdatesHandlerResult::new, new DataStreamerUpdatesHandlerResultSerializer());

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotVerify.java

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -234,29 +234,31 @@ else if (txRec.state() == TransactionState.ROLLED_BACK) {
234234
for (Map.Entry<GridCacheVersion, Set<Short>> tx: txPrimParticipatingNodes.entrySet())
235235
calcTxHash.accept(tx.getKey(), tx.getValue());
236236

237-
Map<Object, TransactionsHashRecord> txHashRes = new HashMap<>();
237+
Collection<TransactionsHashRecord> txHashRes = nodesTxHash.entrySet().stream().map(e -> {
238+
Short nodeId = e.getKey();
239+
HashHolder hashHolder = e.getValue();
238240

239-
nodesTxHash.forEach((nodeId, hashHolder) -> {
240-
TransactionsHashRecord rec = new TransactionsHashRecord(
241+
return new TransactionsHashRecord(
241242
sft.consistentId(),
242243
blt.compactIdMapping().get(nodeId),
243244
hashHolder.hash);
245+
}).collect(Collectors.toCollection(ArrayList::new));
244246

245-
txHashRes.put(rec.remoteConsistentId(), rec);
246-
});
247-
248-
Map<PartitionKey, PartitionHashRecord> partHashRes = new HashMap<>();
249-
250-
partMap.forEach((partKey, hashHolder) -> {
251-
partHashRes.put(partKey, new PartitionHashRecord(
252-
partKey,
253-
false,
254-
sft.consistentId(),
255-
null,
256-
0,
257-
null,
258-
new VerifyPartitionContext(hashHolder)));
259-
});
247+
Map<PartitionKey, PartitionHashRecord> partHashRes = partMap.entrySet().stream()
248+
.collect(Collectors.toMap(
249+
Map.Entry::getKey,
250+
e -> new PartitionHashRecord(
251+
e.getKey(),
252+
false,
253+
sft.consistentId(),
254+
null,
255+
0,
256+
null,
257+
new VerifyPartitionContext(e.getValue())
258+
),
259+
(a, b) -> a,
260+
HashMap::new
261+
));
260262

261263
if (log.isInfoEnabled()) {
262264
log.info("Verify incremental snapshot procedure finished " +

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotVerifyResult.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
/** */
3737
public class IncrementalSnapshotVerifyResult implements MarshallableMessage {
3838
/** Transaction hashes collection. */
39-
private Map<Object, TransactionsHashRecord> txHashRes;
39+
private Collection<TransactionsHashRecord> txHashRes;
4040

4141
/** */
4242
@Order(0)
@@ -67,7 +67,7 @@ public IncrementalSnapshotVerifyResult() {
6767

6868
/** */
6969
IncrementalSnapshotVerifyResult(
70-
Map<Object, TransactionsHashRecord> txHashRes,
70+
Collection<TransactionsHashRecord> txHashRes,
7171
Map<PartitionKey, PartitionHashRecord> partHashRes,
7272
Collection<GridCacheVersion> partiallyCommittedTxs,
7373
Collection<Exception> exceptions
@@ -84,7 +84,7 @@ public Map<PartitionKey, PartitionHashRecord> partHashRes() {
8484
}
8585

8686
/** */
87-
public Map<Object, TransactionsHashRecord> txHashRes() {
87+
public Collection<TransactionsHashRecord> txHashRes() {
8888
return txHashRes;
8989
}
9090

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotChecker.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,16 @@
2525
import java.util.Set;
2626
import java.util.concurrent.CompletableFuture;
2727
import java.util.concurrent.ExecutorService;
28+
import java.util.function.Function;
29+
import java.util.stream.Collectors;
2830
import org.apache.ignite.IgniteCheckedException;
2931
import org.apache.ignite.IgniteException;
3032
import org.apache.ignite.IgniteLogger;
3133
import org.apache.ignite.cluster.ClusterNode;
3234
import org.apache.ignite.internal.GridKernalContext;
3335
import org.apache.ignite.internal.management.cache.IdleVerifyResult;
3436
import org.apache.ignite.internal.processors.cache.persistence.filename.SnapshotFileTree;
37+
import org.apache.ignite.internal.processors.cache.verify.TransactionsHashRecord;
3538
import org.apache.ignite.internal.util.typedef.F;
3639
import org.apache.ignite.plugin.extensions.communication.Message;
3740
import org.jetbrains.annotations.Nullable;
@@ -104,7 +107,10 @@ public IdleVerifyResult reduceIncrementalResults(
104107
if (log.isDebugEnabled())
105108
log.debug("Handle VerifyIncrementalSnapshotJob result [node=" + nodeRes.getKey() + ", taskRes=" + res + ']');
106109

107-
bldr.addIncrementalHashRecords(nodeRes.getKey(), res.txHashRes());
110+
Map<Object, TransactionsHashRecord> txHashRes = res.txHashRes().stream().collect(
111+
Collectors.toMap(TransactionsHashRecord::remoteConsistentId, Function.identity(), (a, b) -> a, HashMap::new));
112+
113+
bldr.addIncrementalHashRecords(nodeRes.getKey(), txHashRes);
108114
}
109115

110116
return bldr.build();

0 commit comments

Comments
 (0)