Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
bedefae
Implement the find_keys_by_prefix_iter (first step)
MathieuDutSik May 1, 2026
dbc767e
Correct the rocks_db implementation.
MathieuDutSik May 1, 2026
56d7ec3
Some simplification.
MathieuDutSik May 1, 2026
5a073b9
Demonstrate the use of the find_keys_by_prefix to the functionality.
MathieuDutSik May 1, 2026
0b3fab6
Implement the reverse iterators.
MathieuDutSik May 1, 2026
d0f173e
Simplify the code.
MathieuDutSik May 1, 2026
99869fa
Simplify the DynamoDB code for the iterators.
MathieuDutSik May 1, 2026
69c23a7
Reformat.
MathieuDutSik May 1, 2026
5af87f1
Resolve the bug in value splitting.
MathieuDutSik May 1, 2026
a04cfa8
Correct the Cargo.lock
MathieuDutSik May 2, 2026
39ab1a4
Some corrections for CI.
MathieuDutSik May 2, 2026
0c19c82
A forgotten entry in the Cargo.lock
MathieuDutSik May 2, 2026
65c36dd
Add a test that exercise the ValueSplitting part of the code.
MathieuDutSik May 2, 2026
42adda2
Update the tests.
MathieuDutSik May 4, 2026
65b9b7a
Correction from CI.
MathieuDutSik May 4, 2026
46bed05
Add another test for an entry forgotten from the coverage.
MathieuDutSik May 4, 2026
e463c9a
Changes to dual and journaling.
MathieuDutSik May 4, 2026
d867a2e
Add the insertion of the values in the cache if the iterator ends.
MathieuDutSik May 4, 2026
3fce0d5
Simplify the LRU caching code.
MathieuDutSik May 4, 2026
41075b9
Some additional cleanups by using try_stream.
MathieuDutSik May 4, 2026
a46c0a5
More type annotations eliminated.
MathieuDutSik May 4, 2026
3b14200
More type annotation eliminated.
MathieuDutSik May 4, 2026
d46cfc4
Simplify the trait implementation and rename _reverse_iterator as _re…
MathieuDutSik May 4, 2026
6bb04f0
Some edit.
MathieuDutSik May 4, 2026
4483dc8
Reformat.
MathieuDutSik May 4, 2026
2dafd78
Restructure the code.
MathieuDutSik May 4, 2026
1dc8cad
Remove a possible deadlock in ScyllaDB and add a test for such deadlo…
MathieuDutSik May 4, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ async-graphql-axum = "=7.0.17"
async-graphql-derive = "=7.0.17"
async-graphql-value = { version = "=7.0.17", features = ["raw_value"] }
async-lock = "3.3.0"
async-stream = "0.3"
async-trait = "0.1.77"
async-tungstenite = { version = "0.22", features = ["tokio-runtime"] }
aws-smithy-types = "1.3.1"
Expand Down
1 change: 1 addition & 0 deletions examples/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions linera-sdk/tests/fixtures/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions linera-views/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ allocative.workspace = true
anyhow.workspace = true
async-graphql.workspace = true
async-lock.workspace = true
async-stream.workspace = true
aws-config = { workspace = true, optional = true }
aws-sdk-dynamodb = { workspace = true, optional = true }
aws-smithy-types = { workspace = true, optional = true }
Expand Down
77 changes: 75 additions & 2 deletions linera-views/src/backends/dual.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

//! Implements [`crate::store::KeyValueStore`] by combining two existing stores.

use futures::stream::StreamExt;
use serde::{Deserialize, Serialize};
use thiserror::Error;

Expand All @@ -11,8 +12,8 @@ use crate::store::TestKeyValueDatabase;
use crate::{
batch::Batch,
store::{
KeyValueDatabase, KeyValueStoreError, ReadableKeyValueStore, WithError,
WritableKeyValueStore,
FindKeyValuesStream, FindKeysStream, KeyValueDatabase, KeyValueStoreError,
ReadableKeyValueStore, WithError, WritableKeyValueStore,
},
};

Expand Down Expand Up @@ -175,6 +176,24 @@ where
Ok(result)
}

fn find_keys_by_prefix_iter<'a>(
&'a self,
key_prefix: &'a [u8],
) -> FindKeysStream<'a, Self::Error> {
match self {
Self::First(store) => Box::pin(
store
.find_keys_by_prefix_iter(key_prefix)
.map(|item| item.map_err(DualStoreError::First)),
),
Self::Second(store) => Box::pin(
store
.find_keys_by_prefix_iter(key_prefix)
.map(|item| item.map_err(DualStoreError::Second)),
),
}
}

