Skip to content

Commit 867b0f9

Browse files
committed
Move test InMemoryStore into shared module
Move the existing in-memory test store into a shared module without changing its behavior. This lets integration tests reuse it while keeping the later async TestSyncStore change separate. Co-Authored-By: HAL 9000
1 parent 47f80ba commit 867b0f9

2 files changed

Lines changed: 228 additions & 215 deletions

File tree

src/io/in_memory_store.rs

Lines changed: 223 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,223 @@
1+
// This file is Copyright its original authors, visible in version control history.
2+
//
3+
// This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
4+
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5+
// http://opensource.org/licenses/MIT>, at your option. You may not use this file except in
6+
// accordance with one or both of these licenses.
7+
8+
use std::collections::{hash_map, HashMap};
9+
use std::future::Future;
10+
use std::sync::atomic::{AtomicU64, Ordering};
11+
use std::sync::Mutex;
12+
13+
use lightning::io;
14+
use lightning::util::persist::{
15+
KVStore, KVStoreSync, PageToken, PaginatedKVStore, PaginatedKVStoreSync, PaginatedListResponse,
16+
};
17+
18+
const IN_MEMORY_PAGE_SIZE: usize = 50;
19+
20+
pub struct InMemoryStore {
21+
persisted_bytes: Mutex<HashMap<String, HashMap<String, Vec<u8>>>>,
22+
creation_counter: AtomicU64,
23+
creation_times: Mutex<HashMap<String, HashMap<String, u64>>>,
24+
}
25+
26+
impl InMemoryStore {
27+
pub fn new() -> Self {
28+
let persisted_bytes = Mutex::new(HashMap::new());
29+
let creation_counter = AtomicU64::new(1);
30+
let creation_times = Mutex::new(HashMap::new());
31+
Self { persisted_bytes, creation_counter, creation_times }
32+
}
33+
34+
fn read_internal(
35+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
36+
) -> io::Result<Vec<u8>> {
37+
let persisted_lock = self.persisted_bytes.lock().unwrap();
38+
let prefixed = format!("{primary_namespace}/{secondary_namespace}");
39+
40+
if let Some(outer_ref) = persisted_lock.get(&prefixed) {
41+
if let Some(inner_ref) = outer_ref.get(key) {
42+
let bytes = inner_ref.clone();
43+
Ok(bytes)
44+
} else {
45+
Err(io::Error::new(io::ErrorKind::NotFound, "Key not found"))
46+
}
47+
} else {
48+
Err(io::Error::new(io::ErrorKind::NotFound, "Namespace not found"))
49+
}
50+
}
51+
52+
fn write_internal(
53+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
54+
) -> io::Result<()> {
55+
let mut persisted_lock = self.persisted_bytes.lock().unwrap();
56+
57+
let prefixed = format!("{primary_namespace}/{secondary_namespace}");
58+
let outer_e = persisted_lock.entry(prefixed.clone()).or_insert(HashMap::new());
59+
outer_e.insert(key.to_string(), buf);
60+
61+
let mut ct_lock = self.creation_times.lock().unwrap();
62+
let ct_ns = ct_lock.entry(prefixed).or_insert(HashMap::new());
63+
ct_ns
64+
.entry(key.to_string())
65+
.or_insert_with(|| self.creation_counter.fetch_add(1, Ordering::Relaxed));
66+
67+
Ok(())
68+
}
69+
70+
fn remove_internal(
71+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, _lazy: bool,
72+
) -> io::Result<()> {
73+
let mut persisted_lock = self.persisted_bytes.lock().unwrap();
74+
75+
let prefixed = format!("{primary_namespace}/{secondary_namespace}");
76+
if let Some(outer_ref) = persisted_lock.get_mut(&prefixed) {
77+
outer_ref.remove(&key.to_string());
78+
}
79+
80+
let mut ct_lock = self.creation_times.lock().unwrap();
81+
if let Some(ct_ns) = ct_lock.get_mut(&prefixed) {
82+
ct_ns.remove(key);
83+
}
84+
85+
Ok(())
86+
}
87+
88+
fn list_internal(
89+
&self, primary_namespace: &str, secondary_namespace: &str,
90+
) -> io::Result<Vec<String>> {
91+
let mut persisted_lock = self.persisted_bytes.lock().unwrap();
92+
93+
let prefixed = format!("{primary_namespace}/{secondary_namespace}");
94+
match persisted_lock.entry(prefixed) {
95+
hash_map::Entry::Occupied(e) => Ok(e.get().keys().cloned().collect()),
96+
hash_map::Entry::Vacant(_) => Ok(Vec::new()),
97+
}
98+
}
99+
}
100+
101+
impl KVStore for InMemoryStore {
102+
fn read(
103+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
104+
) -> impl Future<Output = Result<Vec<u8>, io::Error>> + 'static + Send {
105+
let res = self.read_internal(&primary_namespace, &secondary_namespace, &key);
106+
async move { res }
107+
}
108+
109+
fn write(
110+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
111+
) -> impl Future<Output = Result<(), io::Error>> + 'static + Send {
112+
let res = self.write_internal(&primary_namespace, &secondary_namespace, &key, buf);
113+
async move { res }
114+
}
115+
116+
fn remove(
117+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
118+
) -> impl Future<Output = Result<(), io::Error>> + 'static + Send {
119+
let res = self.remove_internal(&primary_namespace, &secondary_namespace, &key, lazy);
120+
async move { res }
121+
}
122+
123+
fn list(
124+
&self, primary_namespace: &str, secondary_namespace: &str,
125+
) -> impl Future<Output = Result<Vec<String>, io::Error>> + 'static + Send {
126+
let res = self.list_internal(primary_namespace, secondary_namespace);
127+
async move { res }
128+
}
129+
}
130+
131+
impl KVStoreSync for InMemoryStore {
132+
fn read(
133+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
134+
) -> io::Result<Vec<u8>> {
135+
self.read_internal(primary_namespace, secondary_namespace, key)
136+
}
137+
138+
fn write(
139+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
140+
) -> io::Result<()> {
141+
self.write_internal(primary_namespace, secondary_namespace, key, buf)
142+
}
143+
144+
fn remove(
145+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
146+
) -> io::Result<()> {
147+
self.remove_internal(primary_namespace, secondary_namespace, key, lazy)
148+
}
149+
150+
fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result<Vec<String>> {
151+
self.list_internal(primary_namespace, secondary_namespace)
152+
}
153+
}
154+
155+
impl InMemoryStore {
156+
fn list_paginated_internal(
157+
&self, primary_namespace: &str, secondary_namespace: &str, page_token: Option<PageToken>,
158+
) -> io::Result<PaginatedListResponse> {
159+
let ct_lock = self.creation_times.lock().unwrap();
160+
let prefixed = format!("{primary_namespace}/{secondary_namespace}");
161+
162+
let ct_ns = match ct_lock.get(&prefixed) {
163+
Some(m) => m,
164+
None => {
165+
return Ok(PaginatedListResponse { keys: Vec::new(), next_page_token: None });
166+
},
167+
};
168+
169+
let mut entries: Vec<(&String, &u64)> = ct_ns.iter().collect();
170+
entries.sort_by(|a, b| b.1.cmp(a.1));
171+
172+
let start_idx = if let Some(ref token) = page_token {
173+
let token_sort_order: u64 = token
174+
.as_str()
175+
.parse()
176+
.map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "Invalid page token"))?;
177+
178+
entries
179+
.iter()
180+
.position(|(_, sort_order)| **sort_order < token_sort_order)
181+
.unwrap_or(entries.len())
182+
} else {
183+
0
184+
};
185+
186+
let mut page: Vec<(&String, &u64)> =
187+
entries[start_idx..].iter().take(IN_MEMORY_PAGE_SIZE + 1).cloned().collect();
188+
189+
let has_more = page.len() > IN_MEMORY_PAGE_SIZE;
190+
page.truncate(IN_MEMORY_PAGE_SIZE);
191+
192+
let next_page_token = if has_more {
193+
let (_, last_sort_order) = page.last().unwrap();
194+
Some(PageToken::new(last_sort_order.to_string()))
195+
} else {
196+
None
197+
};
198+
199+
let page: Vec<String> = page.into_iter().map(|(k, _)| k.clone()).collect();
200+
201+
Ok(PaginatedListResponse { keys: page, next_page_token })
202+
}
203+
}
204+
205+
impl PaginatedKVStoreSync for InMemoryStore {
206+
fn list_paginated(
207+
&self, primary_namespace: &str, secondary_namespace: &str, page_token: Option<PageToken>,
208+
) -> io::Result<PaginatedListResponse> {
209+
self.list_paginated_internal(primary_namespace, secondary_namespace, page_token)
210+
}
211+
}
212+
213+
impl PaginatedKVStore for InMemoryStore {
214+
fn list_paginated(
215+
&self, primary_namespace: &str, secondary_namespace: &str, page_token: Option<PageToken>,
216+
) -> impl Future<Output = Result<PaginatedListResponse, io::Error>> + 'static + Send {
217+
let res = self.list_paginated_internal(primary_namespace, secondary_namespace, page_token);
218+
async move { res }
219+
}
220+
}
221+
222+
unsafe impl Sync for InMemoryStore {}
223+
unsafe impl Send for InMemoryStore {}

0 commit comments

Comments
 (0)