From 2f3d401e8f63025038fe987d6c93d1ec81ffeda3 Mon Sep 17 00:00:00 2001 From: Andreas Fackler Date: Wed, 25 Mar 2026 10:55:51 +0000 Subject: [PATCH 1/7] Remove chain actors; handle read-only calls concurrently (#5502, #5687) Backport of #5502 (Remove chain actors; handle read-only calls concurrently) and #5687 (Fix race conditions with getting and dropping chain workers) to testnet_conway. Replaces the channel-based ChainWorkerActor with a direct Arc> approach, enabling concurrent read-only operations. Uses a lock-free papaya::HashMap with Shared>> for race-free worker creation, and Arc::try_unwrap in the keep-alive task for safe worker cleanup. Co-Authored-By: Claude Opus 4.6 (1M context) --- Cargo.lock | 1 + examples/Cargo.lock | 1 + linera-client/src/client_context.rs | 4 +- .../src/unit_tests/chain_listener.rs | 24 +- linera-client/src/util.rs | 5 + linera-core/Cargo.toml | 1 + linera-core/src/chain_worker/config.rs | 26 +- linera-core/src/chain_worker/handle.rs | 234 +++++ linera-core/src/chain_worker/mod.rs | 14 +- linera-core/src/chain_worker/state.rs | 492 ++++------ linera-core/src/client/mod.rs | 38 +- linera-core/src/lib.rs | 1 + linera-core/src/local_node.rs | 6 +- linera-core/src/unit_tests/test_utils.rs | 29 +- .../src/unit_tests/wasm_worker_tests.rs | 12 +- linera-core/src/unit_tests/worker_tests.rs | 44 +- linera-core/src/worker.rs | 890 +++++++++--------- linera-faucet/server/src/lib.rs | 2 +- linera-rpc/src/lib.rs | 2 + linera-sdk/src/test/block.rs | 2 +- linera-sdk/src/test/validator.rs | 17 +- linera-sdk/tests/fixtures/Cargo.lock | 1 + linera-service/benches/transfers.rs | 2 + linera-service/src/lib.rs | 2 + linera-service/src/node_service.rs | 43 +- linera-service/src/server.rs | 30 +- linera-service/src/util.rs | 5 + linera-service/tests/wallet.rs | 4 +- 28 files changed, 1042 insertions(+), 890 deletions(-) create mode 100644 linera-core/src/chain_worker/handle.rs diff --git a/Cargo.lock b/Cargo.lock index 77d38156abae..cd989ff9e9c3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5604,6 +5604,7 @@ dependencies = [ "serde_json", "sha3", "social", + "sync_wrapper 1.0.2", "test-case", "test-log", "test-strategy", diff --git a/examples/Cargo.lock b/examples/Cargo.lock index db72f7e5cf01..da712bcd0040 100644 --- a/examples/Cargo.lock +++ b/examples/Cargo.lock @@ -4025,6 +4025,7 @@ dependencies = [ "rand 0.8.5", "serde", "serde_json", + "sync_wrapper 1.0.2", "test-log", "test-strategy", "thiserror 1.0.65", diff --git a/linera-client/src/client_context.rs b/linera-client/src/client_context.rs index c990d0e4b580..d7ad435cab36 100644 --- a/linera-client/src/client_context.rs +++ b/linera-client/src/client_context.rs @@ -317,8 +317,8 @@ where options.long_lived_services, chain_modes, name, - options.chain_worker_ttl, - options.sender_chain_worker_ttl, + crate::util::non_zero_duration(options.chain_worker_ttl), + crate::util::non_zero_duration(options.sender_chain_worker_ttl), options.prioritize_bundles_from.clone().unwrap_or_default(), options.to_chain_client_options(), options.to_requests_scheduler_config(), diff --git a/linera-client/src/unit_tests/chain_listener.rs b/linera-client/src/unit_tests/chain_listener.rs index 582214cb606f..905941695bb7 100644 --- a/linera-client/src/unit_tests/chain_listener.rs +++ b/linera-client/src/unit_tests/chain_listener.rs @@ -120,8 +120,8 @@ async fn test_chain_listener() -> anyhow::Result<()> { false, [(chain_id0, ListeningMode::FullChain)], format!("Client node for {:.8}", chain_id0), - Duration::from_secs(30), - Duration::from_secs(1), + Some(Duration::from_secs(30)), + Some(Duration::from_secs(1)), HashSet::new(), ChainClientOptions::test_default(), linera_core::client::RequestsSchedulerConfig::default(), @@ -234,8 +234,8 @@ async fn test_chain_listener_follow_only() -> anyhow::Result<()> { (chain_b_id, ListeningMode::FullChain), ], "Client node with follow-only and owned chains".to_string(), - Duration::from_secs(30), - Duration::from_secs(1), + Some(Duration::from_secs(30)), + Some(Duration::from_secs(1)), HashSet::new(), ChainClientOptions::test_default(), linera_core::client::RequestsSchedulerConfig::default(), @@ -384,8 +384,8 @@ async fn test_chain_listener_admin_chain() -> anyhow::Result<()> { false, std::iter::empty::<(ChainId, ListeningMode)>(), "Client node with no chains".to_string(), - Duration::from_secs(30), - Duration::from_secs(1), + Some(Duration::from_secs(30)), + Some(Duration::from_secs(1)), HashSet::new(), ChainClientOptions::test_default(), linera_core::client::RequestsSchedulerConfig::default(), @@ -462,8 +462,8 @@ async fn test_chain_listener_listen_command_adds_chains_to_wallet() -> anyhow::R false, std::iter::empty::<(ChainId, ListeningMode)>(), "Client node with no chains".to_string(), - Duration::from_secs(30), - Duration::from_secs(1), + Some(Duration::from_secs(30)), + Some(Duration::from_secs(1)), HashSet::new(), ChainClientOptions::test_default(), linera_core::client::RequestsSchedulerConfig::default(), @@ -580,8 +580,8 @@ async fn test_listener_uses_autosigner_for_incoming_messages() -> anyhow::Result false, [(chain_id0, ListeningMode::FullChain)], format!("Client node for {:.8}", chain_id0), - Duration::from_secs(30), - Duration::from_secs(1), + Some(Duration::from_secs(30)), + Some(Duration::from_secs(1)), HashSet::new(), ChainClientOptions::test_default(), linera_core::client::RequestsSchedulerConfig::default(), @@ -782,8 +782,8 @@ async fn test_chain_listener_sparse_event_download() -> anyhow::Result<()> { false, [(receiver_id, ListeningMode::FullChain)], format!("Client node for {:.8}", receiver_id), - Duration::from_secs(30), - Duration::from_secs(1), + Some(Duration::from_secs(30)), + Some(Duration::from_secs(1)), HashSet::new(), ChainClientOptions::test_default(), linera_core::client::RequestsSchedulerConfig::default(), diff --git a/linera-client/src/util.rs b/linera-client/src/util.rs index 806c6a537877..dd7f581c9103 100644 --- a/linera-client/src/util.rs +++ b/linera-client/src/util.rs @@ -13,6 +13,11 @@ use linera_base::{ use linera_core::{data_types::RoundTimeout, node::NotificationStream, worker::Reason}; use tokio_stream::StreamExt as _; +/// Treats a zero `Duration` as `None` (disabled). +pub fn non_zero_duration(d: Duration) -> Option { + (d > Duration::ZERO).then_some(d) +} + pub fn parse_json(s: &str) -> anyhow::Result { Ok(serde_json::from_str(s.trim())?) } diff --git a/linera-core/Cargo.toml b/linera-core/Cargo.toml index 003968ea2aae..06f825bc0ae9 100644 --- a/linera-core/Cargo.toml +++ b/linera-core/Cargo.toml @@ -76,6 +76,7 @@ proptest = { workspace = true, optional = true } rand = { workspace = true, features = ["std_rng"] } serde.workspace = true serde_json.workspace = true +sync_wrapper.workspace = true test-log = { workspace = true, optional = true } test-strategy = { workspace = true, optional = true } thiserror.workspace = true diff --git a/linera-core/src/chain_worker/config.rs b/linera-core/src/chain_worker/config.rs index 8493e5e68ff3..9adcc6ab8cf6 100644 --- a/linera-core/src/chain_worker/config.rs +++ b/linera-core/src/chain_worker/config.rs @@ -9,9 +9,12 @@ use linera_base::{crypto::ValidatorSecretKey, identifiers::ChainId, time::Durati use crate::CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES; -/// Configuration parameters for the [`ChainWorkerState`][`super::state::ChainWorkerState`]. +/// Configuration parameters for the chain worker and its owning +/// [`WorkerState`][`crate::worker::WorkerState`]. #[derive(Clone)] pub struct ChainWorkerConfig { + /// A name used for logging. + pub nickname: String, /// The signature key pair of the validator. The key may be missing for replicas /// without voting rights (possibly with a partial view of chains). pub key_pair: Option>, @@ -24,13 +27,17 @@ pub struct ChainWorkerConfig { /// Blocks with a timestamp this far in the future will still be accepted, but the validator /// will wait until that timestamp before voting. pub block_time_grace_period: Duration, - /// Idle chain workers free their memory after that duration without requests. - pub ttl: Duration, - /// TTL for sender chains. - // We don't want them to keep in memory forever since usually they're short-lived. - pub sender_chain_ttl: Duration, + /// Idle chain workers free their memory after this duration without requests. + /// `None` means no expiry (handle lives forever). + pub ttl: Option, + /// TTL for sender chains. `None` means no expiry. + pub sender_chain_ttl: Option, /// The size to truncate receive log entries in chain info responses. pub chain_info_max_received_log_entries: usize, + /// Maximum number of entries in the block cache. + pub block_cache_size: usize, + /// Maximum number of entries in the execution state cache. + pub execution_state_cache_size: usize, /// Chain IDs whose incoming bundles should be processed first. pub priority_bundle_origins: HashSet, } @@ -51,14 +58,17 @@ impl ChainWorkerConfig { impl Default for ChainWorkerConfig { fn default() -> Self { Self { + nickname: String::new(), key_pair: None, allow_inactive_chains: false, allow_messages_from_deprecated_epochs: false, long_lived_services: false, block_time_grace_period: Default::default(), - ttl: Default::default(), - sender_chain_ttl: Default::default(), + ttl: None, + sender_chain_ttl: None, chain_info_max_received_log_entries: CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES, + block_cache_size: crate::worker::DEFAULT_BLOCK_CACHE_SIZE, + execution_state_cache_size: crate::worker::DEFAULT_EXECUTION_STATE_CACHE_SIZE, priority_bundle_origins: HashSet::new(), } } diff --git a/linera-core/src/chain_worker/handle.rs b/linera-core/src/chain_worker/handle.rs new file mode 100644 index 000000000000..41e33f2d674d --- /dev/null +++ b/linera-core/src/chain_worker/handle.rs @@ -0,0 +1,234 @@ +// Copyright (c) Zefchain Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +//! Helpers for managing chain worker state behind an `Arc>`. +//! +//! Tokio's [`RwLock`] is write-preferring: once a writer is waiting, new readers +//! queue behind it. This prevents read-only queries from starving write operations +//! (block proposals, certificate handling, etc.). + +use std::{ + ops::{Deref, DerefMut}, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, +}; + +/// An atomically-updatable timestamp backed by microseconds since the Unix epoch. +/// +/// This wraps the raw `AtomicU64` microsecond encoding so that call sites work +/// exclusively with [`Duration`] and never see the underlying representation. +pub(crate) struct AtomicTimestamp(AtomicU64); + +impl AtomicTimestamp { + /// Creates a new `AtomicTimestamp` set to the current time. + pub(crate) fn now() -> Self { + Self(AtomicU64::new(Self::current_micros())) + } + + /// Updates the stored timestamp to the current time. + pub(crate) fn store_now(&self) { + self.0.store(Self::current_micros(), Ordering::Relaxed); + } + + /// Returns how long has passed since the stored timestamp. + pub(crate) fn elapsed(&self) -> Duration { + let last = self.0.load(Ordering::Relaxed); + let now = Self::current_micros(); + Duration::from_micros(now.saturating_sub(last)) + } + + fn current_micros() -> u64 { + linera_base::time::SystemTime::now() + .duration_since(linera_base::time::UNIX_EPOCH) + .map(|d| d.as_micros() as u64) + .unwrap_or(0) + } +} + +use linera_base::{ + data_types::{BlockHeight, Timestamp}, + identifiers::ChainId, + time::Duration, +}; +use linera_execution::{QueryContext, ServiceRuntimeEndpoint, ServiceSyncRuntime}; +use linera_storage::Storage; +use tokio::sync::{OwnedRwLockReadGuard, RwLock}; + +use super::{config::ChainWorkerConfig, state::ChainWorkerState}; + +/// A write guard that automatically rolls back uncommitted chain state changes on drop. +/// +/// This ensures cancellation safety: if a write operation's future is dropped before +/// completion, any uncommitted state changes are rolled back rather than leaked. +pub(crate) struct RollbackGuard( + tokio::sync::OwnedRwLockWriteGuard>, +); + +impl Deref for RollbackGuard { + type Target = ChainWorkerState; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for RollbackGuard { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +impl Drop for RollbackGuard { + fn drop(&mut self) { + self.0.rollback(); + } +} + +/// The endpoint and background task for a long-lived service runtime. +pub(crate) struct ServiceRuntimeActor { + pub(crate) task: web_thread_pool::Task<()>, + pub(crate) endpoint: ServiceRuntimeEndpoint, +} + +impl ServiceRuntimeActor { + /// Spawns a blocking task to execute the service runtime actor. + pub(crate) async fn spawn( + chain_id: ChainId, + thread_pool: &linera_execution::ThreadPool, + ) -> Self { + let (execution_state_sender, incoming_execution_requests) = + futures::channel::mpsc::unbounded(); + let (runtime_request_sender, runtime_request_receiver) = std::sync::mpsc::channel(); + + Self { + endpoint: ServiceRuntimeEndpoint { + incoming_execution_requests, + runtime_request_sender, + }, + task: thread_pool + .run((), move |()| async move { + // The dummy context is overwritten by `prepare_for_query` + // before the first actual query is executed. + ServiceSyncRuntime::new( + execution_state_sender, + QueryContext { + chain_id, + next_block_height: BlockHeight(0), + local_time: Timestamp::from(0), + }, + ) + .run(runtime_request_receiver) + }) + .await, + } + } +} + +/// Wraps a [`ChainWorkerState`] in an `Arc>` and spawns a keep-alive task +/// if a TTL is configured. +pub(crate) fn create_chain_worker( + state: ChainWorkerState, + is_tracked: bool, + config: &ChainWorkerConfig, +) -> Arc>> { + let last_access = state.last_access_arc(); + let chain_id = state.chain().chain_id(); + let arc = Arc::new(RwLock::new(state)); + let ttl = if is_tracked { + config.sender_chain_ttl + } else { + config.ttl + }; + if let Some(ttl) = ttl { + spawn_keep_alive(chain_id, Arc::clone(&arc), last_access, ttl); + } + arc +} + +/// Acquires a read lock, updating the last-access timestamp. +pub(crate) async fn read_lock( + state: &Arc>>, +) -> OwnedRwLockReadGuard> { + let guard = state.clone().read_owned().await; + guard.touch(); + guard +} + +/// Acquires a read lock, initializing the chain if needed. +/// +/// First acquires a read lock and checks if the chain is already known to be active. +/// If not, drops the read lock, acquires a write lock to initialize the chain, +/// then re-acquires the read lock. +pub(crate) async fn read_lock_initialized( + state: &Arc>>, +) -> Result>, crate::worker::WorkerError> { + { + let guard = read_lock(state).await; + if guard.knows_chain_is_active() { + return Ok(guard); + } + } + { + let mut guard = write_lock(state).await; + guard.initialize_and_save_if_needed().await?; + } + Ok(read_lock(state).await) +} + +/// Acquires a write lock, updating the last-access timestamp. +/// +/// Returns a [`RollbackGuard`] that automatically rolls back uncommitted changes +/// when dropped, ensuring cancellation safety. +pub(crate) async fn write_lock( + state: &Arc>>, +) -> RollbackGuard { + let guard = RollbackGuard(state.clone().write_owned().await); + guard.touch(); + guard +} + +/// Spawns a background task that keeps the chain state alive for at least `ttl` +/// after the last access. When the state has been idle for the full TTL, the task +/// drops the state if it holds the only strong reference. +fn spawn_keep_alive( + chain_id: ChainId, + mut state: Arc>>, + last_access: Arc, + ttl: Duration, +) { + linera_base::Task::spawn(async move { + loop { + while let Some(remaining) = ttl + .checked_sub(last_access.elapsed()) + .filter(|remaining| *remaining > Duration::ZERO) + { + // Touched recently — sleep for the remaining time. + linera_base::time::timer::sleep(remaining).await; + } + // Idle long enough. Drop our strong reference if it's the only one. + match Arc::try_unwrap(state) { + Ok(rw_lock) => { + tracing::debug!(%chain_id, "Dropping chain worker"); + // We have sole ownership — extract the state and + // shut down the service runtime gracefully. + let mut worker_state = RwLock::into_inner(rw_lock); + let task = worker_state.clear_service_runtime(); + drop(worker_state); + if let Some(task) = task { + if let Err(err) = task.await { + tracing::warn!(%err, "Failed to shut down service runtime"); + } + } + break; + } + Err(arc) => { + arc.read().await.touch(); + state = arc; + } + } + } + }) + .forget(); +} diff --git a/linera-core/src/chain_worker/mod.rs b/linera-core/src/chain_worker/mod.rs index 866155a1bd04..8aabb070bda3 100644 --- a/linera-core/src/chain_worker/mod.rs +++ b/linera-core/src/chain_worker/mod.rs @@ -3,19 +3,13 @@ //! A worker to handle a single chain. -mod actor; mod config; mod delivery_notifier; -mod state; +pub(crate) mod handle; +pub(crate) mod state; +pub use self::config::ChainWorkerConfig; pub(super) use self::delivery_notifier::DeliveryNotifier; #[cfg(test)] pub(crate) use self::state::CrossChainUpdateHelper; -pub(crate) use self::{ - actor::{ - ChainWorkerActor, ChainWorkerRequest, ChainWorkerRequestReceiver, ChainWorkerRequestSender, - EventSubscriptionsResult, - }, - config::ChainWorkerConfig, - state::BlockOutcome, -}; +pub(crate) use self::state::{BlockOutcome, EventSubscriptionsResult}; diff --git a/linera-core/src/chain_worker/state.rs b/linera-core/src/chain_worker/state.rs index c0a38e89c32b..3c11ff12fbb1 100644 --- a/linera-core/src/chain_worker/state.rs +++ b/linera-core/src/chain_worker/state.rs @@ -33,24 +33,27 @@ use linera_chain::{ ChainError, ChainExecutionContext, ChainStateView, ExecutionResultExt as _, }; use linera_execution::{ - Committee, ExecutionRuntimeContext as _, ExecutionStateView, Query, QueryContext, QueryOutcome, - ResourceTracker, ServiceRuntimeEndpoint, + system::EventSubscriptions, Committee, ExecutionRuntimeContext as _, ExecutionStateView, Query, + QueryContext, QueryOutcome, ResourceTracker, ServiceRuntimeEndpoint, }; use linera_storage::{Clock as _, ResultReadConfirmedBlocks, Storage}; use linera_views::{ context::{Context, InactiveContext}, - views::{ClonableView, ReplaceContext as _, RootView as _, View as _}, + views::{ReplaceContext as _, RootView as _, View as _}, }; -use tokio::sync::{oneshot, OwnedRwLockReadGuard, RwLock, RwLockWriteGuard}; +use tokio::sync::oneshot; use tracing::{debug, instrument, trace, warn}; -use super::{ChainWorkerConfig, ChainWorkerRequest, DeliveryNotifier, EventSubscriptionsResult}; use crate::{ + chain_worker::{handle::AtomicTimestamp, ChainWorkerConfig, DeliveryNotifier}, client::ListeningMode, data_types::{ChainInfo, ChainInfoQuery, ChainInfoResponse, CrossChainRequest}, worker::{NetworkActions, Notification, Reason, WorkerError}, }; +/// Type alias for event subscriptions result. +pub(crate) type EventSubscriptionsResult = Vec<((ChainId, StreamId), EventSubscriptions)>; + #[cfg(with_metrics)] mod metrics { use std::sync::LazyLock; @@ -80,15 +83,24 @@ mod metrics { } /// The state of the chain worker. -pub struct ChainWorkerState +pub(crate) struct ChainWorkerState where - StorageClient: Storage + Clone + 'static, + StorageClient: Storage, { config: ChainWorkerConfig, storage: StorageClient, chain: ChainStateView, - shared_chain_view: Option>>>, service_runtime_endpoint: Option, + /// The background task running the service runtime. Must be kept alive for the + /// lifetime of the worker: the pool `Guard` wrapper returns the thread-pool slot + /// when dropped, so dropping this early lets the pool schedule unrelated work on a + /// thread that is still running the service runtime. + service_runtime_task: Option>, + /// Timestamp of the last access. + /// Used by the keep-alive task to determine when the worker has been idle. + /// Wrapped in `Arc` so the keep-alive task can read it without acquiring + /// the `RwLock`. + last_access: Arc, block_values: Arc>>, execution_state_cache: Arc>>, chain_modes: Option>>>, @@ -112,7 +124,7 @@ where chain_id = %chain_id ))] #[expect(clippy::too_many_arguments)] - pub async fn load( + pub(crate) async fn load( config: ChainWorkerConfig, storage: StorageClient, block_values: Arc>>, @@ -123,6 +135,7 @@ where delivery_notifier: DeliveryNotifier, chain_id: ChainId, service_runtime_endpoint: Option, + service_runtime_task: Option>, ) -> Result { let chain = storage.load_chain(chain_id).await?; @@ -130,8 +143,9 @@ where config, storage, chain, - shared_chain_view: None, service_runtime_endpoint, + service_runtime_task, + last_access: Arc::new(AtomicTimestamp::now()), block_values, execution_state_cache, chain_modes, @@ -141,208 +155,45 @@ where } /// Returns the [`ChainId`] of the chain handled by this worker. - pub fn chain_id(&self) -> ChainId { + fn chain_id(&self) -> ChainId { self.chain.chain_id() } - /// Handles a request and applies it to the chain state. - #[instrument(skip_all, fields(chain_id = %self.chain_id()))] - pub async fn handle_request(&mut self, request: ChainWorkerRequest) { - tracing::trace!("Handling chain worker request: {request:?}"); - // TODO(#2237): Spawn concurrent tasks for read-only operations - let responded = match request { - #[cfg(with_testing)] - ChainWorkerRequest::ReadCertificate { height, callback } => { - callback.send(self.read_certificate(height).await).is_ok() - } - ChainWorkerRequest::GetChainStateView { callback } => { - callback.send(self.chain_state_view().await).is_ok() - } - ChainWorkerRequest::QueryApplication { - query, - block_hash, - callback, - } => callback - .send(self.query_application(query, block_hash).await) - .is_ok(), - ChainWorkerRequest::DescribeApplication { - application_id, - callback, - } => callback - .send(self.describe_application(application_id).await) - .is_ok(), - ChainWorkerRequest::StageBlockExecution { - block, - round, - published_blobs, - policy, - callback, - } => { - let result = self - .stage_block_execution(block, round, &published_blobs, policy) - .await; - callback.send(result).is_ok() - } - ChainWorkerRequest::ProcessTimeout { - certificate, - callback, - } => callback - .send(self.process_timeout(certificate).await) - .is_ok(), - ChainWorkerRequest::HandleBlockProposal { proposal, callback } => callback - .send(self.handle_block_proposal(proposal).await) - .is_ok(), - ChainWorkerRequest::ProcessValidatedBlock { - certificate, - callback, - } => callback - .send(self.process_validated_block(certificate).await) - .is_ok(), - ChainWorkerRequest::ProcessConfirmedBlock { - certificate, - notify_when_messages_are_delivered, - callback, - } => callback - .send( - self.process_confirmed_block(certificate, notify_when_messages_are_delivered) - .await, - ) - .is_ok(), - ChainWorkerRequest::ProcessCrossChainUpdate { - origin, - bundles, - callback, - } => callback - .send(self.process_cross_chain_update(origin, bundles).await) - .is_ok(), - ChainWorkerRequest::ConfirmUpdatedRecipient { - recipient, - latest_height, - callback, - } => callback - .send( - self.confirm_updated_recipient(recipient, latest_height) - .await, - ) - .is_ok(), - ChainWorkerRequest::HandleChainInfoQuery { query, callback } => callback - .send(self.handle_chain_info_query(query).await) - .is_ok(), - ChainWorkerRequest::DownloadPendingBlob { blob_id, callback } => callback - .send(self.download_pending_blob(blob_id).await) - .is_ok(), - ChainWorkerRequest::HandlePendingBlob { blob, callback } => { - callback.send(self.handle_pending_blob(blob).await).is_ok() - } - ChainWorkerRequest::UpdateReceivedCertificateTrackers { - new_trackers, - callback, - } => callback - .send( - self.update_received_certificate_trackers(new_trackers) - .await, - ) - .is_ok(), - ChainWorkerRequest::GetPreprocessedBlockHashes { - start, - end, - callback, - } => callback - .send(self.get_preprocessed_block_hashes(start, end).await) - .is_ok(), - ChainWorkerRequest::GetInboxNextHeight { origin, callback } => callback - .send(self.get_inbox_next_height(origin).await) - .is_ok(), - ChainWorkerRequest::GetLockingBlobs { blob_ids, callback } => callback - .send(self.get_locking_blobs(blob_ids).await) - .is_ok(), - ChainWorkerRequest::GetBlockHashes { heights, callback } => { - callback.send(self.get_block_hashes(heights).await).is_ok() - } - ChainWorkerRequest::GetProposedBlobs { blob_ids, callback } => callback - .send(self.get_proposed_blobs(blob_ids).await) - .is_ok(), - ChainWorkerRequest::GetEventSubscriptions { callback } => { - callback.send(self.get_event_subscriptions().await).is_ok() - } - ChainWorkerRequest::GetStreamEventCount { - stream_id, - callback, - } => callback - .send(self.get_stream_event_count(stream_id).await) - .is_ok(), - ChainWorkerRequest::GetReceivedCertificateTrackers { callback } => callback - .send(self.get_received_certificate_trackers().await) - .is_ok(), - ChainWorkerRequest::GetTipStateAndOutboxInfo { - receiver_id, - callback, - } => callback - .send(self.get_tip_state_and_outbox_info(receiver_id).await) - .is_ok(), - ChainWorkerRequest::GetNextHeightToPreprocess { callback } => callback - .send(self.get_next_height_to_preprocess().await) - .is_ok(), - ChainWorkerRequest::GetManagerSeed { callback } => { - callback.send(self.get_manager_seed().await).is_ok() - } - ChainWorkerRequest::GetPreviousEventBlocks { - stream_ids, - callback, - } => callback - .send(self.get_previous_event_blocks(stream_ids).await) - .is_ok(), - }; + /// Returns a reference to the chain state view. + pub(crate) fn chain(&self) -> &ChainStateView { + &self.chain + } - if !responded { - debug!("Callback for `ChainWorkerActor` was dropped before a response was sent"); - } + /// Returns whether this chain is known to be active (initialized). + pub(crate) fn knows_chain_is_active(&self) -> bool { + self.knows_chain_is_active + } - // Roll back any unsaved changes to the chain state: If there was an error while trying - // to handle the request, the chain state might contain unsaved and potentially invalid - // changes. The next request needs to be applied to the chain state as it is in storage. + /// Rolls back any uncommitted changes to the chain state. + pub(crate) fn rollback(&mut self) { self.chain.rollback(); } - /// Returns a read-only view of the [`ChainStateView`]. - /// - /// The returned view holds a lock on the chain state, which prevents the worker from changing - /// it. - pub(super) async fn chain_state_view( - &mut self, - ) -> Result>, WorkerError> { - if self.shared_chain_view.is_none() { - self.shared_chain_view = Some(Arc::new(RwLock::new(self.chain.clone_unchecked()?))); - } + /// Updates the last-access timestamp to the current time. + pub(crate) fn touch(&self) { + self.last_access.store_now(); + } - Ok(self - .shared_chain_view - .as_ref() - .expect("`shared_chain_view` should be initialized above") - .clone() - .read_owned() - .await) + /// Returns a clone of the last-access `Arc`, for use by the keep-alive task. + pub(crate) fn last_access_arc(&self) -> Arc { + Arc::clone(&self.last_access) } - /// Clears the shared chain view, and acquires and drops its write lock. - /// - /// This is the only place a write lock is acquired, and read locks are acquired in - /// the `chain_state_view` method, which has a `&mut self` receiver like this one. - /// That means that when this function returns, no readers will be waiting to acquire - /// the lock and it is safe to write the chain state to storage without any readers - /// having a stale view of it. - #[instrument(skip_all, fields( - chain_id = %self.chain_id() - ))] - pub(super) async fn clear_shared_chain_view(&mut self) { - if let Some(shared_chain_view) = self.shared_chain_view.take() { - let _: RwLockWriteGuard<_> = shared_chain_view.write().await; - } + /// Drops the service runtime endpoint, signaling the runtime task to stop. + /// Returns the runtime task so the caller can await it outside the lock. + pub(crate) fn clear_service_runtime(&mut self) -> Option> { + self.service_runtime_endpoint.take(); + self.service_runtime_task.take() } /// Handles a [`ChainInfoQuery`], potentially voting on the next block. #[tracing::instrument(level = "debug", skip(self))] - pub(super) async fn handle_chain_info_query( + pub(crate) async fn handle_chain_info_query( &mut self, query: ChainInfoQuery, ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> { @@ -368,7 +219,7 @@ where chain_id = %self.chain_id(), blob_id = %blob_id ))] - pub(super) async fn download_pending_blob(&self, blob_id: BlobId) -> Result { + pub(crate) async fn download_pending_blob(&self, blob_id: BlobId) -> Result { if let Some(blob) = self.chain.manager.pending_blob(&blob_id).await? { return Ok(blob); } @@ -588,7 +439,7 @@ where chain_id = %self.chain_id(), height = %height ))] - pub async fn all_messages_to_tracked_chains_delivered_up_to( + async fn all_messages_to_tracked_chains_delivered_up_to( &self, height: BlockHeight, ) -> Result { @@ -619,7 +470,7 @@ where chain_id = %self.chain_id(), height = %certificate.inner().height() ))] - pub(super) async fn process_timeout( + pub(crate) async fn process_timeout( &mut self, certificate: TimeoutCertificate, ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> { @@ -661,7 +512,7 @@ where chain_id = %self.chain_id(), block_height = %proposal.content.block.height ))] - pub(super) async fn load_proposal_blobs( + async fn load_proposal_blobs( &mut self, proposal: &BlockProposal, ) -> Result, WorkerError> { @@ -709,7 +560,7 @@ where chain_id = %self.chain_id(), block_height = %certificate.block().header.height ))] - pub(super) async fn process_validated_block( + pub(crate) async fn process_validated_block( &mut self, certificate: ValidatedBlockCertificate, ) -> Result<(ChainInfoResponse, NetworkActions, BlockOutcome), WorkerError> { @@ -777,12 +628,8 @@ where Ok((self.chain_info_response(), actions, BlockOutcome::Processed)) } - /// Initializes `next_expected_events` from `stream_event_counts` (which reflects - /// all executed blocks), then replays any preprocessed-but-not-yet-executed blocks to - /// advance the indices further. - /// - /// This handles the migration case where the `next_expected_events` field was added to - /// `ChainStateView` after blocks had already been processed. + /// Initializes `next_expected_events` for any streams that don't have entries yet, + /// for backwards compatibility with chains from older DB versions. async fn initialize_next_expected_events(&mut self) -> Result<(), WorkerError> { if self.chain.next_expected_events.count().await? > 0 { return Ok(()); // Already initialized. @@ -814,7 +661,7 @@ where height = %certificate.block().header.height, block_hash = %certificate.hash(), ))] - pub(super) async fn process_confirmed_block( + pub(crate) async fn process_confirmed_block( &mut self, certificate: ConfirmedBlockCertificate, notify_when_messages_are_delivered: Option>, @@ -908,21 +755,21 @@ where self.initialize_next_expected_events().await?; } // Update the outboxes and track emitted events. - let event_streams = self.chain.preprocess_block(certificate.value()).await?; + let updated_event_streams = self.chain.preprocess_block(certificate.value()).await?; // Persist chain. self.save().await?; let mut actions = self.create_network_actions(None).await?; - trace!("Preprocessed confirmed block {height} on chain {chain_id:.8}"); - if !event_streams.is_empty() { + if !updated_event_streams.is_empty() { actions.notifications.push(Notification { chain_id, reason: Reason::NewEvents { height, - hash: certificate.hash(), - event_streams, + hash: block_hash, + event_streams: updated_event_streams, }, }); } + trace!("Preprocessed confirmed block {height} on chain {chain_id:.8}"); self.register_delivery_notifier(height, &actions, notify_when_messages_are_delivered) .await; return Ok(( @@ -1007,7 +854,6 @@ where computed: Box::new(verified_outcome), } ); - // Update the rest of the chain state. let event_streams = chain .apply_confirmed_block(certificate.value(), local_time) @@ -1073,7 +919,7 @@ where /// Updates the chain's inboxes, receiving messages from a cross-chain update. #[instrument(level = "trace", skip(self, bundles))] - pub(super) async fn process_cross_chain_update( + pub(crate) async fn process_cross_chain_update( &mut self, origin: ChainId, bundles: Vec<(Epoch, MessageBundle)>, @@ -1136,7 +982,7 @@ where recipient = %recipient, latest_height = %latest_height ))] - pub(super) async fn confirm_updated_recipient( + pub(crate) async fn confirm_updated_recipient( &mut self, recipient: ChainId, latest_height: BlockHeight, @@ -1162,7 +1008,7 @@ where chain_id = %self.chain_id(), num_trackers = %new_trackers.len() ))] - pub async fn update_received_certificate_trackers( + pub(crate) async fn update_received_certificate_trackers( &mut self, new_trackers: BTreeMap, ) -> Result<(), WorkerError> { @@ -1178,7 +1024,7 @@ where start = %start, end = %end ))] - async fn get_preprocessed_block_hashes( + pub(crate) async fn get_preprocessed_block_hashes( &self, start: BlockHeight, end: BlockHeight, @@ -1200,7 +1046,10 @@ where chain_id = %self.chain_id(), origin = %origin ))] - async fn get_inbox_next_height(&self, origin: ChainId) -> Result { + pub(crate) async fn get_inbox_next_height( + &self, + origin: ChainId, + ) -> Result { Ok(match self.chain.inboxes.try_load_entry(&origin).await? { Some(inbox) => inbox.next_block_height_to_receive()?, None => BlockHeight::ZERO, @@ -1213,7 +1062,7 @@ where chain_id = %self.chain_id(), num_blob_ids = %blob_ids.len() ))] - async fn get_locking_blobs( + pub(crate) async fn get_locking_blobs( &self, blob_ids: Vec, ) -> Result>, WorkerError> { @@ -1227,7 +1076,7 @@ where } /// Gets block hashes for specified heights. - async fn get_block_hashes( + pub(crate) async fn get_block_hashes( &self, heights: Vec, ) -> Result, WorkerError> { @@ -1235,7 +1084,10 @@ where } /// Gets proposed blobs from the manager for specified blob IDs. - async fn get_proposed_blobs(&self, blob_ids: Vec) -> Result, WorkerError> { + pub(crate) async fn get_proposed_blobs( + &self, + blob_ids: Vec, + ) -> Result, WorkerError> { let results = self .chain .manager @@ -1256,8 +1108,61 @@ where Ok(blobs) } + /// Gets event subscriptions. + pub(crate) async fn get_event_subscriptions( + &self, + ) -> Result { + Ok(self + .chain + .execution_state + .system + .event_subscriptions + .index_values() + .await?) + } + + /// Gets the next expected event index for a stream. + pub(crate) async fn get_next_expected_event( + &self, + stream_id: StreamId, + ) -> Result, WorkerError> { + Ok(self.chain.next_expected_events.get(&stream_id).await?) + } + + /// Gets received certificate trackers. + pub(crate) async fn get_received_certificate_trackers( + &self, + ) -> Result, WorkerError> { + Ok(self.chain.received_certificate_trackers.get().clone()) + } + + /// Gets tip state and outbox info for next_outbox_heights calculation. + pub(crate) async fn get_tip_state_and_outbox_info( + &self, + receiver_id: ChainId, + ) -> Result<(BlockHeight, Option), WorkerError> { + let next_block_height = self.chain.tip_state.get().next_block_height; + let next_height_to_schedule = self + .chain + .outboxes + .try_load_entry(&receiver_id) + .await? + .map(|outbox| *outbox.next_height_to_schedule.get()); + Ok((next_block_height, next_height_to_schedule)) + } + + /// Gets the next height to preprocess. + pub(crate) async fn get_next_height_to_preprocess(&self) -> Result { + Ok(self.chain.next_height_to_preprocess().await?) + } + + /// Gets the chain manager's seed for leader election. + pub(crate) async fn get_manager_seed(&self) -> Result { + Ok(*self.chain.manager.seed.get()) + } + /// Gets the previous event blocks for specific streams. - async fn get_previous_event_blocks( + pub(crate) async fn get_previous_event_blocks( &self, stream_ids: Vec, ) -> Result, WorkerError> { @@ -1285,19 +1190,8 @@ where Ok(result) } - /// Gets event subscriptions. - async fn get_event_subscriptions(&self) -> Result { - Ok(self - .chain - .execution_state - .system - .event_subscriptions - .index_values() - .await?) - } - /// Gets the stream event count for a stream, including preprocessed blocks. - async fn get_stream_event_count( + pub(crate) async fn get_stream_event_count( &self, stream_id: StreamId, ) -> Result, WorkerError> { @@ -1317,45 +1211,13 @@ where .await?) } - /// Gets received certificate trackers. - async fn get_received_certificate_trackers( - &self, - ) -> Result, WorkerError> { - Ok(self.chain.received_certificate_trackers.get().clone()) - } - - /// Gets tip state and outbox info for next_outbox_heights calculation. - async fn get_tip_state_and_outbox_info( - &self, - receiver_id: ChainId, - ) -> Result<(BlockHeight, Option), WorkerError> { - let next_block_height = self.chain.tip_state.get().next_block_height; - let next_height_to_schedule = self - .chain - .outboxes - .try_load_entry(&receiver_id) - .await? - .map(|outbox| *outbox.next_height_to_schedule.get()); - Ok((next_block_height, next_height_to_schedule)) - } - - /// Gets the next height to preprocess. - async fn get_next_height_to_preprocess(&self) -> Result { - Ok(self.chain.next_height_to_preprocess().await?) - } - - /// Gets the chain manager's seed for leader election. - async fn get_manager_seed(&self) -> Result { - Ok(*self.chain.manager.seed.get()) - } - /// Attempts to vote for a leader timeout, if possible. #[instrument(skip_all, fields( chain_id = %self.chain_id(), height = %height, round = %round ))] - pub(super) async fn vote_for_leader_timeout( + async fn vote_for_leader_timeout( &mut self, height: BlockHeight, round: Round, @@ -1382,11 +1244,7 @@ where } /// Votes for falling back to a public chain. - /// This is disabled on the testnet. - #[instrument(skip_all, fields( - chain_id = %self.chain_id() - ))] - pub(super) async fn vote_for_fallback(&mut self) -> Result<(), WorkerError> { + async fn vote_for_fallback(&mut self) -> Result<(), WorkerError> { Err(WorkerError::NoFallbackMode) } @@ -1394,7 +1252,7 @@ where chain_id = %self.chain_id(), blob_id = %blob.id() ))] - pub(super) async fn handle_pending_blob( + pub(crate) async fn handle_pending_blob( &mut self, blob: Blob, ) -> Result { @@ -1429,16 +1287,18 @@ where } /// Returns a stored [`Certificate`] for the chain's block at the requested [`BlockHeight`]. + /// + /// Does not need `&mut self` because the chain is eagerly initialized when the + /// chain handle is created. #[cfg(with_testing)] #[instrument(skip_all, fields( chain_id = %self.chain_id(), height = %height ))] - pub(super) async fn read_certificate( - &mut self, + pub(crate) async fn read_certificate( + &self, height: BlockHeight, ) -> Result, WorkerError> { - self.initialize_and_save_if_needed().await?; let certificate_hash = match self.chain.confirmed_log.get(height.try_into()?).await? { Some(hash) => hash, None => return Ok(None), @@ -1456,7 +1316,7 @@ where chain_id = %self.chain_id(), query_application_id = %query.application_id() ))] - pub(super) async fn query_application( + pub(crate) async fn query_application( &mut self, query: Query, block_hash: Option, @@ -1506,30 +1366,60 @@ where } } - /// Returns an application's description. + /// Returns an application's description by reading the blob directly from storage. + /// + /// Does not track blob usage (which requires `&mut self`), making it safe for + /// concurrent reads. Blob tracking is only relevant during block execution and is + /// always rolled back for read-only queries. #[instrument(skip_all, fields( chain_id = %self.chain_id(), application_id = %application_id ))] - pub(super) async fn describe_application( - &mut self, + pub(crate) async fn describe_application_readonly( + &self, application_id: ApplicationId, ) -> Result { - self.initialize_and_save_if_needed().await?; - let response = self.chain.describe_application(application_id).await?; - Ok(response) + let blob_id = application_id.description_blob_id(); + let blob = self + .storage + .read_blob(blob_id) + .await? + .ok_or(WorkerError::BlobsNotFound(vec![blob_id]))?; + Ok(bcs::from_bytes(blob.bytes())?) + } + + /// Executes a block without persisting any changes to the state. + #[instrument(skip_all, fields( + chain_id = %self.chain_id(), + block_height = %block.height + ))] + pub(crate) async fn stage_block_execution( + &mut self, + block: ProposedBlock, + round: Option, + published_blobs: &[Blob], + ) -> Result<(Block, ChainInfoResponse, ResourceTracker), WorkerError> { + let (_, executed_block, response, resource_tracker) = self + .stage_block_execution_with_policy( + block, + round, + published_blobs, + BundleExecutionPolicy::committed(), + ) + .await?; + Ok((executed_block, response, resource_tracker)) } /// Executes a block without persisting any changes to the state, with a specified /// policy for handling bundle failures. /// /// The block may be modified to reflect the actual executed transactions - /// (bundles may be rejected or discarded based on the policy). + /// (bundles may be rejected or removed based on the policy). #[instrument(skip_all, fields( chain_id = %self.chain_id(), block_height = %block.height ))] - async fn stage_block_execution( + pub(crate) async fn stage_block_execution_with_policy( &mut self, block: ProposedBlock, round: Option, @@ -1569,7 +1459,7 @@ where chain_id = %self.chain_id(), block_height = %proposal.content.block.height ))] - pub(super) async fn handle_block_proposal( + pub(crate) async fn handle_block_proposal( &mut self, proposal: BlockProposal, ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> { @@ -1669,9 +1559,9 @@ where block_time_grace_period: self.config.block_time_grace_period, }); } - // Note: The actor delays processing proposals with future timestamps (within the grace - // period) so that other requests can be handled in the meantime. By the time we reach - // here, the block timestamp should be in the past or very close to the current time. + // Note: WorkerState::handle_block_proposal delays processing proposals with future + // timestamps (within the grace period) before acquiring the chain lock. By the time + // we reach here, the block timestamp should be in the past or very close to current time. self.chain .remove_bundles_from_inboxes(block.timestamp, true, block.incoming_bundles()) @@ -1731,7 +1621,7 @@ where #[instrument(skip_all, fields( chain_id = %self.chain_id() ))] - pub(super) async fn prepare_chain_info_response( + async fn prepare_chain_info_response( &mut self, query: ChainInfoQuery, ) -> Result { @@ -1753,7 +1643,6 @@ where .await?; } if let Some(next_block_height) = query.test_next_block_height { - // If not, send the same error as if a block with next_block_height was proposed. ensure!( self.chain.tip_state.get().next_block_height == next_block_height, WorkerError::UnexpectedBlockHeight { @@ -1875,7 +1764,7 @@ where #[instrument(skip_all, fields( chain_id = %self.chain_id() ))] - async fn initialize_and_save_if_needed(&mut self) -> Result<(), WorkerError> { + pub(crate) async fn initialize_and_save_if_needed(&mut self) -> Result<(), WorkerError> { if !self.knows_chain_is_active { let local_time = self.storage.clock().current_time(); self.chain.initialize_if_needed(local_time).await?; @@ -1885,18 +1774,15 @@ where Ok(()) } - fn chain_info_response(&self) -> ChainInfoResponse { + pub(crate) fn chain_info_response(&self) -> ChainInfoResponse { ChainInfoResponse::new(&self.chain, self.config.key_pair()) } /// Stores the chain state in persistent storage. - /// - /// Waits until the [`ChainStateView`] is no longer shared before persisting the changes. #[instrument(skip_all, fields( chain_id = %self.chain_id() ))] async fn save(&mut self) -> Result<(), WorkerError> { - self.clear_shared_chain_view().await; self.chain.save().await?; Ok(()) } @@ -1932,14 +1818,14 @@ fn check_block_epoch( /// Helper type for handling cross-chain updates. pub(crate) struct CrossChainUpdateHelper<'a> { - pub allow_messages_from_deprecated_epochs: bool, - pub current_epoch: Epoch, - pub committees: &'a BTreeMap, + pub(crate) allow_messages_from_deprecated_epochs: bool, + pub(crate) current_epoch: Epoch, + pub(crate) committees: &'a BTreeMap, } impl<'a> CrossChainUpdateHelper<'a> { /// Creates a new [`CrossChainUpdateHelper`]. - pub fn new(config: &ChainWorkerConfig, chain: &'a ChainStateView) -> Self + fn new(config: &ChainWorkerConfig, chain: &'a ChainStateView) -> Self where C: Context + Clone + 'static, { @@ -1959,7 +1845,7 @@ impl<'a> CrossChainUpdateHelper<'a> { /// * Basic invariants are checked for good measure. We still crucially trust /// the worker of the sending chain to have verified and executed the blocks /// correctly. - pub fn select_message_bundles( + pub(crate) fn select_message_bundles( &self, origin: &'a ChainId, recipient: ChainId, diff --git a/linera-core/src/client/mod.rs b/linera-core/src/client/mod.rs index f88e721a98f1..57a8e353777b 100644 --- a/linera-core/src/client/mod.rs +++ b/linera-core/src/client/mod.rs @@ -45,7 +45,7 @@ use linera_chain::{ Block, CertificateValue, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate, LiteCertificate, Timeout, TimeoutCertificate, ValidatedBlock, ValidatedBlockCertificate, }, - ChainError, ChainExecutionContext, ChainStateView, + ChainError, ChainExecutionContext, }; use linera_execution::{ committee::Committee, @@ -61,7 +61,7 @@ use rand::prelude::SliceRandom as _; use received_log::ReceivedLogs; use serde::{Deserialize, Serialize}; use thiserror::Error; -use tokio::sync::{mpsc, OwnedRwLockReadGuard}; +use tokio::sync::mpsc; use tokio_stream::wrappers::UnboundedReceiverStream; use tracing::{debug, error, info, instrument, trace, warn, Instrument as _}; use validator_trackers::ValidatorTrackers; @@ -78,7 +78,7 @@ use crate::{ remote_node::RemoteNode, updater::{communicate_with_quorum, CommunicateAction, CommunicationError, ValidatorUpdater}, worker::{Notification, ProcessableCertificate, Reason, WorkerError, WorkerState}, - CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES, + ChainWorkerConfig, CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES, }; mod chain_client_state; @@ -275,8 +275,8 @@ impl Client { long_lived_services: bool, chain_modes: impl IntoIterator, name: impl Into, - chain_worker_ttl: Duration, - sender_chain_worker_ttl: Duration, + chain_worker_ttl: Option, + sender_chain_worker_ttl: Option, priority_bundle_origins: HashSet, options: ChainClientOptions, requests_scheduler_config: requests_scheduler::RequestsSchedulerConfig, @@ -284,19 +284,23 @@ impl Client { execution_state_cache_size: usize, ) -> Self { let chain_modes = Arc::new(RwLock::new(chain_modes.into_iter().collect())); - let state = WorkerState::new_for_client( - name.into(), - environment.storage().clone(), - chain_modes.clone(), + let config = ChainWorkerConfig { + nickname: name.into(), + long_lived_services, + allow_inactive_chains: true, + allow_messages_from_deprecated_epochs: true, + ttl: chain_worker_ttl, + sender_chain_ttl: sender_chain_worker_ttl, + priority_bundle_origins, block_cache_size, execution_state_cache_size, - ) - .with_long_lived_services(long_lived_services) - .with_allow_inactive_chains(true) - .with_allow_messages_from_deprecated_epochs(true) - .with_chain_worker_ttl(chain_worker_ttl) - .with_sender_chain_worker_ttl(sender_chain_worker_ttl) - .with_priority_bundle_origins(priority_bundle_origins); + ..ChainWorkerConfig::default() + }; + let state = WorkerState::new( + environment.storage().clone(), + config, + Some(chain_modes.clone()), + ); let local_node = LocalNodeClient::new(state); let requests_scheduler = RequestsScheduler::new(vec![], requests_scheduler_config); @@ -2244,7 +2248,7 @@ impl ChainClient { #[instrument(level = "trace")] pub async fn chain_state_view( &self, - ) -> Result>, LocalNodeError> { + ) -> Result, LocalNodeError> { self.client.local_node.chain_state_view(self.chain_id).await } diff --git a/linera-core/src/lib.rs b/linera-core/src/lib.rs index 0992570fc6f7..0886a706ba6d 100644 --- a/linera-core/src/lib.rs +++ b/linera-core/src/lib.rs @@ -9,6 +9,7 @@ #![allow(async_fn_in_trait)] mod chain_worker; +pub use chain_worker::ChainWorkerConfig; pub mod client; pub use client::Client; pub mod data_types; diff --git a/linera-core/src/local_node.rs b/linera-core/src/local_node.rs index 22101dca484f..25c3b15519d8 100644 --- a/linera-core/src/local_node.rs +++ b/linera-core/src/local_node.rs @@ -16,13 +16,11 @@ use linera_base::{ use linera_chain::{ data_types::{BlockProposal, BundleExecutionPolicy, ProposedBlock}, types::{Block, GenericCertificate}, - ChainStateView, }; use linera_execution::{committee::Committee, BlobState, Query, QueryOutcome, ResourceTracker}; use linera_storage::Storage; use linera_views::ViewError; use thiserror::Error; -use tokio::sync::OwnedRwLockReadGuard; use tracing::{instrument, warn}; use crate::{ @@ -152,7 +150,7 @@ where Ok(self .node .state - .stage_block_execution(block, round, published_blobs, policy) + .stage_block_execution_with_policy(block, round, published_blobs, policy) .await?) } @@ -232,7 +230,7 @@ where pub async fn chain_state_view( &self, chain_id: ChainId, - ) -> Result>, LocalNodeError> { + ) -> Result, LocalNodeError> { Ok(self.node.state.chain_state_view(chain_id).await?) } diff --git a/linera-core/src/unit_tests/test_utils.rs b/linera-core/src/unit_tests/test_utils.rs index 0ce5f9f6f210..06c1f6ff0f4b 100644 --- a/linera-core/src/unit_tests/test_utils.rs +++ b/linera-core/src/unit_tests/test_utils.rs @@ -56,10 +56,8 @@ use crate::{ ValidatorNodeProvider, }, notifier::ChannelNotifier, - worker::{ - Notification, ProcessableCertificate, WorkerState, DEFAULT_BLOCK_CACHE_SIZE, - DEFAULT_EXECUTION_STATE_CACHE_SIZE, - }, + worker::{Notification, ProcessableCertificate, WorkerState}, + ChainWorkerConfig, }; #[derive(Debug, PartialEq, Clone, Copy)] @@ -891,15 +889,12 @@ where for (i, (validator_keypair, _account_public_key)) in validators.into_iter().enumerate() { let validator_public_key = validator_keypair.public_key; let storage = storage_builder.build().await?; - let state = WorkerState::new( - format!("Node {}", i), - Some(validator_keypair.secret_key), - storage.clone(), - DEFAULT_BLOCK_CACHE_SIZE, - DEFAULT_EXECUTION_STATE_CACHE_SIZE, - ) - .with_allow_inactive_chains(false) - .with_allow_messages_from_deprecated_epochs(false); + let config = ChainWorkerConfig { + nickname: format!("Node {}", i), + ..ChainWorkerConfig::default() + } + .with_key_pair(Some(validator_keypair.secret_key)); + let state = WorkerState::new(storage.clone(), config, None); let mut validator = LocalValidatorClient::new(validator_public_key, state); if i < with_faulty_validators { faulty_validators.insert(validator_public_key); @@ -1094,13 +1089,13 @@ where false, [(chain_id, ListeningMode::FullChain)], format!("Client node for {:.8}", chain_id), - Duration::from_secs(30), - Duration::from_secs(1), + Some(Duration::from_secs(30)), + Some(Duration::from_secs(1)), HashSet::new(), options, crate::client::RequestsSchedulerConfig::default(), - DEFAULT_BLOCK_CACHE_SIZE, - DEFAULT_EXECUTION_STATE_CACHE_SIZE, + crate::worker::DEFAULT_BLOCK_CACHE_SIZE, + crate::worker::DEFAULT_EXECUTION_STATE_CACHE_SIZE, )); Ok(client.create_chain_client(chain_id, block_hash, block_height, None, owner, None)) } diff --git a/linera-core/src/unit_tests/wasm_worker_tests.rs b/linera-core/src/unit_tests/wasm_worker_tests.rs index 64f51f38bd6d..3f1117e217b0 100644 --- a/linera-core/src/unit_tests/wasm_worker_tests.rs +++ b/linera-core/src/unit_tests/wasm_worker_tests.rs @@ -398,7 +398,7 @@ where .with_operation(publish_meta_op); let (_, publish_executed, _, _) = env .worker() - .stage_block_execution( + .stage_block_execution_with_policy( publish_block, None, all_blobs.to_vec(), @@ -433,7 +433,7 @@ where .with_operation(create_counter_op); let (_, create_counter_executed, _, _) = env .worker() - .stage_block_execution( + .stage_block_execution_with_policy( create_counter_block, None, vec![], @@ -468,7 +468,7 @@ where .with_operation(create_meta_op); let (_, create_meta_executed, _, _) = env .worker() - .stage_block_execution( + .stage_block_execution_with_policy( create_meta_block, None, vec![], @@ -491,7 +491,7 @@ where }); let (_, send_fail_executed, _, _) = env .worker() - .stage_block_execution( + .stage_block_execution_with_policy( send_fail_block, None, vec![], @@ -536,7 +536,7 @@ where // This should handle the failing message by rejecting the bundle. let (modified_block, auto_retry_executed, _, _) = env .worker() - .stage_block_execution( + .stage_block_execution_with_policy( proposed_block.clone(), None, vec![], @@ -565,7 +565,7 @@ where // and produce the same outcome. let (_, abort_executed, _, _) = env .worker() - .stage_block_execution( + .stage_block_execution_with_policy( modified_block.clone(), None, vec![], diff --git a/linera-core/src/unit_tests/worker_tests.rs b/linera-core/src/unit_tests/worker_tests.rs index d68cc2719566..8eef22ecc8c4 100644 --- a/linera-core/src/unit_tests/worker_tests.rs +++ b/linera-core/src/unit_tests/worker_tests.rs @@ -73,6 +73,7 @@ use crate::{ Reason::{self, NewBlock, NewIncomingBundle}, WorkerError, WorkerState, }, + ChainWorkerConfig, }; /// The test worker accepts blocks with a timestamp this far in the future. @@ -143,17 +144,16 @@ where .await .expect("writing a network description should not fail"); - let worker = WorkerState::new( - "Single validator node".to_string(), - Some(validator_keypair.secret_key), - storage, - super::DEFAULT_BLOCK_CACHE_SIZE, - super::DEFAULT_EXECUTION_STATE_CACHE_SIZE, - ) - .with_allow_inactive_chains(is_client) - .with_allow_messages_from_deprecated_epochs(is_client) - .with_long_lived_services(has_long_lived_services) - .with_block_time_grace_period(Duration::from_micros(TEST_GRACE_PERIOD_MICROS)); + let config = ChainWorkerConfig { + nickname: "Single validator node".to_string(), + allow_inactive_chains: is_client, + allow_messages_from_deprecated_epochs: is_client, + long_lived_services: has_long_lived_services, + block_time_grace_period: Duration::from_micros(TEST_GRACE_PERIOD_MICROS), + ..ChainWorkerConfig::default() + } + .with_key_pair(Some(validator_keypair.secret_key)); + let worker = WorkerState::new(storage, config, None); Self { committee, worker, @@ -788,7 +788,7 @@ where // Stage execution to get the block for certificate creation. let (_, block, _, _) = env .worker() - .stage_block_execution( + .stage_block_execution_with_policy( proposed_block, None, vec![], @@ -817,7 +817,7 @@ where .unwrap(); let (_, block, _, _) = env .worker() - .stage_block_execution( + .stage_block_execution_with_policy( proposed_block, None, vec![], @@ -845,7 +845,7 @@ where .unwrap(); let (_, block, _, _) = env .worker() - .stage_block_execution( + .stage_block_execution_with_policy( proposed_block, None, vec![], @@ -3496,7 +3496,7 @@ where .with_authenticated_signer(Some(owner0)); let (_, block0, _, _) = env .worker() - .stage_block_execution( + .stage_block_execution_with_policy( proposed_block0, None, vec![], @@ -3563,7 +3563,7 @@ where let proposed_block1 = make_child_block(&value0).with_simple_transfer(chain_1, small_transfer); let (_, block1, _, _) = env .worker() - .stage_block_execution( + .stage_block_execution_with_policy( proposed_block1.clone(), None, vec![], @@ -3618,7 +3618,7 @@ where let proposed_block2 = make_child_block(&value0.clone()).with_simple_transfer(chain_1, amount); let (_, block2, _, _) = env .worker() - .stage_block_execution( + .stage_block_execution_with_policy( proposed_block2.clone(), None, vec![], @@ -3762,7 +3762,7 @@ where }); let (_, block0, _, _) = env .worker() - .stage_block_execution( + .stage_block_execution_with_policy( proposed_block0, None, vec![], @@ -3879,7 +3879,7 @@ where }); let (_, block0, _, _) = env .worker() - .stage_block_execution( + .stage_block_execution_with_policy( proposed_block0, None, vec![], @@ -3912,7 +3912,7 @@ where .unwrap(); let (_, block1, _, _) = env .worker() - .stage_block_execution( + .stage_block_execution_with_policy( proposed_block1.clone(), None, vec![], @@ -3974,7 +3974,7 @@ where // A validated block certificate from a later round can override the locked fast block. let (_, block2, _, _) = env .worker() - .stage_block_execution( + .stage_block_execution_with_policy( proposed_block2.clone(), None, vec![], @@ -4345,7 +4345,7 @@ where // Test stage_block_execution directly - this should fail with IncorrectMessageOrder. assert_matches!( env.worker() - .stage_block_execution(bad_proposed_block.clone(), None, vec![], BundleExecutionPolicy::committed()) + .stage_block_execution_with_policy(bad_proposed_block.clone(), None, vec![], BundleExecutionPolicy::committed()) .await, Err(WorkerError::ChainError(chain_error)) if matches!(*chain_error, ChainError::IncorrectMessageOrder { .. }) diff --git a/linera-core/src/worker.rs b/linera-core/src/worker.rs index ba122723ae6d..3ce465f74729 100644 --- a/linera-core/src/worker.rs +++ b/linera-core/src/worker.rs @@ -3,24 +3,25 @@ // SPDX-License-Identifier: Apache-2.0 use std::{ - collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque}, + collections::{BTreeMap, BTreeSet, HashMap, VecDeque}, sync::{Arc, Mutex, RwLock}, time::Duration, }; -use futures::future::Either; +use futures::{ + future::{Either, Shared}, + FutureExt as _, +}; use linera_base::{ - crypto::{CryptoError, CryptoHash, ValidatorPublicKey, ValidatorSecretKey}, + crypto::{CryptoError, CryptoHash, ValidatorPublicKey}, data_types::{ - ApplicationDescription, ArithmeticError, Blob, BlockHeight, Epoch, Round, Timestamp, + ApplicationDescription, ArithmeticError, Blob, BlockHeight, Epoch, Round, TimeDelta, + Timestamp, }, doc_scalar, hashed::Hashed, identifiers::{AccountOwner, ApplicationId, BlobId, ChainId, EventId, StreamId}, - time::Instant, - util::traits::DynError, }; -use linera_cache::{UniqueValueCache, ValueCache}; #[cfg(with_testing)] use linera_chain::ChainExecutionContext; use linera_chain::{ @@ -34,45 +35,75 @@ use linera_chain::{ ChainError, ChainStateView, }; use linera_execution::{ExecutionError, ExecutionStateView, Query, QueryOutcome, ResourceTracker}; -use linera_storage::Storage; +use linera_storage::{Clock as _, Storage}; use linera_views::{context::InactiveContext, ViewError}; use serde::{Deserialize, Serialize}; use thiserror::Error; -use tokio::sync::{mpsc, oneshot, OwnedRwLockReadGuard}; -use tracing::{error, instrument, trace, warn}; +use tokio::sync::{oneshot, OwnedRwLockReadGuard}; +use tracing::{instrument, trace, warn}; + +/// A read guard providing access to a chain's [`ChainStateView`]. +/// +/// Holds a read lock on the chain worker state, preventing writes for its +/// lifetime. The `OwnedRwLockReadGuard` internally holds a strong `Arc` +/// reference to the `RwLock`, keeping the state alive. +/// Dereferences to `ChainStateView`. +pub struct ChainStateViewReadGuard( + OwnedRwLockReadGuard, ChainStateView>, +); + +impl std::ops::Deref for ChainStateViewReadGuard { + type Target = ChainStateView; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +use linera_cache::{UniqueValueCache, ValueCache}; /// Re-export of [`EventSubscriptionsResult`] for use by other crate modules. -pub(crate) use crate::chain_worker::{ - ChainWorkerRequestReceiver, ChainWorkerRequestSender, EventSubscriptionsResult, -}; +pub(crate) use crate::chain_worker::EventSubscriptionsResult; use crate::{ chain_worker::{ - BlockOutcome, ChainWorkerActor, ChainWorkerConfig, ChainWorkerRequest, DeliveryNotifier, + handle, state::ChainWorkerState, BlockOutcome, ChainWorkerConfig, DeliveryNotifier, }, client::ListeningMode, data_types::{ChainInfoQuery, ChainInfoResponse, CrossChainRequest}, - join_set_ext::{JoinSet, JoinSetExt}, notifier::Notifier, - CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES, }; -pub const DEFAULT_BLOCK_CACHE_SIZE: usize = 5_000; -pub const DEFAULT_EXECUTION_STATE_CACHE_SIZE: usize = 10_000; - #[cfg(test)] #[path = "unit_tests/worker_tests.rs"] mod worker_tests; +/// Wraps a future in `SyncFuture` on non-web targets so that it satisfies `Sync` bounds. +/// On web targets the future is returned as-is. +#[cfg(not(web))] +pub(crate) fn wrap_future(f: F) -> sync_wrapper::SyncFuture { + sync_wrapper::SyncFuture::new(f) +} + +/// Wraps a future in `SyncFuture` on non-web targets so that it satisfies `Sync` bounds. +/// On web targets the future is returned as-is. +#[cfg(web)] +pub(crate) fn wrap_future(f: F) -> F { + f +} + +pub const DEFAULT_BLOCK_CACHE_SIZE: usize = 5_000; +pub const DEFAULT_EXECUTION_STATE_CACHE_SIZE: usize = 10_000; + #[cfg(with_metrics)] mod metrics { use std::sync::LazyLock; use linera_base::prometheus_util::{ exponential_bucket_interval, register_histogram, register_histogram_vec, - register_int_counter, register_int_counter_vec, register_int_gauge, + register_int_counter, register_int_counter_vec, }; use linera_chain::types::ConfirmedBlockCertificate; - use prometheus::{Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge}; + use prometheus::{Histogram, HistogramVec, IntCounter, IntCounterVec}; pub static NUM_ROUNDS_IN_CERTIFICATE: LazyLock = LazyLock::new(|| { register_histogram_vec( @@ -147,14 +178,6 @@ mod metrics { ) }); - /// Number of cached chain worker channel endpoints in the map. - pub static CHAIN_WORKER_ENDPOINTS_CACHED: LazyLock = LazyLock::new(|| { - register_int_gauge( - "chain_worker_endpoints_cached", - "Number of cached chain worker channel endpoints", - ) - }); - /// Holds metrics data extracted from a confirmed block certificate. pub struct MetricsData { certificate_log_str: &'static str, @@ -308,7 +331,7 @@ pub enum WorkerError { #[error("Block was not signed by an authorized owner")] InvalidOwner, - #[error("Operations in the block are not authenticated by the proper signer: {0}")] + #[error("Operations in the block are not authenticated by the proper owner: {0}")] InvalidSigner(AccountOwner), // Chaining @@ -382,17 +405,6 @@ pub enum WorkerError { TooManyPublishedBlobs(u64), #[error("Missing network description")] MissingNetworkDescription, - #[error("ChainWorkerActor for chain {chain_id} stopped executing unexpectedly: {error}")] - ChainActorSendError { - chain_id: ChainId, - error: Box, - }, - #[error("ChainWorkerActor for chain {chain_id} stopped executing without responding: {error}")] - ChainActorRecvError { - chain_id: ChainId, - error: Box, - }, - #[error("thread error: {0}")] Thread(#[from] web_thread_pool::Error), @@ -431,8 +443,6 @@ impl WorkerError { | WorkerError::ConfirmedLogEntryNotFound { .. } | WorkerError::PreprocessedBlocksEntryNotFound { .. } | WorkerError::MissingNetworkDescription - | WorkerError::ChainActorSendError { .. } - | WorkerError::ChainActorRecvError { .. } | WorkerError::Thread(_) | WorkerError::ReadCertificatesError(_) => true, WorkerError::ChainError(chain_error) => chain_error.is_local(), @@ -479,16 +489,55 @@ impl WorkerError { } } +type ChainWorkerArc = Arc>>; +type ChainWorkerWeak = std::sync::Weak>>; +type ChainWorkerFuture = Shared>>; + +/// Each map entry is a `Shared>>`: +/// +/// - `peek()` returns `None` while a task is loading the worker from storage. +/// - `peek()` returns `Some(Ok(weak))` once the worker is loaded. +/// - `peek()` returns `Some(Err(_))` if loading failed (sender dropped). +/// +/// Callers that find a pending entry clone the `Shared` future and await it. +type ChainWorkerMap = Arc>>; + +/// Starts a background task that periodically removes dead weak references +/// from the chain handle map. The actual lifetime management is handled by +/// each handle's keep-alive task. +fn start_sweep( + chain_workers: &ChainWorkerMap, + config: &ChainWorkerConfig, +) { + // Sweep at the smaller of the two TTLs. If both are None, workers + // live forever so there's nothing to sweep. + let interval = match (config.ttl, config.sender_chain_ttl) { + (None, None) => return, + (Some(d), None) | (None, Some(d)) => d, + (Some(a), Some(b)) => a.min(b), + }; + let weak_map = Arc::downgrade(chain_workers); + linera_base::Task::spawn(async move { + loop { + linera_base::time::timer::sleep(interval).await; + let Some(map) = weak_map.upgrade() else { + break; + }; + map.pin_owned().retain(|_, shared| match shared.peek() { + Some(Ok(weak)) => weak.strong_count() > 0, + Some(Err(_)) => false, // Loading failed; clean up. + None => true, // Still loading; keep. + }); + } + }) + .forget(); +} + /// State of a worker in a validator or a local node. -pub struct WorkerState -where - StorageClient: Storage, -{ - /// A name used for logging - nickname: String, +pub struct WorkerState { /// Access to local persistent storage. storage: StorageClient, - /// Configuration options for the [`ChainWorker`]s. + /// Configuration options for chain workers. chain_worker_config: ChainWorkerConfig, block_cache: Arc>>, execution_state_cache: Arc>>, @@ -497,10 +546,10 @@ where /// One-shot channels to notify callers when messages of a particular chain have been /// delivered. delivery_notifiers: Arc>, - /// The set of spawned [`ChainWorkerActor`] tasks. - chain_worker_tasks: Arc>, - /// The cache of running [`ChainWorkerActor`]s. - chain_workers: Arc>>>, + /// The cache of loaded chain workers. Stores weak references; each worker + /// manages its own lifetime via a keep-alive task. A background sweep + /// periodically removes dead entries. + chain_workers: ChainWorkerMap, } impl Clone for WorkerState @@ -509,154 +558,37 @@ where { fn clone(&self) -> Self { WorkerState { - nickname: self.nickname.clone(), storage: self.storage.clone(), chain_worker_config: self.chain_worker_config.clone(), block_cache: self.block_cache.clone(), execution_state_cache: self.execution_state_cache.clone(), chain_modes: self.chain_modes.clone(), delivery_notifiers: self.delivery_notifiers.clone(), - chain_worker_tasks: self.chain_worker_tasks.clone(), chain_workers: self.chain_workers.clone(), } } } -/// The sender endpoint for [`ChainWorkerRequest`]s. -type ChainActorEndpoint = mpsc::UnboundedSender<( - ChainWorkerRequest<::Context>, - tracing::Span, - Instant, -)>; - pub(crate) type DeliveryNotifiers = HashMap; impl WorkerState where StorageClient: Storage, { - #[instrument(level = "trace", skip(nickname, key_pair, storage))] - pub fn new( - nickname: String, - key_pair: Option, - storage: StorageClient, - block_cache_size: usize, - execution_state_cache_size: usize, - ) -> Self { - WorkerState { - nickname, - storage, - chain_worker_config: ChainWorkerConfig::default().with_key_pair(key_pair), - block_cache: Arc::new(ValueCache::new(block_cache_size)), - execution_state_cache: Arc::new(UniqueValueCache::new(execution_state_cache_size)), - chain_modes: None, - delivery_notifiers: Arc::default(), - chain_worker_tasks: Arc::default(), - chain_workers: Arc::new(Mutex::new(BTreeMap::new())), - } - } - - #[instrument(level = "trace", skip(nickname, storage))] - pub fn new_for_client( - nickname: String, - storage: StorageClient, - chain_modes: Arc>>, - block_cache_size: usize, - execution_state_cache_size: usize, - ) -> Self { - WorkerState { - nickname, - storage, - chain_worker_config: ChainWorkerConfig::default(), - block_cache: Arc::new(ValueCache::new(block_cache_size)), - execution_state_cache: Arc::new(UniqueValueCache::new(execution_state_cache_size)), - chain_modes: Some(chain_modes), - delivery_notifiers: Arc::default(), - chain_worker_tasks: Arc::default(), - chain_workers: Arc::new(Mutex::new(BTreeMap::new())), - } - } - - #[instrument(level = "trace", skip(self, value))] - pub fn with_allow_inactive_chains(mut self, value: bool) -> Self { - self.chain_worker_config.allow_inactive_chains = value; - self - } - - #[instrument(level = "trace", skip(self, value))] - pub fn with_allow_messages_from_deprecated_epochs(mut self, value: bool) -> Self { - self.chain_worker_config - .allow_messages_from_deprecated_epochs = value; - self - } - - #[instrument(level = "trace", skip(self, value))] - pub fn with_long_lived_services(mut self, value: bool) -> Self { - self.chain_worker_config.long_lived_services = value; - self - } - - /// Returns an instance with the specified block time grace period. - /// - /// Blocks with a timestamp this far in the future will still be accepted, but the validator - /// will wait until that timestamp before voting. - #[instrument(level = "trace", skip(self))] - pub fn with_block_time_grace_period(mut self, block_time_grace_period: Duration) -> Self { - self.chain_worker_config.block_time_grace_period = block_time_grace_period; - self - } - - /// Returns an instance with the specified chain worker TTL. - /// - /// Idle chain workers free their memory after that duration without requests. - #[instrument(level = "trace", skip(self))] - pub fn with_chain_worker_ttl(mut self, chain_worker_ttl: Duration) -> Self { - self.chain_worker_config.ttl = chain_worker_ttl; - self - } - - /// Returns an instance with the specified sender chain worker TTL. - /// - /// Idle sender chain workers free their memory after that duration without requests. #[instrument(level = "trace", skip(self))] - pub fn with_sender_chain_worker_ttl(mut self, sender_chain_worker_ttl: Duration) -> Self { - self.chain_worker_config.sender_chain_ttl = sender_chain_worker_ttl; - self - } - - /// Returns an instance with the specified set of chain IDs whose incoming bundles - /// should be processed first. - #[instrument(level = "trace", skip(self, origins))] - pub fn with_priority_bundle_origins(mut self, origins: HashSet) -> Self { - self.chain_worker_config.priority_bundle_origins = origins; - self + pub fn nickname(&self) -> &str { + &self.chain_worker_config.nickname } - /// Returns an instance with the specified maximum size for received_log entries. - /// - /// Sizes below `CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES` should be avoided. - #[instrument(level = "trace", skip(self))] - pub fn with_chain_info_max_received_log_entries( + /// Sets the priority bundle origins. + pub fn with_priority_bundle_origins( mut self, - chain_info_max_received_log_entries: usize, + origins: std::collections::HashSet, ) -> Self { - if chain_info_max_received_log_entries < CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES { - warn!( - "The value set for the maximum size of received_log entries \ - may not be compatible with the latest clients: {} instead of {}", - chain_info_max_received_log_entries, CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES - ); - } - self.chain_worker_config.chain_info_max_received_log_entries = - chain_info_max_received_log_entries; + self.chain_worker_config.priority_bundle_origins = origins; self } - #[instrument(level = "trace", skip(self))] - pub fn nickname(&self) -> &str { - &self.nickname - } - /// Returns the storage client so that it can be manipulated or queried. #[instrument(level = "trace", skip(self))] #[cfg(not(feature = "test"))] @@ -744,6 +676,31 @@ impl WorkerState where StorageClient: Storage + Clone + 'static, { + /// Creates a new `WorkerState`. + /// + /// The `chain_worker_config` must be fully configured before calling this, because the + /// TTL sweep task is started immediately based on the config's TTL values. + #[instrument(level = "trace", skip(storage, chain_worker_config))] + pub fn new( + storage: StorageClient, + chain_worker_config: ChainWorkerConfig, + chain_modes: Option>>>, + ) -> Self { + let chain_workers = Arc::new(papaya::HashMap::new()); + start_sweep(&chain_workers, &chain_worker_config); + let block_cache_size = chain_worker_config.block_cache_size; + let execution_state_cache_size = chain_worker_config.execution_state_cache_size; + WorkerState { + storage, + chain_worker_config, + block_cache: Arc::new(ValueCache::new(block_cache_size)), + execution_state_cache: Arc::new(UniqueValueCache::new(execution_state_cache_size)), + chain_modes, + delivery_notifiers: Arc::default(), + chain_workers, + } + } + #[instrument(level = "trace", skip(self, certificate, notifier))] #[inline] pub async fn fully_handle_certificate_with_notifications( @@ -771,26 +728,200 @@ where .await } + /// Acquires a read lock on the chain worker and executes the given closure. + /// + /// The future is boxed to keep deeply nested types off the stack. On non-web + /// targets it is also wrapped in `SyncFuture` to satisfy `Sync` bounds. + async fn chain_read(&self, chain_id: ChainId, f: F) -> Result + where + F: FnOnce(OwnedRwLockReadGuard>) -> Fut, + Fut: std::future::Future>, + { + let state = self.get_or_create_chain_worker(chain_id).await?; + Box::pin(wrap_future(async move { + let guard = handle::read_lock(&state).await; + f(guard).await + })) + .await + } + + /// Acquires a write lock on the chain worker and executes the given closure. + /// + /// The [`RollbackGuard`] automatically rolls back uncommitted chain state changes + /// when dropped, ensuring cancellation safety. The future is boxed to keep deeply + /// nested types off the stack. + async fn chain_write(&self, chain_id: ChainId, f: F) -> Result + where + F: FnOnce(handle::RollbackGuard) -> Fut, + Fut: std::future::Future>, + { + let state = self.get_or_create_chain_worker(chain_id).await?; + Box::pin(wrap_future(async move { + let guard = handle::write_lock(&state).await; + f(guard).await + })) + .await + } + + /// Gets or creates a chain worker for the given chain. + /// + /// The oneshot channel is created outside the `compute` closure to keep + /// the closure pure (papaya may call it more than once on CAS retry and + /// may memoize the output). If the fast path hits, the unused channel is + /// dropped harmlessly. + /// + /// Returns a type-erased future to keep `!Sync` intermediate types (e.g. + /// `std::sync::mpsc::Receiver` from `handle::ServiceRuntimeActor::spawn`) out of + /// the caller's future type. + fn get_or_create_chain_worker( + &self, + chain_id: ChainId, + ) -> std::pin::Pin< + Box< + impl std::future::Future, WorkerError>> + '_, + >, + > { + Box::pin(wrap_future(async move { + loop { + // Create the channel outside the closure so that the tx/rx + // always match regardless of CAS retries. + let (tx, rx) = oneshot::channel(); + let shared_rx = rx.shared(); + + // The papaya guard is !Send, so it must be dropped before + // any .await point. + let wait_or_tx = { + let pin = self.chain_workers.pin(); + match pin.compute(chain_id, |existing| match existing { + Some((_, entry)) => match entry.peek() { + Some(Ok(weak)) => match weak.upgrade() { + Some(arc) => papaya::Operation::Abort(Ok(arc)), + None => papaya::Operation::Insert(shared_rx.clone()), + }, + Some(Err(_)) => papaya::Operation::Insert(shared_rx.clone()), + None => papaya::Operation::Abort(Err(entry.clone())), + }, + None => papaya::Operation::Insert(shared_rx.clone()), + }) { + papaya::Compute::Aborted(Ok(arc), ..) => return Ok(arc), + papaya::Compute::Aborted(Err(wait), ..) => Either::Left(wait), + papaya::Compute::Inserted { .. } | papaya::Compute::Updated { .. } => { + Either::Right(tx) + } + papaya::Compute::Removed { .. } => unreachable!(), + } + }; + + match wait_or_tx { + Either::Left(wait) => { + // Another task is loading. Await the shared future. + if let Ok(weak) = wait.await { + if let Some(arc) = weak.upgrade() { + return Ok(arc); + } + } + // Loading failed or worker already dead; retry. + } + Either::Right(tx) => { + // We claimed the loading slot. Load from storage. + // On success, send the Weak through the channel. + // On error, dropping tx wakes waiters so they can retry. + let worker = self.load_chain_worker(chain_id).await?; + if tx.send(Arc::downgrade(&worker)).is_err() { + tracing::error!(%chain_id, "Receiver dropped while loading worker state."); + continue; + } + return Ok(worker); + } + } + } + })) + } + + /// Loads a chain worker state from storage and wraps it in an Arc. + async fn load_chain_worker( + &self, + chain_id: ChainId, + ) -> Result, WorkerError> { + let delivery_notifier = self + .delivery_notifiers + .lock() + .unwrap() + .entry(chain_id) + .or_default() + .clone(); + + let is_tracked = self.chain_modes.as_ref().is_some_and(|chain_modes| { + chain_modes + .read() + .unwrap() + .get(&chain_id) + .is_some_and(ListeningMode::is_full) + }); + + let (service_runtime_endpoint, service_runtime_task) = + if self.chain_worker_config.long_lived_services { + let actor = + handle::ServiceRuntimeActor::spawn(chain_id, self.storage.thread_pool()).await; + (Some(actor.endpoint), Some(actor.task)) + } else { + (None, None) + }; + + let state = crate::chain_worker::state::ChainWorkerState::load( + self.chain_worker_config.clone(), + self.storage.clone(), + self.block_cache.clone(), + self.execution_state_cache.clone(), + self.chain_modes.clone(), + delivery_notifier, + chain_id, + service_runtime_endpoint, + service_runtime_task, + ) + .await?; + + Ok(handle::create_chain_worker( + state, + is_tracked, + &self.chain_worker_config, + )) + } + + /// Tries to execute a block proposal without any verification other than block execution. + #[instrument(level = "trace", skip(self, block))] + pub async fn stage_block_execution( + &self, + block: ProposedBlock, + round: Option, + published_blobs: Vec, + ) -> Result<(Block, ChainInfoResponse, ResourceTracker), WorkerError> { + let chain_id = block.chain_id; + self.chain_write(chain_id, |mut guard| async move { + guard + .stage_block_execution(block, round, &published_blobs) + .await + }) + .await + } + /// Tries to execute a block proposal with a policy for handling bundle failures. /// /// Returns the modified block (bundles may be rejected/removed), the executed block, /// chain info response, and resource tracker. #[instrument(level = "trace", skip(self, block))] - pub async fn stage_block_execution( + pub async fn stage_block_execution_with_policy( &self, block: ProposedBlock, round: Option, published_blobs: Vec, policy: BundleExecutionPolicy, ) -> Result<(ProposedBlock, Block, ChainInfoResponse, ResourceTracker), WorkerError> { - self.query_chain_worker(block.chain_id, move |callback| { - ChainWorkerRequest::StageBlockExecution { - block, - round, - published_blobs, - policy, - callback, - } + let chain_id = block.chain_id; + self.chain_write(chain_id, |mut guard| async move { + guard + .stage_block_execution_with_policy(block, round, &published_blobs, policy) + .await }) .await } @@ -806,18 +937,14 @@ where query: Query, block_hash: Option, ) -> Result<(QueryOutcome, BlockHeight), WorkerError> { - self.query_chain_worker(chain_id, move |callback| { - ChainWorkerRequest::QueryApplication { - query, - block_hash, - callback, - } + self.chain_write(chain_id, |mut guard| async move { + guard.query_application(query, block_hash).await }) .await } #[instrument(level = "trace", skip(self, chain_id, application_id), fields( - nickname = %self.nickname, + nickname = %self.nickname(), chain_id = %chain_id, application_id = %application_id ))] @@ -826,13 +953,9 @@ where chain_id: ChainId, application_id: ApplicationId, ) -> Result { - self.query_chain_worker(chain_id, move |callback| { - ChainWorkerRequest::DescribeApplication { - application_id, - callback, - } - }) - .await + let state = self.get_or_create_chain_worker(chain_id).await?; + let guard = handle::read_lock_initialized(&state).await?; + guard.describe_application_readonly(application_id).await } /// Processes a confirmed block (aka a commit). @@ -840,7 +963,7 @@ where level = "trace", skip(self, certificate, notify_when_messages_are_delivered), fields( - nickname = %self.nickname, + nickname = %self.nickname(), chain_id = %certificate.block().header.chain_id, block_height = %certificate.block().header.height ) @@ -851,19 +974,17 @@ where notify_when_messages_are_delivered: Option>, ) -> Result<(ChainInfoResponse, NetworkActions, BlockOutcome), WorkerError> { let chain_id = certificate.block().header.chain_id; - self.query_chain_worker(chain_id, move |callback| { - ChainWorkerRequest::ProcessConfirmedBlock { - certificate, - notify_when_messages_are_delivered, - callback, - } + self.chain_write(chain_id, |mut guard| async move { + guard + .process_confirmed_block(certificate, notify_when_messages_are_delivered) + .await }) .await } /// Processes a validated block issued from a multi-owner chain. #[instrument(level = "trace", skip(self, certificate), fields( - nickname = %self.nickname, + nickname = %self.nickname(), chain_id = %certificate.block().header.chain_id, block_height = %certificate.block().header.height ))] @@ -872,18 +993,15 @@ where certificate: ValidatedBlockCertificate, ) -> Result<(ChainInfoResponse, NetworkActions, BlockOutcome), WorkerError> { let chain_id = certificate.block().header.chain_id; - self.query_chain_worker(chain_id, move |callback| { - ChainWorkerRequest::ProcessValidatedBlock { - certificate, - callback, - } + self.chain_write(chain_id, |mut guard| async move { + guard.process_validated_block(certificate).await }) .await } /// Processes a leader timeout issued from a multi-owner chain. #[instrument(level = "trace", skip(self, certificate), fields( - nickname = %self.nickname, + nickname = %self.nickname(), chain_id = %certificate.value().chain_id(), height = %certificate.value().height() ))] @@ -892,17 +1010,14 @@ where certificate: TimeoutCertificate, ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> { let chain_id = certificate.value().chain_id(); - self.query_chain_worker(chain_id, move |callback| { - ChainWorkerRequest::ProcessTimeout { - certificate, - callback, - } + self.chain_write(chain_id, |mut guard| async move { + guard.process_timeout(certificate).await }) .await } #[instrument(level = "trace", skip(self, origin, recipient, bundles), fields( - nickname = %self.nickname, + nickname = %self.nickname(), origin = %origin, recipient = %recipient, num_bundles = %bundles.len() @@ -913,19 +1028,15 @@ where recipient: ChainId, bundles: Vec<(Epoch, MessageBundle)>, ) -> Result, WorkerError> { - self.query_chain_worker(recipient, move |callback| { - ChainWorkerRequest::ProcessCrossChainUpdate { - origin, - bundles, - callback, - } + self.chain_write(recipient, |mut guard| async move { + guard.process_cross_chain_update(origin, bundles).await }) .await } /// Returns a stored [`ConfirmedBlockCertificate`] for a chain's block. #[instrument(level = "trace", skip(self, chain_id, height), fields( - nickname = %self.nickname, + nickname = %self.nickname(), chain_id = %chain_id, height = %height ))] @@ -935,147 +1046,34 @@ where chain_id: ChainId, height: BlockHeight, ) -> Result, WorkerError> { - self.query_chain_worker(chain_id, move |callback| { - ChainWorkerRequest::ReadCertificate { height, callback } - }) - .await + let state = self.get_or_create_chain_worker(chain_id).await?; + let guard = handle::read_lock_initialized(&state).await?; + guard.read_certificate(height).await } /// Returns a read-only view of the [`ChainStateView`] of a chain referenced by its /// [`ChainId`]. /// - /// The returned view holds a lock on the chain state, which prevents the worker from changing - /// the state of that chain. + /// The returned guard holds a read lock on the chain state, preventing writes for + /// its lifetime. Multiple concurrent readers are allowed. #[instrument(level = "trace", skip(self), fields( - nickname = %self.nickname, + nickname = %self.nickname(), chain_id = %chain_id ))] pub async fn chain_state_view( &self, chain_id: ChainId, - ) -> Result>, WorkerError> { - self.query_chain_worker(chain_id, |callback| ChainWorkerRequest::GetChainStateView { - callback, - }) - .await - } - - /// Sends a request to the [`ChainWorker`] for a [`ChainId`] and waits for the `Response`. - #[instrument(level = "trace", skip(self, request_builder), fields( - nickname = %self.nickname, - chain_id = %chain_id - ))] - async fn query_chain_worker( - &self, - chain_id: ChainId, - request_builder: impl FnOnce( - oneshot::Sender>, - ) -> ChainWorkerRequest, - ) -> Result { - // Build the request. - let (callback, response) = oneshot::channel(); - let request = request_builder(callback); - - // Call the endpoint, possibly a new one. - let new_channel = self.call_and_maybe_create_chain_worker_endpoint(chain_id, request)?; - - // We just created a channel: spawn the actor. - if let Some((sender, receiver)) = new_channel { - let delivery_notifier = self - .delivery_notifiers - .lock() - .unwrap() - .entry(chain_id) - .or_default() - .clone(); - - let is_tracked = self.chain_modes.as_ref().is_some_and(|chain_modes| { - chain_modes - .read() - .unwrap() - .get(&chain_id) - .is_some_and(ListeningMode::is_full) - }); - - let actor_task = ChainWorkerActor::run( - self.chain_worker_config.clone(), - self.storage.clone(), - self.block_cache.clone(), - self.execution_state_cache.clone(), - self.chain_modes.clone(), - delivery_notifier, - chain_id, - sender, - receiver, - is_tracked, - ); - - self.chain_worker_tasks - .lock() - .unwrap() - .spawn_task(actor_task); - } - - // Finally, wait a response. - match response.await { - Err(e) => { - // The actor endpoint was dropped. Better luck next time! - Err(WorkerError::ChainActorRecvError { - chain_id, - error: Box::new(e), - }) - } - Ok(response) => response, - } - } - - /// Find an endpoint and call it. Create the endpoint if necessary. - /// - /// Returns `Some((sender, receiver))` if a new channel was created and the actor needs to be - /// spawned, or `None` if an existing endpoint was used. - #[instrument(level = "trace", skip(self), fields( - nickname = %self.nickname, - chain_id = %chain_id - ))] - #[expect(clippy::type_complexity)] - fn call_and_maybe_create_chain_worker_endpoint( - &self, - chain_id: ChainId, - request: ChainWorkerRequest, - ) -> Result< - Option<( - ChainWorkerRequestSender, - ChainWorkerRequestReceiver, - )>, - WorkerError, - > { - let mut chain_workers = self.chain_workers.lock().unwrap(); - - let (sender, new_channel) = if let Some(endpoint) = chain_workers.remove(&chain_id) { - (endpoint, None) - } else { - let (sender, receiver) = mpsc::unbounded_channel(); - (sender.clone(), Some((sender, receiver))) - }; - - if let Err(e) = sender.send((request, tracing::Span::current(), Instant::now())) { - // The actor was dropped. Give up without (re-)inserting the endpoint in the cache. - return Err(WorkerError::ChainActorSendError { - chain_id, - error: Box::new(e), - }); - } - - // Put back the sender in the cache for next time. - chain_workers.insert(chain_id, sender); - #[cfg(with_metrics)] - metrics::CHAIN_WORKER_ENDPOINTS_CACHED.set(chain_workers.len() as i64); - - Ok(new_channel) + ) -> Result, WorkerError> { + let state = self.get_or_create_chain_worker(chain_id).await?; + let guard = handle::read_lock(&state).await; + Ok(ChainStateViewReadGuard(OwnedRwLockReadGuard::map( + guard, + |s| s.chain(), + ))) } #[instrument(skip_all, fields( - nick = self.nickname, + nick = self.nickname(), chain_id = format!("{:.8}", proposal.content.block.chain_id), height = %proposal.content.block.height, ))] @@ -1083,12 +1081,26 @@ where &self, proposal: BlockProposal, ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> { - trace!("{} <-- {:?}", self.nickname, proposal); + trace!("{} <-- {:?}", self.nickname(), proposal); #[cfg(with_metrics)] let round = proposal.content.round; + + let chain_id = proposal.content.block.chain_id; + // Delay if block timestamp is in the future but within grace period. + let now = self.storage.clock().current_time(); + let block_timestamp = proposal.content.block.timestamp; + let delta = block_timestamp.delta_since(now); + let grace_period = TimeDelta::from_micros( + u64::try_from(self.chain_worker_config.block_time_grace_period.as_micros()) + .unwrap_or(u64::MAX), + ); + if delta > TimeDelta::ZERO && delta <= grace_period { + self.storage.clock().sleep_until(block_timestamp).await; + } + let response = self - .query_chain_worker(proposal.content.block.chain_id, move |callback| { - ChainWorkerRequest::HandleBlockProposal { proposal, callback } + .chain_write(chain_id, |mut guard| async move { + guard.handle_block_proposal(proposal).await }) .await?; #[cfg(with_metrics)] @@ -1130,7 +1142,7 @@ where /// Processes a confirmed block certificate. #[instrument(skip_all, fields( - nick = self.nickname, + nick = self.nickname(), chain_id = format!("{:.8}", certificate.block().header.chain_id), height = %certificate.block().header.height, ))] @@ -1139,16 +1151,17 @@ where certificate: ConfirmedBlockCertificate, notify_when_messages_are_delivered: Option>, ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> { - trace!("{} <-- {:?}", self.nickname, certificate); + trace!("{} <-- {:?}", self.nickname(), certificate); #[cfg(with_metrics)] let metrics_data = metrics::MetricsData::new(&certificate); - let (info, actions, _outcome) = + #[allow(unused_variables)] + let (info, actions, outcome) = Box::pin(self.process_confirmed_block(certificate, notify_when_messages_are_delivered)) .await?; #[cfg(with_metrics)] - if matches!(_outcome, BlockOutcome::Processed) { + if matches!(outcome, BlockOutcome::Processed) { metrics_data.record(); } Ok((info, actions)) @@ -1156,7 +1169,7 @@ where /// Processes a validated block certificate. #[instrument(skip_all, fields( - nick = self.nickname, + nick = self.nickname(), chain_id = format!("{:.8}", certificate.block().header.chain_id), height = %certificate.block().header.height, ))] @@ -1164,17 +1177,18 @@ where &self, certificate: ValidatedBlockCertificate, ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> { - trace!("{} <-- {:?}", self.nickname, certificate); + trace!("{} <-- {:?}", self.nickname(), certificate); #[cfg(with_metrics)] let round = certificate.round; #[cfg(with_metrics)] let cert_str = certificate.inner().to_log_str(); - let (info, actions, _outcome) = Box::pin(self.process_validated_block(certificate)).await?; + #[allow(unused_variables)] + let (info, actions, outcome) = Box::pin(self.process_validated_block(certificate)).await?; #[cfg(with_metrics)] { - if matches!(_outcome, BlockOutcome::Processed) { + if matches!(outcome, BlockOutcome::Processed) { metrics::NUM_ROUNDS_IN_CERTIFICATE .with_label_values(&[cert_str, round.type_name()]) .observe(round.number() as f64); @@ -1185,7 +1199,7 @@ where /// Processes a timeout certificate #[instrument(skip_all, fields( - nick = self.nickname, + nick = self.nickname(), chain_id = format!("{:.8}", certificate.inner().chain_id()), height = %certificate.inner().height(), ))] @@ -1193,32 +1207,33 @@ where &self, certificate: TimeoutCertificate, ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> { - trace!("{} <-- {:?}", self.nickname, certificate); + trace!("{} <-- {:?}", self.nickname(), certificate); self.process_timeout(certificate).await } #[instrument(skip_all, fields( - nick = self.nickname, + nick = self.nickname(), chain_id = format!("{:.8}", query.chain_id) ))] pub async fn handle_chain_info_query( &self, query: ChainInfoQuery, ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> { - trace!("{} <-- {:?}", self.nickname, query); + trace!("{} <-- {:?}", self.nickname(), query); #[cfg(with_metrics)] metrics::CHAIN_INFO_QUERIES.inc(); + let chain_id = query.chain_id; let result = self - .query_chain_worker(query.chain_id, move |callback| { - ChainWorkerRequest::HandleChainInfoQuery { query, callback } + .chain_write(chain_id, |mut guard| async move { + guard.handle_chain_info_query(query).await }) .await; - trace!("{} --> {:?}", self.nickname, result); + trace!("{} --> {:?}", self.nickname(), result); result } #[instrument(skip_all, fields( - nick = self.nickname, + nick = self.nickname(), chain_id = format!("{:.8}", chain_id) ))] pub async fn download_pending_blob( @@ -1228,23 +1243,23 @@ where ) -> Result { trace!( "{} <-- download_pending_blob({chain_id:8}, {blob_id:8})", - self.nickname + self.nickname() ); let result = self - .query_chain_worker(chain_id, move |callback| { - ChainWorkerRequest::DownloadPendingBlob { blob_id, callback } + .chain_read(chain_id, |guard| async move { + guard.download_pending_blob(blob_id).await }) .await; trace!( "{} --> {:?}", - self.nickname, + self.nickname(), result.as_ref().map(|_| blob_id) ); result } #[instrument(skip_all, fields( - nick = self.nickname, + nick = self.nickname(), chain_id = format!("{:.8}", chain_id) ))] pub async fn handle_pending_blob( @@ -1255,30 +1270,30 @@ where let blob_id = blob.id(); trace!( "{} <-- handle_pending_blob({chain_id:8}, {blob_id:8})", - self.nickname + self.nickname() ); let result = self - .query_chain_worker(chain_id, move |callback| { - ChainWorkerRequest::HandlePendingBlob { blob, callback } + .chain_write(chain_id, |mut guard| async move { + guard.handle_pending_blob(blob).await }) .await; trace!( "{} --> {:?}", - self.nickname, + self.nickname(), result.as_ref().map(|_| blob_id) ); result } #[instrument(skip_all, fields( - nick = self.nickname, + nick = self.nickname(), chain_id = format!("{:.8}", request.target_chain_id()) ))] pub async fn handle_cross_chain_request( &self, request: CrossChainRequest, ) -> Result { - trace!("{} <-- {:?}", self.nickname, request); + trace!("{} <-- {:?}", self.nickname(), request); match request { CrossChainRequest::UpdateRecipient { sender, @@ -1311,12 +1326,10 @@ where recipient, latest_height, } => { - self.query_chain_worker(sender, move |callback| { - ChainWorkerRequest::ConfirmUpdatedRecipient { - recipient, - latest_height, - callback, - } + self.chain_write(sender, |mut guard| async move { + guard + .confirm_updated_recipient(recipient, latest_height) + .await }) .await?; Ok(NetworkActions::default()) @@ -1326,7 +1339,7 @@ where /// Updates the received certificate trackers to at least the given values. #[instrument(skip_all, fields( - nickname = %self.nickname, + nickname = %self.nickname(), chain_id = %chain_id, num_trackers = %new_trackers.len() ))] @@ -1335,18 +1348,17 @@ where chain_id: ChainId, new_trackers: BTreeMap, ) -> Result<(), WorkerError> { - self.query_chain_worker(chain_id, move |callback| { - ChainWorkerRequest::UpdateReceivedCertificateTrackers { - new_trackers, - callback, - } + self.chain_write(chain_id, |mut guard| async move { + guard + .update_received_certificate_trackers(new_trackers) + .await }) .await } /// Gets preprocessed block hashes in a given height range. #[instrument(skip_all, fields( - nickname = %self.nickname, + nickname = %self.nickname(), chain_id = %chain_id, start = %start, end = %end @@ -1357,19 +1369,15 @@ where start: BlockHeight, end: BlockHeight, ) -> Result, WorkerError> { - self.query_chain_worker(chain_id, move |callback| { - ChainWorkerRequest::GetPreprocessedBlockHashes { - start, - end, - callback, - } + self.chain_read(chain_id, |guard| async move { + guard.get_preprocessed_block_hashes(start, end).await }) .await } /// Gets the next block height to receive from an inbox. #[instrument(skip_all, fields( - nickname = %self.nickname, + nickname = %self.nickname(), chain_id = %chain_id, origin = %origin ))] @@ -1378,8 +1386,8 @@ where chain_id: ChainId, origin: ChainId, ) -> Result { - self.query_chain_worker(chain_id, move |callback| { - ChainWorkerRequest::GetInboxNextHeight { origin, callback } + self.chain_read(chain_id, |guard| async move { + guard.get_inbox_next_height(origin).await }) .await } @@ -1387,7 +1395,7 @@ where /// Gets locking blobs for specific blob IDs. /// Returns `Ok(None)` if any of the blobs is not found. #[instrument(skip_all, fields( - nickname = %self.nickname, + nickname = %self.nickname(), chain_id = %chain_id, num_blob_ids = %blob_ids.len() ))] @@ -1396,8 +1404,8 @@ where chain_id: ChainId, blob_ids: Vec, ) -> Result>, WorkerError> { - self.query_chain_worker(chain_id, move |callback| { - ChainWorkerRequest::GetLockingBlobs { blob_ids, callback } + self.chain_read(chain_id, |guard| async move { + guard.get_locking_blobs(blob_ids).await }) .await } @@ -1408,8 +1416,8 @@ where chain_id: ChainId, heights: Vec, ) -> Result, WorkerError> { - self.query_chain_worker(chain_id, move |callback| { - ChainWorkerRequest::GetBlockHashes { heights, callback } + self.chain_read(chain_id, |guard| async move { + guard.get_block_hashes(heights).await }) .await } @@ -1420,8 +1428,8 @@ where chain_id: ChainId, blob_ids: Vec, ) -> Result, WorkerError> { - self.query_chain_worker(chain_id, move |callback| { - ChainWorkerRequest::GetProposedBlobs { blob_ids, callback } + self.chain_read(chain_id, |guard| async move { + guard.get_proposed_blobs(blob_ids).await }) .await } @@ -1431,23 +1439,20 @@ where &self, chain_id: ChainId, ) -> Result { - self.query_chain_worker(chain_id, |callback| { - ChainWorkerRequest::GetEventSubscriptions { callback } + self.chain_read(chain_id, |guard| async move { + guard.get_event_subscriptions().await }) .await } - /// Gets the stream event count for a stream. - pub async fn get_stream_event_count( + /// Gets the next expected event index for a stream. + pub async fn get_next_expected_event( &self, chain_id: ChainId, stream_id: StreamId, ) -> Result, WorkerError> { - self.query_chain_worker(chain_id, move |callback| { - ChainWorkerRequest::GetStreamEventCount { - stream_id, - callback, - } + self.chain_read(chain_id, |guard| async move { + guard.get_next_expected_event(stream_id).await }) .await } @@ -1457,8 +1462,8 @@ where &self, chain_id: ChainId, ) -> Result, WorkerError> { - self.query_chain_worker(chain_id, |callback| { - ChainWorkerRequest::GetReceivedCertificateTrackers { callback } + self.chain_read(chain_id, |guard| async move { + guard.get_received_certificate_trackers().await }) .await } @@ -1469,11 +1474,8 @@ where chain_id: ChainId, receiver_id: ChainId, ) -> Result<(BlockHeight, Option), WorkerError> { - self.query_chain_worker(chain_id, move |callback| { - ChainWorkerRequest::GetTipStateAndOutboxInfo { - receiver_id, - callback, - } + self.chain_read(chain_id, |guard| async move { + guard.get_tip_state_and_outbox_info(receiver_id).await }) .await } @@ -1483,16 +1485,29 @@ where &self, chain_id: ChainId, ) -> Result { - self.query_chain_worker(chain_id, |callback| { - ChainWorkerRequest::GetNextHeightToPreprocess { callback } + self.chain_read(chain_id, |guard| async move { + guard.get_next_height_to_preprocess().await }) .await } /// Gets the chain manager's seed for leader election. pub async fn get_manager_seed(&self, chain_id: ChainId) -> Result { - self.query_chain_worker(chain_id, |callback| ChainWorkerRequest::GetManagerSeed { - callback, + self.chain_read( + chain_id, + |guard| async move { guard.get_manager_seed().await }, + ) + .await + } + + /// Gets the stream event count for a stream. + pub async fn get_stream_event_count( + &self, + chain_id: ChainId, + stream_id: StreamId, + ) -> Result, WorkerError> { + self.chain_read(chain_id, |guard| async move { + guard.get_stream_event_count(stream_id).await }) .await } @@ -1503,11 +1518,8 @@ where chain_id: ChainId, stream_ids: Vec, ) -> Result, WorkerError> { - self.query_chain_worker(chain_id, move |callback| { - ChainWorkerRequest::GetPreviousEventBlocks { - stream_ids, - callback, - } + self.chain_read(chain_id, |guard| async move { + guard.get_previous_event_blocks(stream_ids).await }) .await } @@ -1516,7 +1528,7 @@ where #[cfg(with_testing)] impl WorkerState where - StorageClient: Storage, + StorageClient: Storage + Clone + 'static, { /// Gets a reference to the validator's [`ValidatorPublicKey`]. /// diff --git a/linera-faucet/server/src/lib.rs b/linera-faucet/server/src/lib.rs index 4bcb4217625b..395e6c795586 100644 --- a/linera-faucet/server/src/lib.rs +++ b/linera-faucet/server/src/lib.rs @@ -1,7 +1,7 @@ // Copyright (c) Zefchain Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -#![recursion_limit = "256"] +#![recursion_limit = "512"] //! The server component of the Linera faucet. diff --git a/linera-rpc/src/lib.rs b/linera-rpc/src/lib.rs index 347d28e222d5..5dbd334d0056 100644 --- a/linera-rpc/src/lib.rs +++ b/linera-rpc/src/lib.rs @@ -1,6 +1,8 @@ // Copyright (c) Zefchain Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +#![recursion_limit = "256"] + //! This module provides network abstractions and the data schemas for remote procedure //! calls (RPCs) in the Linera protocol. diff --git a/linera-sdk/src/test/block.rs b/linera-sdk/src/test/block.rs index 2d481813a2c3..9de42edfd76b 100644 --- a/linera-sdk/src/test/block.rs +++ b/linera-sdk/src/test/block.rs @@ -247,7 +247,7 @@ impl BlockBuilder { let (_, block, _, resource_tracker) = self .validator .worker() - .stage_block_execution( + .stage_block_execution_with_policy( self.block, None, published_blobs, diff --git a/linera-sdk/src/test/validator.rs b/linera-sdk/src/test/validator.rs index 1d0652a720be..c654e4287d20 100644 --- a/linera-sdk/src/test/validator.rs +++ b/linera-sdk/src/test/validator.rs @@ -21,9 +21,7 @@ use linera_base::{ identifiers::{AccountOwner, ApplicationId, ChainId, ModuleId}, ownership::ChainOwnership, }; -use linera_core::worker::{ - WorkerState, DEFAULT_BLOCK_CACHE_SIZE, DEFAULT_EXECUTION_STATE_CACHE_SIZE, -}; +use linera_core::{worker::WorkerState, ChainWorkerConfig}; use linera_execution::{ committee::Committee, system::{AdminOperation, OpenChainConfig, SystemOperation}, @@ -90,13 +88,12 @@ impl TestValidator { .now_or_never() .expect("execution of DbStorage::new should not await anything"); let clock = storage.clock().clone(); - let worker = WorkerState::new( - "Single validator node".to_string(), - Some(validator_keypair.secret_key.copy()), - storage.clone(), - DEFAULT_BLOCK_CACHE_SIZE, - DEFAULT_EXECUTION_STATE_CACHE_SIZE, - ); + let config = ChainWorkerConfig { + nickname: "Single validator node".to_string(), + key_pair: Some(Arc::new(validator_keypair.secret_key.copy())), + ..ChainWorkerConfig::default() + }; + let worker = WorkerState::new(storage.clone(), config, None); // Create an admin chain. let key_pair = AccountSecretKey::generate(); diff --git a/linera-sdk/tests/fixtures/Cargo.lock b/linera-sdk/tests/fixtures/Cargo.lock index beceeee474a7..7c0a9eaf966f 100644 --- a/linera-sdk/tests/fixtures/Cargo.lock +++ b/linera-sdk/tests/fixtures/Cargo.lock @@ -2315,6 +2315,7 @@ dependencies = [ "rand 0.8.5", "serde", "serde_json", + "sync_wrapper 1.0.2", "test-log", "test-strategy", "thiserror 1.0.69", diff --git a/linera-service/benches/transfers.rs b/linera-service/benches/transfers.rs index 0bd823336d50..6051026244e6 100644 --- a/linera-service/benches/transfers.rs +++ b/linera-service/benches/transfers.rs @@ -1,6 +1,8 @@ // Copyright (c) Zefchain Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +#![recursion_limit = "256"] + use criterion::{criterion_group, criterion_main, Criterion}; use futures::{ stream::{self, FuturesUnordered}, diff --git a/linera-service/src/lib.rs b/linera-service/src/lib.rs index 1c736168f413..699e600fb79a 100644 --- a/linera-service/src/lib.rs +++ b/linera-service/src/lib.rs @@ -2,6 +2,8 @@ // Copyright (c) Zefchain Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +#![recursion_limit = "256"] + //! This module provides the executables needed to operate a Linera service, including a placeholder wallet acting as a GraphQL service for user interfaces. pub mod cli; diff --git a/linera-service/src/node_service.rs b/linera-service/src/node_service.rs index ac8056fffc11..3562af7506ab 100644 --- a/linera-service/src/node_service.rs +++ b/linera-service/src/node_service.rs @@ -44,7 +44,7 @@ use linera_core::{ client::{ChainClient, ChainClientError}, data_types::ClientOutcome, wallet::Wallet as _, - worker::{Notification, Reason}, + worker::{ChainStateViewReadGuard, Notification, Reason}, }; use linera_execution::{ committee::Committee, system::AdminOperation, Operation, Query, QueryOutcome, QueryResponse, @@ -53,10 +53,11 @@ use linera_execution::{ #[cfg(with_metrics)] use linera_metrics::monitoring_server; use linera_sdk::linera_base_types::BlobContent; +use linera_storage::Storage; use lru::LruCache; use serde::{Deserialize, Serialize}; use serde_json::json; -use tokio::sync::{mpsc::UnboundedReceiver, OwnedRwLockReadGuard}; +use tokio::sync::mpsc::UnboundedReceiver; use tokio_util::sync::CancellationToken; use tower_http::cors::CorsLayer; use tracing::{debug, error, info, instrument, trace}; @@ -719,10 +720,8 @@ where async fn chain( &self, chain_id: ChainId, - ) -> Result< - ChainStateExtendedView<::StorageContext>, - Error, - > { + ) -> Result::Storage>, Error> + { let client = self .context .lock() @@ -860,20 +859,19 @@ impl ChainStateViewExtension { } #[derive(MergedObject)] -struct ChainStateExtendedView(ChainStateViewExtension, ReadOnlyChainStateView) +struct ChainStateExtendedView(ChainStateViewExtension, ReadOnlyChainStateView) where - C: linera_views::context::Context + Clone + Send + Sync + 'static, - C::Extra: linera_execution::ExecutionRuntimeContext; + ChainStateView: ContainerType + OutputType; -/// A wrapper type that allows proxying GraphQL queries to a [`ChainStateView`] that's behind an -/// [`OwnedRwLockReadGuard`]. -pub struct ReadOnlyChainStateView(OwnedRwLockReadGuard>) +/// A wrapper type that allows proxying GraphQL queries to a [`ChainStateView`] that's behind a +/// [`ChainStateViewReadGuard`]. +pub struct ReadOnlyChainStateView(ChainStateViewReadGuard) where - C: linera_views::context::Context + Clone + Send + Sync + 'static; + ChainStateView: ContainerType + OutputType; -impl ContainerType for ReadOnlyChainStateView +impl ContainerType for ReadOnlyChainStateView where - C: linera_views::context::Context + Clone + Send + Sync + 'static, + ChainStateView: ContainerType + OutputType, { async fn resolve_field( &self, @@ -883,16 +881,16 @@ where } } -impl OutputType for ReadOnlyChainStateView +impl OutputType for ReadOnlyChainStateView where - C: linera_views::context::Context + Clone + Send + Sync + 'static, + ChainStateView: ContainerType + OutputType, { fn type_name() -> Cow<'static, str> { - ChainStateView::::type_name() + ChainStateView::::type_name() } fn create_type_info(registry: &mut async_graphql::registry::Registry) -> String { - ChainStateView::::create_type_info(registry) + ChainStateView::::create_type_info(registry) } async fn resolve( @@ -904,12 +902,11 @@ where } } -impl ChainStateExtendedView +impl ChainStateExtendedView where - C: linera_views::context::Context + Clone + Send + Sync + 'static, - C::Extra: linera_execution::ExecutionRuntimeContext, + ChainStateView: ContainerType + OutputType, { - fn new(view: OwnedRwLockReadGuard>) -> Self { + fn new(view: ChainStateViewReadGuard) -> Self { Self( ChainStateViewExtension(view.chain_id()), ReadOnlyChainStateView(view), diff --git a/linera-service/src/server.rs b/linera-service/src/server.rs index d1d7680a6342..4c239ab36528 100644 --- a/linera-service/src/server.rs +++ b/linera-service/src/server.rs @@ -18,6 +18,7 @@ use std::{ borrow::Cow, num::NonZeroU16, path::{Path, PathBuf}, + sync::Arc, time::Duration, }; @@ -29,7 +30,9 @@ use linera_base::{ listen_for_shutdown_signals, }; use linera_client::config::{CommitteeConfig, ValidatorConfig, ValidatorServerConfig}; -use linera_core::{worker::WorkerState, JoinSetExt as _, CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES}; +use linera_core::{ + worker::WorkerState, ChainWorkerConfig, JoinSetExt as _, CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES, +}; use linera_execution::{WasmRuntime, WithWasmDefault}; #[cfg(with_metrics)] use linera_metrics::monitoring_server; @@ -83,18 +86,19 @@ impl ServerContext { "Public key: {}", self.server_config.validator_secret.public() ); - let state = WorkerState::new( - format!("Shard {} @ {}:{}", shard_id, local_ip_addr, shard.port), - Some(self.server_config.validator_secret.copy()), - storage, - self.block_cache_size, - self.execution_state_cache_size, - ) - .with_allow_inactive_chains(false) - .with_allow_messages_from_deprecated_epochs(false) - .with_block_time_grace_period(self.block_time_grace_period) - .with_chain_worker_ttl(self.chain_worker_ttl) - .with_chain_info_max_received_log_entries(self.chain_info_max_received_log_entries); + let config = ChainWorkerConfig { + nickname: format!("Shard {} @ {}:{}", shard_id, local_ip_addr, shard.port), + key_pair: Some(Arc::new(self.server_config.validator_secret.copy())), + allow_inactive_chains: false, + allow_messages_from_deprecated_epochs: false, + block_time_grace_period: self.block_time_grace_period, + ttl: util::non_zero_duration(self.chain_worker_ttl), + chain_info_max_received_log_entries: self.chain_info_max_received_log_entries, + block_cache_size: self.block_cache_size, + execution_state_cache_size: self.execution_state_cache_size, + ..ChainWorkerConfig::default() + }; + let state = WorkerState::new(storage, config, None); (state, shard_id, shard.clone()) } diff --git a/linera-service/src/util.rs b/linera-service/src/util.rs index ea56c2d1c1cd..578546fd09a7 100644 --- a/linera-service/src/util.rs +++ b/linera-service/src/util.rs @@ -8,6 +8,11 @@ use std::{ time::Duration, }; +/// Treats a zero `Duration` as `None` (disabled). +pub fn non_zero_duration(d: Duration) -> Option { + (d > Duration::ZERO).then_some(d) +} + use anyhow::{bail, Context as _, Result}; use async_graphql::http::GraphiQLSource; use axum::response::{self, IntoResponse}; diff --git a/linera-service/tests/wallet.rs b/linera-service/tests/wallet.rs index 7c6cf6fc1ed4..d75d886511fe 100644 --- a/linera-service/tests/wallet.rs +++ b/linera-service/tests/wallet.rs @@ -31,8 +31,8 @@ pub async fn new_test_client_context( let send_recv_timeout = Duration::from_millis(4000); let retry_delay = Duration::from_millis(1000); let max_retries = 10; - let chain_worker_ttl = Duration::from_secs(30); - let sender_chain_worker_ttl = Duration::from_secs(1); + let chain_worker_ttl = Some(Duration::from_secs(30)); + let sender_chain_worker_ttl = Some(Duration::from_secs(1)); let node_options = NodeOptions { send_timeout: send_recv_timeout, From 9f4d8895d7336174f7922ae613491f932f1b4206 Mon Sep 17 00:00:00 2001 From: Andreas Fackler Date: Wed, 25 Mar 2026 12:52:31 +0000 Subject: [PATCH 2/7] Cleanups --- linera-core/src/chain_worker/state.rs | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/linera-core/src/chain_worker/state.rs b/linera-core/src/chain_worker/state.rs index 3c11ff12fbb1..4a35971c50c9 100644 --- a/linera-core/src/chain_worker/state.rs +++ b/linera-core/src/chain_worker/state.rs @@ -628,8 +628,12 @@ where Ok((self.chain_info_response(), actions, BlockOutcome::Processed)) } - /// Initializes `next_expected_events` for any streams that don't have entries yet, - /// for backwards compatibility with chains from older DB versions. + /// Initializes `next_expected_events` from `stream_event_counts` (which reflects + /// all executed blocks), then replays any preprocessed-but-not-yet-executed blocks to + /// advance the indices further. + /// + /// This handles the migration case where the `next_expected_events` field was added to + /// `ChainStateView` after blocks had already been processed. async fn initialize_next_expected_events(&mut self) -> Result<(), WorkerError> { if self.chain.next_expected_events.count().await? > 0 { return Ok(()); // Already initialized. @@ -755,17 +759,17 @@ where self.initialize_next_expected_events().await?; } // Update the outboxes and track emitted events. - let updated_event_streams = self.chain.preprocess_block(certificate.value()).await?; + let event_streams = self.chain.preprocess_block(certificate.value()).await?; // Persist chain. self.save().await?; let mut actions = self.create_network_actions(None).await?; - if !updated_event_streams.is_empty() { + if !event_streams.is_empty() { actions.notifications.push(Notification { chain_id, reason: Reason::NewEvents { height, hash: block_hash, - event_streams: updated_event_streams, + event_streams, }, }); } @@ -854,6 +858,7 @@ where computed: Box::new(verified_outcome), } ); + // Update the rest of the chain state. let event_streams = chain .apply_confirmed_block(certificate.value(), local_time) @@ -1244,6 +1249,10 @@ where } /// Votes for falling back to a public chain. + /// This is disabled on the testnet. + #[instrument(skip_all, fields( + chain_id = %self.chain_id() + ))] async fn vote_for_fallback(&mut self) -> Result<(), WorkerError> { Err(WorkerError::NoFallbackMode) } @@ -1287,9 +1296,6 @@ where } /// Returns a stored [`Certificate`] for the chain's block at the requested [`BlockHeight`]. - /// - /// Does not need `&mut self` because the chain is eagerly initialized when the - /// chain handle is created. #[cfg(with_testing)] #[instrument(skip_all, fields( chain_id = %self.chain_id(), @@ -1643,6 +1649,7 @@ where .await?; } if let Some(next_block_height) = query.test_next_block_height { + // If not, send the same error as if a block with next_block_height was proposed. ensure!( self.chain.tip_state.get().next_block_height == next_block_height, WorkerError::UnexpectedBlockHeight { From fde34505b2d1a48fbb898dfc29cb4e3046c877c3 Mon Sep 17 00:00:00 2001 From: Andreas Fackler Date: Wed, 25 Mar 2026 13:07:44 +0000 Subject: [PATCH 3/7] Merge stage_block_execution and stage_block_execution_with_policy There should only be one stage_block_execution, taking a policy argument. Co-Authored-By: Claude Opus 4.6 (1M context) --- linera-core/src/chain_worker/state.rs | 25 +------------------ linera-core/src/local_node.rs | 2 +- .../src/unit_tests/wasm_worker_tests.rs | 12 ++++----- linera-core/src/unit_tests/worker_tests.rs | 22 ++++++++-------- linera-core/src/worker.rs | 22 +--------------- linera-sdk/src/test/block.rs | 2 +- 6 files changed, 21 insertions(+), 64 deletions(-) diff --git a/linera-core/src/chain_worker/state.rs b/linera-core/src/chain_worker/state.rs index 4a35971c50c9..32ad22d8f650 100644 --- a/linera-core/src/chain_worker/state.rs +++ b/linera-core/src/chain_worker/state.rs @@ -1395,29 +1395,6 @@ where } /// Executes a block without persisting any changes to the state. - #[instrument(skip_all, fields( - chain_id = %self.chain_id(), - block_height = %block.height - ))] - pub(crate) async fn stage_block_execution( - &mut self, - block: ProposedBlock, - round: Option, - published_blobs: &[Blob], - ) -> Result<(Block, ChainInfoResponse, ResourceTracker), WorkerError> { - let (_, executed_block, response, resource_tracker) = self - .stage_block_execution_with_policy( - block, - round, - published_blobs, - BundleExecutionPolicy::committed(), - ) - .await?; - Ok((executed_block, response, resource_tracker)) - } - - /// Executes a block without persisting any changes to the state, with a specified - /// policy for handling bundle failures. /// /// The block may be modified to reflect the actual executed transactions /// (bundles may be rejected or removed based on the policy). @@ -1425,7 +1402,7 @@ where chain_id = %self.chain_id(), block_height = %block.height ))] - pub(crate) async fn stage_block_execution_with_policy( + pub(crate) async fn stage_block_execution( &mut self, block: ProposedBlock, round: Option, diff --git a/linera-core/src/local_node.rs b/linera-core/src/local_node.rs index 25c3b15519d8..da69a757235d 100644 --- a/linera-core/src/local_node.rs +++ b/linera-core/src/local_node.rs @@ -150,7 +150,7 @@ where Ok(self .node .state - .stage_block_execution_with_policy(block, round, published_blobs, policy) + .stage_block_execution(block, round, published_blobs, policy) .await?) } diff --git a/linera-core/src/unit_tests/wasm_worker_tests.rs b/linera-core/src/unit_tests/wasm_worker_tests.rs index 3f1117e217b0..64f51f38bd6d 100644 --- a/linera-core/src/unit_tests/wasm_worker_tests.rs +++ b/linera-core/src/unit_tests/wasm_worker_tests.rs @@ -398,7 +398,7 @@ where .with_operation(publish_meta_op); let (_, publish_executed, _, _) = env .worker() - .stage_block_execution_with_policy( + .stage_block_execution( publish_block, None, all_blobs.to_vec(), @@ -433,7 +433,7 @@ where .with_operation(create_counter_op); let (_, create_counter_executed, _, _) = env .worker() - .stage_block_execution_with_policy( + .stage_block_execution( create_counter_block, None, vec![], @@ -468,7 +468,7 @@ where .with_operation(create_meta_op); let (_, create_meta_executed, _, _) = env .worker() - .stage_block_execution_with_policy( + .stage_block_execution( create_meta_block, None, vec![], @@ -491,7 +491,7 @@ where }); let (_, send_fail_executed, _, _) = env .worker() - .stage_block_execution_with_policy( + .stage_block_execution( send_fail_block, None, vec![], @@ -536,7 +536,7 @@ where // This should handle the failing message by rejecting the bundle. let (modified_block, auto_retry_executed, _, _) = env .worker() - .stage_block_execution_with_policy( + .stage_block_execution( proposed_block.clone(), None, vec![], @@ -565,7 +565,7 @@ where // and produce the same outcome. let (_, abort_executed, _, _) = env .worker() - .stage_block_execution_with_policy( + .stage_block_execution( modified_block.clone(), None, vec![], diff --git a/linera-core/src/unit_tests/worker_tests.rs b/linera-core/src/unit_tests/worker_tests.rs index 8eef22ecc8c4..333568901ccc 100644 --- a/linera-core/src/unit_tests/worker_tests.rs +++ b/linera-core/src/unit_tests/worker_tests.rs @@ -788,7 +788,7 @@ where // Stage execution to get the block for certificate creation. let (_, block, _, _) = env .worker() - .stage_block_execution_with_policy( + .stage_block_execution( proposed_block, None, vec![], @@ -817,7 +817,7 @@ where .unwrap(); let (_, block, _, _) = env .worker() - .stage_block_execution_with_policy( + .stage_block_execution( proposed_block, None, vec![], @@ -845,7 +845,7 @@ where .unwrap(); let (_, block, _, _) = env .worker() - .stage_block_execution_with_policy( + .stage_block_execution( proposed_block, None, vec![], @@ -3496,7 +3496,7 @@ where .with_authenticated_signer(Some(owner0)); let (_, block0, _, _) = env .worker() - .stage_block_execution_with_policy( + .stage_block_execution( proposed_block0, None, vec![], @@ -3563,7 +3563,7 @@ where let proposed_block1 = make_child_block(&value0).with_simple_transfer(chain_1, small_transfer); let (_, block1, _, _) = env .worker() - .stage_block_execution_with_policy( + .stage_block_execution( proposed_block1.clone(), None, vec![], @@ -3618,7 +3618,7 @@ where let proposed_block2 = make_child_block(&value0.clone()).with_simple_transfer(chain_1, amount); let (_, block2, _, _) = env .worker() - .stage_block_execution_with_policy( + .stage_block_execution( proposed_block2.clone(), None, vec![], @@ -3762,7 +3762,7 @@ where }); let (_, block0, _, _) = env .worker() - .stage_block_execution_with_policy( + .stage_block_execution( proposed_block0, None, vec![], @@ -3879,7 +3879,7 @@ where }); let (_, block0, _, _) = env .worker() - .stage_block_execution_with_policy( + .stage_block_execution( proposed_block0, None, vec![], @@ -3912,7 +3912,7 @@ where .unwrap(); let (_, block1, _, _) = env .worker() - .stage_block_execution_with_policy( + .stage_block_execution( proposed_block1.clone(), None, vec![], @@ -3974,7 +3974,7 @@ where // A validated block certificate from a later round can override the locked fast block. let (_, block2, _, _) = env .worker() - .stage_block_execution_with_policy( + .stage_block_execution( proposed_block2.clone(), None, vec![], @@ -4345,7 +4345,7 @@ where // Test stage_block_execution directly - this should fail with IncorrectMessageOrder. assert_matches!( env.worker() - .stage_block_execution_with_policy(bad_proposed_block.clone(), None, vec![], BundleExecutionPolicy::committed()) + .stage_block_execution(bad_proposed_block.clone(), None, vec![], BundleExecutionPolicy::committed()) .await, Err(WorkerError::ChainError(chain_error)) if matches!(*chain_error, ChainError::IncorrectMessageOrder { .. }) diff --git a/linera-core/src/worker.rs b/linera-core/src/worker.rs index 3ce465f74729..f5caccb7f33d 100644 --- a/linera-core/src/worker.rs +++ b/linera-core/src/worker.rs @@ -895,32 +895,12 @@ where block: ProposedBlock, round: Option, published_blobs: Vec, - ) -> Result<(Block, ChainInfoResponse, ResourceTracker), WorkerError> { - let chain_id = block.chain_id; - self.chain_write(chain_id, |mut guard| async move { - guard - .stage_block_execution(block, round, &published_blobs) - .await - }) - .await - } - - /// Tries to execute a block proposal with a policy for handling bundle failures. - /// - /// Returns the modified block (bundles may be rejected/removed), the executed block, - /// chain info response, and resource tracker. - #[instrument(level = "trace", skip(self, block))] - pub async fn stage_block_execution_with_policy( - &self, - block: ProposedBlock, - round: Option, - published_blobs: Vec, policy: BundleExecutionPolicy, ) -> Result<(ProposedBlock, Block, ChainInfoResponse, ResourceTracker), WorkerError> { let chain_id = block.chain_id; self.chain_write(chain_id, |mut guard| async move { guard - .stage_block_execution_with_policy(block, round, &published_blobs, policy) + .stage_block_execution(block, round, &published_blobs, policy) .await }) .await diff --git a/linera-sdk/src/test/block.rs b/linera-sdk/src/test/block.rs index 9de42edfd76b..2d481813a2c3 100644 --- a/linera-sdk/src/test/block.rs +++ b/linera-sdk/src/test/block.rs @@ -247,7 +247,7 @@ impl BlockBuilder { let (_, block, _, resource_tracker) = self .validator .worker() - .stage_block_execution_with_policy( + .stage_block_execution( self.block, None, published_blobs, From 2bc1840f7cbe1ed6bf59ed196908de87e5008400 Mon Sep 17 00:00:00 2001 From: Andreas Fackler Date: Mon, 30 Mar 2026 12:24:19 +0000 Subject: [PATCH 4/7] Remove unused `get_next_expected_event` accidentally ported from main Co-Authored-By: Claude Opus 4.6 (1M context) --- linera-core/src/chain_worker/state.rs | 8 -------- linera-core/src/worker.rs | 12 ------------ 2 files changed, 20 deletions(-) diff --git a/linera-core/src/chain_worker/state.rs b/linera-core/src/chain_worker/state.rs index 296b34198e75..15e41223d22a 100644 --- a/linera-core/src/chain_worker/state.rs +++ b/linera-core/src/chain_worker/state.rs @@ -1127,14 +1127,6 @@ where .await?) } - /// Gets the next expected event index for a stream. - pub(crate) async fn get_next_expected_event( - &self, - stream_id: StreamId, - ) -> Result, WorkerError> { - Ok(self.chain.next_expected_events.get(&stream_id).await?) - } - /// Gets received certificate trackers. pub(crate) async fn get_received_certificate_trackers( &self, diff --git a/linera-core/src/worker.rs b/linera-core/src/worker.rs index fb689a85f40d..a363f610f2e9 100644 --- a/linera-core/src/worker.rs +++ b/linera-core/src/worker.rs @@ -1424,18 +1424,6 @@ where .await } - /// Gets the next expected event index for a stream. - pub async fn get_next_expected_event( - &self, - chain_id: ChainId, - stream_id: StreamId, - ) -> Result, WorkerError> { - self.chain_read(chain_id, |guard| async move { - guard.get_next_expected_event(stream_id).await - }) - .await - } - /// Gets received certificate trackers. pub async fn get_received_certificate_trackers( &self, From c747010cb68ed39fa215307ec84b8c132221107c Mon Sep 17 00:00:00 2001 From: Andreas Fackler Date: Thu, 2 Apr 2026 15:50:49 +0000 Subject: [PATCH 5/7] cargo fmt --- linera-core/src/chain_worker/state.rs | 1 - linera-core/src/worker.rs | 5 +---- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/linera-core/src/chain_worker/state.rs b/linera-core/src/chain_worker/state.rs index 4d871c511bb1..94bac98b8b9a 100644 --- a/linera-core/src/chain_worker/state.rs +++ b/linera-core/src/chain_worker/state.rs @@ -1527,7 +1527,6 @@ where .collect()) } - /// Gets the stream event count for a stream, including preprocessed blocks. pub(crate) async fn get_stream_event_count( &self, diff --git a/linera-core/src/worker.rs b/linera-core/src/worker.rs index b1b1af1a451c..d96a94435f64 100644 --- a/linera-core/src/worker.rs +++ b/linera-core/src/worker.rs @@ -592,7 +592,6 @@ where self.chain_worker_config.cross_chain_message_chunk_limit = limit; } - #[instrument(level = "trace", skip(self))] pub fn nickname(&self) -> &str { &self.chain_worker_config.nickname @@ -1372,9 +1371,7 @@ where missing_height, } => { self.chain_write(sender, |mut guard| async move { - guard - .handle_revert_confirm(recipient, missing_height) - .await + guard.handle_revert_confirm(recipient, missing_height).await }) .await } From 953c127c3daa9fac5463dbfd959341f6b1da69ce Mon Sep 17 00:00:00 2001 From: Andreas Fackler Date: Thu, 2 Apr 2026 16:01:38 +0000 Subject: [PATCH 6/7] Reorder methods in ChainWorkerState to match testnet_conway Minimizes the diff by keeping methods in the same order as in the testnet_conway branch. Co-Authored-By: Claude Opus 4.6 (1M context) --- linera-core/src/chain_worker/state.rs | 90 +++++++++++++-------------- 1 file changed, 45 insertions(+), 45 deletions(-) diff --git a/linera-core/src/chain_worker/state.rs b/linera-core/src/chain_worker/state.rs index 94bac98b8b9a..f26de7159a6d 100644 --- a/linera-core/src/chain_worker/state.rs +++ b/linera-core/src/chain_worker/state.rs @@ -1436,51 +1436,6 @@ where Ok(blobs) } - /// Gets event subscriptions. - pub(crate) async fn get_event_subscriptions( - &self, - ) -> Result { - Ok(self - .chain - .execution_state - .system - .event_subscriptions - .index_values() - .await?) - } - - /// Gets received certificate trackers. - pub(crate) async fn get_received_certificate_trackers( - &self, - ) -> Result, WorkerError> { - Ok(self.chain.received_certificate_trackers.get().clone()) - } - - /// Gets tip state and outbox info for next_outbox_heights calculation. - pub(crate) async fn get_tip_state_and_outbox_info( - &self, - receiver_id: ChainId, - ) -> Result<(BlockHeight, Option), WorkerError> { - let next_block_height = self.chain.tip_state.get().next_block_height; - let next_height_to_schedule = self - .chain - .outboxes - .try_load_entry(&receiver_id) - .await? - .map(|outbox| *outbox.next_height_to_schedule.get()); - Ok((next_block_height, next_height_to_schedule)) - } - - /// Gets the next height to preprocess. - pub(crate) async fn get_next_height_to_preprocess(&self) -> Result { - Ok(self.chain.next_height_to_preprocess().await?) - } - - /// Gets the chain manager's seed for leader election. - pub(crate) async fn get_manager_seed(&self) -> Result { - Ok(*self.chain.manager.seed.get()) - } - /// Gets the previous event blocks for specific streams. pub(crate) async fn get_previous_event_blocks( &self, @@ -1527,6 +1482,19 @@ where .collect()) } + /// Gets event subscriptions. + pub(crate) async fn get_event_subscriptions( + &self, + ) -> Result { + Ok(self + .chain + .execution_state + .system + .event_subscriptions + .index_values() + .await?) + } + /// Gets the stream event count for a stream, including preprocessed blocks. pub(crate) async fn get_stream_event_count( &self, @@ -1548,6 +1516,38 @@ where .await?) } + /// Gets received certificate trackers. + pub(crate) async fn get_received_certificate_trackers( + &self, + ) -> Result, WorkerError> { + Ok(self.chain.received_certificate_trackers.get().clone()) + } + + /// Gets tip state and outbox info for next_outbox_heights calculation. + pub(crate) async fn get_tip_state_and_outbox_info( + &self, + receiver_id: ChainId, + ) -> Result<(BlockHeight, Option), WorkerError> { + let next_block_height = self.chain.tip_state.get().next_block_height; + let next_height_to_schedule = self + .chain + .outboxes + .try_load_entry(&receiver_id) + .await? + .map(|outbox| *outbox.next_height_to_schedule.get()); + Ok((next_block_height, next_height_to_schedule)) + } + + /// Gets the next height to preprocess. + pub(crate) async fn get_next_height_to_preprocess(&self) -> Result { + Ok(self.chain.next_height_to_preprocess().await?) + } + + /// Gets the chain manager's seed for leader election. + pub(crate) async fn get_manager_seed(&self) -> Result { + Ok(*self.chain.manager.seed.get()) + } + /// Attempts to vote for a leader timeout, if possible. #[instrument(skip_all, fields( chain_id = %self.chain_id(), From afdfe8c49c9e99b324fc8b4e0ab494d0982fb077 Mon Sep 17 00:00:00 2001 From: Andreas Fackler Date: Thu, 2 Apr 2026 16:14:51 +0000 Subject: [PATCH 7/7] Skip rollback and evict poisoned chain workers When a journal resolution failure poisons the chain worker, the view's in-memory state is inconsistent. Rolling back would give a false sense of consistency, so the RollbackGuard now skips rollback for poisoned workers. Both chain_read and chain_write evict poisoned workers from the cache so the next request reloads from storage. Co-Authored-By: Claude Opus 4.6 (1M context) --- linera-core/src/chain_worker/handle.rs | 5 +++++ linera-core/src/worker.rs | 10 +++++++--- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/linera-core/src/chain_worker/handle.rs b/linera-core/src/chain_worker/handle.rs index 41e33f2d674d..9d86f4a08914 100644 --- a/linera-core/src/chain_worker/handle.rs +++ b/linera-core/src/chain_worker/handle.rs @@ -82,6 +82,11 @@ impl DerefMut for RollbackGuard { impl Drop for RollbackGuard { fn drop(&mut self) { + if self.0.is_poisoned() { + // The view is in an inconsistent state due to a journal resolution failure. + // Don't rollback — the worker will be evicted and reloaded. + return; + } self.0.rollback(); } } diff --git a/linera-core/src/worker.rs b/linera-core/src/worker.rs index d96a94435f64..6b5407befaa3 100644 --- a/linera-core/src/worker.rs +++ b/linera-core/src/worker.rs @@ -758,6 +758,11 @@ where let state = self.get_or_create_chain_worker(chain_id).await?; Box::pin(wrap_future(async move { let guard = handle::read_lock(&state).await; + if guard.is_poisoned() { + self.chain_workers.pin().remove(&chain_id); + drop(guard); + return Err(WorkerError::WorkerPoisoned { chain_id }); + } f(guard).await })) .await @@ -777,9 +782,8 @@ where Box::pin(wrap_future(async move { let guard = handle::write_lock(&state).await; if guard.is_poisoned() { - // Drop the guard so the lock is released, then evict the worker. - drop(guard); self.chain_workers.pin().remove(&chain_id); + drop(guard); return Err(WorkerError::WorkerPoisoned { chain_id }); } let result = f(guard).await; @@ -788,8 +792,8 @@ where // so the next request reloads from storage. let guard = state.read().await; if guard.is_poisoned() { - drop(guard); self.chain_workers.pin().remove(&chain_id); + drop(guard); } } result