Skip to content

Commit be273eb

Browse files
committed
WIP
1 parent c2c13ca commit be273eb

3 files changed

Lines changed: 20 additions & 23 deletions

File tree

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotVerify.java

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -234,31 +234,25 @@ else if (txRec.state() == TransactionState.ROLLED_BACK) {
234234
for (Map.Entry<GridCacheVersion, Set<Short>> tx: txPrimParticipatingNodes.entrySet())
235235
calcTxHash.accept(tx.getKey(), tx.getValue());
236236

237-
Collection<TransactionsHashRecord> txHashRes = nodesTxHash.entrySet().stream().map(e -> {
238-
Short nodeId = e.getKey();
239-
HashHolder hashHolder = e.getValue();
240-
241-
return new TransactionsHashRecord(
237+
Collection<TransactionsHashRecord> txHashRes = nodesTxHash.entrySet().stream()
238+
.map(e -> new TransactionsHashRecord(
242239
sft.consistentId(),
243-
blt.compactIdMapping().get(nodeId),
244-
hashHolder.hash);
245-
}).collect(Collectors.toCollection(ArrayList::new));
240+
blt.compactIdMapping().get(e.getKey()),
241+
e.getValue().hash
242+
))
243+
.collect(Collectors.toCollection(ArrayList::new));
246244

247-
Map<PartitionKey, PartitionHashRecord> partHashRes = partMap.entrySet().stream()
248-
.collect(Collectors.toMap(
249-
Map.Entry::getKey,
250-
e -> new PartitionHashRecord(
245+
Collection<PartitionHashRecord> partHashRes = partMap.entrySet().stream()
246+
.map(e -> new PartitionHashRecord(
251247
e.getKey(),
252248
false,
253249
sft.consistentId(),
254250
null,
255251
0,
256252
null,
257253
new VerifyPartitionContext(e.getValue())
258-
),
259-
(a, b) -> a,
260-
HashMap::new
261-
));
254+
)
255+
).collect(Collectors.toCollection(ArrayList::new));
262256

263257
if (log.isInfoEnabled()) {
264258
log.info("Verify incremental snapshot procedure finished " +

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotVerifyResult.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,8 @@
1818
package org.apache.ignite.internal.processors.cache.persistence.snapshot;
1919

2020
import java.util.Collection;
21-
import java.util.Map;
2221
import org.apache.ignite.IgniteCheckedException;
2322
import org.apache.ignite.internal.Order;
24-
import org.apache.ignite.internal.management.cache.PartitionKey;
2523
import org.apache.ignite.internal.managers.communication.ErrorMessage;
2624
import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
2725
import org.apache.ignite.internal.processors.cache.verify.PartitionHashRecord;
@@ -46,7 +44,7 @@ public class IncrementalSnapshotVerifyResult implements MarshallableMessage {
4644
* Partition hashes collection. Value is a hash of data entries {@link DataEntry} from WAL segments included
4745
* into the incremental snapshot.
4846
*/
49-
private Map<PartitionKey, PartitionHashRecord> partHashRes;
47+
private Collection<PartitionHashRecord> partHashRes;
5048

5149
/** */
5250
@Order(1)
@@ -68,7 +66,7 @@ public IncrementalSnapshotVerifyResult() {
6866
/** */
6967
IncrementalSnapshotVerifyResult(
7068
Collection<TransactionsHashRecord> txHashRes,
71-
Map<PartitionKey, PartitionHashRecord> partHashRes,
69+
Collection<PartitionHashRecord> partHashRes,
7270
Collection<GridCacheVersion> partiallyCommittedTxs,
7371
Collection<Exception> exceptions
7472
) {
@@ -79,7 +77,7 @@ public IncrementalSnapshotVerifyResult() {
7977
}
8078

8179
/** */
82-
public Map<PartitionKey, PartitionHashRecord> partHashRes() {
80+
public Collection<PartitionHashRecord> partHashRes() {
8381
return partHashRes;
8482
}
8583

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotChecker.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,9 @@
3333
import org.apache.ignite.cluster.ClusterNode;
3434
import org.apache.ignite.internal.GridKernalContext;
3535
import org.apache.ignite.internal.management.cache.IdleVerifyResult;
36+
import org.apache.ignite.internal.management.cache.PartitionKey;
3637
import org.apache.ignite.internal.processors.cache.persistence.filename.SnapshotFileTree;
38+
import org.apache.ignite.internal.processors.cache.verify.PartitionHashRecord;
3739
import org.apache.ignite.internal.processors.cache.verify.TransactionsHashRecord;
3840
import org.apache.ignite.internal.util.typedef.F;
3941
import org.apache.ignite.plugin.extensions.communication.Message;
@@ -102,13 +104,16 @@ public IdleVerifyResult reduceIncrementalResults(
102104
if (!F.isEmpty(res.partiallyCommittedTxs()))
103105
bldr.addPartiallyCommited(nodeRes.getKey(), res.partiallyCommittedTxs());
104106

105-
bldr.addPartitionHashes(res.partHashRes());
107+
Map<PartitionKey, PartitionHashRecord> partHashRes = res.partHashRes().stream()
108+
.collect(Collectors.toMap(PartitionHashRecord::partitionKey, Function.identity()));
109+
110+
bldr.addPartitionHashes(partHashRes);
106111

107112
if (log.isDebugEnabled())
108113
log.debug("Handle VerifyIncrementalSnapshotJob result [node=" + nodeRes.getKey() + ", taskRes=" + res + ']');
109114

110115
Map<Object, TransactionsHashRecord> txHashRes = res.txHashRes().stream().collect(
111-
Collectors.toMap(TransactionsHashRecord::remoteConsistentId, Function.identity(), (a, b) -> a, HashMap::new));
116+
Collectors.toMap(TransactionsHashRecord::remoteConsistentId, Function.identity()));
112117

113118
bldr.addIncrementalHashRecords(nodeRes.getKey(), txHashRes);
114119
}

0 commit comments

Comments
 (0)