Skip to content

Commit 63fd9ec

Browse files
committed
Add vector clock, oplog hashing & chain sync
Introduce vector clock support and oplog hashing to improve sync integrity and gap recovery. Added HLCTimestamp-based VectorClock class and computeOplogHash utility; extended core exports. Updated storage interface and SqlitePeerStore to persist hash/previous_hash, compute entry hashes on write, expose vector clock, per-node oplog queries, last-entry hash and chain-range retrieval, and map DB rows to OplogEntry. Extended protocol and protobuf (sync.proto + generated) with VectorClock messages, hash/previous_hash on ProtoOplogEntry, GetChainRange/GetVectorClock messages and message types. Updated ProtocolMapper and domain OplogEntry to carry hash fields. Enhanced TcpSyncClient with getVectorClock, pushChanges and getChainRange helpers. Revamped SyncOrchestrator to exchange vector clocks, compute nodes to pull/push, process inbound batches with chain verification and gap recovery, and apply changes atomically. These changes enable per-node incremental sync, integrity checks and missing-range recovery.
1 parent 919f05d commit 63fd9ec

11 files changed

Lines changed: 945 additions & 62 deletions

File tree

packages/core/src/crypto/hash.ts

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
import { createHash } from 'crypto';
2+
import { OplogEntry, HLCTimestamp } from '@entgldb/protocol';
3+
import { HLClock } from '../hlc/clock';
4+
5+
export function computeOplogHash(entry: OplogEntry): string {
6+
const sha256 = createHash('sha256');
7+
const sb: string[] = [];
8+
9+
// Order: Collection|Key|Operation|Payload|Timestamp|PreviousHash
10+
// Operation is enum (0=Put, 1=Delete). Node uses exact same proto enum.
11+
12+
sb.push(entry.collection);
13+
sb.push('|');
14+
sb.push(entry.key);
15+
sb.push('|');
16+
// Operation is number in protobuf-ts interface usually? Or string?
17+
// In generated code, enum is usually a number.
18+
// Check .NET: it appends 'Operation' which is an Enum.
19+
// In C# ToString() on Enum returns the name "Put" unless [Flags] or cast.
20+
// WAIT. .NET code: 'sb.Append(Operation);'
21+
// Step 14: public OperationType Operation { get; }
22+
// public enum OperationType { Put, Delete }
23+
// In C#, sb.Append(enum) appends the string name! "Put", "Delete".
24+
// Protobuf usually sends integers 0, 1.
25+
// I need to be careful here.
26+
// If .NET computes hash using "Put", I must use "Put".
27+
// If I use the number 0, hashes will differ.
28+
// I should check OplogEntry.cs again.
29+
// Yes, 'sb.Append(Operation)' where Operation is OperationType enum.
30+
// So it uses names.
31+
32+
// In Node.js, the generated proto interface likely uses numbers or string literals depending on option.
33+
// But OplogEntry is the database object, not necessarily the proto message yet?
34+
// In Node, we often use the Proto interface as the domain object or similar.
35+
// Let's assume input 'entry' is the ProtoOplogEntry-like structure.
36+
// If 'operation' is "Put" or "Delete" string, good.
37+
// If it's number 0, 1, I need to map it.
38+
39+
// Let's check sync.proto again.
40+
// message ProtoOplogEntry { string operation = 3; } -> Wait!
41+
// In sync.proto (Step 104/106), 'string operation = 3; // "Put" or "Delete"'
42+
// Ah, in proto it is defined as STRING.
43+
// In .NET OplogEntry.cs, it's an Enum, but mapped to string in proto?
44+
// Step 13 (Net proto): string operation = 3;
45+
// Step 14 (Net logic): public OperationType Operation { get; } -> SB.Append(Operation).
46+
// So the hash source is the Enum name "Put"/"Delete".
47+
// The proto carries a string "Put"/"Delete".
48+
// So if I use the string from the proto, it should match.
49+
50+
// BUT, wait.
51+
// In Node.js, is OplogEntry type using string or enum?
52+
// Step 71: export type { OplogEntry } from '@entgldb/protocol';
53+
// If I just updated sync.proto to have `string operation = 3`, then it's string.
54+
// Previously in Node proto? Step 104: `string operation = 3`.
55+
// So it's string.
56+
57+
// OplogEntry domain object has 'data' as Uint8Array, 'timestamp' as object.
58+
59+
// Operation
60+
sb.push(entry.operation);
61+
sb.push('|');
62+
63+
// Payload
64+
if (entry.data && entry.data.length > 0) {
65+
// Decode bytes to string for hashing to match .NET GetRawText()
66+
// Assuming data is UTF-8 JSON
67+
sb.push(new TextDecoder().decode(entry.data));
68+
}
69+
sb.push('|');
70+
71+
// Timestamp
72+
if (entry.timestamp) {
73+
sb.push(HLClock.toString(entry.timestamp));
74+
} else {
75+
// Should not happen for valid entry
76+
sb.push('0-0-');
77+
}
78+
sb.push('|');
79+
80+
sb.push(entry.previousHash || '');
81+
82+
sha256.update(sb.join(''));
83+
return sha256.digest('hex').toLowerCase();
84+
}
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
import { HLCTimestamp } from '@entgldb/protocol';
2+
import { HLClock } from './clock';
3+
4+
export enum CausalityRelation {
5+
Equal,
6+
StrictlyAhead,
7+
StrictlyBehind,
8+
Concurrent
9+
}
10+
11+
export class VectorClock {
12+
private clock: Map<string, HLCTimestamp>;
13+
14+
constructor(initial?: Map<string, HLCTimestamp>) {
15+
this.clock = initial ? new Map(initial) : new Map();
16+
}
17+
18+
get nodeIds(): string[] {
19+
return Array.from(this.clock.keys());
20+
}
21+
22+
getTimestamp(nodeId: string): HLCTimestamp | undefined {
23+
return this.clock.get(nodeId);
24+
}
25+
26+
setTimestamp(nodeId: string, timestamp: HLCTimestamp): void {
27+
this.clock.set(nodeId, timestamp);
28+
}
29+
30+
merge(other: VectorClock): void {
31+
for (const nodeId of other.nodeIds) {
32+
const otherTs = other.getTimestamp(nodeId)!;
33+
const currentTs = this.getTimestamp(nodeId);
34+
35+
if (!currentTs || HLClock.compare(otherTs, currentTs) > 0) {
36+
this.setTimestamp(nodeId, otherTs);
37+
}
38+
}
39+
}
40+
41+
compareTo(other: VectorClock): CausalityRelation {
42+
let thisAhead = false;
43+
let otherAhead = false;
44+
45+
const allNodes = new Set([...this.nodeIds, ...other.nodeIds]);
46+
47+
for (const nodeId of allNodes) {
48+
const thisTs = this.getTimestamp(nodeId);
49+
const otherTs = other.getTimestamp(nodeId);
50+
51+
// If a node is missing, treat as "zero" timestamp (which is always older than any real timestamp)
52+
// But strict implementation might require explicit handling.
53+
// Assuming missing means "start of time".
54+
55+
let cmp = 0;
56+
if (thisTs && otherTs) {
57+
cmp = HLClock.compare(thisTs, otherTs);
58+
} else if (thisTs && !otherTs) {
59+
cmp = 1;
60+
} else if (!thisTs && otherTs) {
61+
cmp = -1;
62+
}
63+
64+
if (cmp > 0) thisAhead = true;
65+
if (cmp < 0) otherAhead = true;
66+
67+
if (thisAhead && otherAhead) return CausalityRelation.Concurrent;
68+
}
69+
70+
if (thisAhead && !otherAhead) return CausalityRelation.StrictlyAhead;
71+
if (otherAhead && !thisAhead) return CausalityRelation.StrictlyBehind;
72+
return CausalityRelation.Equal;
73+
}
74+
75+
getNodesWithUpdates(other: VectorClock): string[] {
76+
const result: string[] = [];
77+
const allNodes = new Set(this.clock.keys());
78+
79+
// Add nodes that other has but we don't (implicitly other is ahead)
80+
for (const nodeId of other.nodeIds) {
81+
if (!this.clock.has(nodeId)) {
82+
// Actually if other has a node we don't, other is ahead for that node
83+
// But usually we track "known nodes".
84+
// Wait, logic: "Nodes where OTHER is ahead of THIS"
85+
// So if I don't have it, and other has it, other is ahead.
86+
allNodes.add(nodeId);
87+
}
88+
}
89+
90+
for (const nodeId of allNodes) {
91+
const thisTs = this.getTimestamp(nodeId);
92+
const otherTs = other.getTimestamp(nodeId);
93+
94+
// If other has it and (we don't OR other > us)
95+
if (otherTs) {
96+
if (!thisTs || HLClock.compare(otherTs, thisTs) > 0) {
97+
result.push(nodeId);
98+
}
99+
}
100+
}
101+
return result;
102+
}
103+
104+
getNodesToPush(other: VectorClock): string[] {
105+
const result: string[] = [];
106+
const allNodes = new Set([...this.nodeIds, ...other.nodeIds]);
107+
108+
for (const nodeId of allNodes) {
109+
const thisTs = this.getTimestamp(nodeId);
110+
const otherTs = other.getTimestamp(nodeId);
111+
112+
// If we have it and (other doesn't OR we > other)
113+
if (thisTs) {
114+
if (!otherTs || HLClock.compare(thisTs, otherTs) > 0) {
115+
result.push(nodeId);
116+
}
117+
}
118+
}
119+
return result;
120+
}
121+
122+
clone(): VectorClock {
123+
// deepish clone of map (timestamps are immutable-ish usually)
124+
return new VectorClock(this.clock);
125+
}
126+
127+
toString(): string {
128+
if (this.clock.size === 0) return '{}';
129+
const entries = Array.from(this.clock.entries())
130+
.map(([k, v]) => `${k}:${HLClock.toString(v)}`);
131+
return `{${entries.join(', ')}}`;
132+
}
133+
}

