Skip to content

Commit 17bc9a6

Browse files
authored
Add option to report diagnostic events (#159)
1 parent ffab710 commit 17bc9a6

File tree

8 files changed

+400
-5
lines changed

8 files changed

+400
-5
lines changed
Lines changed: 256 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,256 @@
1+
use alloc::vec;
2+
use alloc::{
3+
borrow::Cow,
4+
collections::btree_map::BTreeMap,
5+
string::{String, ToString},
6+
vec::Vec,
7+
};
8+
use serde::{
9+
Deserialize, Deserializer, Serialize,
10+
de::{IgnoredAny, Visitor},
11+
};
12+
13+
use crate::sync::{
14+
interface::{Instruction, StartSyncStream},
15+
line::{DataLine, OplogData, SyncLineStr},
16+
sync_status::{BucketProgress, DownloadSyncStatus},
17+
};
18+
19+
#[derive(Deserialize)]
20+
pub struct DiagnosticOptions {
21+
// currently empty, we enable diagnostics if an Option<Self> is Some()
22+
}
23+
24+
#[derive(Serialize)]
25+
pub enum DiagnosticsEvent {
26+
BucketStateChange {
27+
changes: Vec<BucketDownloadState>,
28+
incremental: bool,
29+
},
30+
SchemaChange(ObservedSchemaType),
31+
}
32+
33+
#[derive(Serialize)]
34+
pub struct BucketDownloadState {
35+
pub name: String,
36+
pub progress: BucketProgress,
37+
}
38+
39+
#[derive(Serialize)]
40+
pub struct ObservedSchemaType {
41+
pub table: String,
42+
pub column: String,
43+
pub value_type: ValueType,
44+
}
45+
46+
#[derive(Serialize, Clone, Copy, PartialEq)]
47+
pub enum ValueType {
48+
Null,
49+
String,
50+
Integer,
51+
Real,
52+
}
53+
54+
#[derive(Default)]
55+
pub struct DiagnosticsCollector {
56+
inferred_schema: BTreeMap<String, BTreeMap<String, ValueType>>,
57+
}
58+
59+
impl DiagnosticsCollector {
60+
pub fn for_options(options: &StartSyncStream) -> Option<Self> {
61+
options.diagnostics.as_ref().map(|_| Self::default())
62+
}
63+
64+
pub fn handle_tracking_checkpoint(
65+
&self,
66+
status: &DownloadSyncStatus,
67+
instructions: &mut Vec<Instruction>,
68+
) {
69+
let mut buckets = vec![];
70+
if let Some(downloading) = &status.downloading {
71+
for (name, progress) in &downloading.buckets {
72+
buckets.push(BucketDownloadState {
73+
name: name.clone(),
74+
progress: progress.clone(),
75+
});
76+
}
77+
}
78+
79+
instructions.push(Instruction::HandleDiagnostics(
80+
DiagnosticsEvent::BucketStateChange {
81+
changes: buckets,
82+
incremental: false,
83+
},
84+
));
85+
}
86+
87+
/// Updates the internal inferred schema with types from the handled data line.
88+
///
89+
/// Emits a diagnostic line for each changed column.
90+
pub fn handle_data_line<'a>(
91+
&mut self,
92+
line: &'a DataLine<'a>,
93+
status: &DownloadSyncStatus,
94+
instructions: &mut Vec<Instruction>,
95+
) {
96+
if let Some(download_status) = &status.downloading {
97+
if let Some(progress) = download_status.buckets.get(line.bucket.as_ref()) {
98+
let mut changes = vec![];
99+
changes.push(BucketDownloadState {
100+
name: line.bucket.to_string(),
101+
progress: progress.clone(),
102+
});
103+
104+
instructions.push(Instruction::HandleDiagnostics(
105+
DiagnosticsEvent::BucketStateChange {
106+
changes,
107+
incremental: true,
108+
},
109+
));
110+
}
111+
}
112+
113+
for op in &line.data {
114+
if let (Some(data), Some(object_type)) = (&op.data, &op.object_type) {
115+
let OplogData::Json { data } = data;
116+
let table = self
117+
.inferred_schema
118+
.entry(object_type.to_string())
119+
.or_default();
120+
121+
let mut de = serde_json::Deserializer::from_str(data);
122+
123+
struct TypeInferringVisitor<'a> {
124+
table_name: &'a str,
125+
table: &'a mut BTreeMap<String, ValueType>,
126+
instructions: &'a mut Vec<Instruction>,
127+
}
128+
129+
impl TypeInferringVisitor<'_> {
130+
fn observe_type<'a>(&mut self, name: Cow<'a, str>, column_type: ValueType) {
131+
if column_type == ValueType::Null {
132+
// We don't track nullability in the inferred schema.
133+
return;
134+
}
135+
136+
if let Some(existing) = self.table.get_mut(name.as_ref()) {
137+
if *existing != column_type && *existing != ValueType::String {
138+
*existing = column_type;
139+
140+
self.instructions.push(Instruction::HandleDiagnostics(
141+
DiagnosticsEvent::SchemaChange(ObservedSchemaType {
142+
table: self.table_name.to_string(),
143+
column: name.into_owned(),
144+
value_type: column_type,
145+
}),
146+
));
147+
}
148+
} else {
149+
let name = name.into_owned();
150+
self.table.insert(name.clone(), column_type);
151+
152+
self.instructions.push(Instruction::HandleDiagnostics(
153+
DiagnosticsEvent::SchemaChange(ObservedSchemaType {
154+
table: self.table_name.to_string(),
155+
column: name,
156+
value_type: column_type,
157+
}),
158+
));
159+
}
160+
}
161+
}
162+
163+
impl<'de> Visitor<'de> for TypeInferringVisitor<'de> {
164+
type Value = ();
165+
166+
fn expecting(&self, formatter: &mut core::fmt::Formatter) -> core::fmt::Result {
167+
write!(formatter, "a map")
168+
}
169+
170+
fn visit_map<A>(mut self, mut map: A) -> Result<Self::Value, A::Error>
171+
where
172+
A: serde::de::MapAccess<'de>,
173+
{
174+
while let Some(k) = map.next_key::<SyncLineStr<'de>>()? {
175+
if k == "id" {
176+
map.next_value::<IgnoredAny>()?;
177+
} else {
178+
let value_type = map.next_value::<ValueToValueType>()?.0;
179+
self.observe_type(k, value_type);
180+
}
181+
}
182+
183+
Ok(())
184+
}
185+
}
186+
187+
let _ = de.deserialize_map(TypeInferringVisitor {
188+
table_name: object_type,
189+
table,
190+
instructions,
191+
});
192+
}
193+
}
194+
}
195+
}
196+
197+
/// Utility to deserialize the [ValueType] from a [serde_json::Value] without reading it into a
198+
/// structure that requires allocation.
199+
struct ValueToValueType(ValueType);
200+
201+
impl<'de> Deserialize<'de> for ValueToValueType {
202+
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
203+
where
204+
D: serde::Deserializer<'de>,
205+
{
206+
struct ValueTypeVisitor;
207+
208+
impl<'de> Visitor<'de> for ValueTypeVisitor {
209+
type Value = ValueType;
210+
211+
fn expecting(&self, formatter: &mut core::fmt::Formatter) -> core::fmt::Result {
212+
write!(formatter, "a sync value")
213+
}
214+
215+
fn visit_f64<E>(self, _v: f64) -> Result<Self::Value, E>
216+
where
217+
E: serde::de::Error,
218+
{
219+
Ok(ValueType::Real)
220+
}
221+
222+
fn visit_u64<E>(self, _v: u64) -> Result<Self::Value, E>
223+
where
224+
E: serde::de::Error,
225+
{
226+
Ok(ValueType::Integer)
227+
}
228+
229+
fn visit_i64<E>(self, _v: i64) -> Result<Self::Value, E>
230+
where
231+
E: serde::de::Error,
232+
{
233+
Ok(ValueType::Integer)
234+
}
235+
236+
fn visit_str<E>(self, _v: &str) -> Result<Self::Value, E>
237+
where
238+
E: serde::de::Error,
239+
{
240+
Ok(ValueType::String)
241+
}
242+
243+
fn visit_unit<E>(self) -> Result<Self::Value, E>
244+
where
245+
E: serde::de::Error,
246+
{
247+
// Unit is used to represent nulls, see https://github.com/serde-rs/json/blob/4f6dbfac79647d032b0997b5ab73022340c6dab7/src/de.rs#L1404-L1409
248+
Ok(ValueType::Null)
249+
}
250+
}
251+
252+
Ok(ValueToValueType(
253+
deserializer.deserialize_any(ValueTypeVisitor)?,
254+
))
255+
}
256+
}