async fn find_key_values_by_prefix(
&self,
key_prefix: &[u8],
Expand All @@ -191,6 +210,60 @@ where
};
Ok(result)
}

fn find_key_values_by_prefix_iter<'a>(
&'a self,
key_prefix: &'a [u8],
) -> FindKeyValuesStream<'a, Self::Error> {
match self {
Self::First(store) => Box::pin(
store
.find_key_values_by_prefix_iter(key_prefix)
.map(|item| item.map_err(DualStoreError::First)),
),
Self::Second(store) => Box::pin(
store
.find_key_values_by_prefix_iter(key_prefix)
.map(|item| item.map_err(DualStoreError::Second)),
),
}
}

fn find_keys_by_prefix_rev_iter<'a>(
&'a self,
key_prefix: &'a [u8],
) -> FindKeysStream<'a, Self::Error> {
match self {
Self::First(store) => Box::pin(
store
.find_keys_by_prefix_rev_iter(key_prefix)
.map(|item| item.map_err(DualStoreError::First)),
),
Self::Second(store) => Box::pin(
store
.find_keys_by_prefix_rev_iter(key_prefix)
.map(|item| item.map_err(DualStoreError::Second)),
),
}
}

fn find_key_values_by_prefix_rev_iter<'a>(
&'a self,
key_prefix: &'a [u8],
) -> FindKeyValuesStream<'a, Self::Error> {
match self {
Self::First(store) => Box::pin(
store
.find_key_values_by_prefix_rev_iter(key_prefix)
.map(|item| item.map_err(DualStoreError::First)),
),
Self::Second(store) => Box::pin(
store
.find_key_values_by_prefix_rev_iter(key_prefix)
.map(|item| item.map_err(DualStoreError::Second)),
),
}
}
}

impl<S1, S2> WritableKeyValueStore for DualStore<S1, S2>
Expand Down
105 changes: 102 additions & 3 deletions linera-views/src/backends/dynamo_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ use crate::{
journaling::JournalingKeyValueDatabase,
lru_caching::{LruCachingConfig, LruCachingDatabase},
store::{
DirectWritableKeyValueStore, KeyValueDatabase, KeyValueStoreError, ReadableKeyValueStore,
WithError,
DirectWritableKeyValueStore, FindKeyValuesStream, FindKeysStream, KeyValueDatabase,
KeyValueStoreError, ReadableKeyValueStore, WithError,
},
value_splitting::{ValueSplittingDatabase, ValueSplittingError},
};
Expand Down Expand Up @@ -608,6 +608,7 @@ impl DynamoDbStoreInternal {
start_key: &[u8],
key_prefix: &[u8],
start_key_map: Option<HashMap<String, AttributeValue>>,
forward: bool,
) -> Result<QueryOutput, DynamoDbStoreInternalError> {
let _guard = self.acquire().await;
let start_key = start_key.to_vec();
Expand All @@ -627,6 +628,7 @@ impl DynamoDbStoreInternal {
AttributeValue::B(Blob::new(prefixed_key_prefix)),
)
.set_exclusive_start_key(start_key_map)
.scan_index_forward(forward)
.send()
.boxed_sync()
.await?;
Expand Down Expand Up @@ -685,7 +687,7 @@ impl DynamoDbStoreInternal {
let mut start_key_map = None;
loop {
let response = self
.get_query_output(attribute, start_key, key_prefix, start_key_map)
.get_query_output(attribute, start_key, key_prefix, start_key_map, true)
.await?;
let last_evaluated = response.last_evaluated_key.clone();
responses.push(response);
Expand Down Expand Up @@ -886,6 +888,13 @@ impl ReadableKeyValueStore for DynamoDbStoreInternal {
.collect()
}

fn find_keys_by_prefix_iter<'a>(
&'a self,
key_prefix: &'a [u8],
) -> FindKeysStream<'a, Self::Error> {
self.find_keys_stream(key_prefix, true)
}

async fn find_key_values_by_prefix(
&self,
key_prefix: &[u8],
Expand All @@ -898,6 +907,96 @@ impl ReadableKeyValueStore for DynamoDbStoreInternal {
.map(|entry| entry.map(|(key, value)| (key.to_vec(), value.to_vec())))
.collect()
}

fn find_key_values_by_prefix_iter<'a>(
&'a self,
key_prefix: &'a [u8],
) -> FindKeyValuesStream<'a, Self::Error> {
self.find_key_values_stream(key_prefix, true)
}

fn find_keys_by_prefix_rev_iter<'a>(
&'a self,
key_prefix: &'a [u8],
) -> FindKeysStream<'a, Self::Error> {
self.find_keys_stream(key_prefix, false)
}

