-
Notifications
You must be signed in to change notification settings - Fork 658
Expand file tree
/
Copy pathMigrateScanFunctions.cs
More file actions
115 lines (90 loc) · 4.43 KB
/
MigrateScanFunctions.cs
File metadata and controls
115 lines (90 loc) · 4.43 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
using System;
using Garnet.common;
using Garnet.server;
using Tsavorite.core;
namespace Garnet.cluster
{
internal sealed unsafe partial class MigrateSession
{
#region mainStoreScan
internal sealed unsafe class MainStoreScan : IScanIteratorFunctions<SpanByte, SpanByte>
{
readonly MigrateOperation mss;
internal MainStoreScan(MigrateOperation mss)
{
this.mss = mss;
}
public bool OnStart(long beginAddress, long endAddress) => true;
public void OnStop(bool completed, long numberOfRecords) { }
public void OnException(Exception exception, long numberOfRecords) { }
public unsafe bool SingleReader(ref SpanByte key, ref SpanByte value, RecordMetadata recordMetadata, long numberOfRecords, out CursorRecordResult cursorRecordResult)
{
cursorRecordResult = CursorRecordResult.Accept; // default; not used here
mss.ThrowIfCancelled();
// Do not send key if it is expired
if (ClusterSession.Expired(ref value))
return true;
// TODO: Some other way to detect namespaces
if (key.MetadataSize == 1)
{
var ns = key.GetNamespaceInPayload();
if (mss.ContainsNamespace(ns) && !mss.sketch.TryHashAndStore(ns, key.AsSpan()))
return false;
}
else
{
var s = HashSlotUtils.HashSlot(ref key);
// Check if key belongs to slot that is being migrated...
if (mss.Contains(s))
{
if (recordMetadata.RecordInfo.VectorSet)
{
// We can't delete the vector set _yet_ nor can we migrate it,
// we just need to remember it to migrate once the associated namespaces are all moved over
mss.EncounteredVectorSet(key.ToByteArray(), value.ToByteArray());
}
else if (!mss.sketch.TryHashAndStore(key.AsSpan()))
{
// Out of space, end scan for now
return false;
}
}
}
return true;
}
public bool ConcurrentReader(ref SpanByte key, ref SpanByte value, RecordMetadata recordMetadata, long numberOfRecords, out CursorRecordResult cursorRecordResult)
=> SingleReader(ref key, ref value, recordMetadata, numberOfRecords, out cursorRecordResult);
}
#endregion
#region objectStoreScan
internal sealed unsafe class ObjectStoreScan : IScanIteratorFunctions<byte[], IGarnetObject>
{
readonly MigrateOperation mss;
internal ObjectStoreScan(MigrateOperation mss)
{
this.mss = mss;
}
public bool OnStart(long beginAddress, long endAddress) => true;
public void OnStop(bool completed, long numberOfRecords) { }
public void OnException(Exception exception, long numberOfRecords) { }
public bool ConcurrentReader(ref byte[] key, ref IGarnetObject value, RecordMetadata recordMetadata, long numberOfRecords, out CursorRecordResult cursorRecordResult)
=> SingleReader(ref key, ref value, recordMetadata, numberOfRecords, out cursorRecordResult);
public unsafe bool SingleReader(ref byte[] key, ref IGarnetObject value, RecordMetadata recordMetadata, long numberOfRecords, out CursorRecordResult cursorRecordResult)
{
cursorRecordResult = CursorRecordResult.Accept; // default; not used here
mss.ThrowIfCancelled();
// Do not send key if it is expired
if (ClusterSession.Expired(ref value))
return true;
var s = HashSlotUtils.HashSlot(key);
// Check if key belongs to slot that is being migrated and if it can be added to our buffer
if (mss.Contains(s) && !mss.sketch.TryHashAndStore(key.AsSpan()))
return false;
return true;
}
}
#endregion
}
}