crates/core/src/sync/interface.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use crate::create_sqlite_text_fn;
88
use crate::error::PowerSyncError;
99
use crate::schema::Schema;
1010
use crate::state::DatabaseState;
11+
use crate::sync::diagnostics::{DiagnosticOptions, DiagnosticsEvent};
1112
use crate::sync::storage_adapter::StorageAdapter;
1213
use crate::sync::subscriptions::{StreamKey, apply_subscriptions};
1314
use alloc::borrow::Cow;
@@ -43,6 +44,10 @@ pub struct StartSyncStream {
4344
pub active_streams: Rc<Vec<StreamKey>>,
4445
#[serde(default)]
4546
pub app_metadata: Option<Box<RawValue>>,
47+
48+
/// Whether sync diagnostics with detailed download stats and inferred schema should be reported
49+
/// by the sync client.
50+
pub diagnostics: Option<DiagnosticOptions>,
4651
}
4752

4853
impl StartSyncStream {
@@ -59,6 +64,7 @@ impl Default for StartSyncStream {
5964
include_defaults: Self::include_defaults_by_default(),
6065
active_streams: Default::default(),
6166
app_metadata: Default::default(),
67+
diagnostics: Default::default(),
6268
}
6369
}
6470
}
@@ -138,6 +144,11 @@ pub enum Instruction {
138144
FlushFileSystem {},
139145
/// Notify that a sync has been completed, prompting client SDKs to clear earlier errors.
140146
DidCompleteSync {},
147+
148+
/// Handle a diagnostic event.
149+
///
150+
/// This instruction is only emitted if diagnostics have been enabled on [StartSyncStream].
151+
HandleDiagnostics(DiagnosticsEvent),
141152
}
142153

