diff --git a/Cargo.lock b/Cargo.lock index 2a0cf7a6e7db..40ce414d85fb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5608,6 +5608,7 @@ dependencies = [ "serde_json", "sha3", "social", + "sync_wrapper 1.0.2", "test-case", "test-log", "test-strategy", diff --git a/examples/Cargo.lock b/examples/Cargo.lock index 730b89f6b002..31c04db48514 100644 --- a/examples/Cargo.lock +++ b/examples/Cargo.lock @@ -4026,6 +4026,7 @@ dependencies = [ "rand 0.8.5", "serde", "serde_json", + "sync_wrapper 1.0.2", "test-log", "test-strategy", "thiserror 1.0.65", diff --git a/linera-client/src/client_context.rs b/linera-client/src/client_context.rs index f051e8867956..805be52aa0f8 100644 --- a/linera-client/src/client_context.rs +++ b/linera-client/src/client_context.rs @@ -320,8 +320,8 @@ where options.long_lived_services, chain_modes, name, - options.chain_worker_ttl, - options.sender_chain_worker_ttl, + crate::util::non_zero_duration(options.chain_worker_ttl), + crate::util::non_zero_duration(options.sender_chain_worker_ttl), options.prioritize_bundles_from.clone().unwrap_or_default(), options.ignore_bundles_from.clone().unwrap_or_default(), options.to_chain_client_options(), diff --git a/linera-client/src/unit_tests/chain_listener.rs b/linera-client/src/unit_tests/chain_listener.rs index 3c006fffed91..e1753ff1a024 100644 --- a/linera-client/src/unit_tests/chain_listener.rs +++ b/linera-client/src/unit_tests/chain_listener.rs @@ -120,8 +120,8 @@ async fn test_chain_listener() -> anyhow::Result<()> { false, [(chain_id0, ListeningMode::FullChain)], format!("Client node for {:.8}", chain_id0), - Duration::from_secs(30), - Duration::from_secs(1), + Some(Duration::from_secs(30)), + Some(Duration::from_secs(1)), HashSet::new(), HashSet::new(), chain_client::Options::test_default(), @@ -235,8 +235,8 @@ async fn test_chain_listener_follow_only() -> anyhow::Result<()> { (chain_b_id, ListeningMode::FullChain), ], "Client node with follow-only and owned chains".to_string(), - Duration::from_secs(30), - Duration::from_secs(1), + Some(Duration::from_secs(30)), + Some(Duration::from_secs(1)), HashSet::new(), HashSet::new(), chain_client::Options::test_default(), @@ -386,8 +386,8 @@ async fn test_chain_listener_admin_chain() -> anyhow::Result<()> { false, std::iter::empty::<(ChainId, ListeningMode)>(), "Client node with no chains".to_string(), - Duration::from_secs(30), - Duration::from_secs(1), + Some(Duration::from_secs(30)), + Some(Duration::from_secs(1)), HashSet::new(), HashSet::new(), chain_client::Options::test_default(), @@ -465,8 +465,8 @@ async fn test_chain_listener_listen_command_adds_chains_to_wallet() -> anyhow::R false, std::iter::empty::<(ChainId, ListeningMode)>(), "Client node with no chains".to_string(), - Duration::from_secs(30), - Duration::from_secs(1), + Some(Duration::from_secs(30)), + Some(Duration::from_secs(1)), HashSet::new(), HashSet::new(), chain_client::Options::test_default(), @@ -584,8 +584,8 @@ async fn test_listener_uses_autosigner_for_incoming_messages() -> anyhow::Result false, [(chain_id0, ListeningMode::FullChain)], format!("Client node for {:.8}", chain_id0), - Duration::from_secs(30), - Duration::from_secs(1), + Some(Duration::from_secs(30)), + Some(Duration::from_secs(1)), HashSet::new(), HashSet::new(), chain_client::Options::test_default(), @@ -787,8 +787,8 @@ async fn test_chain_listener_sparse_event_download() -> anyhow::Result<()> { false, [(receiver_id, ListeningMode::FullChain)], format!("Client node for {:.8}", receiver_id), - Duration::from_secs(30), - Duration::from_secs(1), + Some(Duration::from_secs(30)), + Some(Duration::from_secs(1)), HashSet::new(), HashSet::new(), chain_client::Options::test_default(), diff --git a/linera-client/src/util.rs b/linera-client/src/util.rs index 806c6a537877..dd7f581c9103 100644 --- a/linera-client/src/util.rs +++ b/linera-client/src/util.rs @@ -13,6 +13,11 @@ use linera_base::{ use linera_core::{data_types::RoundTimeout, node::NotificationStream, worker::Reason}; use tokio_stream::StreamExt as _; +/// Treats a zero `Duration` as `None` (disabled). +pub fn non_zero_duration(d: Duration) -> Option { + (d > Duration::ZERO).then_some(d) +} + pub fn parse_json(s: &str) -> anyhow::Result { Ok(serde_json::from_str(s.trim())?) } diff --git a/linera-core/Cargo.toml b/linera-core/Cargo.toml index 003968ea2aae..06f825bc0ae9 100644 --- a/linera-core/Cargo.toml +++ b/linera-core/Cargo.toml @@ -76,6 +76,7 @@ proptest = { workspace = true, optional = true } rand = { workspace = true, features = ["std_rng"] } serde.workspace = true serde_json.workspace = true +sync_wrapper.workspace = true test-log = { workspace = true, optional = true } test-strategy = { workspace = true, optional = true } thiserror.workspace = true diff --git a/linera-core/src/chain_worker/config.rs b/linera-core/src/chain_worker/config.rs index 97ee6c353693..babc13917107 100644 --- a/linera-core/src/chain_worker/config.rs +++ b/linera-core/src/chain_worker/config.rs @@ -9,9 +9,12 @@ use linera_base::{crypto::ValidatorSecretKey, identifiers::ChainId, time::Durati use crate::CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES; -/// Configuration parameters for the [`ChainWorkerState`][`super::state::ChainWorkerState`]. +/// Configuration parameters for the chain worker and its owning +/// [`WorkerState`][`crate::worker::WorkerState`]. #[derive(Clone)] pub struct ChainWorkerConfig { + /// A name used for logging. + pub nickname: String, /// The signature key pair of the validator. The key may be missing for replicas /// without voting rights (possibly with a partial view of chains). pub key_pair: Option>, @@ -24,13 +27,17 @@ pub struct ChainWorkerConfig { /// Blocks with a timestamp this far in the future will still be accepted, but the validator /// will wait until that timestamp before voting. pub block_time_grace_period: Duration, - /// Idle chain workers free their memory after that duration without requests. - pub ttl: Duration, - /// TTL for sender chains. - // We don't want them to keep in memory forever since usually they're short-lived. - pub sender_chain_ttl: Duration, + /// Idle chain workers free their memory after this duration without requests. + /// `None` means no expiry (handle lives forever). + pub ttl: Option, + /// TTL for sender chains. `None` means no expiry. + pub sender_chain_ttl: Option, /// The size to truncate receive log entries in chain info responses. pub chain_info_max_received_log_entries: usize, + /// Maximum number of entries in the block cache. + pub block_cache_size: usize, + /// Maximum number of entries in the execution state cache. + pub execution_state_cache_size: usize, /// Chain IDs whose incoming bundles should be processed first. pub priority_bundle_origins: HashSet, /// Chain IDs whose incoming bundles should be ignored. @@ -63,14 +70,17 @@ impl ChainWorkerConfig { impl Default for ChainWorkerConfig { fn default() -> Self { Self { + nickname: String::new(), key_pair: None, allow_inactive_chains: false, allow_messages_from_deprecated_epochs: false, long_lived_services: false, block_time_grace_period: Default::default(), - ttl: Default::default(), - sender_chain_ttl: Default::default(), + ttl: None, + sender_chain_ttl: None, chain_info_max_received_log_entries: CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES, + block_cache_size: crate::worker::DEFAULT_BLOCK_CACHE_SIZE, + execution_state_cache_size: crate::worker::DEFAULT_EXECUTION_STATE_CACHE_SIZE, priority_bundle_origins: HashSet::new(), ignored_bundle_origins: HashSet::new(), cross_chain_message_chunk_limit: usize::MAX, diff --git a/linera-core/src/chain_worker/handle.rs b/linera-core/src/chain_worker/handle.rs new file mode 100644 index 000000000000..9d86f4a08914 --- /dev/null +++ b/linera-core/src/chain_worker/handle.rs @@ -0,0 +1,239 @@ +// Copyright (c) Zefchain Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +//! Helpers for managing chain worker state behind an `Arc>`. +//! +//! Tokio's [`RwLock`] is write-preferring: once a writer is waiting, new readers +//! queue behind it. This prevents read-only queries from starving write operations +//! (block proposals, certificate handling, etc.). + +use std::{ + ops::{Deref, DerefMut}, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, +}; + +/// An atomically-updatable timestamp backed by microseconds since the Unix epoch. +/// +/// This wraps the raw `AtomicU64` microsecond encoding so that call sites work +/// exclusively with [`Duration`] and never see the underlying representation. +pub(crate) struct AtomicTimestamp(AtomicU64); + +impl AtomicTimestamp { + /// Creates a new `AtomicTimestamp` set to the current time. + pub(crate) fn now() -> Self { + Self(AtomicU64::new(Self::current_micros())) + } + + /// Updates the stored timestamp to the current time. + pub(crate) fn store_now(&self) { + self.0.store(Self::current_micros(), Ordering::Relaxed); + } + + /// Returns how long has passed since the stored timestamp. + pub(crate) fn elapsed(&self) -> Duration { + let last = self.0.load(Ordering::Relaxed); + let now = Self::current_micros(); + Duration::from_micros(now.saturating_sub(last)) + } + + fn current_micros() -> u64 { + linera_base::time::SystemTime::now() + .duration_since(linera_base::time::UNIX_EPOCH) + .map(|d| d.as_micros() as u64) + .unwrap_or(0) + } +} + +use linera_base::{ + data_types::{BlockHeight, Timestamp}, + identifiers::ChainId, + time::Duration, +}; +use linera_execution::{QueryContext, ServiceRuntimeEndpoint, ServiceSyncRuntime}; +use linera_storage::Storage; +use tokio::sync::{OwnedRwLockReadGuard, RwLock}; + +use super::{config::ChainWorkerConfig, state::ChainWorkerState}; + +/// A write guard that automatically rolls back uncommitted chain state changes on drop. +/// +/// This ensures cancellation safety: if a write operation's future is dropped before +/// completion, any uncommitted state changes are rolled back rather than leaked. +pub(crate) struct RollbackGuard( + tokio::sync::OwnedRwLockWriteGuard>, +); + +impl Deref for RollbackGuard { + type Target = ChainWorkerState; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for RollbackGuard { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +impl Drop for RollbackGuard { + fn drop(&mut self) { + if self.0.is_poisoned() { + // The view is in an inconsistent state due to a journal resolution failure. + // Don't rollback — the worker will be evicted and reloaded. + return; + } + self.0.rollback(); + } +} + +/// The endpoint and background task for a long-lived service runtime. +pub(crate) struct ServiceRuntimeActor { + pub(crate) task: web_thread_pool::Task<()>, + pub(crate) endpoint: ServiceRuntimeEndpoint, +} + +impl ServiceRuntimeActor { + /// Spawns a blocking task to execute the service runtime actor. + pub(crate) async fn spawn( + chain_id: ChainId, + thread_pool: &linera_execution::ThreadPool, + ) -> Self { + let (execution_state_sender, incoming_execution_requests) = + futures::channel::mpsc::unbounded(); + let (runtime_request_sender, runtime_request_receiver) = std::sync::mpsc::channel(); + + Self { + endpoint: ServiceRuntimeEndpoint { + incoming_execution_requests, + runtime_request_sender, + }, + task: thread_pool + .run((), move |()| async move { + // The dummy context is overwritten by `prepare_for_query` + // before the first actual query is executed. + ServiceSyncRuntime::new( + execution_state_sender, + QueryContext { + chain_id, + next_block_height: BlockHeight(0), + local_time: Timestamp::from(0), + }, + ) + .run(runtime_request_receiver) + }) + .await, + } + } +} + +/// Wraps a [`ChainWorkerState`] in an `Arc>` and spawns a keep-alive task +/// if a TTL is configured. +pub(crate) fn create_chain_worker( + state: ChainWorkerState, + is_tracked: bool, + config: &ChainWorkerConfig, +) -> Arc>> { + let last_access = state.last_access_arc(); + let chain_id = state.chain().chain_id(); + let arc = Arc::new(RwLock::new(state)); + let ttl = if is_tracked { + config.sender_chain_ttl + } else { + config.ttl + }; + if let Some(ttl) = ttl { + spawn_keep_alive(chain_id, Arc::clone(&arc), last_access, ttl); + } + arc +} + +/// Acquires a read lock, updating the last-access timestamp. +pub(crate) async fn read_lock( + state: &Arc>>, +) -> OwnedRwLockReadGuard> { + let guard = state.clone().read_owned().await; + guard.touch(); + guard +} + +/// Acquires a read lock, initializing the chain if needed. +/// +/// First acquires a read lock and checks if the chain is already known to be active. +/// If not, drops the read lock, acquires a write lock to initialize the chain, +/// then re-acquires the read lock. +pub(crate) async fn read_lock_initialized( + state: &Arc>>, +) -> Result>, crate::worker::WorkerError> { + { + let guard = read_lock(state).await; + if guard.knows_chain_is_active() { + return Ok(guard); + } + } + { + let mut guard = write_lock(state).await; + guard.initialize_and_save_if_needed().await?; + } + Ok(read_lock(state).await) +} + +/// Acquires a write lock, updating the last-access timestamp. +/// +/// Returns a [`RollbackGuard`] that automatically rolls back uncommitted changes +/// when dropped, ensuring cancellation safety. +pub(crate) async fn write_lock( + state: &Arc>>, +) -> RollbackGuard { + let guard = RollbackGuard(state.clone().write_owned().await); + guard.touch(); + guard +} + +/// Spawns a background task that keeps the chain state alive for at least `ttl` +/// after the last access. When the state has been idle for the full TTL, the task +/// drops the state if it holds the only strong reference. +fn spawn_keep_alive( + chain_id: ChainId, + mut state: Arc>>, + last_access: Arc, + ttl: Duration, +) { + linera_base::Task::spawn(async move { + loop { + while let Some(remaining) = ttl + .checked_sub(last_access.elapsed()) + .filter(|remaining| *remaining > Duration::ZERO) + { + // Touched recently — sleep for the remaining time. + linera_base::time::timer::sleep(remaining).await; + } + // Idle long enough. Drop our strong reference if it's the only one. + match Arc::try_unwrap(state) { + Ok(rw_lock) => { + tracing::debug!(%chain_id, "Dropping chain worker"); + // We have sole ownership — extract the state and + // shut down the service runtime gracefully. + let mut worker_state = RwLock::into_inner(rw_lock); + let task = worker_state.clear_service_runtime(); + drop(worker_state); + if let Some(task) = task { + if let Err(err) = task.await { + tracing::warn!(%err, "Failed to shut down service runtime"); + } + } + break; + } + Err(arc) => { + arc.read().await.touch(); + state = arc; + } + } + } + }) + .forget(); +} diff --git a/linera-core/src/chain_worker/mod.rs b/linera-core/src/chain_worker/mod.rs index 5678c2c8a41f..7e0b9bf9fc2b 100644 --- a/linera-core/src/chain_worker/mod.rs +++ b/linera-core/src/chain_worker/mod.rs @@ -3,19 +3,13 @@ //! A worker to handle a single chain. -mod actor; mod config; mod delivery_notifier; -mod state; +pub(crate) mod handle; +pub(crate) mod state; +pub use self::config::ChainWorkerConfig; pub(super) use self::delivery_notifier::DeliveryNotifier; #[cfg(test)] pub(crate) use self::state::CrossChainUpdateHelper; -pub(crate) use self::{ - actor::{ - ChainWorkerActor, ChainWorkerRequest, ChainWorkerRequestReceiver, ChainWorkerRequestSender, - EventSubscriptionsResult, - }, - config::ChainWorkerConfig, - state::{BlockOutcome, CrossChainUpdateResult}, -}; +pub(crate) use self::state::{BlockOutcome, CrossChainUpdateResult, EventSubscriptionsResult}; diff --git a/linera-core/src/chain_worker/state.rs b/linera-core/src/chain_worker/state.rs index b4c673cf4fb0..b091e117c52a 100644 --- a/linera-core/src/chain_worker/state.rs +++ b/linera-core/src/chain_worker/state.rs @@ -35,24 +35,27 @@ use linera_chain::{ ChainError, ChainExecutionContext, ChainStateView, ExecutionResultExt as _, }; use linera_execution::{ - Committee, ExecutionRuntimeContext as _, ExecutionStateView, Query, QueryContext, QueryOutcome, - ResourceTracker, ServiceRuntimeEndpoint, + system::EventSubscriptions, Committee, ExecutionRuntimeContext as _, ExecutionStateView, Query, + QueryContext, QueryOutcome, ResourceTracker, ServiceRuntimeEndpoint, }; use linera_storage::{Clock as _, Storage}; use linera_views::{ context::{Context, InactiveContext}, - views::{ClonableView, ReplaceContext as _, RootView as _, View as _}, + views::{ReplaceContext as _, RootView as _, View as _}, }; -use tokio::sync::{oneshot, OwnedRwLockReadGuard, RwLock, RwLockWriteGuard}; +use tokio::sync::oneshot; use tracing::{debug, instrument, trace, warn}; -use super::{ChainWorkerConfig, ChainWorkerRequest, DeliveryNotifier, EventSubscriptionsResult}; use crate::{ + chain_worker::{handle::AtomicTimestamp, ChainWorkerConfig, DeliveryNotifier}, client::ListeningMode, data_types::{ChainInfo, ChainInfoQuery, ChainInfoResponse, CrossChainRequest}, worker::{NetworkActions, Notification, Reason, WorkerError}, }; +/// Type alias for event subscriptions result. +pub(crate) type EventSubscriptionsResult = Vec<((ChainId, StreamId), EventSubscriptions)>; + #[cfg(with_metrics)] mod metrics { use std::sync::LazyLock; @@ -82,15 +85,24 @@ mod metrics { } /// The state of the chain worker. -pub struct ChainWorkerState +pub(crate) struct ChainWorkerState where - StorageClient: Storage + Clone + 'static, + StorageClient: Storage, { config: ChainWorkerConfig, storage: StorageClient, chain: ChainStateView, - shared_chain_view: Option>>>, service_runtime_endpoint: Option, + /// The background task running the service runtime. Must be kept alive for the + /// lifetime of the worker: the pool `Guard` wrapper returns the thread-pool slot + /// when dropped, so dropping this early lets the pool schedule unrelated work on a + /// thread that is still running the service runtime. + service_runtime_task: Option>, + /// Timestamp of the last access. + /// Used by the keep-alive task to determine when the worker has been idle. + /// Wrapped in `Arc` so the keep-alive task can read it without acquiring + /// the `RwLock`. + last_access: Arc, block_values: Arc>, execution_state_cache: Option>>>, @@ -123,9 +135,6 @@ pub enum BlockOutcome { Skipped, } -/// Returned when a chain worker needs to be unloaded. -pub struct PoisonedWorkerError; - impl ChainWorkerState where StorageClient: Storage + Clone + 'static, @@ -135,7 +144,7 @@ where chain_id = %chain_id ))] #[expect(clippy::too_many_arguments)] - pub async fn load( + pub(crate) async fn load( config: ChainWorkerConfig, storage: StorageClient, block_values: Arc>, @@ -146,6 +155,7 @@ where delivery_notifier: DeliveryNotifier, chain_id: ChainId, service_runtime_endpoint: Option, + service_runtime_task: Option>, ) -> Result { let chain = storage.load_chain(chain_id).await?; @@ -153,8 +163,9 @@ where config, storage, chain, - shared_chain_view: None, service_runtime_endpoint, + service_runtime_task, + last_access: Arc::new(AtomicTimestamp::now()), block_values, execution_state_cache, chain_modes, @@ -165,239 +176,50 @@ where } /// Returns the [`ChainId`] of the chain handled by this worker. - pub fn chain_id(&self) -> ChainId { + fn chain_id(&self) -> ChainId { self.chain.chain_id() } - /// Handles a request and applies it to the chain state. - /// Returns `Err(())` if the worker is poisoned and must be reloaded. - #[instrument(skip_all, fields(chain_id = %self.chain_id()))] - pub async fn handle_request( - &mut self, - request: ChainWorkerRequest, - ) -> Result<(), PoisonedWorkerError> { - tracing::trace!("Handling chain worker request: {request:?}"); - assert!( - !self.poisoned, - "handle_request should not be called on a poisoned chain worker" - ); - // TODO(#2237): Spawn concurrent tasks for read-only operations - let responded = match request { - #[cfg(with_testing)] - ChainWorkerRequest::ReadCertificate { height, callback } => { - callback.send(self.read_certificate(height).await).is_ok() - } - ChainWorkerRequest::GetChainStateView { callback } => { - callback.send(self.chain_state_view().await).is_ok() - } - ChainWorkerRequest::QueryApplication { - query, - block_hash, - callback, - } => callback - .send(self.query_application(query, block_hash).await) - .is_ok(), - ChainWorkerRequest::DescribeApplication { - application_id, - callback, - } => callback - .send(self.describe_application(application_id).await) - .is_ok(), - ChainWorkerRequest::StageBlockExecution { - block, - round, - published_blobs, - policy, - callback, - } => { - let result = self - .stage_block_execution(block, round, &published_blobs, policy) - .await; - callback.send(result).is_ok() - } - ChainWorkerRequest::ProcessTimeout { - certificate, - callback, - } => callback - .send(self.process_timeout(certificate).await) - .is_ok(), - ChainWorkerRequest::HandleBlockProposal { proposal, callback } => callback - .send(self.handle_block_proposal(proposal).await) - .is_ok(), - ChainWorkerRequest::ProcessValidatedBlock { - certificate, - callback, - } => callback - .send(self.process_validated_block(certificate).await) - .is_ok(), - ChainWorkerRequest::ProcessConfirmedBlock { - certificate, - notify_when_messages_are_delivered, - callback, - } => callback - .send( - self.process_confirmed_block(certificate, notify_when_messages_are_delivered) - .await, - ) - .is_ok(), - ChainWorkerRequest::ProcessCrossChainUpdate { - origin, - bundles, - previous_height, - callback, - } => callback - .send( - self.process_cross_chain_update(origin, bundles, previous_height) - .await, - ) - .is_ok(), - ChainWorkerRequest::ConfirmUpdatedRecipient { - recipient, - latest_height, - callback, - } => callback - .send( - self.confirm_updated_recipient(recipient, latest_height) - .await, - ) - .is_ok(), - ChainWorkerRequest::HandleRevertConfirm { - recipient, - retransmit_from, - callback, - } => callback - .send(self.handle_revert_confirm(recipient, retransmit_from).await) - .is_ok(), - ChainWorkerRequest::HandleChainInfoQuery { query, callback } => callback - .send(self.handle_chain_info_query(query).await) - .is_ok(), - ChainWorkerRequest::DownloadPendingBlob { blob_id, callback } => callback - .send(self.download_pending_blob(blob_id).await) - .is_ok(), - ChainWorkerRequest::HandlePendingBlob { blob, callback } => { - callback.send(self.handle_pending_blob(blob).await).is_ok() - } - ChainWorkerRequest::UpdateReceivedCertificateTrackers { - new_trackers, - callback, - } => callback - .send( - self.update_received_certificate_trackers(new_trackers) - .await, - ) - .is_ok(), - ChainWorkerRequest::GetPreprocessedBlockHashes { - start, - end, - callback, - } => callback - .send(self.get_preprocessed_block_hashes(start, end).await) - .is_ok(), - ChainWorkerRequest::GetInboxNextHeight { origin, callback } => callback - .send(self.get_inbox_next_height(origin).await) - .is_ok(), - ChainWorkerRequest::GetLockingBlobs { blob_ids, callback } => callback - .send(self.get_locking_blobs(blob_ids).await) - .is_ok(), - ChainWorkerRequest::GetBlockHashes { heights, callback } => { - callback.send(self.get_block_hashes(heights).await).is_ok() - } - ChainWorkerRequest::GetProposedBlobs { blob_ids, callback } => callback - .send(self.get_proposed_blobs(blob_ids).await) - .is_ok(), - ChainWorkerRequest::GetEventSubscriptions { callback } => { - callback.send(self.get_event_subscriptions().await).is_ok() - } - ChainWorkerRequest::GetStreamEventCount { - stream_id, - callback, - } => callback - .send(self.get_stream_event_count(stream_id).await) - .is_ok(), - ChainWorkerRequest::GetReceivedCertificateTrackers { callback } => callback - .send(self.get_received_certificate_trackers().await) - .is_ok(), - ChainWorkerRequest::GetTipStateAndOutboxInfo { - receiver_id, - callback, - } => callback - .send(self.get_tip_state_and_outbox_info(receiver_id).await) - .is_ok(), - ChainWorkerRequest::GetNextHeightToPreprocess { callback } => callback - .send(self.get_next_height_to_preprocess().await) - .is_ok(), - ChainWorkerRequest::GetManagerSeed { callback } => { - callback.send(self.get_manager_seed().await).is_ok() - } - ChainWorkerRequest::GetPreviousEventBlocks { - stream_ids, - callback, - } => callback - .send(self.get_previous_event_blocks(stream_ids).await) - .is_ok(), - ChainWorkerRequest::GetNextExpectedEvents { - stream_ids, - callback, - } => callback - .send(self.get_next_expected_events(stream_ids).await) - .is_ok(), - }; + /// Returns a reference to the chain state view. + pub(crate) fn chain(&self) -> &ChainStateView { + &self.chain + } - if !responded { - debug!("Callback for `ChainWorkerActor` was dropped before a response was sent"); - } + /// Returns whether this chain is known to be active (initialized). + pub(crate) fn knows_chain_is_active(&self) -> bool { + self.knows_chain_is_active + } - if self.poisoned { - // The view is in an inconsistent state due to a journal resolution failure. - // Don't rollback — the worker will be dropped and reloaded. - return Err(PoisonedWorkerError); - } - // Roll back any unsaved changes to the chain state: If there was an error while trying - // to handle the request, the chain state might contain unsaved and potentially invalid - // changes. The next request needs to be applied to the chain state as it is in storage. + /// Rolls back any uncommitted changes to the chain state. + pub(crate) fn rollback(&mut self) { self.chain.rollback(); - Ok(()) } - /// Returns a read-only view of the [`ChainStateView`]. - /// - /// The returned view holds a lock on the chain state, which prevents the worker from changing - /// it. - pub(super) async fn chain_state_view( - &mut self, - ) -> Result>, WorkerError> { - if self.shared_chain_view.is_none() { - self.shared_chain_view = Some(Arc::new(RwLock::new(self.chain.clone_unchecked()?))); - } + /// Returns whether this worker is poisoned (view is inconsistent). + pub(crate) fn is_poisoned(&self) -> bool { + self.poisoned + } - Ok(self - .shared_chain_view - .as_ref() - .expect("`shared_chain_view` should be initialized above") - .clone() - .read_owned() - .await) + /// Updates the last-access timestamp to the current time. + pub(crate) fn touch(&self) { + self.last_access.store_now(); } - /// Clears the shared chain view, and acquires and drops its write lock. - /// - /// This is the only place a write lock is acquired, and read locks are acquired in - /// the `chain_state_view` method, which has a `&mut self` receiver like this one. - /// That means that when this function returns, no readers will be waiting to acquire - /// the lock and it is safe to write the chain state to storage without any readers - /// having a stale view of it. - #[instrument(skip_all, fields( - chain_id = %self.chain_id() - ))] - pub(super) async fn clear_shared_chain_view(&mut self) { - if let Some(shared_chain_view) = self.shared_chain_view.take() { - let _: RwLockWriteGuard<_> = shared_chain_view.write().await; - } + /// Returns a clone of the last-access `Arc`, for use by the keep-alive task. + pub(crate) fn last_access_arc(&self) -> Arc { + Arc::clone(&self.last_access) + } + + /// Drops the service runtime endpoint, signaling the runtime task to stop. + /// Returns the runtime task so the caller can await it outside the lock. + pub(crate) fn clear_service_runtime(&mut self) -> Option> { + self.service_runtime_endpoint.take(); + self.service_runtime_task.take() } /// Handles a [`ChainInfoQuery`], potentially voting on the next block. #[tracing::instrument(level = "debug", skip(self))] - pub(super) async fn handle_chain_info_query( + pub(crate) async fn handle_chain_info_query( &mut self, query: ChainInfoQuery, ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> { @@ -423,7 +245,7 @@ where chain_id = %self.chain_id(), blob_id = %blob_id ))] - pub(super) async fn download_pending_blob(&self, blob_id: BlobId) -> Result { + pub(crate) async fn download_pending_blob(&self, blob_id: BlobId) -> Result { if let Some(blob) = self.chain.manager.pending_blob(&blob_id).await? { return Ok(blob); } @@ -688,7 +510,7 @@ where chain_id = %self.chain_id(), height = %height ))] - pub async fn all_messages_to_tracked_chains_delivered_up_to( + async fn all_messages_to_tracked_chains_delivered_up_to( &self, height: BlockHeight, ) -> Result { @@ -719,7 +541,7 @@ where chain_id = %self.chain_id(), height = %certificate.inner().height() ))] - pub(super) async fn process_timeout( + pub(crate) async fn process_timeout( &mut self, certificate: TimeoutCertificate, ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> { @@ -761,7 +583,7 @@ where chain_id = %self.chain_id(), block_height = %proposal.content.block.height ))] - pub(super) async fn load_proposal_blobs( + async fn load_proposal_blobs( &mut self, proposal: &BlockProposal, ) -> Result, WorkerError> { @@ -809,7 +631,7 @@ where chain_id = %self.chain_id(), block_height = %certificate.block().header.height ))] - pub(super) async fn process_validated_block( + pub(crate) async fn process_validated_block( &mut self, certificate: ValidatedBlockCertificate, ) -> Result<(ChainInfoResponse, NetworkActions, BlockOutcome), WorkerError> { @@ -914,7 +736,7 @@ where height = %certificate.block().header.height, block_hash = %certificate.hash(), ))] - pub(super) async fn process_confirmed_block( + pub(crate) async fn process_confirmed_block( &mut self, certificate: ConfirmedBlockCertificate, notify_when_messages_are_delivered: Option>, @@ -1012,17 +834,17 @@ where // Persist chain. self.save().await?; let mut actions = self.create_network_actions(None).await?; - trace!("Preprocessed confirmed block {height} on chain {chain_id:.8}"); if !event_streams.is_empty() { actions.notifications.push(Notification { chain_id, reason: Reason::NewEvents { height, - hash: certificate.hash(), + hash: block_hash, event_streams, }, }); } + trace!("Preprocessed confirmed block {height} on chain {chain_id:.8}"); self.register_delivery_notifier(height, &actions, notify_when_messages_are_delivered) .await; return Ok(( @@ -1215,7 +1037,7 @@ where /// Updates the chain's inboxes, receiving messages from a cross-chain update. #[instrument(level = "trace", skip(self, bundles))] - pub(super) async fn process_cross_chain_update( + pub(crate) async fn process_cross_chain_update( &mut self, origin: ChainId, bundles: Vec<(Epoch, MessageBundle)>, @@ -1326,7 +1148,7 @@ where recipient = %recipient, latest_height = %latest_height ))] - pub(super) async fn confirm_updated_recipient( + pub(crate) async fn confirm_updated_recipient( &mut self, recipient: ChainId, latest_height: BlockHeight, @@ -1362,7 +1184,7 @@ where %recipient, %retransmit_from, ))] - pub(super) async fn handle_revert_confirm( + pub(crate) async fn handle_revert_confirm( &mut self, recipient: ChainId, retransmit_from: BlockHeight, @@ -1528,7 +1350,7 @@ where chain_id = %self.chain_id(), num_trackers = %new_trackers.len() ))] - pub async fn update_received_certificate_trackers( + pub(crate) async fn update_received_certificate_trackers( &mut self, new_trackers: BTreeMap, ) -> Result<(), WorkerError> { @@ -1544,7 +1366,7 @@ where start = %start, end = %end ))] - async fn get_preprocessed_block_hashes( + pub(crate) async fn get_preprocessed_block_hashes( &self, start: BlockHeight, end: BlockHeight, @@ -1566,7 +1388,10 @@ where chain_id = %self.chain_id(), origin = %origin ))] - async fn get_inbox_next_height(&self, origin: ChainId) -> Result { + pub(crate) async fn get_inbox_next_height( + &self, + origin: ChainId, + ) -> Result { Ok(match self.chain.inboxes.try_load_entry(&origin).await? { Some(inbox) => inbox.next_block_height_to_receive()?, None => BlockHeight::ZERO, @@ -1579,7 +1404,7 @@ where chain_id = %self.chain_id(), num_blob_ids = %blob_ids.len() ))] - async fn get_locking_blobs( + pub(crate) async fn get_locking_blobs( &self, blob_ids: Vec, ) -> Result>, WorkerError> { @@ -1593,7 +1418,7 @@ where } /// Gets block hashes for specified heights. - async fn get_block_hashes( + pub(crate) async fn get_block_hashes( &self, heights: Vec, ) -> Result, WorkerError> { @@ -1601,7 +1426,10 @@ where } /// Gets proposed blobs from the manager for specified blob IDs. - async fn get_proposed_blobs(&self, blob_ids: Vec) -> Result, WorkerError> { + pub(crate) async fn get_proposed_blobs( + &self, + blob_ids: Vec, + ) -> Result, WorkerError> { let results = self .chain .manager @@ -1623,7 +1451,7 @@ where } /// Gets the previous event blocks for specific streams. - async fn get_previous_event_blocks( + pub(crate) async fn get_previous_event_blocks( &self, stream_ids: Vec, ) -> Result, WorkerError> { @@ -1652,7 +1480,7 @@ where } /// Gets the `next_expected_events` indices for the given streams. - async fn get_next_expected_events( + pub(crate) async fn get_next_expected_events( &self, stream_ids: Vec, ) -> Result, WorkerError> { @@ -1669,7 +1497,9 @@ where } /// Gets event subscriptions. - async fn get_event_subscriptions(&self) -> Result { + pub(crate) async fn get_event_subscriptions( + &self, + ) -> Result { Ok(self .chain .execution_state @@ -1680,7 +1510,7 @@ where } /// Gets the stream event count for a stream, including preprocessed blocks. - async fn get_stream_event_count( + pub(crate) async fn get_stream_event_count( &self, stream_id: StreamId, ) -> Result, WorkerError> { @@ -1701,14 +1531,14 @@ where } /// Gets received certificate trackers. - async fn get_received_certificate_trackers( + pub(crate) async fn get_received_certificate_trackers( &self, ) -> Result, WorkerError> { Ok(self.chain.received_certificate_trackers.get().clone()) } /// Gets tip state and outbox info for next_outbox_heights calculation. - async fn get_tip_state_and_outbox_info( + pub(crate) async fn get_tip_state_and_outbox_info( &self, receiver_id: ChainId, ) -> Result<(BlockHeight, Option), WorkerError> { @@ -1723,12 +1553,12 @@ where } /// Gets the next height to preprocess. - async fn get_next_height_to_preprocess(&self) -> Result { + pub(crate) async fn get_next_height_to_preprocess(&self) -> Result { Ok(self.chain.next_height_to_preprocess().await?) } /// Gets the chain manager's seed for leader election. - async fn get_manager_seed(&self) -> Result { + pub(crate) async fn get_manager_seed(&self) -> Result { Ok(*self.chain.manager.seed.get()) } @@ -1738,7 +1568,7 @@ where height = %height, round = %round ))] - pub(super) async fn vote_for_leader_timeout( + async fn vote_for_leader_timeout( &mut self, height: BlockHeight, round: Round, @@ -1769,7 +1599,7 @@ where #[instrument(skip_all, fields( chain_id = %self.chain_id() ))] - pub(super) async fn vote_for_fallback(&mut self) -> Result<(), WorkerError> { + async fn vote_for_fallback(&mut self) -> Result<(), WorkerError> { Err(WorkerError::NoFallbackMode) } @@ -1777,7 +1607,7 @@ where chain_id = %self.chain_id(), blob_id = %blob.id() ))] - pub(super) async fn handle_pending_blob( + pub(crate) async fn handle_pending_blob( &mut self, blob: Blob, ) -> Result { @@ -1817,11 +1647,10 @@ where chain_id = %self.chain_id(), height = %height ))] - pub(super) async fn read_certificate( - &mut self, + pub(crate) async fn read_certificate( + &self, height: BlockHeight, ) -> Result, WorkerError> { - self.initialize_and_save_if_needed().await?; let certificate_hash = match self.chain.confirmed_log.get(height.try_into()?).await? { Some(hash) => hash, None => return Ok(None), @@ -1839,7 +1668,7 @@ where chain_id = %self.chain_id(), query_application_id = %query.application_id() ))] - pub(super) async fn query_application( + pub(crate) async fn query_application( &mut self, query: Query, block_hash: Option, @@ -1895,30 +1724,37 @@ where } } - /// Returns an application's description. + /// Returns an application's description by reading the blob directly from storage. + /// + /// Does not track blob usage (which requires `&mut self`), making it safe for + /// concurrent reads. Blob tracking is only relevant during block execution and is + /// always rolled back for read-only queries. #[instrument(skip_all, fields( chain_id = %self.chain_id(), application_id = %application_id ))] - pub(super) async fn describe_application( - &mut self, + pub(crate) async fn describe_application_readonly( + &self, application_id: ApplicationId, ) -> Result { - self.initialize_and_save_if_needed().await?; - let response = self.chain.describe_application(application_id).await?; - Ok(response) + let blob_id = application_id.description_blob_id(); + let blob = self + .storage + .read_blob(blob_id) + .await? + .ok_or(WorkerError::BlobsNotFound(vec![blob_id]))?; + Ok(bcs::from_bytes(blob.bytes())?) } - /// Executes a block without persisting any changes to the state, with a specified - /// policy for handling bundle failures. + /// Executes a block without persisting any changes to the state. /// /// The block may be modified to reflect the actual executed transactions - /// (bundles may be rejected or discarded based on the policy). + /// (bundles may be rejected or removed based on the policy). #[instrument(skip_all, fields( chain_id = %self.chain_id(), block_height = %block.height ))] - async fn stage_block_execution( + pub(crate) async fn stage_block_execution( &mut self, block: ProposedBlock, round: Option, @@ -1958,7 +1794,7 @@ where chain_id = %self.chain_id(), block_height = %proposal.content.block.height ))] - pub(super) async fn handle_block_proposal( + pub(crate) async fn handle_block_proposal( &mut self, proposal: BlockProposal, ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> { @@ -2058,9 +1894,9 @@ where block_time_grace_period: self.config.block_time_grace_period, }); } - // Note: The actor delays processing proposals with future timestamps (within the grace - // period) so that other requests can be handled in the meantime. By the time we reach - // here, the block timestamp should be in the past or very close to the current time. + // Note: WorkerState::handle_block_proposal delays processing proposals with future + // timestamps (within the grace period) before acquiring the chain lock. By the time + // we reach here, the block timestamp should be in the past or very close to current time. self.chain .remove_bundles_from_inboxes(block.timestamp, true, block.incoming_bundles()) @@ -2120,7 +1956,7 @@ where #[instrument(skip_all, fields( chain_id = %self.chain_id() ))] - pub(super) async fn prepare_chain_info_response( + async fn prepare_chain_info_response( &mut self, query: ChainInfoQuery, ) -> Result { @@ -2270,7 +2106,7 @@ where #[instrument(skip_all, fields( chain_id = %self.chain_id() ))] - async fn initialize_and_save_if_needed(&mut self) -> Result<(), WorkerError> { + pub(crate) async fn initialize_and_save_if_needed(&mut self) -> Result<(), WorkerError> { if !self.knows_chain_is_active { let local_time = self.storage.clock().current_time(); self.chain.initialize_if_needed(local_time).await?; @@ -2280,19 +2116,17 @@ where Ok(()) } - fn chain_info_response(&self) -> ChainInfoResponse { + pub(crate) fn chain_info_response(&self) -> ChainInfoResponse { ChainInfoResponse::new(&self.chain, self.config.key_pair()) } /// Stores the chain state in persistent storage. /// - /// Waits until the [`ChainStateView`] is no longer shared before persisting the changes. /// If the save fails, the worker is marked as poisoned and must be reloaded. #[instrument(skip_all, fields( chain_id = %self.chain_id() ))] async fn save(&mut self) -> Result<(), WorkerError> { - self.clear_shared_chain_view().await; if let Err(e) = self.chain.save().await { if e.must_reload_view() { tracing::error!( @@ -2338,14 +2172,14 @@ fn check_block_epoch( /// Helper type for handling cross-chain updates. pub(crate) struct CrossChainUpdateHelper<'a> { - pub allow_messages_from_deprecated_epochs: bool, - pub current_epoch: Epoch, - pub committees: &'a BTreeMap, + pub(crate) allow_messages_from_deprecated_epochs: bool, + pub(crate) current_epoch: Epoch, + pub(crate) committees: &'a BTreeMap, } impl<'a> CrossChainUpdateHelper<'a> { /// Creates a new [`CrossChainUpdateHelper`]. - pub fn new(config: &ChainWorkerConfig, chain: &'a ChainStateView) -> Self + fn new(config: &ChainWorkerConfig, chain: &'a ChainStateView) -> Self where C: Context + Clone + 'static, { @@ -2365,7 +2199,7 @@ impl<'a> CrossChainUpdateHelper<'a> { /// * Basic invariants are checked for good measure. We still crucially trust /// the worker of the sending chain to have verified and executed the blocks /// correctly. - pub fn select_message_bundles( + pub(crate) fn select_message_bundles( &self, origin: &'a ChainId, recipient: ChainId, diff --git a/linera-core/src/client/chain_client/mod.rs b/linera-core/src/client/chain_client/mod.rs index 87606bea9483..2121af999a25 100644 --- a/linera-core/src/client/chain_client/mod.rs +++ b/linera-core/src/client/chain_client/mod.rs @@ -43,7 +43,7 @@ use linera_chain::{ Block, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate, Timeout, TimeoutCertificate, ValidatedBlock, }, - ChainError, ChainExecutionContext, ChainStateView, + ChainError, ChainExecutionContext, }; use linera_execution::{ committee::Committee, @@ -450,10 +450,7 @@ impl ChainClient { #[instrument(level = "trace")] pub async fn chain_state_view( &self, - ) -> Result< - tokio::sync::OwnedRwLockReadGuard>, - LocalNodeError, - > { + ) -> Result, LocalNodeError> { self.client.local_node.chain_state_view(self.chain_id).await } diff --git a/linera-core/src/client/mod.rs b/linera-core/src/client/mod.rs index 239e1e75470e..0d5497d07f61 100644 --- a/linera-core/src/client/mod.rs +++ b/linera-core/src/client/mod.rs @@ -51,7 +51,7 @@ use crate::{ remote_node::RemoteNode, updater::{communicate_with_quorum, CommunicateAction, ValidatorUpdater}, worker::{Notification, ProcessableCertificate, Reason, WorkerError, WorkerState}, - CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES, + ChainWorkerConfig, CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES, }; pub mod chain_client; @@ -264,8 +264,8 @@ impl Client { long_lived_services: bool, chain_modes: impl IntoIterator, name: impl Into, - chain_worker_ttl: Duration, - sender_chain_worker_ttl: Duration, + chain_worker_ttl: Option, + sender_chain_worker_ttl: Option, priority_bundle_origins: HashSet, ignored_bundle_origins: HashSet, options: chain_client::Options, @@ -274,20 +274,24 @@ impl Client { execution_state_cache_size: usize, ) -> Self { let chain_modes = Arc::new(RwLock::new(chain_modes.into_iter().collect())); - let state = WorkerState::new_for_client( - name.into(), - environment.storage().clone(), - chain_modes.clone(), + let config = ChainWorkerConfig { + nickname: name.into(), + long_lived_services, + allow_inactive_chains: true, + allow_messages_from_deprecated_epochs: true, + ttl: chain_worker_ttl, + sender_chain_ttl: sender_chain_worker_ttl, + priority_bundle_origins, block_cache_size, execution_state_cache_size, - ) - .with_long_lived_services(long_lived_services) - .with_allow_inactive_chains(true) - .with_allow_messages_from_deprecated_epochs(true) - .with_chain_worker_ttl(chain_worker_ttl) - .with_sender_chain_worker_ttl(sender_chain_worker_ttl) - .with_priority_bundle_origins(priority_bundle_origins) - .with_ignored_bundle_origins(ignored_bundle_origins); + ignored_bundle_origins, + ..ChainWorkerConfig::default() + }; + let state = WorkerState::new( + environment.storage().clone(), + config, + Some(chain_modes.clone()), + ); let local_node = LocalNodeClient::new(state); let requests_scheduler = RequestsScheduler::new(vec![], requests_scheduler_config); diff --git a/linera-core/src/lib.rs b/linera-core/src/lib.rs index 0992570fc6f7..0886a706ba6d 100644 --- a/linera-core/src/lib.rs +++ b/linera-core/src/lib.rs @@ -9,6 +9,7 @@ #![allow(async_fn_in_trait)] mod chain_worker; +pub use chain_worker::ChainWorkerConfig; pub mod client; pub use client::Client; pub mod data_types; diff --git a/linera-core/src/local_node.rs b/linera-core/src/local_node.rs index dc0f1ba184e6..bfc9a1e45696 100644 --- a/linera-core/src/local_node.rs +++ b/linera-core/src/local_node.rs @@ -16,13 +16,11 @@ use linera_base::{ use linera_chain::{ data_types::{BlockProposal, BundleExecutionPolicy, ProposedBlock}, types::{Block, GenericCertificate}, - ChainStateView, }; use linera_execution::{committee::Committee, BlobState, Query, QueryOutcome, ResourceTracker}; use linera_storage::Storage; use linera_views::ViewError; use thiserror::Error; -use tokio::sync::OwnedRwLockReadGuard; use tracing::{instrument, warn}; use crate::{ @@ -232,7 +230,7 @@ where pub async fn chain_state_view( &self, chain_id: ChainId, - ) -> Result>, LocalNodeError> { + ) -> Result, LocalNodeError> { Ok(self.node.state.chain_state_view(chain_id).await?) } diff --git a/linera-core/src/unit_tests/test_utils.rs b/linera-core/src/unit_tests/test_utils.rs index 030f79aa22ee..c3d9009dd40e 100644 --- a/linera-core/src/unit_tests/test_utils.rs +++ b/linera-core/src/unit_tests/test_utils.rs @@ -56,10 +56,8 @@ use crate::{ ValidatorNodeProvider, }, notifier::ChannelNotifier, - worker::{ - Notification, ProcessableCertificate, WorkerState, DEFAULT_BLOCK_CACHE_SIZE, - DEFAULT_EXECUTION_STATE_CACHE_SIZE, - }, + worker::{Notification, ProcessableCertificate, WorkerState}, + ChainWorkerConfig, }; #[derive(Debug, PartialEq, Clone, Copy)] @@ -891,15 +889,12 @@ where for (i, (validator_keypair, _account_public_key)) in validators.into_iter().enumerate() { let validator_public_key = validator_keypair.public_key; let storage = storage_builder.build().await?; - let state = WorkerState::new( - format!("Node {}", i), - Some(validator_keypair.secret_key), - storage.clone(), - DEFAULT_BLOCK_CACHE_SIZE, - DEFAULT_EXECUTION_STATE_CACHE_SIZE, - ) - .with_allow_inactive_chains(false) - .with_allow_messages_from_deprecated_epochs(false); + let config = ChainWorkerConfig { + nickname: format!("Node {}", i), + ..ChainWorkerConfig::default() + } + .with_key_pair(Some(validator_keypair.secret_key)); + let state = WorkerState::new(storage.clone(), config, None); let mut validator = LocalValidatorClient::new(validator_public_key, state); if i < with_faulty_validators { faulty_validators.insert(validator_public_key); @@ -1104,14 +1099,14 @@ where false, [(chain_id, ListeningMode::FullChain)], format!("Client node for {:.8}", chain_id), - Duration::from_secs(30), - Duration::from_secs(1), + Some(Duration::from_secs(30)), + Some(Duration::from_secs(1)), HashSet::new(), HashSet::new(), options, crate::client::RequestsSchedulerConfig::default(), - DEFAULT_BLOCK_CACHE_SIZE, - DEFAULT_EXECUTION_STATE_CACHE_SIZE, + crate::worker::DEFAULT_BLOCK_CACHE_SIZE, + crate::worker::DEFAULT_EXECUTION_STATE_CACHE_SIZE, )); Ok(client.create_chain_client(chain_id, block_hash, block_height, None, owner, None)) } diff --git a/linera-core/src/unit_tests/worker_tests.rs b/linera-core/src/unit_tests/worker_tests.rs index 061805703bda..25f14ac802db 100644 --- a/linera-core/src/unit_tests/worker_tests.rs +++ b/linera-core/src/unit_tests/worker_tests.rs @@ -73,6 +73,7 @@ use crate::{ Reason::{self, NewBlock, NewIncomingBundle}, WorkerError, WorkerState, }, + ChainWorkerConfig, }; /// The test worker accepts blocks with a timestamp this far in the future. @@ -143,17 +144,16 @@ where .await .expect("writing a network description should not fail"); - let worker = WorkerState::new( - "Single validator node".to_string(), - Some(validator_keypair.secret_key), - storage, - super::DEFAULT_BLOCK_CACHE_SIZE, - super::DEFAULT_EXECUTION_STATE_CACHE_SIZE, - ) - .with_allow_inactive_chains(is_client) - .with_allow_messages_from_deprecated_epochs(is_client) - .with_long_lived_services(has_long_lived_services) - .with_block_time_grace_period(Duration::from_micros(TEST_GRACE_PERIOD_MICROS)); + let config = ChainWorkerConfig { + nickname: "Single validator node".to_string(), + allow_inactive_chains: is_client, + allow_messages_from_deprecated_epochs: is_client, + long_lived_services: has_long_lived_services, + block_time_grace_period: Duration::from_micros(TEST_GRACE_PERIOD_MICROS), + ..ChainWorkerConfig::default() + } + .with_key_pair(Some(validator_keypair.secret_key)); + let worker = WorkerState::new(storage, config, None); Self { committee, worker, @@ -181,7 +181,7 @@ where } fn with_cross_chain_message_chunk_limit(mut self, limit: usize) -> Self { - self.worker = self.worker.with_cross_chain_message_chunk_limit(limit); + self.worker.set_cross_chain_message_chunk_limit(limit); self } @@ -4476,7 +4476,7 @@ where { let sender_key_pair = AccountSecretKey::generate(); let mut env = TestEnvironment::new(storage_builder.build().await?, true, false).await; - env.worker = env.worker.with_allow_revert_confirm(true); + env.worker.chain_worker_config.allow_revert_confirm = true; let chain_1_desc = env .add_root_chain(1, sender_key_pair.public().into(), Amount::from_tokens(100)) .await; @@ -4701,15 +4701,19 @@ where // Step 4: Verify that WITHOUT recovery, processing fails with IncorrectOutcome. { let worker_no_recovery = WorkerState::new( - "No-recovery worker".to_string(), - Some(env.worker().chain_worker_config.key_pair().unwrap().copy()), storage.clone(), - super::DEFAULT_BLOCK_CACHE_SIZE, - super::DEFAULT_EXECUTION_STATE_CACHE_SIZE, - ) - .with_allow_inactive_chains(true) - .with_allow_messages_from_deprecated_epochs(true) - .with_block_time_grace_period(Duration::from_micros(TEST_GRACE_PERIOD_MICROS)); + ChainWorkerConfig { + nickname: "No-recovery worker".to_string(), + allow_inactive_chains: true, + allow_messages_from_deprecated_epochs: true, + block_time_grace_period: Duration::from_micros(TEST_GRACE_PERIOD_MICROS), + ..ChainWorkerConfig::default() + } + .with_key_pair(Some( + env.worker().chain_worker_config.key_pair().unwrap().copy(), + )), + None, + ); assert_matches!( worker_no_recovery @@ -4721,16 +4725,20 @@ where // Step 5: Now create a worker WITH recovery enabled and process the same block. let worker_with_recovery = WorkerState::new( - "Recovery worker".to_string(), - Some(env.worker().chain_worker_config.key_pair().unwrap().copy()), storage.clone(), - super::DEFAULT_BLOCK_CACHE_SIZE, - super::DEFAULT_EXECUTION_STATE_CACHE_SIZE, - ) - .with_allow_inactive_chains(true) - .with_allow_messages_from_deprecated_epochs(true) - .with_reset_on_incorrect_outcome(Some(0)) - .with_block_time_grace_period(Duration::from_micros(TEST_GRACE_PERIOD_MICROS)); + ChainWorkerConfig { + nickname: "Recovery worker".to_string(), + allow_inactive_chains: true, + allow_messages_from_deprecated_epochs: true, + reset_on_incorrect_outcome: Some(Duration::from_secs(0)), + block_time_grace_period: Duration::from_micros(TEST_GRACE_PERIOD_MICROS), + ..ChainWorkerConfig::default() + } + .with_key_pair(Some( + env.worker().chain_worker_config.key_pair().unwrap().copy(), + )), + None, + ); worker_with_recovery .fully_handle_certificate_with_notifications(cert_1.clone(), &()) @@ -4897,7 +4905,7 @@ where let mut env = TestEnvironment::new(storage_builder.build().await?, true, false) .await .with_cross_chain_message_chunk_limit(1); - env.worker = env.worker.with_allow_revert_confirm(true); + env.worker.chain_worker_config.allow_revert_confirm = true; let chain_1_desc = env .add_root_chain(1, sender_key_pair.public().into(), Amount::from_tokens(100)) diff --git a/linera-core/src/worker.rs b/linera-core/src/worker.rs index 12a6535ce117..16bd3f3399ae 100644 --- a/linera-core/src/worker.rs +++ b/linera-core/src/worker.rs @@ -3,23 +3,24 @@ // SPDX-License-Identifier: Apache-2.0 use std::{ - collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque}, + collections::{BTreeMap, BTreeSet, HashMap, VecDeque}, sync::{Arc, Mutex, RwLock}, time::Duration, }; -use futures::future::Either; +use futures::{ + future::{Either, Shared}, + FutureExt as _, +}; use linera_base::{ - crypto::{CryptoError, CryptoHash, ValidatorPublicKey, ValidatorSecretKey}, + crypto::{CryptoError, CryptoHash, ValidatorPublicKey}, data_types::{ - ApplicationDescription, ArithmeticError, Blob, BlockHeight, Epoch, Round, Timestamp, + ApplicationDescription, ArithmeticError, Blob, BlockHeight, Epoch, Round, TimeDelta, + Timestamp, }, doc_scalar, identifiers::{AccountOwner, ApplicationId, BlobId, ChainId, EventId, StreamId}, - time::Instant, - util::traits::DynError, }; -use linera_cache::{UniqueValueCache, ValueCache}; #[cfg(with_testing)] use linera_chain::ChainExecutionContext; use linera_chain::{ @@ -33,46 +34,76 @@ use linera_chain::{ ChainError, ChainStateView, }; use linera_execution::{ExecutionError, ExecutionStateView, Query, QueryOutcome, ResourceTracker}; -use linera_storage::Storage; +use linera_storage::{Clock as _, Storage}; use linera_views::{context::InactiveContext, ViewError}; use serde::{Deserialize, Serialize}; use thiserror::Error; -use tokio::sync::{mpsc, oneshot, OwnedRwLockReadGuard}; -use tracing::{error, instrument, trace, warn}; +use tokio::sync::{oneshot, OwnedRwLockReadGuard}; +use tracing::{instrument, trace, warn}; + +/// A read guard providing access to a chain's [`ChainStateView`]. +/// +/// Holds a read lock on the chain worker state, preventing writes for its +/// lifetime. The `OwnedRwLockReadGuard` internally holds a strong `Arc` +/// reference to the `RwLock`, keeping the state alive. +/// Dereferences to `ChainStateView`. +pub struct ChainStateViewReadGuard( + OwnedRwLockReadGuard, ChainStateView>, +); + +impl std::ops::Deref for ChainStateViewReadGuard { + type Target = ChainStateView; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +use linera_cache::{UniqueValueCache, ValueCache}; /// Re-export of [`EventSubscriptionsResult`] for use by other crate modules. -pub(crate) use crate::chain_worker::{ - ChainWorkerRequestReceiver, ChainWorkerRequestSender, EventSubscriptionsResult, -}; +pub(crate) use crate::chain_worker::EventSubscriptionsResult; use crate::{ chain_worker::{ - BlockOutcome, ChainWorkerActor, ChainWorkerConfig, ChainWorkerRequest, - CrossChainUpdateResult, DeliveryNotifier, + handle, state::ChainWorkerState, BlockOutcome, ChainWorkerConfig, CrossChainUpdateResult, + DeliveryNotifier, }, client::ListeningMode, data_types::{ChainInfoQuery, ChainInfoResponse, CrossChainRequest}, - join_set_ext::{JoinSet, JoinSetExt}, notifier::Notifier, - CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES, }; -pub const DEFAULT_BLOCK_CACHE_SIZE: usize = 5_000; -pub const DEFAULT_EXECUTION_STATE_CACHE_SIZE: usize = 10_000; - #[cfg(test)] #[path = "unit_tests/worker_tests.rs"] mod worker_tests; +/// Wraps a future in `SyncFuture` on non-web targets so that it satisfies `Sync` bounds. +/// On web targets the future is returned as-is. +#[cfg(not(web))] +pub(crate) fn wrap_future(f: F) -> sync_wrapper::SyncFuture { + sync_wrapper::SyncFuture::new(f) +} + +/// Wraps a future in `SyncFuture` on non-web targets so that it satisfies `Sync` bounds. +/// On web targets the future is returned as-is. +#[cfg(web)] +pub(crate) fn wrap_future(f: F) -> F { + f +} + +pub const DEFAULT_BLOCK_CACHE_SIZE: usize = 5_000; +pub const DEFAULT_EXECUTION_STATE_CACHE_SIZE: usize = 10_000; + #[cfg(with_metrics)] mod metrics { use std::sync::LazyLock; use linera_base::prometheus_util::{ exponential_bucket_interval, register_histogram, register_histogram_vec, - register_int_counter, register_int_counter_vec, register_int_gauge, + register_int_counter, register_int_counter_vec, }; use linera_chain::types::ConfirmedBlockCertificate; - use prometheus::{Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge}; + use prometheus::{Histogram, HistogramVec, IntCounter, IntCounterVec}; pub static NUM_ROUNDS_IN_CERTIFICATE: LazyLock = LazyLock::new(|| { register_histogram_vec( @@ -147,14 +178,6 @@ mod metrics { ) }); - /// Number of cached chain worker channel endpoints in the map. - pub static CHAIN_WORKER_ENDPOINTS_CACHED: LazyLock = LazyLock::new(|| { - register_int_gauge( - "chain_worker_endpoints_cached", - "Number of cached chain worker channel endpoints", - ) - }); - /// Holds metrics data extracted from a confirmed block certificate. pub struct MetricsData { certificate_log_str: &'static str, @@ -308,7 +331,7 @@ pub enum WorkerError { #[error("Block was not signed by an authorized owner")] InvalidOwner, - #[error("Operations in the block are not authenticated by the proper signer: {0}")] + #[error("Operations in the block are not authenticated by the proper owner: {0}")] InvalidSigner(AccountOwner), // Chaining @@ -384,22 +407,13 @@ pub enum WorkerError { TooManyPublishedBlobs(u64), #[error("Missing network description")] MissingNetworkDescription, - #[error("ChainWorkerActor for chain {chain_id} stopped executing unexpectedly: {error}")] - ChainActorSendError { - chain_id: ChainId, - error: Box, - }, - #[error("ChainWorkerActor for chain {chain_id} stopped executing without responding: {error}")] - ChainActorRecvError { - chain_id: ChainId, - error: Box, - }, - #[error("thread error: {0}")] Thread(#[from] web_thread_pool::Error), #[error("Fallback mode is not available on this network")] NoFallbackMode, + #[error("Chain worker for {chain_id} is poisoned and must be reloaded")] + WorkerPoisoned { chain_id: ChainId }, } impl WorkerError { @@ -432,11 +446,10 @@ impl WorkerError { | WorkerError::ConfirmedBlockHashNotFound { .. } | WorkerError::LocalBlockNotFound { .. } | WorkerError::MissingNetworkDescription - | WorkerError::ChainActorSendError { .. } - | WorkerError::ChainActorRecvError { .. } | WorkerError::Thread(_) | WorkerError::ReadCertificatesError(_) - | WorkerError::IncorrectOutcome { .. } => true, + | WorkerError::IncorrectOutcome { .. } + | WorkerError::WorkerPoisoned { .. } => true, WorkerError::ChainError(chain_error) => chain_error.is_local(), } } @@ -487,17 +500,56 @@ impl WorkerError { } } +type ChainWorkerArc = Arc>>; +type ChainWorkerWeak = std::sync::Weak>>; +type ChainWorkerFuture = Shared>>; + +/// Each map entry is a `Shared>>`: +/// +/// - `peek()` returns `None` while a task is loading the worker from storage. +/// - `peek()` returns `Some(Ok(weak))` once the worker is loaded. +/// - `peek()` returns `Some(Err(_))` if loading failed (sender dropped). +/// +/// Callers that find a pending entry clone the `Shared` future and await it. +type ChainWorkerMap = Arc>>; + +/// Starts a background task that periodically removes dead weak references +/// from the chain handle map. The actual lifetime management is handled by +/// each handle's keep-alive task. +fn start_sweep( + chain_workers: &ChainWorkerMap, + config: &ChainWorkerConfig, +) { + // Sweep at the smaller of the two TTLs. If both are None, workers + // live forever so there's nothing to sweep. + let interval = match (config.ttl, config.sender_chain_ttl) { + (None, None) => return, + (Some(d), None) | (None, Some(d)) => d, + (Some(a), Some(b)) => a.min(b), + }; + let weak_map = Arc::downgrade(chain_workers); + linera_base::Task::spawn(async move { + loop { + linera_base::time::timer::sleep(interval).await; + let Some(map) = weak_map.upgrade() else { + break; + }; + map.pin_owned().retain(|_, shared| match shared.peek() { + Some(Ok(weak)) => weak.strong_count() > 0, + Some(Err(_)) => false, // Loading failed; clean up. + None => true, // Still loading; keep. + }); + } + }) + .forget(); +} + /// State of a worker in a validator or a local node. -pub struct WorkerState -where - StorageClient: Storage, -{ - /// A name used for logging - nickname: String, +pub struct WorkerState { /// Access to local persistent storage. storage: StorageClient, - /// Configuration options for the [`ChainWorker`]s. - chain_worker_config: ChainWorkerConfig, + /// Configuration options for chain workers. + pub(crate) chain_worker_config: ChainWorkerConfig, block_cache: Arc>, execution_state_cache: Option>>>, @@ -506,10 +558,10 @@ where /// One-shot channels to notify callers when messages of a particular chain have been /// delivered. delivery_notifiers: Arc>, - /// The set of spawned [`ChainWorkerActor`] tasks. - chain_worker_tasks: Arc>, - /// The cache of running [`ChainWorkerActor`]s. - chain_workers: Arc>>>, + /// The cache of loaded chain workers. Stores weak references; each worker + /// manages its own lifetime via a keep-alive task. A background sweep + /// periodically removes dead entries. + chain_workers: ChainWorkerMap, } impl Clone for WorkerState @@ -518,189 +570,42 @@ where { fn clone(&self) -> Self { WorkerState { - nickname: self.nickname.clone(), storage: self.storage.clone(), chain_worker_config: self.chain_worker_config.clone(), block_cache: self.block_cache.clone(), execution_state_cache: self.execution_state_cache.clone(), chain_modes: self.chain_modes.clone(), delivery_notifiers: self.delivery_notifiers.clone(), - chain_worker_tasks: self.chain_worker_tasks.clone(), chain_workers: self.chain_workers.clone(), } } } -/// The sender endpoint for [`ChainWorkerRequest`]s. -type ChainActorEndpoint = mpsc::UnboundedSender<( - ChainWorkerRequest<::Context>, - tracing::Span, - Instant, -)>; - pub(crate) type DeliveryNotifiers = HashMap; impl WorkerState where StorageClient: Storage, { - #[instrument(level = "trace", skip(nickname, key_pair, storage))] - pub fn new( - nickname: String, - key_pair: Option, - storage: StorageClient, - block_cache_size: usize, - execution_state_cache_size: usize, - ) -> Self { - WorkerState { - nickname, - storage, - chain_worker_config: ChainWorkerConfig::default().with_key_pair(key_pair), - block_cache: Arc::new(ValueCache::new(block_cache_size)), - execution_state_cache: (execution_state_cache_size > 0) - .then(|| Arc::new(UniqueValueCache::new(execution_state_cache_size))), - chain_modes: None, - delivery_notifiers: Arc::default(), - chain_worker_tasks: Arc::default(), - chain_workers: Arc::new(Mutex::new(BTreeMap::new())), - } - } - - #[instrument(level = "trace", skip(nickname, storage))] - pub fn new_for_client( - nickname: String, - storage: StorageClient, - chain_modes: Arc>>, - block_cache_size: usize, - execution_state_cache_size: usize, - ) -> Self { - WorkerState { - nickname, - storage, - chain_worker_config: ChainWorkerConfig::default(), - block_cache: Arc::new(ValueCache::new(block_cache_size)), - execution_state_cache: (execution_state_cache_size > 0) - .then(|| Arc::new(UniqueValueCache::new(execution_state_cache_size))), - chain_modes: Some(chain_modes), - delivery_notifiers: Arc::default(), - chain_worker_tasks: Arc::default(), - chain_workers: Arc::new(Mutex::new(BTreeMap::new())), - } - } - - #[instrument(level = "trace", skip(self, value))] - pub fn with_allow_inactive_chains(mut self, value: bool) -> Self { - self.chain_worker_config.allow_inactive_chains = value; - self - } - - #[instrument(level = "trace", skip(self, value))] - pub fn with_allow_messages_from_deprecated_epochs(mut self, value: bool) -> Self { - self.chain_worker_config - .allow_messages_from_deprecated_epochs = value; - self - } - - #[instrument(level = "trace", skip(self, value))] - pub fn with_long_lived_services(mut self, value: bool) -> Self { - self.chain_worker_config.long_lived_services = value; - self - } - - #[instrument(level = "trace", skip(self, value))] - pub fn with_allow_revert_confirm(mut self, value: bool) -> Self { - self.chain_worker_config.allow_revert_confirm = value; - self - } - - #[instrument(level = "trace", skip(self))] - pub fn with_reset_on_incorrect_outcome(mut self, minutes: Option) -> Self { - self.chain_worker_config.reset_on_incorrect_outcome = - minutes.map(|m| Duration::from_secs(m * 60)); - self - } - - /// Returns an instance with the specified block time grace period. - /// - /// Blocks with a timestamp this far in the future will still be accepted, but the validator - /// will wait until that timestamp before voting. - #[instrument(level = "trace", skip(self))] - pub fn with_block_time_grace_period(mut self, block_time_grace_period: Duration) -> Self { - self.chain_worker_config.block_time_grace_period = block_time_grace_period; - self - } - - /// Returns an instance with the specified chain worker TTL. - /// - /// Idle chain workers free their memory after that duration without requests. - #[instrument(level = "trace", skip(self))] - pub fn with_chain_worker_ttl(mut self, chain_worker_ttl: Duration) -> Self { - self.chain_worker_config.ttl = chain_worker_ttl; - self - } - - /// Returns an instance with the specified sender chain worker TTL. - /// - /// Idle sender chain workers free their memory after that duration without requests. - #[instrument(level = "trace", skip(self))] - pub fn with_sender_chain_worker_ttl(mut self, sender_chain_worker_ttl: Duration) -> Self { - self.chain_worker_config.sender_chain_ttl = sender_chain_worker_ttl; - self - } - - /// Returns an instance with the specified set of chain IDs whose incoming bundles - /// should be processed first. - #[instrument(level = "trace", skip(self, origins))] - pub fn with_priority_bundle_origins(mut self, origins: HashSet) -> Self { - self.chain_worker_config.priority_bundle_origins = origins; - self - } - - /// Returns an instance with the specified set of chain IDs whose incoming bundles - /// should be ignored. - #[instrument(level = "trace", skip(self, origins))] - pub fn with_ignored_bundle_origins(mut self, origins: HashSet) -> Self { - self.chain_worker_config.ignored_bundle_origins = origins; - self - } - - /// Returns an instance with the specified cross-chain message chunk limit. - #[instrument(level = "trace", skip(self))] - pub fn with_cross_chain_message_chunk_limit(mut self, limit: usize) -> Self { - self.chain_worker_config.cross_chain_message_chunk_limit = limit; - self - } - /// Sets the cross-chain message chunk limit. pub fn set_cross_chain_message_chunk_limit(&mut self, limit: usize) { self.chain_worker_config.cross_chain_message_chunk_limit = limit; } - /// Returns an instance with the specified maximum size for received_log entries. - /// - /// Sizes below `CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES` should be avoided. #[instrument(level = "trace", skip(self))] - pub fn with_chain_info_max_received_log_entries( + pub fn nickname(&self) -> &str { + &self.chain_worker_config.nickname + } + + /// Sets the priority bundle origins. + pub fn with_priority_bundle_origins( mut self, - chain_info_max_received_log_entries: usize, + origins: std::collections::HashSet, ) -> Self { - if chain_info_max_received_log_entries < CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES { - warn!( - "The value set for the maximum size of received_log entries \ - may not be compatible with the latest clients: {} instead of {}", - chain_info_max_received_log_entries, CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES - ); - } - self.chain_worker_config.chain_info_max_received_log_entries = - chain_info_max_received_log_entries; + self.chain_worker_config.priority_bundle_origins = origins; self } - #[instrument(level = "trace", skip(self))] - pub fn nickname(&self) -> &str { - &self.nickname - } - /// Returns the storage client so that it can be manipulated or queried. #[instrument(level = "trace", skip(self))] #[cfg(not(feature = "test"))] @@ -788,6 +693,32 @@ impl WorkerState where StorageClient: Storage + Clone + 'static, { + /// Creates a new `WorkerState`. + /// + /// The `chain_worker_config` must be fully configured before calling this, because the + /// TTL sweep task is started immediately based on the config's TTL values. + #[instrument(level = "trace", skip(storage, chain_worker_config))] + pub fn new( + storage: StorageClient, + chain_worker_config: ChainWorkerConfig, + chain_modes: Option>>>, + ) -> Self { + let chain_workers = Arc::new(papaya::HashMap::new()); + start_sweep(&chain_workers, &chain_worker_config); + let block_cache_size = chain_worker_config.block_cache_size; + let execution_state_cache_size = chain_worker_config.execution_state_cache_size; + WorkerState { + storage, + chain_worker_config, + block_cache: Arc::new(ValueCache::new(block_cache_size)), + execution_state_cache: (execution_state_cache_size > 0) + .then(|| Arc::new(UniqueValueCache::new(execution_state_cache_size))), + chain_modes, + delivery_notifiers: Arc::default(), + chain_workers, + } + } + #[instrument(level = "trace", skip(self, certificate, notifier))] #[inline] pub async fn fully_handle_certificate_with_notifications( @@ -815,10 +746,187 @@ where .await } - /// Tries to execute a block proposal with a policy for handling bundle failures. + /// Acquires a read lock on the chain worker and executes the given closure. + /// + /// The future is boxed to keep deeply nested types off the stack. On non-web + /// targets it is also wrapped in `SyncFuture` to satisfy `Sync` bounds. + async fn chain_read(&self, chain_id: ChainId, f: F) -> Result + where + F: FnOnce(OwnedRwLockReadGuard>) -> Fut, + Fut: std::future::Future>, + { + let state = self.get_or_create_chain_worker(chain_id).await?; + Box::pin(wrap_future(async move { + let guard = handle::read_lock(&state).await; + if guard.is_poisoned() { + self.chain_workers.pin().remove(&chain_id); + drop(guard); + return Err(WorkerError::WorkerPoisoned { chain_id }); + } + f(guard).await + })) + .await + } + + /// Acquires a write lock on the chain worker and executes the given closure. + /// + /// The [`RollbackGuard`] automatically rolls back uncommitted chain state changes + /// when dropped, ensuring cancellation safety. The future is boxed to keep deeply + /// nested types off the stack. + async fn chain_write(&self, chain_id: ChainId, f: F) -> Result + where + F: FnOnce(handle::RollbackGuard) -> Fut, + Fut: std::future::Future>, + { + let state = self.get_or_create_chain_worker(chain_id).await?; + Box::pin(wrap_future(async move { + let guard = handle::write_lock(&state).await; + if guard.is_poisoned() { + self.chain_workers.pin().remove(&chain_id); + drop(guard); + return Err(WorkerError::WorkerPoisoned { chain_id }); + } + let result = f(guard).await; + if result.is_err() { + // Check if the operation poisoned the worker. If so, evict it + // so the next request reloads from storage. + let guard = state.read().await; + if guard.is_poisoned() { + self.chain_workers.pin().remove(&chain_id); + drop(guard); + } + } + result + })) + .await + } + + /// Gets or creates a chain worker for the given chain. + /// + /// The oneshot channel is created outside the `compute` closure to keep + /// the closure pure (papaya may call it more than once on CAS retry and + /// may memoize the output). If the fast path hits, the unused channel is + /// dropped harmlessly. /// - /// Returns the modified block (bundles may be rejected/removed), the executed block, - /// chain info response, and resource tracker. + /// Returns a type-erased future to keep `!Sync` intermediate types (e.g. + /// `std::sync::mpsc::Receiver` from `handle::ServiceRuntimeActor::spawn`) out of + /// the caller's future type. + fn get_or_create_chain_worker( + &self, + chain_id: ChainId, + ) -> std::pin::Pin< + Box< + impl std::future::Future, WorkerError>> + '_, + >, + > { + Box::pin(wrap_future(async move { + loop { + // Create the channel outside the closure so that the tx/rx + // always match regardless of CAS retries. + let (tx, rx) = oneshot::channel(); + let shared_rx = rx.shared(); + + // The papaya guard is !Send, so it must be dropped before + // any .await point. + let wait_or_tx = { + let pin = self.chain_workers.pin(); + match pin.compute(chain_id, |existing| match existing { + Some((_, entry)) => match entry.peek() { + Some(Ok(weak)) => match weak.upgrade() { + Some(arc) => papaya::Operation::Abort(Ok(arc)), + None => papaya::Operation::Insert(shared_rx.clone()), + }, + Some(Err(_)) => papaya::Operation::Insert(shared_rx.clone()), + None => papaya::Operation::Abort(Err(entry.clone())), + }, + None => papaya::Operation::Insert(shared_rx.clone()), + }) { + papaya::Compute::Aborted(Ok(arc), ..) => return Ok(arc), + papaya::Compute::Aborted(Err(wait), ..) => Either::Left(wait), + papaya::Compute::Inserted { .. } | papaya::Compute::Updated { .. } => { + Either::Right(tx) + } + papaya::Compute::Removed { .. } => unreachable!(), + } + }; + + match wait_or_tx { + Either::Left(wait) => { + // Another task is loading. Await the shared future. + if let Ok(weak) = wait.await { + if let Some(arc) = weak.upgrade() { + return Ok(arc); + } + } + // Loading failed or worker already dead; retry. + } + Either::Right(tx) => { + // We claimed the loading slot. Load from storage. + // On success, send the Weak through the channel. + // On error, dropping tx wakes waiters so they can retry. + let worker = self.load_chain_worker(chain_id).await?; + if tx.send(Arc::downgrade(&worker)).is_err() { + tracing::error!(%chain_id, "Receiver dropped while loading worker state."); + continue; + } + return Ok(worker); + } + } + } + })) + } + + /// Loads a chain worker state from storage and wraps it in an Arc. + async fn load_chain_worker( + &self, + chain_id: ChainId, + ) -> Result, WorkerError> { + let delivery_notifier = self + .delivery_notifiers + .lock() + .unwrap() + .entry(chain_id) + .or_default() + .clone(); + + let is_tracked = self.chain_modes.as_ref().is_some_and(|chain_modes| { + chain_modes + .read() + .unwrap() + .get(&chain_id) + .is_some_and(ListeningMode::is_full) + }); + + let (service_runtime_endpoint, service_runtime_task) = + if self.chain_worker_config.long_lived_services { + let actor = + handle::ServiceRuntimeActor::spawn(chain_id, self.storage.thread_pool()).await; + (Some(actor.endpoint), Some(actor.task)) + } else { + (None, None) + }; + + let state = crate::chain_worker::state::ChainWorkerState::load( + self.chain_worker_config.clone(), + self.storage.clone(), + self.block_cache.clone(), + self.execution_state_cache.clone(), + self.chain_modes.clone(), + delivery_notifier, + chain_id, + service_runtime_endpoint, + service_runtime_task, + ) + .await?; + + Ok(handle::create_chain_worker( + state, + is_tracked, + &self.chain_worker_config, + )) + } + + /// Tries to execute a block proposal without any verification other than block execution. #[instrument(level = "trace", skip(self, block))] pub async fn stage_block_execution( &self, @@ -827,14 +935,11 @@ where published_blobs: Vec, policy: BundleExecutionPolicy, ) -> Result<(ProposedBlock, Block, ChainInfoResponse, ResourceTracker), WorkerError> { - self.query_chain_worker(block.chain_id, move |callback| { - ChainWorkerRequest::StageBlockExecution { - block, - round, - published_blobs, - policy, - callback, - } + let chain_id = block.chain_id; + self.chain_write(chain_id, |mut guard| async move { + guard + .stage_block_execution(block, round, &published_blobs, policy) + .await }) .await } @@ -850,18 +955,14 @@ where query: Query, block_hash: Option, ) -> Result<(QueryOutcome, BlockHeight), WorkerError> { - self.query_chain_worker(chain_id, move |callback| { - ChainWorkerRequest::QueryApplication { - query, - block_hash, - callback, - } + self.chain_write(chain_id, |mut guard| async move { + guard.query_application(query, block_hash).await }) .await } #[instrument(level = "trace", skip(self, chain_id, application_id), fields( - nickname = %self.nickname, + nickname = %self.nickname(), chain_id = %chain_id, application_id = %application_id ))] @@ -870,13 +971,9 @@ where chain_id: ChainId, application_id: ApplicationId, ) -> Result { - self.query_chain_worker(chain_id, move |callback| { - ChainWorkerRequest::DescribeApplication { - application_id, - callback, - } - }) - .await + let state = self.get_or_create_chain_worker(chain_id).await?; + let guard = handle::read_lock_initialized(&state).await?; + guard.describe_application_readonly(application_id).await } /// Processes a confirmed block (aka a commit). @@ -884,7 +981,7 @@ where level = "trace", skip(self, certificate, notify_when_messages_are_delivered), fields( - nickname = %self.nickname, + nickname = %self.nickname(), chain_id = %certificate.block().header.chain_id, block_height = %certificate.block().header.height ) @@ -895,19 +992,17 @@ where notify_when_messages_are_delivered: Option>, ) -> Result<(ChainInfoResponse, NetworkActions, BlockOutcome), WorkerError> { let chain_id = certificate.block().header.chain_id; - self.query_chain_worker(chain_id, move |callback| { - ChainWorkerRequest::ProcessConfirmedBlock { - certificate, - notify_when_messages_are_delivered, - callback, - } + self.chain_write(chain_id, |mut guard| async move { + guard + .process_confirmed_block(certificate, notify_when_messages_are_delivered) + .await }) .await } /// Processes a validated block issued from a multi-owner chain. #[instrument(level = "trace", skip(self, certificate), fields( - nickname = %self.nickname, + nickname = %self.nickname(), chain_id = %certificate.block().header.chain_id, block_height = %certificate.block().header.height ))] @@ -916,18 +1011,15 @@ where certificate: ValidatedBlockCertificate, ) -> Result<(ChainInfoResponse, NetworkActions, BlockOutcome), WorkerError> { let chain_id = certificate.block().header.chain_id; - self.query_chain_worker(chain_id, move |callback| { - ChainWorkerRequest::ProcessValidatedBlock { - certificate, - callback, - } + self.chain_write(chain_id, |mut guard| async move { + guard.process_validated_block(certificate).await }) .await } /// Processes a leader timeout issued from a multi-owner chain. #[instrument(level = "trace", skip(self, certificate), fields( - nickname = %self.nickname, + nickname = %self.nickname(), chain_id = %certificate.value().chain_id(), height = %certificate.value().height() ))] @@ -936,17 +1028,14 @@ where certificate: TimeoutCertificate, ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> { let chain_id = certificate.value().chain_id(); - self.query_chain_worker(chain_id, move |callback| { - ChainWorkerRequest::ProcessTimeout { - certificate, - callback, - } + self.chain_write(chain_id, |mut guard| async move { + guard.process_timeout(certificate).await }) .await } #[instrument(level = "trace", skip(self, origin, recipient, bundles), fields( - nickname = %self.nickname, + nickname = %self.nickname(), origin = %origin, recipient = %recipient, num_bundles = %bundles.len() @@ -958,20 +1047,17 @@ where bundles: Vec<(Epoch, MessageBundle)>, previous_height: Option, ) -> Result { - self.query_chain_worker(recipient, move |callback| { - ChainWorkerRequest::ProcessCrossChainUpdate { - origin, - bundles, - previous_height, - callback, - } + self.chain_write(recipient, |mut guard| async move { + guard + .process_cross_chain_update(origin, bundles, previous_height) + .await }) .await } /// Returns a stored [`ConfirmedBlockCertificate`] for a chain's block. #[instrument(level = "trace", skip(self, chain_id, height), fields( - nickname = %self.nickname, + nickname = %self.nickname(), chain_id = %chain_id, height = %height ))] @@ -981,147 +1067,34 @@ where chain_id: ChainId, height: BlockHeight, ) -> Result, WorkerError> { - self.query_chain_worker(chain_id, move |callback| { - ChainWorkerRequest::ReadCertificate { height, callback } - }) - .await + let state = self.get_or_create_chain_worker(chain_id).await?; + let guard = handle::read_lock_initialized(&state).await?; + guard.read_certificate(height).await } /// Returns a read-only view of the [`ChainStateView`] of a chain referenced by its /// [`ChainId`]. /// - /// The returned view holds a lock on the chain state, which prevents the worker from changing - /// the state of that chain. + /// The returned guard holds a read lock on the chain state, preventing writes for + /// its lifetime. Multiple concurrent readers are allowed. #[instrument(level = "trace", skip(self), fields( - nickname = %self.nickname, + nickname = %self.nickname(), chain_id = %chain_id ))] pub async fn chain_state_view( &self, chain_id: ChainId, - ) -> Result>, WorkerError> { - self.query_chain_worker(chain_id, |callback| ChainWorkerRequest::GetChainStateView { - callback, - }) - .await - } - - /// Sends a request to the [`ChainWorker`] for a [`ChainId`] and waits for the `Response`. - #[instrument(level = "trace", skip(self, request_builder), fields( - nickname = %self.nickname, - chain_id = %chain_id - ))] - async fn query_chain_worker( - &self, - chain_id: ChainId, - request_builder: impl FnOnce( - oneshot::Sender>, - ) -> ChainWorkerRequest, - ) -> Result { - // Build the request. - let (callback, response) = oneshot::channel(); - let request = request_builder(callback); - - // Call the endpoint, possibly a new one. - let new_channel = self.call_and_maybe_create_chain_worker_endpoint(chain_id, request)?; - - // We just created a channel: spawn the actor. - if let Some((sender, receiver)) = new_channel { - let delivery_notifier = self - .delivery_notifiers - .lock() - .unwrap() - .entry(chain_id) - .or_default() - .clone(); - - let is_tracked = self.chain_modes.as_ref().is_some_and(|chain_modes| { - chain_modes - .read() - .unwrap() - .get(&chain_id) - .is_some_and(ListeningMode::is_full) - }); - - let actor_task = ChainWorkerActor::run( - self.chain_worker_config.clone(), - self.storage.clone(), - self.block_cache.clone(), - self.execution_state_cache.clone(), - self.chain_modes.clone(), - delivery_notifier, - chain_id, - sender, - receiver, - is_tracked, - ); - - self.chain_worker_tasks - .lock() - .unwrap() - .spawn_task(actor_task); - } - - // Finally, wait a response. - match response.await { - Err(e) => { - // The actor endpoint was dropped. Better luck next time! - Err(WorkerError::ChainActorRecvError { - chain_id, - error: Box::new(e), - }) - } - Ok(response) => response, - } - } - - /// Find an endpoint and call it. Create the endpoint if necessary. - /// - /// Returns `Some((sender, receiver))` if a new channel was created and the actor needs to be - /// spawned, or `None` if an existing endpoint was used. - #[instrument(level = "trace", skip(self), fields( - nickname = %self.nickname, - chain_id = %chain_id - ))] - #[expect(clippy::type_complexity)] - fn call_and_maybe_create_chain_worker_endpoint( - &self, - chain_id: ChainId, - request: ChainWorkerRequest, - ) -> Result< - Option<( - ChainWorkerRequestSender, - ChainWorkerRequestReceiver, - )>, - WorkerError, - > { - let mut chain_workers = self.chain_workers.lock().unwrap(); - - let (sender, new_channel) = if let Some(endpoint) = chain_workers.remove(&chain_id) { - (endpoint, None) - } else { - let (sender, receiver) = mpsc::unbounded_channel(); - (sender.clone(), Some((sender, receiver))) - }; - - if let Err(e) = sender.send((request, tracing::Span::current(), Instant::now())) { - // The actor was dropped. Give up without (re-)inserting the endpoint in the cache. - return Err(WorkerError::ChainActorSendError { - chain_id, - error: Box::new(e), - }); - } - - // Put back the sender in the cache for next time. - chain_workers.insert(chain_id, sender); - #[cfg(with_metrics)] - metrics::CHAIN_WORKER_ENDPOINTS_CACHED.set(chain_workers.len() as i64); - - Ok(new_channel) + ) -> Result, WorkerError> { + let state = self.get_or_create_chain_worker(chain_id).await?; + let guard = handle::read_lock(&state).await; + Ok(ChainStateViewReadGuard(OwnedRwLockReadGuard::map( + guard, + |s| s.chain(), + ))) } #[instrument(skip_all, fields( - nick = self.nickname, + nick = self.nickname(), chain_id = format!("{:.8}", proposal.content.block.chain_id), height = %proposal.content.block.height, ))] @@ -1129,12 +1102,26 @@ where &self, proposal: BlockProposal, ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> { - trace!("{} <-- {:?}", self.nickname, proposal); + trace!("{} <-- {:?}", self.nickname(), proposal); #[cfg(with_metrics)] let round = proposal.content.round; + + let chain_id = proposal.content.block.chain_id; + // Delay if block timestamp is in the future but within grace period. + let now = self.storage.clock().current_time(); + let block_timestamp = proposal.content.block.timestamp; + let delta = block_timestamp.delta_since(now); + let grace_period = TimeDelta::from_micros( + u64::try_from(self.chain_worker_config.block_time_grace_period.as_micros()) + .unwrap_or(u64::MAX), + ); + if delta > TimeDelta::ZERO && delta <= grace_period { + self.storage.clock().sleep_until(block_timestamp).await; + } + let response = self - .query_chain_worker(proposal.content.block.chain_id, move |callback| { - ChainWorkerRequest::HandleBlockProposal { proposal, callback } + .chain_write(chain_id, |mut guard| async move { + guard.handle_block_proposal(proposal).await }) .await?; #[cfg(with_metrics)] @@ -1176,7 +1163,7 @@ where /// Processes a confirmed block certificate. #[instrument(skip_all, fields( - nick = self.nickname, + nick = self.nickname(), chain_id = format!("{:.8}", certificate.block().header.chain_id), height = %certificate.block().header.height, ))] @@ -1185,16 +1172,17 @@ where certificate: ConfirmedBlockCertificate, notify_when_messages_are_delivered: Option>, ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> { - trace!("{} <-- {:?}", self.nickname, certificate); + trace!("{} <-- {:?}", self.nickname(), certificate); #[cfg(with_metrics)] let metrics_data = metrics::MetricsData::new(&certificate); - let (info, actions, _outcome) = + #[allow(unused_variables)] + let (info, actions, outcome) = Box::pin(self.process_confirmed_block(certificate, notify_when_messages_are_delivered)) .await?; #[cfg(with_metrics)] - if matches!(_outcome, BlockOutcome::Processed) { + if matches!(outcome, BlockOutcome::Processed) { metrics_data.record(); } Ok((info, actions)) @@ -1202,7 +1190,7 @@ where /// Processes a validated block certificate. #[instrument(skip_all, fields( - nick = self.nickname, + nick = self.nickname(), chain_id = format!("{:.8}", certificate.block().header.chain_id), height = %certificate.block().header.height, ))] @@ -1210,17 +1198,18 @@ where &self, certificate: ValidatedBlockCertificate, ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> { - trace!("{} <-- {:?}", self.nickname, certificate); + trace!("{} <-- {:?}", self.nickname(), certificate); #[cfg(with_metrics)] let round = certificate.round; #[cfg(with_metrics)] let cert_str = certificate.inner().to_log_str(); - let (info, actions, _outcome) = Box::pin(self.process_validated_block(certificate)).await?; + #[allow(unused_variables)] + let (info, actions, outcome) = Box::pin(self.process_validated_block(certificate)).await?; #[cfg(with_metrics)] { - if matches!(_outcome, BlockOutcome::Processed) { + if matches!(outcome, BlockOutcome::Processed) { metrics::NUM_ROUNDS_IN_CERTIFICATE .with_label_values(&[cert_str, round.type_name()]) .observe(round.number() as f64); @@ -1231,7 +1220,7 @@ where /// Processes a timeout certificate #[instrument(skip_all, fields( - nick = self.nickname, + nick = self.nickname(), chain_id = format!("{:.8}", certificate.inner().chain_id()), height = %certificate.inner().height(), ))] @@ -1239,32 +1228,33 @@ where &self, certificate: TimeoutCertificate, ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> { - trace!("{} <-- {:?}", self.nickname, certificate); + trace!("{} <-- {:?}", self.nickname(), certificate); self.process_timeout(certificate).await } #[instrument(skip_all, fields( - nick = self.nickname, + nick = self.nickname(), chain_id = format!("{:.8}", query.chain_id) ))] pub async fn handle_chain_info_query( &self, query: ChainInfoQuery, ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> { - trace!("{} <-- {:?}", self.nickname, query); + trace!("{} <-- {:?}", self.nickname(), query); #[cfg(with_metrics)] metrics::CHAIN_INFO_QUERIES.inc(); + let chain_id = query.chain_id; let result = self - .query_chain_worker(query.chain_id, move |callback| { - ChainWorkerRequest::HandleChainInfoQuery { query, callback } + .chain_write(chain_id, |mut guard| async move { + guard.handle_chain_info_query(query).await }) .await; - trace!("{} --> {:?}", self.nickname, result); + trace!("{} --> {:?}", self.nickname(), result); result } #[instrument(skip_all, fields( - nick = self.nickname, + nick = self.nickname(), chain_id = format!("{:.8}", chain_id) ))] pub async fn download_pending_blob( @@ -1274,23 +1264,23 @@ where ) -> Result { trace!( "{} <-- download_pending_blob({chain_id:8}, {blob_id:8})", - self.nickname + self.nickname() ); let result = self - .query_chain_worker(chain_id, move |callback| { - ChainWorkerRequest::DownloadPendingBlob { blob_id, callback } + .chain_read(chain_id, |guard| async move { + guard.download_pending_blob(blob_id).await }) .await; trace!( "{} --> {:?}", - self.nickname, + self.nickname(), result.as_ref().map(|_| blob_id) ); result } #[instrument(skip_all, fields( - nick = self.nickname, + nick = self.nickname(), chain_id = format!("{:.8}", chain_id) ))] pub async fn handle_pending_blob( @@ -1301,30 +1291,30 @@ where let blob_id = blob.id(); trace!( "{} <-- handle_pending_blob({chain_id:8}, {blob_id:8})", - self.nickname + self.nickname() ); let result = self - .query_chain_worker(chain_id, move |callback| { - ChainWorkerRequest::HandlePendingBlob { blob, callback } + .chain_write(chain_id, |mut guard| async move { + guard.handle_pending_blob(blob).await }) .await; trace!( "{} --> {:?}", - self.nickname, + self.nickname(), result.as_ref().map(|_| blob_id) ); result } #[instrument(skip_all, fields( - nick = self.nickname, + nick = self.nickname(), chain_id = format!("{:.8}", request.target_chain_id()) ))] pub async fn handle_cross_chain_request( &self, request: CrossChainRequest, ) -> Result { - trace!("{} <-- {:?}", self.nickname, request); + trace!("{} <-- {:?}", self.nickname(), request); match request { CrossChainRequest::UpdateRecipient { sender, @@ -1372,28 +1362,22 @@ where recipient, latest_height, } => { - let actions = self - .query_chain_worker(sender, move |callback| { - ChainWorkerRequest::ConfirmUpdatedRecipient { - recipient, - latest_height, - callback, - } - }) - .await?; - Ok(actions) + self.chain_write(sender, |mut guard| async move { + guard + .confirm_updated_recipient(recipient, latest_height) + .await + }) + .await } CrossChainRequest::RevertConfirm { sender, recipient, retransmit_from, } => { - self.query_chain_worker(sender, move |callback| { - ChainWorkerRequest::HandleRevertConfirm { - recipient, - retransmit_from, - callback, - } + self.chain_write(sender, |mut guard| async move { + guard + .handle_revert_confirm(recipient, retransmit_from) + .await }) .await } @@ -1402,7 +1386,7 @@ where /// Updates the received certificate trackers to at least the given values. #[instrument(skip_all, fields( - nickname = %self.nickname, + nickname = %self.nickname(), chain_id = %chain_id, num_trackers = %new_trackers.len() ))] @@ -1411,18 +1395,17 @@ where chain_id: ChainId, new_trackers: BTreeMap, ) -> Result<(), WorkerError> { - self.query_chain_worker(chain_id, move |callback| { - ChainWorkerRequest::UpdateReceivedCertificateTrackers { - new_trackers, - callback, - } + self.chain_write(chain_id, |mut guard| async move { + guard + .update_received_certificate_trackers(new_trackers) + .await }) .await } /// Gets preprocessed block hashes in a given height range. #[instrument(skip_all, fields( - nickname = %self.nickname, + nickname = %self.nickname(), chain_id = %chain_id, start = %start, end = %end @@ -1433,19 +1416,15 @@ where start: BlockHeight, end: BlockHeight, ) -> Result, WorkerError> { - self.query_chain_worker(chain_id, move |callback| { - ChainWorkerRequest::GetPreprocessedBlockHashes { - start, - end, - callback, - } + self.chain_read(chain_id, |guard| async move { + guard.get_preprocessed_block_hashes(start, end).await }) .await } /// Gets the next block height to receive from an inbox. #[instrument(skip_all, fields( - nickname = %self.nickname, + nickname = %self.nickname(), chain_id = %chain_id, origin = %origin ))] @@ -1454,8 +1433,8 @@ where chain_id: ChainId, origin: ChainId, ) -> Result { - self.query_chain_worker(chain_id, move |callback| { - ChainWorkerRequest::GetInboxNextHeight { origin, callback } + self.chain_read(chain_id, |guard| async move { + guard.get_inbox_next_height(origin).await }) .await } @@ -1463,7 +1442,7 @@ where /// Gets locking blobs for specific blob IDs. /// Returns `Ok(None)` if any of the blobs is not found. #[instrument(skip_all, fields( - nickname = %self.nickname, + nickname = %self.nickname(), chain_id = %chain_id, num_blob_ids = %blob_ids.len() ))] @@ -1472,8 +1451,8 @@ where chain_id: ChainId, blob_ids: Vec, ) -> Result>, WorkerError> { - self.query_chain_worker(chain_id, move |callback| { - ChainWorkerRequest::GetLockingBlobs { blob_ids, callback } + self.chain_read(chain_id, |guard| async move { + guard.get_locking_blobs(blob_ids).await }) .await } @@ -1484,8 +1463,8 @@ where chain_id: ChainId, heights: Vec, ) -> Result, WorkerError> { - self.query_chain_worker(chain_id, move |callback| { - ChainWorkerRequest::GetBlockHashes { heights, callback } + self.chain_read(chain_id, |guard| async move { + guard.get_block_hashes(heights).await }) .await } @@ -1496,8 +1475,8 @@ where chain_id: ChainId, blob_ids: Vec, ) -> Result, WorkerError> { - self.query_chain_worker(chain_id, move |callback| { - ChainWorkerRequest::GetProposedBlobs { blob_ids, callback } + self.chain_read(chain_id, |guard| async move { + guard.get_proposed_blobs(blob_ids).await }) .await } @@ -1507,23 +1486,8 @@ where &self, chain_id: ChainId, ) -> Result { - self.query_chain_worker(chain_id, |callback| { - ChainWorkerRequest::GetEventSubscriptions { callback } - }) - .await - } - - /// Gets the stream event count for a stream. - pub async fn get_stream_event_count( - &self, - chain_id: ChainId, - stream_id: StreamId, - ) -> Result, WorkerError> { - self.query_chain_worker(chain_id, move |callback| { - ChainWorkerRequest::GetStreamEventCount { - stream_id, - callback, - } + self.chain_read(chain_id, |guard| async move { + guard.get_event_subscriptions().await }) .await } @@ -1533,8 +1497,8 @@ where &self, chain_id: ChainId, ) -> Result, WorkerError> { - self.query_chain_worker(chain_id, |callback| { - ChainWorkerRequest::GetReceivedCertificateTrackers { callback } + self.chain_read(chain_id, |guard| async move { + guard.get_received_certificate_trackers().await }) .await } @@ -1545,11 +1509,8 @@ where chain_id: ChainId, receiver_id: ChainId, ) -> Result<(BlockHeight, Option), WorkerError> { - self.query_chain_worker(chain_id, move |callback| { - ChainWorkerRequest::GetTipStateAndOutboxInfo { - receiver_id, - callback, - } + self.chain_read(chain_id, |guard| async move { + guard.get_tip_state_and_outbox_info(receiver_id).await }) .await } @@ -1559,16 +1520,29 @@ where &self, chain_id: ChainId, ) -> Result { - self.query_chain_worker(chain_id, |callback| { - ChainWorkerRequest::GetNextHeightToPreprocess { callback } + self.chain_read(chain_id, |guard| async move { + guard.get_next_height_to_preprocess().await }) .await } /// Gets the chain manager's seed for leader election. pub async fn get_manager_seed(&self, chain_id: ChainId) -> Result { - self.query_chain_worker(chain_id, |callback| ChainWorkerRequest::GetManagerSeed { - callback, + self.chain_read( + chain_id, + |guard| async move { guard.get_manager_seed().await }, + ) + .await + } + + /// Gets the stream event count for a stream. + pub async fn get_stream_event_count( + &self, + chain_id: ChainId, + stream_id: StreamId, + ) -> Result, WorkerError> { + self.chain_read(chain_id, |guard| async move { + guard.get_stream_event_count(stream_id).await }) .await } @@ -1579,11 +1553,8 @@ where chain_id: ChainId, stream_ids: Vec, ) -> Result, WorkerError> { - self.query_chain_worker(chain_id, move |callback| { - ChainWorkerRequest::GetNextExpectedEvents { - stream_ids, - callback, - } + self.chain_read(chain_id, |guard| async move { + guard.get_next_expected_events(stream_ids).await }) .await } @@ -1594,11 +1565,8 @@ where chain_id: ChainId, stream_ids: Vec, ) -> Result, WorkerError> { - self.query_chain_worker(chain_id, move |callback| { - ChainWorkerRequest::GetPreviousEventBlocks { - stream_ids, - callback, - } + self.chain_read(chain_id, |guard| async move { + guard.get_previous_event_blocks(stream_ids).await }) .await } @@ -1607,7 +1575,7 @@ where #[cfg(with_testing)] impl WorkerState where - StorageClient: Storage, + StorageClient: Storage + Clone + 'static, { /// Gets a reference to the validator's [`ValidatorPublicKey`]. /// diff --git a/linera-faucet/server/src/lib.rs b/linera-faucet/server/src/lib.rs index c0d21bfc80e2..bf5b027a5c3b 100644 --- a/linera-faucet/server/src/lib.rs +++ b/linera-faucet/server/src/lib.rs @@ -1,7 +1,7 @@ // Copyright (c) Zefchain Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -#![recursion_limit = "256"] +#![recursion_limit = "512"] //! The server component of the Linera faucet. diff --git a/linera-rpc/src/lib.rs b/linera-rpc/src/lib.rs index 347d28e222d5..5dbd334d0056 100644 --- a/linera-rpc/src/lib.rs +++ b/linera-rpc/src/lib.rs @@ -1,6 +1,8 @@ // Copyright (c) Zefchain Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +#![recursion_limit = "256"] + //! This module provides network abstractions and the data schemas for remote procedure //! calls (RPCs) in the Linera protocol. diff --git a/linera-sdk/src/test/validator.rs b/linera-sdk/src/test/validator.rs index 1d0652a720be..c654e4287d20 100644 --- a/linera-sdk/src/test/validator.rs +++ b/linera-sdk/src/test/validator.rs @@ -21,9 +21,7 @@ use linera_base::{ identifiers::{AccountOwner, ApplicationId, ChainId, ModuleId}, ownership::ChainOwnership, }; -use linera_core::worker::{ - WorkerState, DEFAULT_BLOCK_CACHE_SIZE, DEFAULT_EXECUTION_STATE_CACHE_SIZE, -}; +use linera_core::{worker::WorkerState, ChainWorkerConfig}; use linera_execution::{ committee::Committee, system::{AdminOperation, OpenChainConfig, SystemOperation}, @@ -90,13 +88,12 @@ impl TestValidator { .now_or_never() .expect("execution of DbStorage::new should not await anything"); let clock = storage.clock().clone(); - let worker = WorkerState::new( - "Single validator node".to_string(), - Some(validator_keypair.secret_key.copy()), - storage.clone(), - DEFAULT_BLOCK_CACHE_SIZE, - DEFAULT_EXECUTION_STATE_CACHE_SIZE, - ); + let config = ChainWorkerConfig { + nickname: "Single validator node".to_string(), + key_pair: Some(Arc::new(validator_keypair.secret_key.copy())), + ..ChainWorkerConfig::default() + }; + let worker = WorkerState::new(storage.clone(), config, None); // Create an admin chain. let key_pair = AccountSecretKey::generate(); diff --git a/linera-sdk/tests/fixtures/Cargo.lock b/linera-sdk/tests/fixtures/Cargo.lock index beceeee474a7..7c0a9eaf966f 100644 --- a/linera-sdk/tests/fixtures/Cargo.lock +++ b/linera-sdk/tests/fixtures/Cargo.lock @@ -2315,6 +2315,7 @@ dependencies = [ "rand 0.8.5", "serde", "serde_json", + "sync_wrapper 1.0.2", "test-log", "test-strategy", "thiserror 1.0.69", diff --git a/linera-service/benches/transfers.rs b/linera-service/benches/transfers.rs index 0bd823336d50..6051026244e6 100644 --- a/linera-service/benches/transfers.rs +++ b/linera-service/benches/transfers.rs @@ -1,6 +1,8 @@ // Copyright (c) Zefchain Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +#![recursion_limit = "256"] + use criterion::{criterion_group, criterion_main, Criterion}; use futures::{ stream::{self, FuturesUnordered}, diff --git a/linera-service/src/lib.rs b/linera-service/src/lib.rs index 1c736168f413..699e600fb79a 100644 --- a/linera-service/src/lib.rs +++ b/linera-service/src/lib.rs @@ -2,6 +2,8 @@ // Copyright (c) Zefchain Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +#![recursion_limit = "256"] + //! This module provides the executables needed to operate a Linera service, including a placeholder wallet acting as a GraphQL service for user interfaces. pub mod cli; diff --git a/linera-service/src/node_service.rs b/linera-service/src/node_service.rs index 9150057e3d7c..3bff6efc298a 100644 --- a/linera-service/src/node_service.rs +++ b/linera-service/src/node_service.rs @@ -44,7 +44,7 @@ use linera_core::{ client::{chain_client, ChainClient}, data_types::ClientOutcome, wallet::Wallet as _, - worker::{Notification, Reason}, + worker::{ChainStateViewReadGuard, Notification, Reason}, }; use linera_execution::{ committee::Committee, system::AdminOperation, Operation, Query, QueryOutcome, QueryResponse, @@ -53,10 +53,11 @@ use linera_execution::{ #[cfg(with_metrics)] use linera_metrics::monitoring_server; use linera_sdk::linera_base_types::BlobContent; +use linera_storage::Storage; use lru::LruCache; use serde::{Deserialize, Serialize}; use serde_json::json; -use tokio::sync::{mpsc::UnboundedReceiver, OwnedRwLockReadGuard}; +use tokio::sync::mpsc::UnboundedReceiver; use tokio_util::sync::CancellationToken; use tower_http::cors::CorsLayer; use tracing::{debug, error, info, instrument, trace}; @@ -719,10 +720,8 @@ where async fn chain( &self, chain_id: ChainId, - ) -> Result< - ChainStateExtendedView<::StorageContext>, - Error, - > { + ) -> Result::Storage>, Error> + { let client = self .context .lock() @@ -860,20 +859,19 @@ impl ChainStateViewExtension { } #[derive(MergedObject)] -struct ChainStateExtendedView(ChainStateViewExtension, ReadOnlyChainStateView) +struct ChainStateExtendedView(ChainStateViewExtension, ReadOnlyChainStateView) where - C: linera_views::context::Context + Clone + Send + Sync + 'static, - C::Extra: linera_execution::ExecutionRuntimeContext; + ChainStateView: ContainerType + OutputType; -/// A wrapper type that allows proxying GraphQL queries to a [`ChainStateView`] that's behind an -/// [`OwnedRwLockReadGuard`]. -pub struct ReadOnlyChainStateView(OwnedRwLockReadGuard>) +/// A wrapper type that allows proxying GraphQL queries to a [`ChainStateView`] that's behind a +/// [`ChainStateViewReadGuard`]. +pub struct ReadOnlyChainStateView(ChainStateViewReadGuard) where - C: linera_views::context::Context + Clone + Send + Sync + 'static; + ChainStateView: ContainerType + OutputType; -impl ContainerType for ReadOnlyChainStateView +impl ContainerType for ReadOnlyChainStateView where - C: linera_views::context::Context + Clone + Send + Sync + 'static, + ChainStateView: ContainerType + OutputType, { async fn resolve_field( &self, @@ -883,16 +881,16 @@ where } } -impl OutputType for ReadOnlyChainStateView +impl OutputType for ReadOnlyChainStateView where - C: linera_views::context::Context + Clone + Send + Sync + 'static, + ChainStateView: ContainerType + OutputType, { fn type_name() -> Cow<'static, str> { - ChainStateView::::type_name() + ChainStateView::::type_name() } fn create_type_info(registry: &mut async_graphql::registry::Registry) -> String { - ChainStateView::::create_type_info(registry) + ChainStateView::::create_type_info(registry) } async fn resolve( @@ -904,12 +902,11 @@ where } } -impl ChainStateExtendedView +impl ChainStateExtendedView where - C: linera_views::context::Context + Clone + Send + Sync + 'static, - C::Extra: linera_execution::ExecutionRuntimeContext, + ChainStateView: ContainerType + OutputType, { - fn new(view: OwnedRwLockReadGuard>) -> Self { + fn new(view: ChainStateViewReadGuard) -> Self { Self( ChainStateViewExtension(view.chain_id()), ReadOnlyChainStateView(view), diff --git a/linera-service/src/server.rs b/linera-service/src/server.rs index 71eb3c9e76e1..5f0a9256d47b 100644 --- a/linera-service/src/server.rs +++ b/linera-service/src/server.rs @@ -18,6 +18,7 @@ use std::{ borrow::Cow, num::NonZeroU16, path::{Path, PathBuf}, + sync::Arc, time::Duration, }; @@ -29,7 +30,9 @@ use linera_base::{ listen_for_shutdown_signals, }; use linera_client::config::{CommitteeConfig, ValidatorConfig, ValidatorServerConfig}; -use linera_core::{worker::WorkerState, JoinSetExt as _, CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES}; +use linera_core::{ + worker::WorkerState, ChainWorkerConfig, JoinSetExt as _, CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES, +}; use linera_execution::{WasmRuntime, WithWasmDefault}; #[cfg(with_metrics)] use linera_metrics::monitoring_server; @@ -86,21 +89,24 @@ impl ServerContext { "Public key: {}", self.server_config.validator_secret.public() ); - let state = WorkerState::new( - format!("Shard {} @ {}:{}", shard_id, local_ip_addr, shard.port), - Some(self.server_config.validator_secret.copy()), - storage, - self.block_cache_size, - self.execution_state_cache_size, - ) - .with_allow_inactive_chains(false) - .with_allow_messages_from_deprecated_epochs(false) - .with_allow_revert_confirm(self.allow_revert_confirm) - .with_reset_on_incorrect_outcome(self.reset_on_incorrect_outcome_mins) - .with_block_time_grace_period(self.block_time_grace_period) - .with_chain_worker_ttl(self.chain_worker_ttl) - .with_chain_info_max_received_log_entries(self.chain_info_max_received_log_entries) - .with_cross_chain_message_chunk_limit(self.cross_chain_message_chunk_limit); + let config = ChainWorkerConfig { + nickname: format!("Shard {} @ {}:{}", shard_id, local_ip_addr, shard.port), + key_pair: Some(Arc::new(self.server_config.validator_secret.copy())), + allow_inactive_chains: false, + allow_messages_from_deprecated_epochs: false, + block_time_grace_period: self.block_time_grace_period, + ttl: util::non_zero_duration(self.chain_worker_ttl), + chain_info_max_received_log_entries: self.chain_info_max_received_log_entries, + block_cache_size: self.block_cache_size, + execution_state_cache_size: self.execution_state_cache_size, + allow_revert_confirm: self.allow_revert_confirm, + reset_on_incorrect_outcome: self + .reset_on_incorrect_outcome_mins + .map(|m| linera_base::time::Duration::from_secs(m * 60)), + cross_chain_message_chunk_limit: self.cross_chain_message_chunk_limit, + ..ChainWorkerConfig::default() + }; + let state = WorkerState::new(storage, config, None); (state, shard_id, shard.clone()) } diff --git a/linera-service/src/util.rs b/linera-service/src/util.rs index ea56c2d1c1cd..578546fd09a7 100644 --- a/linera-service/src/util.rs +++ b/linera-service/src/util.rs @@ -8,6 +8,11 @@ use std::{ time::Duration, }; +/// Treats a zero `Duration` as `None` (disabled). +pub fn non_zero_duration(d: Duration) -> Option { + (d > Duration::ZERO).then_some(d) +} + use anyhow::{bail, Context as _, Result}; use async_graphql::http::GraphiQLSource; use axum::response::{self, IntoResponse}; diff --git a/linera-service/tests/wallet.rs b/linera-service/tests/wallet.rs index d5c5eb753995..7608c58cc88c 100644 --- a/linera-service/tests/wallet.rs +++ b/linera-service/tests/wallet.rs @@ -31,8 +31,8 @@ pub async fn new_test_client_context( let send_recv_timeout = Duration::from_millis(4000); let retry_delay = Duration::from_millis(1000); let max_retries = 10; - let chain_worker_ttl = Duration::from_secs(30); - let sender_chain_worker_ttl = Duration::from_secs(1); + let chain_worker_ttl = Some(Duration::from_secs(30)); + let sender_chain_worker_ttl = Some(Duration::from_secs(1)); let node_options = NodeOptions { send_timeout: send_recv_timeout,