packages/core/src/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ export { IPeerStore } from './storage/interface';
1010

1111
// Sync and conflict resolution
1212
export * from './sync';
13+
export * from './hlc/vector-clock';
14+
export * from './crypto/hash';
1315

1416
// Re-export protocol types for convenience
1517
// Re-export protocol types for convenience

packages/core/src/storage/interface.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,31 @@ export interface IPeerStore {
3131
*/
3232
deleteDocument(collection: string, key: string, timestamp: HLCTimestamp): Promise<void>;
3333

34+
/**
35+
* Get the current Vector Clock from the store
36+
*/
37+
getVectorClock(): Promise<import('../hlc/vector-clock').VectorClock>;
38+
3439
/**
3540
* Get oplog entries after a given timestamp
3641
*/
3742
getOplogAfter(timestamp: HLCTimestamp, limit?: number): Promise<OplogEntry[]>;
3843

44+
/**
45+
* Get oplog entries for a specific node after a given timestamp
46+
*/
47+
getOplogForNodeAfter(nodeId: string, timestamp: HLCTimestamp): Promise<OplogEntry[]>;
48+
49+
/**
50+
* Get the hash of the last entry for a specific node
51+
*/
52+
getLastEntryHash(nodeId: string): Promise<string | null>;
53+
54+
/**
55+
* Get a range of oplog entries between two hashes (inclusive end)
56+
*/
57+
getChainRange(startHash: string, endHash: string): Promise<OplogEntry[]>;
58+
3959
/**
4060
* Apply a batch of documents and oplog entries (for sync)
4161
*/

0 commit comments

Comments
 (0)