Skip to content

Commit 22f16f9

Browse files
authored
Correlate parameters if they originate from the same logical result set (#633)
1 parent ad9ea06 commit 22f16f9

4 files changed

Lines changed: 828 additions & 36 deletions

File tree

packages/service-core-tests/src/tests/register-data-storage-parameter-tests.ts

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1067,4 +1067,87 @@ streams:
10671067
expect(foundLookupA).toBeTruthy();
10681068
expect(foundLookupB).toBeTruthy();
10691069
});
1070+
1071+
test('sync streams preserve duplicate downstream lookups with different provenance', async () => {
1072+
await using factory = await generateStorageFactory();
1073+
const syncRules = await factory.updateSyncRules(
1074+
updateSyncRulesFromYaml(
1075+
`
1076+
config:
1077+
edition: 3
1078+
streams:
1079+
stream:
1080+
auto_subscribe: true
1081+
query: |
1082+
SELECT a.*
1083+
FROM a, b, c
1084+
WHERE a.x = b.x
1085+
AND a.z = c.z
1086+
AND b.y = c.y
1087+
AND c.u = auth.user_id()
1088+
`,
1089+
{
1090+
storageVersion
1091+
}
1092+
)
1093+
);
1094+
const bucketStorage = factory.getInstance(syncRules);
1095+
const sync_rules = syncRules.parsed(test_utils.PARSE_OPTIONS).hydratedSyncRules();
1096+
1097+
await using writer = await bucketStorage.createWriter(test_utils.BATCH_OPTIONS);
1098+
const tableB = await test_utils.resolveTestTable(writer, 'b', ['id'], config);
1099+
const tableC = await test_utils.resolveTestTable(writer, 'c', ['id'], config);
1100+
1101+
await writer.markAllSnapshotDone('1/1');
1102+
await writer.save({
1103+
sourceTable: tableB,
1104+
tag: storage.SaveOperationTag.INSERT,
1105+
after: {
1106+
id: 'b1',
1107+
y: 'shared-y',
1108+
x: 'x-from-shared-y'
1109+
},
1110+
afterReplicaId: test_utils.rid('b1')
1111+
});
1112+
await writer.save({
1113+
sourceTable: tableC,
1114+
tag: storage.SaveOperationTag.INSERT,
1115+
after: {
1116+
id: 'c1',
1117+
u: 'user1',
1118+
y: 'shared-y',
1119+
z: 'z1'
1120+
},
1121+
afterReplicaId: test_utils.rid('c1')
1122+
});
1123+
await writer.save({
1124+
sourceTable: tableC,
1125+
tag: storage.SaveOperationTag.INSERT,
1126+
after: {
1127+
id: 'c2',
1128+
u: 'user1',
1129+
y: 'shared-y',
1130+
z: 'z2'
1131+
},
1132+
afterReplicaId: test_utils.rid('c2')
1133+
});
1134+
await writer.commit('1/1');
1135+
1136+
const checkpoint = await bucketStorage.getCheckpoint();
1137+
const parameters = new RequestParameters(new JwtPayload({ sub: 'user1' }), {});
1138+
const querier = sync_rules.getBucketParameterQuerier({
1139+
...test_utils.querierOptions(parameters)
1140+
}).querier;
1141+
1142+
const buckets = await querier.queryDynamicBucketDescriptions({
1143+
async getParameterSets(lookups) {
1144+
return checkpoint.getParameterSets(lookups, 1000);
1145+
}
1146+
});
1147+
1148+
expect(buckets.map((bucket) => bucket.bucket).sort()).toStrictEqual([
1149+
expect.stringMatching(/stream.*\["x-from-shared-y","z1"\]$/),
1150+
expect.stringMatching(/stream.*\["x-from-shared-y","z2"\]$/)
1151+
]);
1152+
});
10701153
}

packages/sync-rules/src/compiler/equality.ts

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import { SqliteParameterValue } from '../types.js';
2+
13
/**
24
* Stable value-based hashcodes for JavaScript. The sync streams compiler uses hashmaps derived from this to efficiently
35
* de-duplicate equivalent expressions and lookups.
@@ -71,6 +73,43 @@ export class StableHasher {
7173
};
7274

7375
static readonly defaultListEquality = listEquality(this.defaultEquality);
76+
77+
static readonly parameterValueEquality: Equality<SqliteParameterValue> = (() => {
78+
const buf = new DataView(new ArrayBuffer(8));
79+
80+
return {
81+
equals: function (a: SqliteParameterValue, b: SqliteParameterValue): boolean {
82+
// Allowed values are numbers, string, and bigint. All of them compare correctly with ===
83+
return a === b;
84+
},
85+
hash: function (hasher: StableHasher, value: SqliteParameterValue): void {
86+
switch (typeof value) {
87+
case 'string':
88+
hasher.addString(value);
89+
break;
90+
case 'number':
91+
const normalized = value || 0; // Ensure 0 and -0 have the same hash code.
92+
buf.setFloat64(0, value, true);
93+
hasher.addHash(buf.getUint32(0, true));
94+
hasher.addHash(buf.getUint32(4, true));
95+
break;
96+
case 'bigint':
97+
// Most bigints we're dealing with fit in 64 bits. Truncating is fine, we're building hashes anyway.
98+
buf.setBigInt64(0, value, true);
99+
hasher.addHash(buf.getUint32(0, true));
100+
hasher.addHash(buf.getUint32(4, true));
101+
case 'boolean':
102+
hasher.addHash(value ? 0 : 1);
103+
break;
104+
case 'symbol':
105+
case 'undefined':
106+
case 'object':
107+
case 'function':
108+
throw new Error(`Not a parameter value: ${value}`);
109+
}
110+
}
111+
};
112+
})();
74113
}
75114

76115
/**

0 commit comments

Comments
 (0)