Skip to content

Commit ec1d965

Browse files
committed
fix(sqlite-native): restore native startup kv preload
1 parent 7eb7cba commit ec1d965

9 files changed

Lines changed: 248 additions & 17 deletions

File tree

rivetkit-typescript/packages/rivetkit-native/index.d.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ export interface QueryResult {
1818
rows: Array<Array<any>>
1919
}
2020
/** Open a native SQLite database backed by the envoy's KV channel. */
21-
export declare function openDatabaseFromEnvoy(jsHandle: JsEnvoyHandle, actorId: string): Promise<JsNativeDatabase>
21+
export declare function openDatabaseFromEnvoy(jsHandle: JsEnvoyHandle, actorId: string, preloadedEntries?: Array<JsKvEntry> | undefined | null): Promise<JsNativeDatabase>
2222
/** Configuration for starting the native envoy client. */
2323
export interface JsEnvoyConfig {
2424
endpoint: string

rivetkit-typescript/packages/rivetkit-native/src/bridge_actor.rs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ impl EnvoyCallbacks for BridgeCallbacks {
7070
actor_id: String,
7171
generation: u32,
7272
config: protocol::ActorConfig,
73-
_preloaded_kv: Option<protocol::PreloadedKv>,
73+
preloaded_kv: Option<protocol::PreloadedKv>,
7474
) -> BoxFuture<anyhow::Result<()>> {
7575
let response_map = self.response_map.clone();
7676
let event_cb = self.event_cb.clone();
@@ -85,6 +85,7 @@ impl EnvoyCallbacks for BridgeCallbacks {
8585
"key": config.key,
8686
"createTs": config.create_ts,
8787
"input": config.input.map(|v| base64_encode(&v)),
88+
"preloadedKv": preloaded_kv.as_ref().map(encode_preloaded_kv),
8889
"responseId": response_id,
8990
});
9091

@@ -359,3 +360,20 @@ fn base64_decode(data: &str) -> Option<Vec<u8>> {
359360
use base64::Engine;
360361
base64::engine::general_purpose::STANDARD.decode(data).ok()
361362
}
363+
364+
fn encode_preloaded_kv(preloaded_kv: &protocol::PreloadedKv) -> serde_json::Value {
365+
serde_json::json!({
366+
"entries": preloaded_kv.entries.iter().map(|entry| {
367+
serde_json::json!({
368+
"key": base64_encode(&entry.key),
369+
"value": base64_encode(&entry.value),
370+
"metadata": {
371+
"version": base64_encode(&entry.metadata.version),
372+
"updateTs": entry.metadata.update_ts,
373+
},
374+
})
375+
}).collect::<Vec<_>>(),
376+
"requestedGetKeys": preloaded_kv.requested_get_keys.iter().map(|key| base64_encode(key)).collect::<Vec<_>>(),
377+
"requestedPrefixes": preloaded_kv.requested_prefixes.iter().map(|key| base64_encode(key)).collect::<Vec<_>>(),
378+
})
379+
}

rivetkit-typescript/packages/rivetkit-native/src/database.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use rivetkit_sqlite_native::vfs::{KvVfs, NativeDatabase};
1919
use tokio::runtime::Handle;
2020

2121
use crate::envoy_handle::JsEnvoyHandle;
22+
use crate::types::JsKvEntry;
2223

2324
/// SqliteKv adapter that routes operations through the envoy handle's KV methods.
2425
pub struct EnvoyKv {
@@ -501,12 +502,24 @@ fn exec_statements(db: *mut sqlite3, sql: &str) -> napi::Result<QueryResult> {
501502
pub async fn open_database_from_envoy(
502503
js_handle: &JsEnvoyHandle,
503504
actor_id: String,
505+
preloaded_entries: Option<Vec<JsKvEntry>>,
504506
) -> napi::Result<JsNativeDatabase> {
505507
let envoy_kv = Arc::new(EnvoyKv::new(js_handle.handle.clone(), actor_id.clone()));
508+
let preloaded_entries = preloaded_entries
509+
.unwrap_or_default()
510+
.into_iter()
511+
.map(|entry| (entry.key.to_vec(), entry.value.to_vec()))
512+
.collect();
506513
let rt_handle = Handle::current();
507514
let db = tokio::task::spawn_blocking(move || {
508515
let vfs_name = format!("envoy-kv-{}", actor_id);
509-
let vfs = KvVfs::register(&vfs_name, envoy_kv, actor_id.clone(), rt_handle)
516+
let vfs = KvVfs::register(
517+
&vfs_name,
518+
envoy_kv,
519+
actor_id.clone(),
520+
rt_handle,
521+
preloaded_entries,
522+
)
510523
.map_err(|e| napi::Error::from_reason(format!("failed to register VFS: {}", e)))?;
511524

512525
rivetkit_sqlite_native::vfs::open_database(vfs, &actor_id)

rivetkit-typescript/packages/rivetkit-native/wrapper.d.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ export declare function startEnvoy(config: EnvoyConfig): Promise<EnvoyHandle>;
126126
export declare function openDatabaseFromEnvoy(
127127
handle: EnvoyHandle,
128128
actorId: string,
129+
preloadedEntries?: readonly [Uint8Array, Uint8Array][] | null,
129130
): Promise<JsNativeDatabase>;
130131

131132
export interface NativeRawDatabase {
@@ -139,6 +140,7 @@ export interface NativeRawDatabase {
139140
export declare function openRawDatabaseFromEnvoy(
140141
handle: EnvoyHandle,
141142
actorId: string,
143+
preloadedEntries?: readonly [Uint8Array, Uint8Array][] | null,
142144
): Promise<NativeRawDatabase>;
143145

144146
export declare const utils: {};

rivetkit-typescript/packages/rivetkit-native/wrapper.js

Lines changed: 40 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -159,9 +159,40 @@ async function startEnvoy(config) {
159159
/**
160160
* Open a native database backed by envoy KV.
161161
*/
162-
async function openDatabaseFromEnvoy(handle, actorId) {
162+
async function openDatabaseFromEnvoy(handle, actorId, preloadedEntries) {
163163
const rawHandle = handle._raw || handle;
164-
return native.openDatabaseFromEnvoy(rawHandle, actorId);
164+
const nativePreloadedEntries = preloadedEntries
165+
? preloadedEntries.map(([key, value]) => ({
166+
key: Buffer.from(key),
167+
value: Buffer.from(value),
168+
}))
169+
: null;
170+
return native.openDatabaseFromEnvoy(
171+
rawHandle,
172+
actorId,
173+
nativePreloadedEntries,
174+
);
175+
}
176+
177+
function decodePreloadedKv(preloadedKv) {
178+
if (!preloadedKv) {
179+
return null;
180+
}
181+
182+
const decodeBytes = (value) => Uint8Array.from(Buffer.from(value, "base64"));
183+
184+
return {
185+
entries: (preloadedKv.entries || []).map((entry) => ({
186+
key: decodeBytes(entry.key),
187+
value: decodeBytes(entry.value),
188+
metadata: {
189+
version: decodeBytes(entry.metadata.version),
190+
updateTs: entry.metadata.updateTs,
191+
},
192+
})),
193+
requestedGetKeys: (preloadedKv.requestedGetKeys || []).map(decodeBytes),
194+
requestedPrefixes: (preloadedKv.requestedPrefixes || []).map(decodeBytes),
195+
};
165196
}
166197

167198
function isPlainObject(value) {
@@ -279,8 +310,12 @@ function wrapNativeStorageError(nativeDb, error) {
279310
);
280311
}
281312

282-
async function openRawDatabaseFromEnvoy(handle, actorId) {
283-
const nativeDb = await openDatabaseFromEnvoy(handle, actorId);
313+
async function openRawDatabaseFromEnvoy(handle, actorId, preloadedEntries) {
314+
const nativeDb = await openDatabaseFromEnvoy(
315+
handle,
316+
actorId,
317+
preloadedEntries,
318+
);
284319
let closed = false;
285320

286321
const ensureOpen = () => {
@@ -357,7 +392,7 @@ function handleEvent(event, config, wrappedHandle) {
357392
event.actorId,
358393
event.generation,
359394
actorConfig,
360-
null, // preloadedKv
395+
decodePreloadedKv(event.preloadedKv),
361396
),
362397
).then(
363398
async () => {

rivetkit-typescript/packages/rivetkit/src/db/config.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,10 @@ export interface NativeSqliteConfig {
1717
* Replaces the transport-config-based NativeSqliteConfig seam.
1818
*/
1919
export interface NativeDatabaseProvider {
20-
open(actorId: string): Promise<RawAccess>;
20+
open(
21+
actorId: string,
22+
preloadedEntries?: [Uint8Array, Uint8Array][],
23+
): Promise<RawAccess>;
2124
}
2225

2326
/**

rivetkit-typescript/packages/rivetkit/src/db/mod.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,10 @@ export function db({
5252
// path where databases are opened from a live runtime handle
5353
// (e.g., the native envoy client).
5454
if (ctx.nativeDatabaseProvider) {
55-
return await ctx.nativeDatabaseProvider.open(ctx.actorId);
55+
return await ctx.nativeDatabaseProvider.open(
56+
ctx.actorId,
57+
ctx.preloadedEntries,
58+
);
5659
}
5760

5861
const { database: db, kvStore } = await openActorDatabase(ctx);

rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -584,10 +584,14 @@ export class EngineActorDriver implements ActorDriver {
584584

585585
const envoy = this.#envoy;
586586
return {
587-
open: async (actorId: string) => {
587+
open: async (
588+
actorId: string,
589+
preloadedEntries?: [Uint8Array, Uint8Array][],
590+
) => {
588591
return await nativeMod.openRawDatabaseFromEnvoy(
589592
envoy,
590593
actorId,
594+
preloadedEntries,
591595
);
592596
},
593597
};

0 commit comments

Comments
 (0)