Skip to content

Commit e453166

Browse files
committed
add the possibility to define a decoder to decode a blob column (used for patch resend)
Now the hook is waiting for this object: { write : 'jsonb(?)', read : 'jsonb(binary)' }; If it returns undefined, default column write/ead is used
1 parent bf58b3e commit e453166

2 files changed

Lines changed: 64 additions & 42 deletions

File tree

lib/index.js

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -117,12 +117,14 @@ const SQLiteOnSteroid = (db, myPeerId = null, options) => {
117117
const _columns = [];
118118
const _columnsPatch = [];
119119
const _placeholders = [];
120+
const _readColumns = [];
120121
const _pkColumns = [];
121122
const _updateClauses = [];
122123
for (const col of _tableInfo) {
123124
const _colName = col.name;
124-
let _valueExpr = PREPARE_STATEMENT_HOOK(tableName, _colName);
125-
_placeholders.push(_valueExpr);
125+
const _hook = PREPARE_STATEMENT_HOOK(tableName, _colName);
126+
_placeholders.push(_hook?.write ?? '?');
127+
_readColumns.push({ name : _colName, read : (_hook?.read ?? _colName) });
126128
_columns.push(_colName);
127129
if (col.pk > 0) {
128130
_pkColumns.push(_colName);
@@ -157,7 +159,7 @@ const SQLiteOnSteroid = (db, myPeerId = null, options) => {
157159
const _getPatchFromColumnSQL = `
158160
SELECT
159161
_sequenceId,
160-
json_object('type', ${MESSAGE_TYPES.PATCH}, 'at', _patchedAt, 'peer', _peerId, 'seq', _sequenceId, 'ver', ${dbVersion}, 'tab', '${tableName}', 'delta', json_object(${_columns.map(col => `'${col}', ${col}`).join(', ')})) as patch
162+
json_object('type', ${MESSAGE_TYPES.PATCH}, 'at', _patchedAt, 'peer', _peerId, 'seq', _sequenceId, 'ver', ${dbVersion}, 'tab', '${tableName}', 'delta', json_object(${_readColumns.map(col => `'${col.name}', ${col.read}`).join(', ')})) as patch
161163
FROM ${_tableNamePatches}
162164
WHERE _peerId = ?
163165
AND _sequenceId >= ?

test/test.main.js

Lines changed: 59 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ describe('main', function () {
2121
name TEXT,
2222
deletedAt INTEGER,
2323
createdAt INTEGER,
24+
binary BLOB,
2425
PRIMARY KEY (id, tenantId)
2526
) STRICT;
2627
@@ -33,7 +34,8 @@ describe('main', function () {
3334
tenantId INTEGER NOT NULL,
3435
name TEXT,
3536
deletedAt INTEGER,
36-
createdAt INTEGER
37+
createdAt INTEGER,
38+
binary BLOB
3739
) STRICT;
3840
3941
CREATE INDEX testA_patches_at_idx ON testA_patches (_patchedAt);
@@ -43,7 +45,6 @@ describe('main', function () {
4345
let _prepareStatementHookCalls = [];
4446
function prepareStatementHook (tableName, column) {
4547
_prepareStatementHookCalls.push({ tableName, column });
46-
return '?';
4748
}
4849
beforeEach (function () {
4950
db = connect(); // memory db
@@ -57,34 +58,36 @@ describe('main', function () {
5758
_prepareStatementHookCalls = [];
5859
const _sql = app._generateMergePatchesQueryPlan('testA').applyPatchesSQL;
5960
const expectedSql = `
60-
INSERT INTO testA (id, tenantId, name, deletedAt, createdAt)
61+
INSERT INTO testA (id, tenantId, name, deletedAt, createdAt, binary)
6162
SELECT
6263
id,
6364
tenantId,
6465
keep_last(name, _patchedAt, _peerId, _sequenceId),
6566
keep_last(deletedAt, _patchedAt, _peerId, _sequenceId),
66-
keep_last(createdAt, _patchedAt, _peerId, _sequenceId)
67+
keep_last(createdAt, _patchedAt, _peerId, _sequenceId),
68+
keep_last(binary, _patchedAt, _peerId, _sequenceId)
6769
FROM testA_patches
6870
WHERE _patchedAt >= ?
6971
GROUP BY id, tenantId
7072
ON CONFLICT (id, tenantId) DO UPDATE SET
7173
name = coalesce(excluded.name, name),
7274
deletedAt = coalesce(excluded.deletedAt, deletedAt),
73-
createdAt = coalesce(excluded.createdAt, createdAt);
75+
createdAt = coalesce(excluded.createdAt, createdAt),
76+
binary = coalesce(excluded.binary, binary);
7477
`;
7578
// Remove all whitespace and compare
7679
const normalizedSql = _sql.replace(/\s+/g, '\n');
7780
const normalizedExpectedSql = expectedSql.replace(/\s+/g, '\n');
7881
assert.strictEqual(normalizedSql, normalizedExpectedSql);
79-
assert.strictEqual(_prepareStatementHookCalls.length, 5);
82+
assert.strictEqual(_prepareStatementHookCalls.length, 6);
8083
assert.strictEqual(_prepareStatementHookCalls[0].tableName, 'testA');
8184
assert.strictEqual(_prepareStatementHookCalls[0].column, 'id');
8285
});
8386
it('should generate correct insert patch query plan', function () {
8487
const _sql = app._generateMergePatchesQueryPlan('testA').savePatchSQL;
8588
const expectedSql = `
86-
INSERT INTO testA_patches (_patchedAt, _sequenceId, _peerId, id, tenantId, name, deletedAt, createdAt)
87-
VALUES (?, ?, ?, ?, ?, ?, ?, ?);
89+
INSERT INTO testA_patches (_patchedAt, _sequenceId, _peerId, id, tenantId, name, deletedAt, createdAt, binary)
90+
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?);
8891
`;
8992
// Remove all whitespace and compare
9093
const normalizedSql = _sql.replace(/\s+/g, '\n');
@@ -94,12 +97,13 @@ describe('main', function () {
9497
it('should generate correct direct upsert query plan', function () {
9598
const _sql = app._generateMergePatchesQueryPlan('testA').directUpsertSQL;
9699
const expectedSql = `
97-
INSERT INTO testA (id, tenantId, name, deletedAt, createdAt)
98-
VALUES (?, ?, ?, ?, ?)
100+
INSERT INTO testA (id, tenantId, name, deletedAt, createdAt, binary)
101+
VALUES (?, ?, ?, ?, ?, ?)
99102
ON CONFLICT (id, tenantId) DO UPDATE SET
100103
name = coalesce(excluded.name, name),
101104
deletedAt = coalesce(excluded.deletedAt, deletedAt),
102-
createdAt = coalesce(excluded.createdAt, createdAt);
105+
createdAt = coalesce(excluded.createdAt, createdAt),
106+
binary = coalesce(excluded.binary, binary);
103107
`;
104108
// Remove all whitespace and compare
105109
const normalizedSql = _sql.replace(/\s+/g, '\n');
@@ -912,21 +916,29 @@ describe('main', function () {
912916

913917

914918
it('should return the missing patch to the right peer', function (done) {
915-
app = SQLiteOnSteroid(db, 1);
919+
app = SQLiteOnSteroid(db, 1, {
920+
prepareStatementHook : (tableName, column) => {
921+
if (tableName === 'testA') {
922+
if (column === 'binary') {
923+
return { write : 'unhex(?)', read : 'lower(hex(binary))' };
924+
}
925+
}
926+
}
927+
});
916928
app.addRemotePeer(10, fakePeerSockets[10]);
917929
app.addRemotePeer(2, fakePeerSockets[2]);
918930
app.migrate([{ up : _testSchema, down : ''}]);
919931
const _constantTimestamp = Date.now();
920932
const _patches = [
921-
{ at : hlc.from(_constantTimestamp), peer : 3 , seq : 1, ver : 1, tab : 'testA', delta : { id : 1, tenantId : 1, name : '1a' } },
922-
{ at : hlc.from(_constantTimestamp), peer : 3 , seq : 3, ver : 2, tab : 'testB', delta : { id : 3, tenantId : 1, name : '3a' } }, // missing B
923-
{ at : hlc.from(_constantTimestamp), peer : 3 , seq : 5, ver : 1, tab : 'testA', delta : { id : 5, tenantId : 1, name : '5a' } },
924-
{ at : hlc.from(_constantTimestamp), peer : 10, seq : 1, ver : 1, tab : 'testA', delta : { id : 1, tenantId : 1, name : '1a' } },
925-
{ at : hlc.from(_constantTimestamp), peer : 10, seq : 3, ver : 2, tab : 'testA', delta : { id : 3, tenantId : 1, name : '3a' } },
926-
{ at : hlc.from(_constantTimestamp), peer : 10, seq : 5, ver : 1, tab : 'testA', delta : { id : 5, tenantId : 1, name : '5a' } },
927-
{ at : hlc.from(_constantTimestamp), peer : 2 , seq : 1, ver : 1, tab : 'testA', delta : { id : 6, tenantId : 2, name : '6a' } },
928-
{ at : hlc.from(_constantTimestamp), peer : 2 , seq : 2, ver : 1, tab : 'testA', delta : { id : 7, tenantId : 2, name : '7a' } }, // missing A
929-
{ at : hlc.from(_constantTimestamp), peer : 2 , seq : 4, ver : 1, tab : 'testA', delta : { id : 9, tenantId : 2, name : '9a' } },
933+
{ at : hlc.from(_constantTimestamp), peer : 3 , seq : 1, ver : 1, tab : 'testA', delta : { id : 1, tenantId : 1, name : '1a', binary : '1a'.repeat(32) } },
934+
{ at : hlc.from(_constantTimestamp), peer : 3 , seq : 3, ver : 2, tab : 'testB', delta : { id : 3, tenantId : 1, name : '3a', binary : '1a'.repeat(32) } }, // missing B
935+
{ at : hlc.from(_constantTimestamp), peer : 3 , seq : 5, ver : 1, tab : 'testA', delta : { id : 5, tenantId : 1, name : '5a', binary : '1a'.repeat(32) } },
936+
{ at : hlc.from(_constantTimestamp), peer : 10, seq : 1, ver : 1, tab : 'testA', delta : { id : 1, tenantId : 1, name : '1a', binary : '1a'.repeat(32) } },
937+
{ at : hlc.from(_constantTimestamp), peer : 10, seq : 3, ver : 2, tab : 'testA', delta : { id : 3, tenantId : 1, name : '3a', binary : '1a'.repeat(32) } },
938+
{ at : hlc.from(_constantTimestamp), peer : 10, seq : 5, ver : 1, tab : 'testA', delta : { id : 5, tenantId : 1, name : '5a', binary : '1a'.repeat(32) } },
939+
{ at : hlc.from(_constantTimestamp), peer : 2 , seq : 1, ver : 1, tab : 'testA', delta : { id : 6, tenantId : 2, name : '6a', binary : '1a'.repeat(32) } },
940+
{ at : hlc.from(_constantTimestamp), peer : 2 , seq : 2, ver : 1, tab : 'testA', delta : { id : 7, tenantId : 2, name : '7a', binary : '1a'.repeat(32) } }, // missing A
941+
{ at : hlc.from(_constantTimestamp), peer : 2 , seq : 4, ver : 1, tab : 'testA', delta : { id : 9, tenantId : 2, name : '9a', binary : '1a'.repeat(32) } },
930942
];
931943
// Apply all patches
932944
for (const patch of _patches) {
@@ -937,12 +949,12 @@ describe('main', function () {
937949
// Should search and find the patch in the database (pending_patches)
938950
app._onRequestForMissingPatchFromPeers({ type : 30 /* MISSING_PATCH */, peer : 3, minSeq : 3, maxSeq : 3, forPeer : 10 }); // missing B
939951
assert.strictEqual(messagesPerPeer[10].length, 1, 'Should have sent messages to the peer 10');
940-
assert.deepStrictEqual(messagesPerPeer[10][0], { type : 10 /* PATCH */, at : hlc.from(_constantTimestamp), peer : 3, seq : 3, ver : 2, tab : 'testB', delta : { id : 3, tenantId : 1, name : '3a' } });
952+
assert.deepStrictEqual(messagesPerPeer[10][0], { type : 10 /* PATCH */, at : hlc.from(_constantTimestamp), peer : 3, seq : 3, ver : 2, tab : 'testB', delta : { id : 3, tenantId : 1, name : '3a', binary : '1a'.repeat(32) /* binary : JSON.stringify(['3a', '3aa'])*/ } });
941953

942954
// Should search and find the patch in the database (testA)
943955
app._onRequestForMissingPatchFromPeers({ type : 30 /* MISSING_PATCH */, peer : 2, minSeq : 2, maxSeq : 2, forPeer : 2 }); // missing A
944956
assert.strictEqual(messagesPerPeer[2].length, 1, 'Should have sent messages to the peer 2');
945-
assert.deepStrictEqual(messagesPerPeer[2][0], { type : 10, at : hlc.from(_constantTimestamp), peer : 2, seq : 2, ver : 1, tab : 'testA', delta : { id : 7, tenantId : 2, name : '7a', createdAt : null, deletedAt : null } });
957+
assert.deepStrictEqual(messagesPerPeer[2][0], { type : 10, at : hlc.from(_constantTimestamp), peer : 2, seq : 2, ver : 1, tab : 'testA', delta : { id : 7, tenantId : 2, name : '7a', createdAt : null, deletedAt : null, binary : '1a'.repeat(32) } });
946958

947959
// Should not crash if the patch is not found
948960
app._onRequestForMissingPatchFromPeers({ type : 30 /* MISSING_PATCH */, peer : 3, minSeq : 300, maxSeq : 300, forPeer : 10 });
@@ -956,22 +968,30 @@ describe('main', function () {
956968
});
957969

958970
it('should return the all missing patches (range request) to the right peer from a given sequenceId', function (done) {
959-
app = SQLiteOnSteroid(db, 1);
971+
app = SQLiteOnSteroid(db, 1, {
972+
prepareStatementHook : (tableName, column) => {
973+
if (tableName === 'testA') {
974+
if (column === 'binary') {
975+
return { write : 'jsonb(?)', read : 'jsonb(binary)' };
976+
}
977+
}
978+
}
979+
});
960980
app.addRemotePeer(10, fakePeerSockets[10]);
961981
app.addRemotePeer(2, fakePeerSockets[2]);
962982
app.migrate([{ up : _testSchema, down : ''}]);
963983
const _constantTimestamp = Date.now();
964984
const _patches = [
965-
{ at : hlc.from(_constantTimestamp), peer : 3 , seq : 1, ver : 1, tab : 'testA', delta : { id : 1, tenantId : 1, name : '1a' } },
966-
{ at : hlc.from(_constantTimestamp), peer : 3 , seq : 3, ver : 1, tab : 'testA', delta : { id : 3, tenantId : 1, name : '3a' } },
967-
{ at : hlc.from(_constantTimestamp), peer : 3 , seq : 5, ver : 1, tab : 'testA', delta : { id : 5, tenantId : 1, name : '5a' } },
968-
{ at : hlc.from(_constantTimestamp), peer : 2 , seq : 1, ver : 1, tab : 'testA', delta : { id : 1, tenantId : 1, name : '1a' } },
969-
{ at : hlc.from(_constantTimestamp), peer : 2 , seq : 3, ver : 2, tab : 'testB', delta : { id : 3, tenantId : 1, name : '3a' } }, // missing B
970-
{ at : hlc.from(_constantTimestamp), peer : 2 , seq : 5, ver : 2, tab : 'testB', delta : { id : 5, tenantId : 1, name : '5a' } }, // missing B
971-
{ at : hlc.from(_constantTimestamp), peer : 2 , seq : 6, ver : 2, tab : 'testB', delta : { id : 6, tenantId : 1, name : '6a' } }, // missing B
972-
{ at : hlc.from(_constantTimestamp), peer : 3 , seq : 6, ver : 1, tab : 'testA', delta : { id : 6, tenantId : 2, name : '6a' } },
973-
{ at : hlc.from(_constantTimestamp), peer : 10, seq : 2, ver : 1, tab : 'testA', delta : { id : 7, tenantId : 2, name : '7a' } },
974-
{ at : hlc.from(_constantTimestamp), peer : 10, seq : 4, ver : 1, tab : 'testA', delta : { id : 9, tenantId : 2, name : '9a' } },
985+
{ at : hlc.from(_constantTimestamp), peer : 3 , seq : 1, ver : 1, tab : 'testA', delta : { id : 1, tenantId : 1, name : '1a', binary : JSON.stringify(['1a', '1aa']) } },
986+
{ at : hlc.from(_constantTimestamp), peer : 3 , seq : 3, ver : 1, tab : 'testA', delta : { id : 3, tenantId : 1, name : '3a', binary : JSON.stringify(['3a', '3aa']) } },
987+
{ at : hlc.from(_constantTimestamp), peer : 3 , seq : 5, ver : 1, tab : 'testA', delta : { id : 5, tenantId : 1, name : '5a', binary : JSON.stringify(['5a', '5aa']) } },
988+
{ at : hlc.from(_constantTimestamp), peer : 2 , seq : 1, ver : 1, tab : 'testA', delta : { id : 1, tenantId : 1, name : '1a', binary : JSON.stringify(['1a', '1aa']) } },
989+
{ at : hlc.from(_constantTimestamp), peer : 2 , seq : 3, ver : 2, tab : 'testB', delta : { id : 3, tenantId : 1, name : '3a', binary : JSON.stringify(['3a', '3aa']) } }, // missing B
990+
{ at : hlc.from(_constantTimestamp), peer : 2 , seq : 5, ver : 2, tab : 'testB', delta : { id : 5, tenantId : 1, name : '5a', binary : JSON.stringify(['5a', '5aa']) } }, // missing B
991+
{ at : hlc.from(_constantTimestamp), peer : 2 , seq : 6, ver : 2, tab : 'testB', delta : { id : 6, tenantId : 1, name : '6a', binary : JSON.stringify(['6a', '6aa']) } }, // missing B
992+
{ at : hlc.from(_constantTimestamp), peer : 3 , seq : 6, ver : 1, tab : 'testA', delta : { id : 6, tenantId : 2, name : '6a', binary : JSON.stringify(['6a', '6aa']) } },
993+
{ at : hlc.from(_constantTimestamp), peer : 10, seq : 2, ver : 1, tab : 'testA', delta : { id : 7, tenantId : 2, name : '7a', binary : JSON.stringify(['7a', '7aa']) } },
994+
{ at : hlc.from(_constantTimestamp), peer : 10, seq : 4, ver : 1, tab : 'testA', delta : { id : 9, tenantId : 2, name : '9a', binary : JSON.stringify(['9a', '9aa']) } },
975995
];
976996
// Apply all patches
977997
for (const patch of _patches) {
@@ -982,15 +1002,15 @@ describe('main', function () {
9821002
// Should search and find the patch in the database (pending_patches)
9831003
app._onRequestForMissingPatchFromPeers({ type : 30 /* MISSING_PATCH */, peer : 2, minSeq : 2, maxSeq : 5, forPeer : 10 }); // missing B
9841004
assert.strictEqual(messagesPerPeer[10].length, 2, 'Should have sent messages to the peer 10');
985-
assert.deepStrictEqual(messagesPerPeer[10][0], { type : 10, at : hlc.from(_constantTimestamp), peer : 2, seq : 3, ver : 2, tab : 'testB', delta : { id : 3, tenantId : 1, name : '3a' } });
986-
assert.deepStrictEqual(messagesPerPeer[10][1], { type : 10, at : hlc.from(_constantTimestamp), peer : 2, seq : 5, ver : 2, tab : 'testB', delta : { id : 5, tenantId : 1, name : '5a' } });
1005+
assert.deepStrictEqual(messagesPerPeer[10][0], { type : 10, at : hlc.from(_constantTimestamp), peer : 2, seq : 3, ver : 2, tab : 'testB', delta : { id : 3, tenantId : 1, name : '3a' , binary : JSON.stringify(['3a', '3aa']) } });
1006+
assert.deepStrictEqual(messagesPerPeer[10][1], { type : 10, at : hlc.from(_constantTimestamp), peer : 2, seq : 5, ver : 2, tab : 'testB', delta : { id : 5, tenantId : 1, name : '5a' , binary : JSON.stringify(['5a', '5aa']) } });
9871007

9881008
// Should search and find the patch in the database (testA)
9891009
app._onRequestForMissingPatchFromPeers({ type : 30 /* MISSING_PATCH */, peer : 3, minSeq : 2, maxSeq : 100, forPeer : 2 }); // missing A
9901010
assert.strictEqual(messagesPerPeer[2].length, 3, 'Should have sent messages to the peer 2');
991-
assert.deepStrictEqual(messagesPerPeer[2][0], { type : 10, at : hlc.from(_constantTimestamp), peer : 3, seq : 3, ver : 1, tab : 'testA', delta : { id : 3, tenantId : 1, name : '3a', createdAt : null, deletedAt : null } });
992-
assert.deepStrictEqual(messagesPerPeer[2][1], { type : 10, at : hlc.from(_constantTimestamp), peer : 3, seq : 5, ver : 1, tab : 'testA', delta : { id : 5, tenantId : 1, name : '5a', createdAt : null, deletedAt : null } });
993-
assert.deepStrictEqual(messagesPerPeer[2][2], { type : 10, at : hlc.from(_constantTimestamp), peer : 3, seq : 6, ver : 1, tab : 'testA', delta : { id : 6, tenantId : 2, name : '6a', createdAt : null, deletedAt : null } });
1011+
assert.deepStrictEqual(messagesPerPeer[2][0], { type : 10, at : hlc.from(_constantTimestamp), peer : 3, seq : 3, ver : 1, tab : 'testA', delta : { id : 3, tenantId : 1, name : '3a', createdAt : null, deletedAt : null, binary : ['3a', '3aa'] } });
1012+
assert.deepStrictEqual(messagesPerPeer[2][1], { type : 10, at : hlc.from(_constantTimestamp), peer : 3, seq : 5, ver : 1, tab : 'testA', delta : { id : 5, tenantId : 1, name : '5a', createdAt : null, deletedAt : null, binary : ['5a', '5aa'] } });
1013+
assert.deepStrictEqual(messagesPerPeer[2][2], { type : 10, at : hlc.from(_constantTimestamp), peer : 3, seq : 6, ver : 1, tab : 'testA', delta : { id : 6, tenantId : 2, name : '6a', createdAt : null, deletedAt : null, binary : ['6a', '6aa'] } });
9941014

9951015
// Should not crash if the patch is not found
9961016
app._onRequestForMissingPatchFromPeers({ type : 30 /* MISSING_PATCH */, peer : 2, minSeq : 200, maxSeq : 200, forPeer : 10 });

0 commit comments

Comments
 (0)