-
Notifications
You must be signed in to change notification settings - Fork 11
Expand file tree
/
Copy pathinterface.rs
More file actions
343 lines (311 loc) · 12.1 KB
/
interface.rs
File metadata and controls
343 lines (311 loc) · 12.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
use core::cell::RefCell;
use core::ffi::{c_int, c_void};
use super::streaming_sync::SyncClient;
use super::sync_status::DownloadSyncStatus;
use crate::constants::SUBTYPE_JSON;
use crate::create_sqlite_text_fn;
use crate::error::PowerSyncError;
use crate::schema::Schema;
use crate::state::DatabaseState;
use crate::sync::storage_adapter::StorageAdapter;
use crate::sync::subscriptions::{StreamKey, apply_subscriptions};
use alloc::borrow::Cow;
use alloc::boxed::Box;
use alloc::rc::Rc;
use alloc::{string::String, vec::Vec};
use powersync_sqlite_nostd::bindings::SQLITE_RESULT_SUBTYPE;
use powersync_sqlite_nostd::{self as sqlite, ColumnType};
use powersync_sqlite_nostd::{Connection, Context};
use serde::{Deserialize, Serialize};
use serde_json::value::RawValue;
use sqlite::{ResultCode, Value};
use crate::sync::BucketPriority;
use crate::util::JsonString;
/// Payload provided by SDKs when requesting a sync iteration.
#[derive(Deserialize)]
pub struct StartSyncStream {
/// Bucket parameters to include in the request when opening a sync stream.
#[serde(default)]
pub parameters: Option<serde_json::Map<String, serde_json::Value>>,
#[serde(default)]
pub schema: Schema,
/// Whether to request default streams in the generated sync request.
#[serde(default = "StartSyncStream::include_defaults_by_default")]
pub include_defaults: bool,
/// Streams that are currently active in the app.
///
/// We will increase the expiry date for those streams at the time we connect and disconnect.
#[serde(default)]
pub active_streams: Rc<Vec<StreamKey>>,
#[serde(default)]
pub app_metadata: Option<Box<RawValue>>,
}
impl StartSyncStream {
pub const fn include_defaults_by_default() -> bool {
true
}
}
impl Default for StartSyncStream {
fn default() -> Self {
Self {
parameters: Default::default(),
schema: Default::default(),
include_defaults: Self::include_defaults_by_default(),
active_streams: Default::default(),
app_metadata: Default::default(),
}
}
}
/// A request sent from a client SDK to the [SyncClient] with a `powersync_control` invocation.
pub enum SyncControlRequest<'a> {
/// The client requests to start a sync iteration.
///
/// Earlier iterations are implicitly dropped when receiving this request.
StartSyncStream(StartSyncStream),
/// The client requests to stop the current sync iteration.
StopSyncStream,
/// The client is forwading a sync event to the core extension.
SyncEvent(SyncEvent<'a>),
}
pub enum SyncEvent<'a> {
/// A synthetic event forwarded to the [SyncClient] after being started.
Initialize,
/// An event requesting the sync client to shut down.
TearDown,
/// Notifies the sync client that a token has been refreshed.
///
/// In response, we'll stop the current iteration to begin another one with the new token.
DidRefreshToken,
/// Notifies the sync client that the current CRUD upload (for which the client SDK is
/// responsible) has finished.
///
/// If pending CRUD entries have previously prevented a sync from completing, this even can be
/// used to try again.
UploadFinished,
ConnectionEstablished,
StreamEnded,
/// Forward a text line (JSON) received from the sync service.
TextLine {
data: &'a str,
},
/// Forward a binary line (BSON) received from the sync service.
BinaryLine {
data: &'a [u8],
},
/// The active stream subscriptions (as in, `SyncStreamSubscription` instances active right now)
/// have changed.
///
/// The client will compare the new active subscriptions with the current one and will issue a
/// request to restart the sync iteration if necessary.
DidUpdateSubscriptions {
active_streams: Rc<Vec<StreamKey>>,
},
}
/// An instruction sent by the core extension to the SDK.
#[derive(Serialize)]
pub enum Instruction {
LogLine {
severity: LogSeverity,
line: Cow<'static, str>,
},
/// Update the download status for the ongoing sync iteration.
UpdateSyncStatus {
status: Rc<RefCell<DownloadSyncStatus>>,
},
/// Connect to the sync service using the [StreamingSyncRequest] created by the core extension,
/// and then forward received lines via [SyncEvent::TextLine] and [SyncEvent::BinaryLine].
EstablishSyncStream { request: StreamingSyncRequest },
FetchCredentials {
/// Whether the credentials currently used have expired.
///
/// If false, this is a pre-fetch.
did_expire: bool,
},
// These are defined like this because deserializers in Kotlin can't support either an
// object or a literal value
/// Close the websocket / HTTP stream to the sync service.
CloseSyncStream(CloseSyncStream),
/// Flush the file-system if it's non-durable (only applicable to the Dart SDK).
FlushFileSystem {},
/// Notify that a sync has been completed, prompting client SDKs to clear earlier errors.
DidCompleteSync {},
}
#[derive(Serialize, Default)]
pub struct CloseSyncStream {
/// Whether clients should hide the brief disconnected status from the public sync status and
/// reconnect immediately.
pub hide_disconnect: bool,
}
#[derive(Serialize)]
pub enum LogSeverity {
DEBUG,
INFO,
WARNING,
}
#[derive(Serialize)]
pub struct StreamingSyncRequest {
pub buckets: Vec<BucketRequest>,
pub include_checksum: bool,
pub raw_data: bool,
pub binary_data: bool,
pub client_id: String,
pub parameters: Option<serde_json::Map<String, serde_json::Value>>,
pub streams: Rc<StreamSubscriptionRequest>,
#[serde(skip_serializing_if = "Option::is_none")]
pub app_metadata: Option<Box<RawValue>>,
}
#[derive(Debug, Serialize, PartialEq)]
pub struct StreamSubscriptionRequest {
pub include_defaults: bool,
pub subscriptions: Vec<RequestedStreamSubscription>,
}
#[derive(Debug, Serialize, PartialEq)]
pub struct RequestedStreamSubscription {
/// The name of the sync stream to subscribe to.
pub stream: String,
/// Parameters to make available in the stream's definition.
pub parameters: Option<Box<JsonString>>,
pub override_priority: Option<BucketPriority>,
}
#[derive(Serialize)]
pub struct BucketRequest {
pub name: String,
pub after: String,
}
pub fn register(db: *mut sqlite::sqlite3, state: Rc<DatabaseState>) -> Result<(), ResultCode> {
extern "C" fn control(
ctx: *mut sqlite::context,
argc: c_int,
argv: *mut *mut sqlite::value,
) -> () {
let result = (|| -> Result<(), PowerSyncError> {
let db = ctx.db_handle();
debug_assert!(!db.get_autocommit());
let state = unsafe { DatabaseState::from_context(&ctx) };
let args = sqlite::args!(argc, argv);
let [op, payload] = args else {
// This should be unreachable, we register the function with two arguments.
return Err(PowerSyncError::unknown_internal());
};
if op.value_type() != ColumnType::Text {
return Err(PowerSyncError::argument_error(
"First argument must be a string",
));
}
let op = op.text();
let event = match op {
"start" => {
// Ensure the operations vtab exists. It's not actually used by the sync client,
// but we rely on that vtab being destroyed as a pre-close hook for the database
// connection to free statements preserved across multiple powersync_control
// invocations.
db.exec_safe(
"insert into powersync_operations (op, data) VALUES ('noop', null);",
)?;
SyncControlRequest::StartSyncStream({
if payload.value_type() == ColumnType::Text {
serde_json::from_str(payload.text())
.map_err(PowerSyncError::as_argument_error)?
} else {
StartSyncStream::default()
}
})
}
"stop" => SyncControlRequest::StopSyncStream,
"line_text" => SyncControlRequest::SyncEvent(SyncEvent::TextLine {
data: if payload.value_type() == ColumnType::Text {
payload.text()
} else {
return Err(PowerSyncError::argument_error(
"Second argument must be a string",
));
},
}),
"line_binary" => SyncControlRequest::SyncEvent(SyncEvent::BinaryLine {
data: if payload.value_type() == ColumnType::Blob {
payload.blob()
} else {
return Err(PowerSyncError::argument_error(
"Second argument must be a byte array",
));
},
}),
"refreshed_token" => SyncControlRequest::SyncEvent(SyncEvent::DidRefreshToken),
"completed_upload" => SyncControlRequest::SyncEvent(SyncEvent::UploadFinished),
"update_subscriptions" => {
SyncControlRequest::SyncEvent(SyncEvent::DidUpdateSubscriptions {
active_streams: serde_json::from_str(payload.text())
.map_err(PowerSyncError::as_argument_error)?,
})
}
"connection" => SyncControlRequest::SyncEvent(match payload.text() {
"established" => SyncEvent::ConnectionEstablished,
"end" => SyncEvent::StreamEnded,
_ => {
return Err(PowerSyncError::argument_error("unknown connection event"));
}
}),
"subscriptions" => {
let request = serde_json::from_str(payload.text())
.map_err(PowerSyncError::as_argument_error)?;
return apply_subscriptions(db, request);
}
_ => {
return Err(PowerSyncError::argument_error("Unknown operation"));
}
};
let instructions = {
let mut client = state.sync_client.borrow_mut();
client
.get_or_insert_with(|| {
let state = unsafe { DatabaseState::clone_from(ctx.user_data()) };
SyncClient::new(db, &state)
})
.push_event(event)
}?;
let formatted =
serde_json::to_string(&instructions).map_err(PowerSyncError::internal)?;
ctx.result_text_transient(&formatted);
ctx.result_subtype(SUBTYPE_JSON);
Ok(())
})();
if let Err(e) = result {
e.apply_to_ctx("powersync_control", ctx);
}
}
db.create_function_v2(
"powersync_control",
2,
sqlite::UTF8 | sqlite::DIRECTONLY | SQLITE_RESULT_SUBTYPE,
Some(Rc::into_raw(state.clone()) as *mut c_void),
Some(control),
None,
None,
Some(DatabaseState::destroy_rc),
)?;
db.create_function_v2(
"powersync_offline_sync_status",
0,
sqlite::UTF8 | sqlite::DIRECTONLY | SQLITE_RESULT_SUBTYPE,
None,
Some(powersync_offline_sync_status),
None,
None,
None,
)?;
Ok(())
}
fn powersync_offline_sync_status_impl(
ctx: *mut sqlite::context,
_args: &[*mut sqlite::value],
) -> Result<String, PowerSyncError> {
let adapter = StorageAdapter::new(ctx.db_handle())?;
let state = adapter.offline_sync_state()?;
let serialized = serde_json::to_string(&state).map_err(PowerSyncError::internal)?;
Ok(serialized)
}
create_sqlite_text_fn!(
powersync_offline_sync_status,
powersync_offline_sync_status_impl,
"powersync_offline_sync_status"
);