Skip to content

Commit cdb8993

Browse files
authored
Configurable storage versions (#601)
* WIP: Implement models for incremental reprocessing. * Handle current_data v1/v3 differences. * Split out PersistedBatch implementations. * Split out MongoBucketBatch implementations. * Resolve circular imports. * Back to old bucket names for now. * Fix type check. * nullable CurrentDataDocumentV3.data. * Use string ids. * Split collections for bucket_data. * Use clustered collections. * Drop bucket_data collections when clearing. * Fix type issues. * Fixes. * Fix tests. * Split out checksum implementations. * Split bucket_parameter collections. * Initialize collections upfront. * Workaround for MongoDB SERVER-121822. * Update snapshots. * Fix for parameter lookups. * Optimize parameter query lookups. * Minor restructuring. * Split current_data into separate source_record_ collections. * Refactor _id for source_records. * Split out CurrentDataStore. * Further split out implementations. * Rename CurrentData -> SourceRecord. * Split out source_tables collections. * Don't do initializeCurrentDataCollection for v1. * Initialize source records collection on resolveTable instead of flush. * Refactor more collection initialization. * Restructure v3 parameter index lookup values. * Update tests. * Further test fix. * Document collection structure. * Add some comments. * Rename. * Use $unionWith to find parameter index changes. * Fix current_data / source_records structure. * Further fixes for v1 current_data. * Cleanup. * Fix type issue post merge. * Rename postCommitCleanup. * Track pending deletes & fix other source_table issues. * Only cleanup pending deletes for affected source tables. * Add timeout and retry for deletes. * Increase clear timeout. * Retry inner clear operations instead of the entire loop. * Split bucket_state collections. * Split compactor implementations. * Avoid listing collections unless we drop them. * Split implementation versions, phase 1. * Split MongoChecksums. * Split MongoParameterCompactor. * Split methods in PersistedBatch. * Further split MongoBucketBatch. * Move out helpers for MongoSyncBucketStorage * Refactor MongoSyncBucketStorage. * Split db implementations. * Cleanup. * Tweak types. * More type tweaks. * Type tweak. * Simplify MongoChecksums. * Remove MongoCompactorBase. * Remove compatibility re-rexports. * Remove MongoParameterCompactorBase. * Rename MongoSyncBucketStorage. * Simplify db. * Split v1/v3 models. * Move files back to their original location. * Update docs. * Add back clear for current_data. * Add back missing metric tracking. * Check if collection exists before getting storage stats. * Report metrics even if there are no active replication streams yet. * Update comment. * Some collection listing cleanup. * Optimize bucket collection lookup for v3 compacting. * Remove unused functions. * Organize imports. * Cleanup types. * Reduce type casting in tests. * Further improve test types. * More type cast improvements and comments. * Reduce instance state on MongoCompactor. * Introduce SingleBucketStore. * Minor cleanup. * Explicitly list fields when persisting. * Merge implementations for saveBucketData. * Fix regression. * Avoid double-parsing sync rules in many cases when updating. * Add storage version parsing. * Use configured storage version. * Changeset. * Re-use persisted storage version in asUpdateOptions(). * Further tweaks and comments. * Fix tests.
1 parent d564c23 commit cdb8993

11 files changed

Lines changed: 147 additions & 2 deletions

File tree

.changeset/nice-eels-dance.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
---
2+
'@powersync/service-module-postgres-storage': patch
3+
'@powersync/service-module-mongodb-storage': patch
4+
'@powersync/service-core-tests': patch
5+
'@powersync/service-module-postgres': patch
6+
'@powersync/service-core': patch
7+
'@powersync/service-sync-rules': patch
8+
'@powersync/lib-service-mongodb': patch
9+
---
10+
11+
Add `config.storage_version` configuration option.

modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,8 @@ export class MongoBucketStorage extends storage.BucketStorageFactory {
156156
}
157157

158158
async updateSyncRules(options: storage.UpdateSyncRulesOptions): Promise<MongoPersistedSyncRulesContent> {
159-
const storageVersion = options.storageVersion ?? storage.CURRENT_STORAGE_VERSION;
159+
const storageVersion =
160+
options.storageVersion ?? options.config.parsed.config.storageVersion ?? storage.CURRENT_STORAGE_VERSION;
160161
const storageConfig = getMongoStorageConfig(storageVersion);
161162

162163
let rules: MongoPersistedSyncRulesContent | undefined = undefined;

modules/module-postgres-storage/src/storage/PostgresBucketStorageFactory.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,8 @@ export class PostgresBucketStorageFactory extends storage.BucketStorageFactory {
154154
}
155155

156156
async updateSyncRules(options: storage.UpdateSyncRulesOptions): Promise<PostgresPersistedSyncRulesContent> {
157-
const storageVersion = options.storageVersion ?? storage.CURRENT_STORAGE_VERSION;
157+
const storageVersion =
158+
options.storageVersion ?? options.config.parsed.config.storageVersion ?? storage.CURRENT_STORAGE_VERSION;
158159
const storageConfig = storage.STORAGE_VERSION_CONFIG[storageVersion];
159160
if (storageConfig == null) {
160161
throw new framework.ServiceError(

packages/service-core/src/routes/endpoints/admin.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,10 @@ export const reprocess = routeDefinition({
131131
});
132132
}
133133

134+
// There are some differences between this and using asUpdateOptions():
135+
// 1. This always re-parses the source YAML. If there are changes to the sync stream compiler, that can affect the sync plan.
136+
// 2. If the source does not set the storage version, this will update it do the current version.
137+
// We can consider tweaking this behavior in the future.
134138
const new_rules = await activeBucketStorage.updateSyncRules(
135139
storage.updateSyncRulesFromYaml(active.sync_rules.config.content, {
136140
// These sync rules already passed validation. But if the rules are not valid anymore due

packages/service-core/src/storage/PersistedSyncRulesContent.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,10 @@ export abstract class PersistedSyncRulesContent implements PersistedSyncRulesCon
103103
sourceText: this.sync_rules_content
104104
});
105105

106+
// Note: If the original content did not define a storage version, this will still set the storage version.
107+
// This means asUpdateOptions will not change the storage version, even if the default changes.
108+
precompiled.storageVersion = this.storageVersion;
109+
106110
const errors: YamlError[] = [];
107111
if (this.compiled_plan.errors) {
108112
for (const error of this.compiled_plan.errors) {
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/**
2+
* This is only for the purpose of validating the version in the sync config, for reporting configurable versions
3+
* upfront.
4+
*
5+
* The service itself may:
6+
* 1. Support additional storage versions, such as legacy storage versions.
7+
* 2. Attach specific behavior to storage versions.
8+
*
9+
* See: service-core/src/storage/StorageVersionConfig.js
10+
*/
11+
export interface ValidatedStorageVersion {
12+
version: number;
13+
14+
/**
15+
* If false, this version may be dropped or fundamentally changed in any future service version.
16+
*/
17+
stable: boolean;
18+
}
19+
20+
export const STORAGE_VERSIONS = new Map<number, ValidatedStorageVersion>([
21+
// version 1 is supported by the storage modules, but cannot be used in sync config
22+
[2, { version: 2, stable: true }],
23+
[3, { version: 3, stable: false }]
24+
]);
25+
26+
export const DEFAULT_STORAGE_VERSION = STORAGE_VERSIONS.get(2)!;
27+
28+
/**
29+
* Parse a storage version.
30+
*
31+
* If the version number is unknown or not supported, returns undefined.
32+
*
33+
* Generally, even storage versions are stable, and odd storage versions unstable.
34+
*/
35+
export function validateStorageVersion(version: number): ValidatedStorageVersion | undefined {
36+
return STORAGE_VERSIONS.get(version);
37+
}

packages/sync-rules/src/SyncConfig.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,12 @@ export abstract class SyncConfig {
1717
bucketParameterLookupSources: ParameterIndexLookupCreator[] = [];
1818
bucketSources: BucketSource[] = [];
1919
compatibility: CompatibilityContext = CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY;
20+
/**
21+
* If not defined, the storage module picks the latest stable version.
22+
*
23+
* Only supported storage versions can be set here when parsing from yaml.
24+
*/
25+
storageVersion: number | undefined;
2026
eventDescriptors: SqlEventDescriptor[] = [];
2127

2228
/**

packages/sync-rules/src/from_yaml.ts

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import { SqlEventDescriptor } from './events/SqlEventDescriptor.js';
1313
import { validateSyncRulesSchema } from './json_schema.js';
1414
import { QueryParseResult, SqlBucketDescriptor } from './SqlBucketDescriptor.js';
1515
import { SqlSyncRules } from './SqlSyncRules.js';
16+
import { validateStorageVersion } from './StorageVersion.js';
1617
import { syncStreamFromSql } from './streams/from_sql.js';
1718
import { javaScriptExpressionEngine } from './sync_plan/engine/javascript.js';
1819
import { PrecompiledSyncConfig } from './sync_plan/evaluator/index.js';
@@ -69,9 +70,11 @@ export class SyncConfigFromYaml {
6970
}
7071

7172
let compatibility: CompatibilityContext;
73+
let storageVersion: number | undefined;
7274
if (parsed.has('config')) {
7375
const declaredOptions = parsed.get('config') as YAMLMap;
7476
compatibility = this.#parseCompatibilityOptions(declaredOptions);
77+
storageVersion = this.#validateStorageVersion(declaredOptions);
7578
} else {
7679
compatibility = CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY;
7780
}
@@ -98,6 +101,8 @@ export class SyncConfigFromYaml {
98101
result = this.#legacyParseBucketDefinitionsAndStreams(bucketMap, streamMap, compatibility);
99102
}
100103

104+
result.storageVersion = storageVersion;
105+
101106
const eventDefinitions = this.#parseEventDefinitions(parsed, compatibility);
102107
result.eventDescriptors.push(...eventDefinitions);
103108

@@ -411,6 +416,31 @@ export class SyncConfigFromYaml {
411416
return rules;
412417
}
413418

419+
#validateStorageVersion(config: YAMLMap): number | undefined {
420+
const storageScalar = config.get('storage_version', true);
421+
if (storageScalar != null) {
422+
if (typeof storageScalar.value == 'number') {
423+
const rawVersion = storageScalar.value;
424+
const version = validateStorageVersion(storageScalar.value);
425+
if (version == null) {
426+
this.#errors.push(this.#yamlError(storageScalar, `Storage version ${storageScalar.value} is not supported`));
427+
} else if (!version.stable) {
428+
const error = this.#yamlError(
429+
storageScalar,
430+
`Storage version ${version.version} is unstable, and may cause unexpected behavior or stop functioning in any release`
431+
);
432+
error.type = 'warning';
433+
this.#errors.push(error);
434+
}
435+
return version?.version;
436+
} else {
437+
this.#errors.push(this.#yamlError(storageScalar, 'Storage version must be numeric'));
438+
return undefined;
439+
}
440+
}
441+
return undefined;
442+
}
443+
414444
#parseEventDefinitions(parsed: Document, compatibility: CompatibilityContext) {
415445
const eventMap = parsed.get('event_definitions') as YAMLMap;
416446
const eventDescriptors: SqlEventDescriptor[] = [];

packages/sync-rules/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ export * from './SqlDataQuery.js';
2222
export * from './SqlParameterQuery.js';
2323
export * from './SqlSyncRules.js';
2424
export * from './StaticSchema.js';
25+
export * from './StorageVersion.js';
2526
export { syncStreamFromSql } from './streams/from_sql.js';
2627
export { STREAM_FUNCTIONS } from './streams/functions.js';
2728
export { SyncStream } from './streams/stream.js';

packages/sync-rules/src/json_schema.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import ajvModule from 'ajv';
22
import { CompatibilityEdition, CompatibilityOption, TimeValuePrecision } from './compatibility.js';
3+
import { DEFAULT_STORAGE_VERSION, STORAGE_VERSIONS } from './StorageVersion.js';
34
// Hack to make this work both in NodeJS and a browser
45
const Ajv = ajvModule.default ?? ajvModule;
56
const ajv = new Ajv({ allErrors: true, verbose: true });
@@ -141,6 +142,12 @@ export const syncRulesSchema: ajvModule.Schema = {
141142
minimum: CompatibilityEdition.LEGACY,
142143
exclusiveMaximum: CompatibilityEdition.COMPILED_STREAMS + 1
143144
},
145+
storage_version: {
146+
type: 'integer',
147+
description: 'Storage version used by the storage database. By default, the latest stable version is used.',
148+
default: DEFAULT_STORAGE_VERSION.version,
149+
enum: [...STORAGE_VERSIONS.keys()]
150+
},
144151
timestamp_max_precision: {
145152
type: 'string',
146153
enum: Object.values(TimeValuePrecision.byName).map((e) => e.name)

0 commit comments

Comments
 (0)