Skip to content

Commit a70f7a8

Browse files
committed
feat: (re-)enable timestamp+offset based pagination optimization
#53 enabled #559 disabled And this patch enables it for MySQL and Postgres but not Spanner.
1 parent ed6e27f commit a70f7a8

6 files changed

Lines changed: 248 additions & 55 deletions

File tree

syncserver/src/web/extractors/bso_query_params.rs

Lines changed: 49 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,6 @@ impl From<Offset> for params::Offset {
3232
impl FromStr for Offset {
3333
type Err = ParseIntError;
3434
fn from_str(s: &str) -> Result<Self, Self::Err> {
35-
// issue559: Disable ':' support for now: simply parse as i64 as
36-
// previously (it was u64 previously but i64's close enough)
37-
let result = Offset {
38-
timestamp: None,
39-
offset: s.parse::<u64>()?,
40-
};
41-
/*
4235
let result = match s.chars().position(|c| c == ':') {
4336
None => Offset {
4437
timestamp: None,
@@ -55,7 +48,6 @@ impl FromStr for Offset {
5548
}
5649
}
5750
};
58-
*/
5951
Ok(result)
6052
}
6153
}
@@ -201,8 +193,7 @@ impl FromRequest for BsoQueryParams {
201193
None,
202194
)
203195
})?;
204-
// issue559: Dead code (timestamp always None)
205-
/*
196+
206197
if params.sort != Sorting::Index {
207198
if let Some(timestamp) = params.offset.as_ref().and_then(|offset| offset.timestamp)
208199
{
@@ -230,7 +221,7 @@ impl FromRequest for BsoQueryParams {
230221
}
231222
}
232223
}
233-
*/
224+
234225
Ok(params)
235226
})
236227
}
@@ -288,19 +279,60 @@ mod tests {
288279
assert!(result.full);
289280
}
290281

282+
#[test]
283+
fn test_offset_bound_below_newer() {
284+
let state = make_state();
285+
let req = TestRequest::with_uri("/?sort=newest&newer=2.22&offset=1111:1")
286+
.data(state)
287+
.to_http_request();
288+
let result = block_on(BsoQueryParams::extract(&req));
289+
assert!(result.is_err());
290+
let resp: HttpResponse = result.err().unwrap().into();
291+
assert_eq!(resp.status(), 400);
292+
}
293+
294+
#[test]
295+
fn test_offset_bound_above_older() {
296+
let state = make_state();
297+
let req = TestRequest::with_uri("/?sort=newest&older=2.22&offset=5858:1")
298+
.data(state)
299+
.to_http_request();
300+
let result = block_on(BsoQueryParams::extract(&req));
301+
assert!(result.is_err());
302+
let resp: HttpResponse = result.err().unwrap().into();
303+
assert_eq!(resp.status(), 400);
304+
}
305+
306+
#[test]
307+
fn test_offset_bound_within_range() {
308+
let state = make_state();
309+
let req = TestRequest::with_uri("/?sort=newest&newer=1.23&older=5.43&offset=3838:1")
310+
.data(state)
311+
.to_http_request();
312+
let result = block_on(BsoQueryParams::extract(&req));
313+
assert!(result.is_ok());
314+
}
315+
316+
#[test]
317+
fn test_bound_validation_skipped_for_index_sort() {
318+
let state = make_state();
319+
let req = TestRequest::with_uri("/?sort=index&newer=2.22&offset=1111:1")
320+
.data(state)
321+
.to_http_request();
322+
let result = block_on(BsoQueryParams::extract(&req));
323+
assert!(result.is_ok());
324+
}
325+
291326
#[actix_rt::test]
292327
async fn test_offset() {
293328
let sample_offset = params::Offset {
294329
timestamp: Some(SyncTimestamp::default()),
295330
offset: 1234,
296331
};
297332

298-
let test_offset = Offset {
299-
timestamp: None,
300-
offset: sample_offset.offset,
301-
};
302-
303333
let offset_str = sample_offset.to_string();
304-
assert!(test_offset == Offset::from_str(&offset_str).unwrap())
334+
let parsed = Offset::from_str(&offset_str).unwrap();
335+
assert_eq!(parsed.offset, sample_offset.offset);
336+
assert_eq!(parsed.timestamp, sample_offset.timestamp,);
305337
}
306338
}