fn find_key_values_by_prefix_rev_iter<'a>(
&'a self,
key_prefix: &'a [u8],
) -> FindKeyValuesStream<'a, Self::Error> {
self.find_key_values_stream(key_prefix, false)
}
}

impl DynamoDbStoreInternal {
fn find_keys_stream<'a>(
&'a self,
key_prefix: &'a [u8],
forward: bool,
) -> FindKeysStream<'a, DynamoDbStoreInternalError> {
Box::pin(async_stream::stream! {
check_key_size(key_prefix)?;
let prefix_len = key_prefix.len();
let mut start_key_map = None;
loop {
let response = self
.get_query_output(
KEY_ATTRIBUTE,
&self.start_key,
key_prefix,
start_key_map,
forward,
)
.await?;
let last_evaluated = response.last_evaluated_key.clone();
for item in response.items.iter().flatten() {
yield extract_key(prefix_len, item).map(|k| k.to_vec());
}
match last_evaluated {
None => break,
Some(value) => {
start_key_map = Some(value);
}
}
}
})
}

fn find_key_values_stream<'a>(
&'a self,
key_prefix: &'a [u8],
forward: bool,
) -> FindKeyValuesStream<'a, DynamoDbStoreInternalError> {
Box::pin(async_stream::stream! {
check_key_size(key_prefix)?;
let prefix_len = key_prefix.len();
let mut start_key_map = None;
loop {
let response = self
.get_query_output(
KEY_VALUE_ATTRIBUTE,
&self.start_key,
key_prefix,
start_key_map,
forward,
)
.await?;
let last_evaluated = response.last_evaluated_key.clone();
for item in response.items.iter().flatten() {
yield extract_key_value(prefix_len, item)
.map(|(k, v)| (k.to_vec(), v.to_vec()));
}
match last_evaluated {
None => break,
Some(value) => {
start_key_map = Some(value);
}
}
}
})
}
}

impl DirectWritableKeyValueStore for DynamoDbStoreInternal {
Expand Down
49 changes: 47 additions & 2 deletions linera-views/src/backends/journaling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,16 @@
//! time the data in a block are written, the journal header is updated in the same
//! transaction to mark the block as processed.

use futures::stream::StreamExt;
use serde::{Deserialize, Serialize};
use static_assertions as sa;
use thiserror::Error;

use crate::{
batch::{Batch, BatchValueWriter, DeletePrefixExpander, SimplifiedBatch},
store::{
DirectKeyValueStore, KeyValueDatabase, KeyValueStoreError, ReadableKeyValueStore,
WithError, WritableKeyValueStore,
DirectKeyValueStore, FindKeyValuesStream, FindKeysStream, KeyValueDatabase,
KeyValueStoreError, ReadableKeyValueStore, WithError, WritableKeyValueStore,
},
views::MIN_VIEW_TAG,
};
Expand Down Expand Up @@ -196,12 +197,56 @@ where
Ok(self.store.find_keys_by_prefix(key_prefix).await?)
}

fn find_keys_by_prefix_iter<'a>(
&'a self,
key_prefix: &'a [u8],
) -> FindKeysStream<'a, Self::Error> {
Box::pin(
self.store
.find_keys_by_prefix_iter(key_prefix)
.map(|item| item.map_err(JournalingError::Inner)),
)
}

async fn find_key_values_by_prefix(
&self,
key_prefix: &[u8],
) -> Result<Vec<(Vec<u8>, Vec<u8>)>, Self::Error> {
Ok(self.store.find_key_values_by_prefix(key_prefix).await?)
}

fn find_key_values_by_prefix_iter<'a>(
&'a self,
key_prefix: &'a [u8],
) -> FindKeyValuesStream<'a, Self::Error> {
Box::pin(
self.store
.find_key_values_by_prefix_iter(key_prefix)
.map(|item| item.map_err(JournalingError::Inner)),
)
}

fn find_keys_by_prefix_rev_iter<'a>(
&'a self,
key_prefix: &'a [u8],
) -> FindKeysStream<'a, Self::Error> {
Box::pin(
self.store
.find_keys_by_prefix_rev_iter(key_prefix)
.map(|item| item.map_err(JournalingError::Inner)),
)
}

fn find_key_values_by_prefix_rev_iter<'a>(
&'a self,
key_prefix: &'a [u8],
) -> FindKeyValuesStream<'a, Self::Error> {
Box::pin(
self.store
.find_key_values_by_prefix_rev_iter(key_prefix)
.map(|item| item.map_err(JournalingError::Inner)),
)
}
}

impl<D> KeyValueDatabase for JournalingKeyValueDatabase<D>
Expand Down
Loading
Loading