143154
#[derive(Serialize, Default)]

crates/core/src/sync/line.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use super::bucket_priority::BucketPriority;
1616
/// With the JSON decoder, borrowing from input data is only possible when the string contains no
1717
/// escape sequences (otherwise, the string is not a direct view of input data and we need an
1818
/// internal copy).
19-
type SyncLineStr<'a> = Cow<'a, str>;
19+
pub type SyncLineStr<'a> = Cow<'a, str>;
2020

2121
#[derive(Clone, Copy)]
2222
pub enum SyncLineSource<'a> {

crates/core/src/sync/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use powersync_sqlite_nostd::{self as sqlite, ResultCode};
44
mod bucket_priority;
55
pub mod checkpoint;
66
mod checksum;
7+
mod diagnostics;
78
mod interface;
89
pub mod line;
910
pub mod operations;

crates/core/src/sync/streaming_sync.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use crate::{
2424
sync::{
2525
BucketPriority,
2626
checkpoint::OwnedBucketChecksum,
27+
diagnostics::DiagnosticsCollector,
2728
interface::{CloseSyncStream, StartSyncStream, StreamSubscriptionRequest},
2829
line::{
2930
BucketSubscriptionReason, DataLine, StreamDescription, StreamSubscriptionError,
@@ -148,11 +149,12 @@ impl SyncIterationHandle {
148149
) -> Result<Self, PowerSyncError> {
149150
let runner = StreamingSyncIteration {
150151
db,
152+
validated_but_not_applied: None,
153+
diagnostics: DiagnosticsCollector::for_options(&options),
151154
options,
152155
state,
153156
adapter: StorageAdapter::new(db)?,
154157
status: SyncStatusContainer::new(),
155-
validated_but_not_applied: None,
156158
};
157159
let future = runner.run().boxed_local();
158160

@@ -231,6 +233,7 @@ struct StreamingSyncIteration {
231233
// pending local data. We will retry applying this checkpoint when the client SDK informs us
232234
// that it has finished uploading changes.
233235
validated_but_not_applied: Option<OwnedCheckpoint>,
236+
diagnostics: Option<DiagnosticsCollector>,
234237
}
235238

236239
impl StreamingSyncIteration {
@@ -456,10 +459,20 @@ impl StreamingSyncIteration {
456459
// something worth doing.
457460
self.validated_but_not_applied = None;
458461
*target = updated_target;
462+
463+
if let Some(diagnostics) = &self.diagnostics {
464+
let status = self.status.inner().borrow();
465+
diagnostics.handle_tracking_checkpoint(&*status, &mut event.instructions);
466+
}
459467
}
460468
SyncStateMachineTransition::DataLineSaved { line } => {
461469
self.status
462470
.update(|s| s.track_line(&line), &mut event.instructions);
471+
472+
if let Some(diagnostics) = &mut self.diagnostics {
473+
let status = self.status.inner().borrow();
474+
diagnostics.handle_data_line(line, &*status, &mut event.instructions);
475+
}
463476
}
464477
SyncStateMachineTransition::CloseIteration(close) => return Some(close),
465478
SyncStateMachineTransition::SyncLocalFailedDueToPendingCrud {

crates/core/src/sync/sync_status.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ pub struct SyncPriorityStatus {
267267
}
268268

269269
/// Per-bucket download progress information.
270-
#[derive(Serialize, Hash)]
270+
#[derive(Serialize, Hash, Clone)]
271271
pub struct BucketProgress {
272272
pub priority: BucketPriority,
273273
pub at_last: i64,
@@ -277,7 +277,7 @@ pub struct BucketProgress {
277277

278278
#[derive(Hash)]
279279
pub struct SyncDownloadProgress {
280-
buckets: BTreeMap<String, BucketProgress>,
280+
pub buckets: BTreeMap<String, BucketProgress>,
281281
}
282282

283283
impl Serialize for SyncDownloadProgress {

0 commit comments

Comments
 (0)