syncstorage-db-common/src/params.rs

Lines changed: 56 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -68,27 +68,16 @@ pub struct Offset {
6868

6969
impl Display for Offset {
7070
fn fmt(&self, fmt: &mut Formatter) -> Result<(), fmt::Error> {
71-
// issue559: Disable ':' support for now.
72-
write!(fmt, "{}", self.offset)
73-
/*
7471
match self.timestamp {
75-
None => self.offset.to_string(),
76-
Some(ts) => format!("{}:{}", ts.as_i64(), self.offset),
72+
None => write!(fmt, "{}", self.offset),
73+
Some(ts) => write!(fmt, "{}:{}", ts.as_i64(), self.offset),
7774
}
78-
*/
7975
}
8076
}
8177

8278
impl FromStr for Offset {
8379
type Err = ParseIntError;
8480
fn from_str(s: &str) -> Result<Self, Self::Err> {
85-
// issue559: Disable ':' support for now: simply parse as i64 as
86-
// previously (it was u64 previously but i64's close enough)
87-
let result = Offset {
88-
timestamp: None,
89-
offset: s.parse::<u64>()?,
90-
};
91-
/*
9281
let result = match s.chars().position(|c| c == ':') {
9382
None => Offset {
9483
timestamp: None,
@@ -105,11 +94,64 @@ impl FromStr for Offset {
10594
}
10695
}
10796
};
108-
*/
10997
Ok(result)
11098
}
11199
}
112100

101+
#[cfg(test)]
102+
mod tests {
103+
use std::str::FromStr;
104+
105+
use super::Offset;
106+
use crate::util::SyncTimestamp;
107+
108+
#[test]
109+
fn offset_display_without_timestamp() {
110+
let offset = Offset {
111+
timestamp: None,
112+
offset: 50,
113+
};
114+
assert_eq!(offset.to_string(), "50");
115+
}
116+
117+
#[test]
118+
fn offset_display_with_timestamp() {
119+
let offset = Offset {
120+
timestamp: Some(SyncTimestamp::from_milliseconds(676767)),
121+
offset: 2,
122+
};
123+
assert_eq!(offset.to_string(), "676767:2");
124+
}
125+
126+
#[test]
127+
fn offset_without_timestamp_parsed() {
128+
let original = Offset {
129+
timestamp: None,
130+
offset: 99,
131+
};
132+
let parsed = Offset::from_str(&original.to_string()).unwrap();
133+
assert_eq!(parsed.offset, original.offset);
134+
assert!(parsed.timestamp.is_none());
135+
}
136+
137+
#[test]
138+
fn offset_with_timestamp_parsed() {
139+
let original = Offset {
140+
timestamp: Some(SyncTimestamp::from_milliseconds(71138383838)),
141+
offset: 3,
142+
};
143+
let parsed = Offset::from_str(&original.to_string()).unwrap();
144+
assert_eq!(parsed.offset, original.offset);
145+
assert_eq!(parsed.timestamp, original.timestamp);
146+
}
147+
148+
#[test]
149+
fn offset_fromstr_malformed_returns_error() {
150+
assert!(Offset::from_str("quux").is_err());
151+
assert!(Offset::from_str("wibble:buzz").is_err());
152+
}
153+
}
154+
113155
collection_data! {
114156
LockCollection {},
115157
DeleteCollection {},

syncstorage-db-common/src/util.rs

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use diesel::{
1515
use serde::{Deserialize, Deserializer, Serialize, Serializer, ser};
1616

1717
use super::error::SyncstorageDbError;
18+
use crate::{Sorting, params};
1819

1920
/// Get the time since the UNIX epoch in milliseconds
2021
fn ms_since_epoch() -> i64 {
@@ -215,6 +216,39 @@ pub fn to_rfc3339(val: i64) -> Result<String, SyncstorageDbError> {
215216
)))
216217
}
217218

219+
/// Encode a timestamp and the number of rows to skip for BSO query pagination.
220+
pub fn encode_next_offset(
221+
sort: Sorting,
222+
prev_offset: u64,
223+
prev_timestamp: Option<i64>,
224+
modified_timestamps: &[i64],
225+
) -> String {
226+
if let Sorting::Index = sort {
227+
return (prev_offset + modified_timestamps.len() as u64).to_string();
228+
}
229+
if modified_timestamps.is_empty() {
230+
return prev_offset.to_string();
231+
}
232+
233+
let bound = *modified_timestamps.last().unwrap();
234+
let mut skip = 1usize;
235+
236+
skip += modified_timestamps[..modified_timestamps.len() - 1]
237+
.iter()
238+
.rev()
239+
.take_while(|&&m| m == bound)
240+
.count();
241+
if skip == modified_timestamps.len() && prev_timestamp == Some(bound) {
242+
skip += prev_offset as usize;
243+
}
244+
245+
params::Offset {
246+
timestamp: Some(SyncTimestamp::from_milliseconds(bound as u64)),
247+
offset: skip as u64,
248+
}
249+
.to_string()
250+
}
251+
218252
#[cfg(test)]
219253
mod tests {
220254
use std::error::Error;
@@ -262,4 +296,51 @@ mod tests {
262296
assert_eq!(zero, SyncTimestamp::from_i64(0).unwrap());
263297
assert_eq!(zero, SyncTimestamp::from_seconds(0.00));
264298
}
299+
300+
mod encode_next_offset_tests {
301+
use crate::Sorting;
302+
use crate::util::encode_next_offset;
303+
304+
#[test]
305+
fn index_sort_returns_numeric_offset() {
306+
let result = encode_next_offset(Sorting::Index, 50, None, &[19, 83, 747]);
307+
assert_eq!(result, "53");
308+
}
309+
310+
#[test]
311+
fn empty_modified_timestamps_returns_prev_offset() {
312+
let result = encode_next_offset(Sorting::Newest, 42, None, &[]);
313+
assert_eq!(result, "42");
314+
}
315+
316+
#[test]
317+
fn unique_last_timestamp_skip_is_one() {
318+
let result = encode_next_offset(Sorting::Newest, 0, None, &[5555, 4242, 3838]);
319+
assert_eq!(result, "3838:1");
320+
}
321+
322+
#[test]
323+
fn skip_counts_identical_tail_timestamps() {
324+
let result = encode_next_offset(Sorting::Newest, 0, None, &[5000, 3838, 3838]);
325+
assert_eq!(result, "3838:2");
326+
}
327+
328+
#[test]
329+
fn identical_timestamps_no_prev_bound_match() {
330+
let result = encode_next_offset(Sorting::Newest, 0, Some(2048), &[3838, 3838, 3838]);
331+
assert_eq!(result, "3838:3");
332+
}
333+
334+
#[test]
335+
fn identical_timestamps_with_matching_prev_bound_sums() {
336+
let result = encode_next_offset(Sorting::Newest, 2, Some(9001), &[9001, 9001, 9001]);
337+
assert_eq!(result, "9001:5");
338+
}
339+
340+
#[test]
341+
fn oldest_sort_works() {
342+
let result = encode_next_offset(Sorting::Oldest, 0, None, &[8999, 9000, 9001]);
343+
assert_eq!(result, "9001:1");
344+
}
345+
}
265346
}

syncstorage-mysql/src/db/db_impl.rs

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,10 @@ use diesel::{
1010
};
1111
use diesel_async::{AsyncConnection, RunQueryDsl, TransactionManager};
1212
use syncstorage_db_common::{
13-
DEFAULT_BSO_TTL, Db, Sorting, UserIdentifier, error::DbErrorIntrospect, params, results,
14-
util::SyncTimestamp,
13+
DEFAULT_BSO_TTL, Db, Sorting, UserIdentifier,
14+
error::DbErrorIntrospect,
15+
params, results,
16+
util::{SyncTimestamp, encode_next_offset},
1517
};
1618
use syncstorage_settings::DEFAULT_MAX_TOTAL_RECORDS;
1719

@@ -324,6 +326,13 @@ impl Db for MysqlDb {
324326
.filter(bso::expiry.gt(now))
325327
.into_boxed();
326328

329+
if let Some(ts) = params.offset.as_ref().and_then(|o| o.timestamp) {
330+
match params.sort {
331+
Sorting::Newest => query = query.filter(bso::modified.le(ts.as_i64())),
332+
Sorting::Oldest => query = query.filter(bso::modified.ge(ts.as_i64())),
333+
_ => {}
334+
}
335+
}
327336
if let Some(older) = params.older {
328337
query = query.filter(bso::modified.lt(older.as_i64()));
329338
}
@@ -340,14 +349,6 @@ impl Db for MysqlDb {
340349
// an error. We "fudge" a bit here by taking the id order as a secondary, since
341350
// that is guaranteed to be unique by the client.
342351
query = match params.sort {
343-
// issue559: Revert to previous sorting
344-
/*
345-
Sorting::Index => query.order(bso::id.desc()).order(bso::sortindex.desc()),
346-
Sorting::Newest | Sorting::None => {
347-
query.order(bso::id.desc()).order(bso::modified.desc())
348-
}
349-
Sorting::Oldest => query.order(bso::id.asc()).order(bso::modified.asc()),
350-
*/
351352
Sorting::Index => query.order(bso::sortindex.desc()),
352353
Sorting::Newest => query.order((bso::modified.desc(), bso::id.desc())),
353354
Sorting::Oldest => query.order((bso::modified.asc(), bso::id.asc())),
@@ -363,6 +364,7 @@ impl Db for MysqlDb {
363364
// match the query conditions
364365
query = query.limit(if limit > 0 { limit + 1 } else { limit });
365366

367+
let prev_ts = params.offset.as_ref().and_then(|o| o.timestamp).map(|t| t.as_i64());
366368
let numeric_offset = params.offset.map_or(0, |offset| offset.offset as i64);
367369

368370
if numeric_offset > 0 {
@@ -379,7 +381,13 @@ impl Db for MysqlDb {
379381

380382
let next_offset = if limit >= 0 && bsos.len() > limit as usize {
381383
bsos.pop();
382-
Some((limit + numeric_offset).to_string())
384+
let modified_timestamps: Vec<i64> = bsos.iter().map(|b| b.modified.as_i64()).collect();
385+
Some(encode_next_offset(
386+
params.sort,
387+
numeric_offset as u64,
388+
prev_ts,
389+
&modified_timestamps,
390+
))
383391
} else {
384392
// if an explicit "limit=0" is sent, return the offset of "0"
385393
// Otherwise, this would break at least the db::tests::db::get_bsos_limit_offset
@@ -420,8 +428,8 @@ impl Db for MysqlDb {
420428

421429
query = match params.sort {
422430
Sorting::Index => query.order(bso::sortindex.desc()),
423-
Sorting::Newest => query.order(bso::modified.desc()),
424-
Sorting::Oldest => query.order(bso::modified.asc()),
431+
Sorting::Newest => query.order((bso::modified.desc(), bso::id.desc())),
432+
Sorting::Oldest => query.order((bso::modified.asc(), bso::id.asc())),
425433
_ => query,
426434
};
427435

0 commit comments

Comments
 (0)