diff --git a/cli/golem-cli/wit/deps/golem-1.x/golem-oplog.wit b/cli/golem-cli/wit/deps/golem-1.x/golem-oplog.wit index 18e98d5672..4cc7f5a569 100644 --- a/cli/golem-cli/wit/deps/golem-1.x/golem-oplog.wit +++ b/cli/golem-cli/wit/deps/golem-1.x/golem-oplog.wit @@ -106,7 +106,8 @@ interface oplog { initial-total-linear-memory-size: u64, initial-active-plugins: list, local-agent-config: list, - original-phantom-id: option + original-phantom-id: option, + instance-id: uuid } record host-call-parameters { @@ -453,7 +454,8 @@ interface oplog { initial-total-linear-memory-size: u64, initial-active-plugins: list, local-agent-config: list, - original-phantom-id: option + original-phantom-id: option, + instance-id: uuid } record raw-host-call-parameters { diff --git a/golem-api-grpc/proto/golem/worker/public_oplog.proto b/golem-api-grpc/proto/golem/worker/public_oplog.proto index 27265b6ee3..5df6f80eb0 100644 --- a/golem-api-grpc/proto/golem/worker/public_oplog.proto +++ b/golem-api-grpc/proto/golem/worker/public_oplog.proto @@ -97,6 +97,7 @@ message CreateParameters { repeated PluginInstallationDescription initial_active_plugins = 11; golem.common.EnvironmentId environment_id = 12; optional golem.common.UUID original_phantom_id = 13; + golem.common.UUID instance_id = 14; } message HostCallParameters { diff --git a/golem-api-grpc/proto/golem/worker/raw_oplog.proto b/golem-api-grpc/proto/golem/worker/raw_oplog.proto index a119dea201..5c00bad7ad 100644 --- a/golem-api-grpc/proto/golem/worker/raw_oplog.proto +++ b/golem-api-grpc/proto/golem/worker/raw_oplog.proto @@ -139,6 +139,7 @@ message RawCreateParameters { repeated golem.component.EnvironmentPluginGrantId initial_active_plugins = 9; repeated bytes local_agent_config = 10; optional golem.common.UUID original_phantom_id = 11; + optional golem.common.UUID instance_id = 12; } message RawHostCallParameters { diff --git a/golem-api-grpc/proto/golem/worker/v1/worker_service.proto b/golem-api-grpc/proto/golem/worker/v1/worker_service.proto index fe408cbcc0..f9bafe5023 100644 --- a/golem-api-grpc/proto/golem/worker/v1/worker_service.proto +++ b/golem-api-grpc/proto/golem/worker/v1/worker_service.proto @@ -61,6 +61,7 @@ message LaunchNewWorkerResponse { message LaunchNewWorkerSuccessResponse { golem.worker.AgentId agentId = 1; uint64 component_version = 2; + golem.common.UUID instance_id = 3; } message CompletePromiseRequest { diff --git a/golem-api-grpc/proto/golem/workerexecutor/v1/worker_executor.proto b/golem-api-grpc/proto/golem/workerexecutor/v1/worker_executor.proto index 94a62a02d3..914abd5287 100644 --- a/golem-api-grpc/proto/golem/workerexecutor/v1/worker_executor.proto +++ b/golem-api-grpc/proto/golem/workerexecutor/v1/worker_executor.proto @@ -131,9 +131,13 @@ message CreateWorkerRequest { optional golem.worker.InvocationContext invocation_context = 9; } +message CreateWorkerSuccessResponse { + golem.common.UUID instance_id = 1; +} + message CreateWorkerResponse { oneof result { - golem.common.Empty success = 1; + CreateWorkerSuccessResponse success = 1; golem.worker.v1.WorkerExecutionError failure = 2; } } diff --git a/golem-common/src/base_model/mod.rs b/golem-common/src/base_model/mod.rs index 5b0825578d..d2204c19ac 100644 --- a/golem-common/src/base_model/mod.rs +++ b/golem-common/src/base_model/mod.rs @@ -165,6 +165,39 @@ impl FromValue for Timestamp { } } +/// A stable, per-instance fingerprint for a worker, generated as a random UUID at creation time. +/// Globally unique across recreations of the same agent ID. +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[cfg_attr( + feature = "full", + derive( + desert_rust::BinaryCodec, + golem_wasm_derive::IntoValue, + golem_wasm_derive::FromValue, + ) +)] +#[cfg_attr(feature = "full", desert(transparent))] +#[serde(transparent)] +pub struct AgentFingerprint(pub Uuid); + +impl Default for AgentFingerprint { + fn default() -> Self { + Self::new() + } +} + +impl AgentFingerprint { + pub fn new() -> Self { + AgentFingerprint(Uuid::now_v7()) + } +} + +impl Display for AgentFingerprint { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0) + } +} + #[cfg(feature = "full")] impl From for golem_wasm::wasi::clocks::wall_clock::Datetime { fn from(value: Timestamp) -> Self { diff --git a/golem-common/src/base_model/oplog/mod.rs b/golem-common/src/base_model/oplog/mod.rs index c67d67a636..a7ef60b5b3 100644 --- a/golem-common/src/base_model/oplog/mod.rs +++ b/golem-common/src/base_model/oplog/mod.rs @@ -80,7 +80,9 @@ oplog_entry! { initial_total_linear_memory_size: u64, initial_active_plugins: HashSet, local_agent_config: Vec, - original_phantom_id: Option + original_phantom_id: Option, + /// Per-instance fingerprint, unique across recreations of the same agent ID. + instance_id: Uuid } public { agent_id: AgentId, @@ -93,7 +95,8 @@ oplog_entry! { initial_total_linear_memory_size: u64, initial_active_plugins: BTreeSet, local_agent_config: Vec, - original_phantom_id: Option + original_phantom_id: Option, + instance_id: Uuid } }, /// The agent invoked a host function diff --git a/golem-common/src/lib.rs b/golem-common/src/lib.rs index 19298c2a17..aa814e340e 100644 --- a/golem-common/src/lib.rs +++ b/golem-common/src/lib.rs @@ -12,6 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +// The oplog_entry! macro expands to large enums processed by serde/poem_openapi +// which require an increased recursion limit. +#![recursion_limit = "256"] + use http::Uri; use shadow_rs::shadow; use std::convert::Infallible; diff --git a/golem-common/src/model/mod.rs b/golem-common/src/model/mod.rs index 94389511ac..3cf2440502 100644 --- a/golem-common/src/model/mod.rs +++ b/golem-common/src/model/mod.rs @@ -335,10 +335,17 @@ pub enum ScheduledAction { }, /// Invoke the given action on the worker. The invocation will only /// be persisted in the oplog when it's actually getting scheduled. + /// + /// `target_worker_fingerprint` guards against stale invocations: when `Some(fp)`, the + /// scheduler compares `fp` against the current worker's `fingerprint` before firing. A + /// mismatch means the original worker was deleted and a new one was created with the same + /// ID — the invocation is silently dropped. `None` means fire unconditionally (used for + /// legacy entries and non-wasm-rpc invocations). Invoke { account_id: AccountId, owned_agent_id: OwnedAgentId, invocation: Box, + target_worker_fingerprint: AgentFingerprint, }, /// Resume the agent Resume { @@ -506,8 +513,7 @@ impl Display for ShardAssignment { } } -#[derive(Clone, Debug, PartialEq, BinaryCodec)] -#[desert(evolution())] +#[derive(Clone, Debug, PartialEq)] pub struct AgentMetadata { pub agent_id: AgentId, pub env: Vec<(String, String)>, @@ -518,27 +524,10 @@ pub struct AgentMetadata { pub parent: Option, pub last_known_status: AgentStatusRecord, pub original_phantom_id: Option, + pub fingerprint: AgentFingerprint, } impl AgentMetadata { - pub fn default( - agent_id: AgentId, - created_by: AccountId, - environment_id: EnvironmentId, - ) -> AgentMetadata { - AgentMetadata { - agent_id, - env: vec![], - environment_id, - created_by, - config: Vec::new(), - created_at: Timestamp::now_utc(), - parent: None, - last_known_status: AgentStatusRecord::default(), - original_phantom_id: None, - } - } - pub fn owned_agent_id(&self) -> OwnedAgentId { OwnedAgentId::new(self.environment_id, &self.agent_id) } diff --git a/golem-common/src/model/oplog/payload/types.rs b/golem-common/src/model/oplog/payload/types.rs index 4f068526de..41906f42a5 100644 --- a/golem-common/src/model/oplog/payload/types.rs +++ b/golem-common/src/model/oplog/payload/types.rs @@ -22,8 +22,8 @@ use crate::model::oplog::{ }; use crate::model::worker::TypedAgentConfigEntry; use crate::model::{ - AccountId, AgentId, AgentInvocation, AgentMetadata, AgentStatus, IdempotencyKey, OwnedAgentId, - RdbmsPoolKey, ScheduleId, ScheduledAction, + AccountId, AgentFingerprint, AgentId, AgentInvocation, AgentMetadata, AgentStatus, + IdempotencyKey, OwnedAgentId, RdbmsPoolKey, ScheduleId, ScheduledAction, }; use bigdecimal::BigDecimal; use bit_vec::BitVec; @@ -1355,6 +1355,7 @@ pub struct SerializableScheduledInvocation { pub trace_id: TraceId, pub trace_states: Vec, pub spans: Vec>, + pub target_worker_fingerprint: AgentFingerprint, } impl SerializableScheduledInvocation { @@ -1364,6 +1365,7 @@ impl SerializableScheduledInvocation { account_id, owned_agent_id, invocation, + target_worker_fingerprint, } => match *invocation { AgentInvocation::AgentMethod { idempotency_key, @@ -1383,6 +1385,7 @@ impl SerializableScheduledInvocation { spans: encode_span_data(&invocation_context.to_oplog_data()), trace_id: invocation_context.trace_id, trace_states: invocation_context.trace_states, + target_worker_fingerprint, }), other => Err(format!( "ScheduleId contains a non-method invocation: {:?}", @@ -1414,6 +1417,7 @@ impl SerializableScheduledInvocation { invocation_context, principal: self.principal, }), + target_worker_fingerprint: self.target_worker_fingerprint, }, } } diff --git a/golem-common/src/model/oplog/protobuf.rs b/golem-common/src/model/oplog/protobuf.rs index e7f217b885..835ae95ac1 100644 --- a/golem-common/src/model/oplog/protobuf.rs +++ b/golem-common/src/model/oplog/protobuf.rs @@ -315,6 +315,10 @@ impl TryFrom for PublicOplogEn .collect::, _>>()?, ), original_phantom_id: create.original_phantom_id.map(|id| id.into()), + instance_id: create + .instance_id + .map(|id| id.into()) + .ok_or("Missing instance_id in Create entry")?, })), oplog_entry::Entry::HostCall(host_call) => { Ok(PublicOplogEntry::HostCall(HostCallParams { @@ -759,6 +763,7 @@ impl TryFrom for golem_api_grpc::proto::golem::worker::OplogEn .map(Into::into) .collect(), original_phantom_id: create.original_phantom_id.map(|id| id.into()), + instance_id: Some(create.instance_id.into()), }, )), }, @@ -2125,6 +2130,7 @@ impl TryFrom for OplogEntry { .map(|p| p.environment_plugin_grant_id) .collect(), original_phantom_id: create.original_phantom_id, + instance_id: create.instance_id }), PublicOplogEntry::HostCall(host_call) => { let durable_function_type = match host_call.durable_function_type { @@ -2795,6 +2801,7 @@ impl TryFrom for golem_api_grpc::proto::golem::worker::RawOplogEntry initial_active_plugins, local_agent_config, original_phantom_id, + instance_id, .. } => Entry::Create(RawCreateParameters { agent_id: Some(agent_id.into()), @@ -2817,6 +2824,7 @@ impl TryFrom for golem_api_grpc::proto::golem::worker::RawOplogEntry .map(|e| crate::serialization::serialize(&e)) .collect::, _>>()?, original_phantom_id: original_phantom_id.map(Into::into), + instance_id: Some(instance_id.into()), }), OplogEntry::HostCall { function_name, @@ -3147,6 +3155,8 @@ impl TryFrom for OplogEntry let proto_uuid: golem_api_grpc::proto::golem::common::Uuid = u; uuid::Uuid::from(proto_uuid) }); + let instance_id = p.instance_id.ok_or("Missing instance_id")?.into(); + Ok(OplogEntry::Create { timestamp, agent_id, @@ -3160,6 +3170,7 @@ impl TryFrom for OplogEntry initial_active_plugins, local_agent_config, original_phantom_id, + instance_id, }) } Entry::HostCall(p) => { diff --git a/golem-common/src/model/oplog/tests.rs b/golem-common/src/model/oplog/tests.rs index 140c99305d..4b6c7fd125 100644 --- a/golem-common/src/model/oplog/tests.rs +++ b/golem-common/src/model/oplog/tests.rs @@ -93,6 +93,7 @@ fn create_serialization_poem_serde_equivalence() { parameters: BTreeMap::new(), }]), original_phantom_id: None, + instance_id: Uuid::new_v4(), }); let serialized = entry.to_json_string(); let deserialized: PublicOplogEntry = serde_json::from_str(&serialized).unwrap(); diff --git a/golem-common/src/model/tests.rs b/golem-common/src/model/tests.rs index 017e31535e..4ee971d4c8 100644 --- a/golem-common/src/model/tests.rs +++ b/golem-common/src/model/tests.rs @@ -17,7 +17,7 @@ use crate::model::environment::EnvironmentId; use crate::model::oplog::{OplogIndex, TimestampedUpdateDescription, UpdateDescription}; use crate::model::worker::TypedAgentConfigEntry; use crate::model::{ - AccountId, AgentFilter, AgentId, AgentInvocation, AgentMetadata, AgentStatus, + AccountId, AgentFilter, AgentFingerprint, AgentId, AgentInvocation, AgentMetadata, AgentStatus, AgentStatusRecord, ComponentId, FilterComparator, IdempotencyKey, StringFilterComparator, Timestamp, TimestampedAgentInvocation, }; @@ -28,7 +28,7 @@ use serde::{Deserialize, Serialize}; use std::str::FromStr; use std::vec; use test_r::test; -use uuid::uuid; +use uuid::{Uuid, uuid}; #[test] fn timestamp_conversion() { @@ -228,6 +228,7 @@ fn worker_filter_matches() { ..AgentStatusRecord::default() }, original_phantom_id: None, + fingerprint: AgentFingerprint(Uuid::now_v7()), }; assert!( diff --git a/golem-common/wit/deps/golem-1.x/golem-oplog.wit b/golem-common/wit/deps/golem-1.x/golem-oplog.wit index 18e98d5672..4cc7f5a569 100644 --- a/golem-common/wit/deps/golem-1.x/golem-oplog.wit +++ b/golem-common/wit/deps/golem-1.x/golem-oplog.wit @@ -106,7 +106,8 @@ interface oplog { initial-total-linear-memory-size: u64, initial-active-plugins: list, local-agent-config: list, - original-phantom-id: option + original-phantom-id: option, + instance-id: uuid } record host-call-parameters { @@ -453,7 +454,8 @@ interface oplog { initial-total-linear-memory-size: u64, initial-active-plugins: list, local-agent-config: list, - original-phantom-id: option + original-phantom-id: option, + instance-id: uuid } record raw-host-call-parameters { diff --git a/golem-debugging-service/src/debug_session.rs b/golem-debugging-service/src/debug_session.rs index 26f96c4121..ff3bb28f84 100644 --- a/golem-debugging-service/src/debug_session.rs +++ b/golem-debugging-service/src/debug_session.rs @@ -244,6 +244,7 @@ fn get_oplog_entry_from_public_oplog_entry( initial_total_linear_memory_size, initial_active_plugins, original_phantom_id, + instance_id, }) => Ok(OplogEntry::Create { timestamp, agent_id, @@ -260,6 +261,7 @@ fn get_oplog_entry_from_public_oplog_entry( .map(|x| x.environment_plugin_grant_id) .collect(), original_phantom_id, + instance_id, }), PublicOplogEntry::HostCall(HostCallParams { timestamp, diff --git a/golem-debugging-service/tests/services/worker_proxy.rs b/golem-debugging-service/tests/services/worker_proxy.rs index 442fc002fc..80d41517a6 100644 --- a/golem-debugging-service/tests/services/worker_proxy.rs +++ b/golem-debugging-service/tests/services/worker_proxy.rs @@ -13,7 +13,9 @@ use golem_common::model::agent::{AgentInvocationMode, Principal, UntypedDataValu use golem_common::model::component::ComponentRevision; use golem_common::model::invocation_context::InvocationContextStack; use golem_common::model::worker::{AgentConfigEntryDto, RevertWorkerTarget}; -use golem_common::model::{AgentId, IdempotencyKey, InvocationStatus, OwnedAgentId, PromiseId}; +use golem_common::model::{ + AgentFingerprint, AgentId, IdempotencyKey, InvocationStatus, OwnedAgentId, PromiseId, +}; use golem_service_base::error::worker_executor::WorkerExecutorError; use golem_service_base::model::auth::{AuthCtx, UserAuthCtx}; use golem_worker_executor::services::worker_proxy::{WorkerProxy, WorkerProxyError}; @@ -74,7 +76,7 @@ impl WorkerProxy for TestWorkerProxy { _caller_account_id: AccountId, _agent_config: Vec, _principal: Principal, - ) -> Result<(), WorkerProxyError> { + ) -> Result { Err(WorkerProxyError::InternalError( WorkerExecutorError::unknown( "Not implemented in tests as debug service is not expected to call start through proxy", diff --git a/golem-wasm/wit/deps/golem-1.x/golem-oplog.wit b/golem-wasm/wit/deps/golem-1.x/golem-oplog.wit index 18e98d5672..4cc7f5a569 100644 --- a/golem-wasm/wit/deps/golem-1.x/golem-oplog.wit +++ b/golem-wasm/wit/deps/golem-1.x/golem-oplog.wit @@ -106,7 +106,8 @@ interface oplog { initial-total-linear-memory-size: u64, initial-active-plugins: list, local-agent-config: list, - original-phantom-id: option + original-phantom-id: option, + instance-id: uuid } record host-call-parameters { @@ -453,7 +454,8 @@ interface oplog { initial-total-linear-memory-size: u64, initial-active-plugins: list, local-agent-config: list, - original-phantom-id: option + original-phantom-id: option, + instance-id: uuid } record raw-host-call-parameters { diff --git a/golem-worker-executor/src/durable_host/wasm_rpc/mod.rs b/golem-worker-executor/src/durable_host/wasm_rpc/mod.rs index afbc7782d7..7943cb6b4a 100644 --- a/golem-worker-executor/src/durable_host/wasm_rpc/mod.rs +++ b/golem-worker-executor/src/durable_host/wasm_rpc/mod.rs @@ -50,8 +50,8 @@ use golem_common::model::oplog::{ HostResponseGolemRpcUnitOrFailure, OplogEntry, PersistenceLevel, }; use golem_common::model::{ - AgentId, AgentInvocation, IdempotencyKey, NamedRetryPolicy, OplogIndex, OwnedAgentId, - PredicateValue, RetryContext, RetryProperties, ScheduledAction, + AgentFingerprint, AgentId, AgentInvocation, IdempotencyKey, NamedRetryPolicy, OplogIndex, + OwnedAgentId, PredicateValue, RetryContext, RetryProperties, ScheduledAction, }; use golem_common::serialization::{deserialize, serialize}; @@ -573,6 +573,7 @@ impl HostWasmRpc for DurableWorkerCtx { let entry = self.table().get(&this)?; let payload = entry.payload.downcast_ref::().unwrap(); let remote_agent_id = payload.remote_agent_id.clone(); + let target_worker_fingerprint = payload.target_fingerprint; let input_untyped: UntypedDataValue = input.into(); @@ -611,6 +612,7 @@ impl HostWasmRpc for DurableWorkerCtx { invocation_context: stack, principal: Principal::anonymous(), }), + target_worker_fingerprint, }; let result = self @@ -1177,11 +1179,13 @@ pub async fn construct_wasm_rpc_resource( config, ) .await?; + let target_fingerprint = demand.fingerprint(); let entry = ctx.table().push(WasmRpcEntry { payload: Box::new(WasmRpcEntryPayload { demand, remote_agent_id, span_id: span.span_id().clone(), + target_fingerprint, }), })?; Ok(entry) @@ -1443,6 +1447,7 @@ pub struct WasmRpcEntryPayload { pub demand: Box, pub remote_agent_id: OwnedAgentId, pub span_id: SpanId, + pub target_fingerprint: AgentFingerprint, } impl Debug for WasmRpcEntryPayload { diff --git a/golem-worker-executor/src/grpc/mod.rs b/golem-worker-executor/src/grpc/mod.rs index 388591da17..17f0355810 100644 --- a/golem-worker-executor/src/grpc/mod.rs +++ b/golem-worker-executor/src/grpc/mod.rs @@ -61,7 +61,7 @@ use golem_common::model::oplog::{OplogIndex, UpdateDescription}; use golem_common::model::protobuf::to_protobuf_resource_description; use golem_common::model::worker::{AgentConfigEntryDto, AgentMetadataDto, TypedAgentConfigEntry}; use golem_common::model::{ - AgentEvent, AgentFilter, AgentId, AgentInvocation, AgentInvocationOutput, + AgentEvent, AgentFilter, AgentFingerprint, AgentId, AgentInvocation, AgentInvocationOutput, AgentInvocationResult, AgentMetadata, AgentStatus, IdempotencyKey, InvocationStatus, OwnedAgentId, ScanCursor, ScheduledAction, ShardId, Timestamp, TimestampedAgentInvocation, }; @@ -295,7 +295,7 @@ impl + UsesAllDeps + Send + Sync + async fn create_worker_internal( &self, request: golem::workerexecutor::v1::CreateWorkerRequest, - ) -> Result<(), WorkerExecutorError> { + ) -> Result { let owned_agent_id = extract_owned_agent_id(&request, |r| &r.agent_id, |r| &r.environment_id)?; let owned_agent_id = self.canonicalize_owned_agent_id(&owned_agent_id).await?; @@ -352,6 +352,8 @@ impl + UsesAllDeps + Send + Sync + ) .await?; + let fingerprint = worker.get_initial_worker_metadata().fingerprint; + let mut subscription = self.events().subscribe(); Worker::start_if_needed(worker.clone()).await?; if worker.is_loading() { @@ -366,7 +368,7 @@ impl + UsesAllDeps + Send + Sync + }) .await { - Ok(Ok(())) => Ok(()), + Ok(Ok(())) => Ok(fingerprint), Ok(Err(e)) => Err(e), Err(RecvError::Closed) => { Err(WorkerExecutorError::unknown("Events subscription closed")) @@ -376,7 +378,7 @@ impl + UsesAllDeps + Send + Sync + )), } } else { - Ok(()) + Ok(fingerprint) } } @@ -1772,6 +1774,9 @@ impl + UsesAllDeps + Send + Sync + golem_api_grpc::proto::golem::worker::AgentInvocationMode::Schedule => { match schedule_at { Some(scheduled_time) => { + let worker = self.get_or_create_pending(&request).await?; + let target_worker_fingerprint = + worker.get_initial_worker_metadata().fingerprint; self.scheduler_service() .schedule( scheduled_time, @@ -1779,6 +1784,7 @@ impl + UsesAllDeps + Send + Sync + account_id, owned_agent_id, invocation: Box::new(invocation), + target_worker_fingerprint, }, ) .await; @@ -2015,11 +2021,13 @@ impl + UsesAllDeps + Send + Sync + .instrument(record.span.clone()) .await { - Ok(_) => record.succeed(Ok(Response::new( + Ok(fingerprint) => record.succeed(Ok(Response::new( golem::workerexecutor::v1::CreateWorkerResponse { result: Some( golem::workerexecutor::v1::create_worker_response::Result::Success( - golem::common::Empty {}, + golem::workerexecutor::v1::CreateWorkerSuccessResponse { + instance_id: Some(fingerprint.0.into()), + }, ), ), }, diff --git a/golem-worker-executor/src/model/public_oplog/mod.rs b/golem-worker-executor/src/model/public_oplog/mod.rs index 56018a3bd0..747d115b6d 100644 --- a/golem-worker-executor/src/model/public_oplog/mod.rs +++ b/golem-worker-executor/src/model/public_oplog/mod.rs @@ -242,6 +242,7 @@ impl PublicOplogEntryOps for PublicOplogEntry { initial_active_plugins, local_agent_config, original_phantom_id, + instance_id, } => { let metadata = components .get_metadata( @@ -278,6 +279,7 @@ impl PublicOplogEntryOps for PublicOplogEntry { initial_active_plugins: initial_plugins, local_agent_config, original_phantom_id, + instance_id, })) } OplogEntry::HostCall { diff --git a/golem-worker-executor/src/model/public_oplog/wit.rs b/golem-worker-executor/src/model/public_oplog/wit.rs index 0060bfde4f..5d6bf65783 100644 --- a/golem-worker-executor/src/model/public_oplog/wit.rs +++ b/golem-worker-executor/src/model/public_oplog/wit.rs @@ -56,6 +56,7 @@ impl From for oplog::PublicOplogEntry { initial_active_plugins, local_agent_config, original_phantom_id, + instance_id, }) => Self::Create(oplog::CreateParameters { timestamp: timestamp.into(), agent_id: agent_id.into(), @@ -78,6 +79,7 @@ impl From for oplog::PublicOplogEntry { }) .collect(), original_phantom_id: original_phantom_id.map(|id| id.into()), + instance_id: instance_id.into(), }), PublicOplogEntry::HostCall(HostCallParams { timestamp, @@ -825,6 +827,10 @@ impl TryFrom for golem_common::model::oplog::OplogEntry { original_phantom_id: params .original_phantom_id .map(|uuid| uuid::Uuid::from_u64_pair(uuid.high_bits, uuid.low_bits)), + instance_id: uuid::Uuid::from_u64_pair( + params.instance_id.high_bits, + params.instance_id.low_bits, + ), }), oplog::OplogEntry::HostCall(params) => Ok(Self::HostCall { timestamp: timestamp_from_datetime(params.timestamp), diff --git a/golem-worker-executor/src/services/oplog/plugin.rs b/golem-worker-executor/src/services/oplog/plugin.rs index 228304ae45..521edd0dad 100644 --- a/golem-worker-executor/src/services/oplog/plugin.rs +++ b/golem-worker-executor/src/services/oplog/plugin.rs @@ -1652,7 +1652,6 @@ fn oplog_processor_idempotency_key( mod tests { use super::*; use golem_common::base_model::component_metadata::KnownExports; - use golem_common::model::Timestamp; use golem_common::model::account::AccountId; use golem_common::model::application::ApplicationId; use golem_common::model::component::{ @@ -1664,6 +1663,7 @@ mod tests { use golem_common::model::environment_plugin_grant::EnvironmentPluginGrantId; use golem_common::model::oplog::PersistenceLevel; use golem_common::model::plugin_registration::PluginRegistrationId; + use golem_common::model::{AgentFingerprint, Timestamp}; use golem_common::read_only_lock; use golem_service_base::model::component::Component; use test_r::test; @@ -2078,6 +2078,7 @@ mod tests { parent: None, last_known_status: status.clone(), original_phantom_id: None, + fingerprint: AgentFingerprint::new(), }; let status_lock = diff --git a/golem-worker-executor/src/services/oplog/rate_limited.rs b/golem-worker-executor/src/services/oplog/rate_limited.rs index 0dd07bbd02..ddfc34f52c 100644 --- a/golem-worker-executor/src/services/oplog/rate_limited.rs +++ b/golem-worker-executor/src/services/oplog/rate_limited.rs @@ -385,7 +385,9 @@ mod tests { use golem_common::model::component::ComponentId; use golem_common::model::environment::EnvironmentId; use golem_common::model::regions::OplogRegion; - use golem_common::model::{AgentId, AgentMetadata, AgentStatusRecord, OwnedAgentId, Timestamp}; + use golem_common::model::{ + AgentFingerprint, AgentId, AgentMetadata, AgentStatusRecord, OwnedAgentId, Timestamp, + }; use golem_common::read_only_lock; use golem_service_base::error::worker_executor::WorkerExecutorError; use golem_service_base::storage::blob::memory::InMemoryBlobStorage; @@ -396,6 +398,25 @@ mod tests { test_r::enable!(); + fn make_agent_metadata( + agent_id: AgentId, + created_by: AccountId, + environment_id: EnvironmentId, + ) -> AgentMetadata { + AgentMetadata { + agent_id, + env: vec![], + environment_id, + created_by, + config: Vec::new(), + created_at: Timestamp::now_utc(), + parent: None, + last_known_status: AgentStatusRecord::default(), + original_phantom_id: None, + fingerprint: AgentFingerprint::new(), + } + } + /// [`ResourceLimits`] stub that always returns the same pre-seeded entry /// regardless of account. Allows tests to inject a specific rate directly. struct FixedResourceLimits { @@ -474,7 +495,7 @@ mod tests { .open( &owned, None, - AgentMetadata::default(agent_id, account_id, env_id), + make_agent_metadata(agent_id, account_id, env_id), last_known_status, execution_status, ) diff --git a/golem-worker-executor/src/services/oplog/tests.rs b/golem-worker-executor/src/services/oplog/tests.rs index e50abb7b73..05318678c2 100644 --- a/golem-worker-executor/src/services/oplog/tests.rs +++ b/golem-worker-executor/src/services/oplog/tests.rs @@ -27,8 +27,10 @@ use golem_common::model::component::ComponentId; use golem_common::model::invocation_context::InvocationContextStack; use golem_common::model::oplog::{AgentError, LogLevel}; use golem_common::model::regions::OplogRegion; +use golem_common::model::{ + AgentFingerprint, AgentMetadata, AgentStatusRecord, IdempotencyKey, OwnedAgentId, +}; use golem_common::model::{AgentInvocationPayload, RetryConfig}; -use golem_common::model::{AgentMetadata, AgentStatusRecord, IdempotencyKey, OwnedAgentId}; use golem_common::redis::RedisPool; use golem_common::tracing::{TracingConfig, init_tracing}; use golem_service_base::db::sqlite::SqlitePool; @@ -59,6 +61,25 @@ fn tracing() -> Tracing { Tracing::init() } +fn make_agent_metadata( + agent_id: AgentId, + created_by: AccountId, + environment_id: EnvironmentId, +) -> AgentMetadata { + AgentMetadata { + agent_id, + env: vec![], + environment_id, + created_by, + config: Vec::new(), + created_at: Timestamp::now_utc(), + parent: None, + last_known_status: AgentStatusRecord::default(), + original_phantom_id: None, + fingerprint: AgentFingerprint::new(), + } +} + fn default_last_known_status() -> read_only_lock::tokio::ReadOnlyLock { read_only_lock::tokio::ReadOnlyLock::new(Arc::new(tokio::sync::RwLock::new( AgentStatusRecord::default(), @@ -98,7 +119,7 @@ async fn open_add_and_read_back(_tracing: &Tracing) { .open( &owned_agent_id, None, - AgentMetadata::default(agent_id.clone(), account_id, environment_id), + make_agent_metadata(agent_id.clone(), account_id, environment_id), default_last_known_status(), default_execution_status(AgentMode::Durable), ) @@ -159,7 +180,7 @@ async fn open_add_and_read_back_many(_tracing: &Tracing) { .open( &owned_agent_id, None, - AgentMetadata::default(agent_id.clone(), account_id, environment_id), + make_agent_metadata(agent_id.clone(), account_id, environment_id), default_last_known_status(), default_execution_status(AgentMode::Durable), ) @@ -227,7 +248,7 @@ async fn open_add_and_read_back_ephemeral(_tracing: &Tracing) { .open( &owned_agent_id, None, - AgentMetadata::default(agent_id.clone(), account_id, environment_id), + make_agent_metadata(agent_id.clone(), account_id, environment_id), default_last_known_status(), default_execution_status(AgentMode::Ephemeral), ) @@ -302,7 +323,7 @@ async fn open_add_and_read_back_many_ephemeral(_tracing: &Tracing) { .open( &owned_agent_id, None, - AgentMetadata::default(agent_id.clone(), account_id, environment_id), + make_agent_metadata(agent_id.clone(), account_id, environment_id), default_last_known_status(), default_execution_status(AgentMode::Ephemeral), ) @@ -370,7 +391,7 @@ async fn ephemeral_read_many_committed_only(_tracing: &Tracing) { .open( &owned_agent_id, None, - AgentMetadata::default(agent_id.clone(), account_id, environment_id), + make_agent_metadata(agent_id.clone(), account_id, environment_id), default_last_known_status(), default_execution_status(AgentMode::Ephemeral), ) @@ -433,7 +454,7 @@ async fn ephemeral_read_many_uncommitted_only(_tracing: &Tracing) { .open( &owned_agent_id, None, - AgentMetadata::default(agent_id.clone(), account_id, environment_id), + make_agent_metadata(agent_id.clone(), account_id, environment_id), default_last_known_status(), default_execution_status(AgentMode::Ephemeral), ) @@ -493,7 +514,7 @@ async fn ephemeral_read_many_partial_range(_tracing: &Tracing) { .open( &owned_agent_id, None, - AgentMetadata::default(agent_id.clone(), account_id, environment_id), + make_agent_metadata(agent_id.clone(), account_id, environment_id), default_last_known_status(), default_execution_status(AgentMode::Ephemeral), ) @@ -594,7 +615,7 @@ async fn ephemeral_read_many_across_archive_layers(_tracing: &Tracing) { .open( &owned_agent_id, None, - AgentMetadata::default(agent_id.clone(), account_id, environment_id), + make_agent_metadata(agent_id.clone(), account_id, environment_id), default_last_known_status(), default_execution_status(AgentMode::Ephemeral), ) @@ -708,7 +729,7 @@ async fn ephemeral_read_many_zero_returns_empty(_tracing: &Tracing) { .open( &owned_agent_id, None, - AgentMetadata::default(agent_id.clone(), account_id, environment_id), + make_agent_metadata(agent_id.clone(), account_id, environment_id), default_last_known_status(), default_execution_status(AgentMode::Ephemeral), ) @@ -745,7 +766,7 @@ async fn entries_with_small_payload(_tracing: &Tracing) { .open( &owned_agent_id, None, - AgentMetadata::default(agent_id.clone(), account_id, environment_id), + make_agent_metadata(agent_id.clone(), account_id, environment_id), default_last_known_status(), default_execution_status(AgentMode::Durable), ) @@ -930,7 +951,7 @@ async fn entries_with_large_payload(_tracing: &Tracing) { .open( &owned_agent_id, None, - AgentMetadata::default(agent_id.clone(), account_id, environment_id), + make_agent_metadata(agent_id.clone(), account_id, environment_id), default_last_known_status(), default_execution_status(AgentMode::Durable), ) @@ -1192,7 +1213,7 @@ async fn multilayer_transfers_entries_after_limit_reached( .open( &owned_agent_id, None, - AgentMetadata::default(agent_id.clone(), account_id, environment_id), + make_agent_metadata(agent_id.clone(), account_id, environment_id), default_last_known_status(), default_execution_status(AgentMode::Durable), ) @@ -1220,7 +1241,7 @@ async fn multilayer_transfers_entries_after_limit_reached( .open( &owned_agent_id, None, - AgentMetadata::default(agent_id.clone(), account_id, environment_id), + make_agent_metadata(agent_id.clone(), account_id, environment_id), default_last_known_status(), default_execution_status(AgentMode::Durable), ) @@ -1246,7 +1267,7 @@ async fn multilayer_transfers_entries_after_limit_reached( .open( &owned_agent_id, None, - AgentMetadata::default(agent_id.clone(), account_id, environment_id), + make_agent_metadata(agent_id.clone(), account_id, environment_id), default_last_known_status(), default_execution_status(AgentMode::Durable), ) @@ -1331,7 +1352,7 @@ async fn read_from_archive_impl(use_blob: bool) { .open( &owned_agent_id, None, - AgentMetadata::default(agent_id.clone(), account_id, environment_id), + make_agent_metadata(agent_id.clone(), account_id, environment_id), default_last_known_status(), default_execution_status(AgentMode::Durable), ) @@ -1371,7 +1392,7 @@ async fn read_from_archive_impl(use_blob: bool) { .open( &owned_agent_id, None, - AgentMetadata::default(agent_id.clone(), account_id, environment_id), + make_agent_metadata(agent_id.clone(), account_id, environment_id), default_last_known_status(), default_execution_status(AgentMode::Durable), ) @@ -1475,6 +1496,7 @@ async fn read_initial_from_archive_impl(use_blob: bool) { initial_total_linear_memory_size: 0, initial_active_plugins: HashSet::new(), original_phantom_id: None, + instance_id: Uuid::new_v4(), } .rounded(); @@ -1482,7 +1504,7 @@ async fn read_initial_from_archive_impl(use_blob: bool) { .create( &owned_agent_id, create_entry.clone(), - AgentMetadata::default(agent_id.clone(), account_id, environment_id), + make_agent_metadata(agent_id.clone(), account_id, environment_id), default_last_known_status(), default_execution_status(AgentMode::Durable), ) @@ -1616,7 +1638,7 @@ async fn write_after_archive_impl(use_blob: bool, reopen: Reopen) { .open( &owned_agent_id, None, - AgentMetadata::default(agent_id.clone(), account_id, environment_id), + make_agent_metadata(agent_id.clone(), account_id, environment_id), default_last_known_status(), default_execution_status(AgentMode::Durable), ) @@ -1649,7 +1671,7 @@ async fn write_after_archive_impl(use_blob: bool, reopen: Reopen) { .open( &owned_agent_id, None, - AgentMetadata::default(agent_id.clone(), account_id, environment_id), + make_agent_metadata(agent_id.clone(), account_id, environment_id), default_last_known_status(), default_execution_status(AgentMode::Durable), ) @@ -1670,7 +1692,7 @@ async fn write_after_archive_impl(use_blob: bool, reopen: Reopen) { .open( &owned_agent_id, None, - AgentMetadata::default(agent_id.clone(), account_id, environment_id), + make_agent_metadata(agent_id.clone(), account_id, environment_id), default_last_known_status(), default_execution_status(AgentMode::Durable), ) @@ -1698,7 +1720,7 @@ async fn write_after_archive_impl(use_blob: bool, reopen: Reopen) { .open( &owned_agent_id, None, - AgentMetadata::default(agent_id.clone(), account_id, environment_id), + make_agent_metadata(agent_id.clone(), account_id, environment_id), default_last_known_status(), default_execution_status(AgentMode::Durable), ) @@ -1733,7 +1755,7 @@ async fn write_after_archive_impl(use_blob: bool, reopen: Reopen) { .open( &owned_agent_id, None, - AgentMetadata::default(agent_id.clone(), account_id, environment_id), + make_agent_metadata(agent_id.clone(), account_id, environment_id), default_last_known_status(), default_execution_status(AgentMode::Durable), ) @@ -1754,7 +1776,7 @@ async fn write_after_archive_impl(use_blob: bool, reopen: Reopen) { .open( &owned_agent_id, None, - AgentMetadata::default(agent_id.clone(), account_id, environment_id), + make_agent_metadata(agent_id.clone(), account_id, environment_id), default_last_known_status(), default_execution_status(AgentMode::Durable), ) @@ -1782,7 +1804,7 @@ async fn write_after_archive_impl(use_blob: bool, reopen: Reopen) { .open( &owned_agent_id, None, - AgentMetadata::default(agent_id.clone(), account_id, environment_id), + make_agent_metadata(agent_id.clone(), account_id, environment_id), default_last_known_status(), default_execution_status(AgentMode::Durable), ) @@ -1930,7 +1952,7 @@ async fn empty_layer_gets_deleted_impl(use_blob: bool) { .open( &owned_agent_id, None, - AgentMetadata::default(agent_id.clone(), account_id, environment_id), + make_agent_metadata(agent_id.clone(), account_id, environment_id), default_last_known_status(), default_execution_status(AgentMode::Durable), ) @@ -1972,7 +1994,7 @@ async fn empty_layer_gets_deleted_impl(use_blob: bool) { .open( &owned_agent_id, None, - AgentMetadata::default(agent_id.clone(), account_id, environment_id), + make_agent_metadata(agent_id.clone(), account_id, environment_id), default_last_known_status(), default_execution_status(AgentMode::Durable), ) @@ -2071,7 +2093,7 @@ async fn scheduled_archive_impl(use_blob: bool) { .open( &owned_agent_id, None, - AgentMetadata::default(agent_id.clone(), account_id, environment_id), + make_agent_metadata(agent_id.clone(), account_id, environment_id), default_last_known_status(), default_execution_status(AgentMode::Durable), ) @@ -2094,7 +2116,7 @@ async fn scheduled_archive_impl(use_blob: bool) { .open( &owned_agent_id, None, - AgentMetadata::default(agent_id.clone(), account_id, environment_id), + make_agent_metadata(agent_id.clone(), account_id, environment_id), default_last_known_status(), default_execution_status(AgentMode::Durable), ) @@ -2123,7 +2145,7 @@ async fn scheduled_archive_impl(use_blob: bool) { .open( &owned_agent_id, None, - AgentMetadata::default(agent_id.clone(), account_id, environment_id), + make_agent_metadata(agent_id.clone(), account_id, environment_id), default_last_known_status(), default_execution_status(AgentMode::Durable), ) @@ -2139,7 +2161,7 @@ async fn scheduled_archive_impl(use_blob: bool) { .open( &owned_agent_id, None, - AgentMetadata::default(agent_id.clone(), account_id, environment_id), + make_agent_metadata(agent_id.clone(), account_id, environment_id), default_last_known_status(), default_execution_status(AgentMode::Durable), ) @@ -2215,6 +2237,7 @@ async fn multilayer_scan_for_component(_tracing: &Tracing) { HashSet::new(), Vec::new(), None, + Uuid::new_v4(), ); let owned_agent_id = OwnedAgentId::new(environment_id, &agent_id); @@ -2222,7 +2245,7 @@ async fn multilayer_scan_for_component(_tracing: &Tracing) { .create( &owned_agent_id, create_entry, - AgentMetadata::default(agent_id.clone(), account_id, environment_id), + make_agent_metadata(agent_id.clone(), account_id, environment_id), default_last_known_status(), default_execution_status(AgentMode::Durable), ) @@ -2359,7 +2382,7 @@ async fn concurrent_get_or_open_does_not_cause_unique_key_violation(_tracing: &T start: OplogIndex::from_u64(0), end: OplogIndex::from_u64(0), }), - AgentMetadata::default(worker_id.clone(), account_id, environment_id), + make_agent_metadata(worker_id.clone(), account_id, environment_id), default_last_known_status(), default_execution_status(AgentMode::Durable), ) @@ -2397,7 +2420,7 @@ async fn concurrent_get_or_open_does_not_cause_unique_key_violation(_tracing: &T .open( &owned_agent_id, None, - AgentMetadata::default(worker_id.clone(), account_id, environment_id), + make_agent_metadata(worker_id.clone(), account_id, environment_id), default_last_known_status(), default_execution_status(AgentMode::Durable), ) diff --git a/golem-worker-executor/src/services/rpc.rs b/golem-worker-executor/src/services/rpc.rs index 85e6c863dd..40e5f3c028 100644 --- a/golem-worker-executor/src/services/rpc.rs +++ b/golem-worker-executor/src/services/rpc.rs @@ -45,7 +45,7 @@ use golem_common::model::invocation_context::InvocationContextStack; use golem_common::model::oplog::types::SerializableRpcError; use golem_common::model::worker::AgentConfigEntryDto; use golem_common::model::{ - AgentId, AgentInvocation, AgentInvocationResult, IdempotencyKey, OwnedAgentId, + AgentFingerprint, AgentId, AgentInvocation, AgentInvocationResult, IdempotencyKey, OwnedAgentId, }; use golem_service_base::error::worker_executor::WorkerExecutorError; @@ -219,7 +219,10 @@ impl From for crate::preview2::golem::agent::host::RpcError { } } -pub trait RpcDemand: Send + Sync {} +pub trait RpcDemand: Send + Sync { + /// The fingerprint of the target worker this demand was established for. + fn fingerprint(&self) -> AgentFingerprint; +} pub struct RemoteInvocationRpc { worker_proxy: Arc, @@ -237,16 +240,24 @@ impl RemoteInvocationRpc { struct LoggingDemand { agent_id: AgentId, + fingerprint: AgentFingerprint, } impl LoggingDemand { - pub fn new(agent_id: AgentId) -> Self { + pub fn new(agent_id: AgentId, fingerprint: AgentFingerprint) -> Self { log::debug!("Initializing RPC connection for worker {agent_id}"); - Self { agent_id } + Self { + agent_id, + fingerprint, + } } } -impl RpcDemand for LoggingDemand {} +impl RpcDemand for LoggingDemand { + fn fingerprint(&self) -> AgentFingerprint { + self.fingerprint + } +} impl Drop for LoggingDemand { fn drop(&mut self) { @@ -269,9 +280,9 @@ impl Rpc for RemoteInvocationRpc { debug!("Ensuring remote target worker exists"); let principal = caller_agent_principal(self_agent_id); - let demand = LoggingDemand::new(owned_agent_id.agent_id()); - self.worker_proxy + let fingerprint = self + .worker_proxy .start( owned_agent_id, self_agent_id, @@ -283,7 +294,10 @@ impl Rpc for RemoteInvocationRpc { ) .await?; - Ok(Box::new(demand)) + Ok(Box::new(LoggingDemand::new( + owned_agent_id.agent_id(), + fingerprint, + ))) } async fn invoke_and_await( @@ -776,7 +790,7 @@ impl Rpc for DirectWorkerInvocationRpc { ) .await?; - let _worker = Worker::get_or_create_running( + let worker = Worker::get_or_create_running( self, owned_agent_id, Some(self_env.to_vec()), @@ -790,8 +804,11 @@ impl Rpc for DirectWorkerInvocationRpc { ) .await?; - let demand = LoggingDemand::new(owned_agent_id.agent_id()); - Ok(Box::new(demand)) + let fingerprint = worker.get_initial_worker_metadata().fingerprint; + Ok(Box::new(LoggingDemand::new( + owned_agent_id.agent_id(), + fingerprint, + ))) } else { self.remote_rpc .create_demand( @@ -950,5 +967,3 @@ impl Rpc for DirectWorkerInvocationRpc { } } } - -impl RpcDemand for () {} diff --git a/golem-worker-executor/src/services/scheduler.rs b/golem-worker-executor/src/services/scheduler.rs index 42f9b5023e..1250e277e5 100644 --- a/golem-worker-executor/src/services/scheduler.rs +++ b/golem-worker-executor/src/services/scheduler.rs @@ -335,20 +335,37 @@ impl SchedulerServiceDefault { account_id: _, owned_agent_id, invocation, + target_worker_fingerprint, } => { - // TODO: We probably need more error handling here and retry the action when we fail to enqueue the invocation. - // We don't really care that it completes here, but it needs to be persisted in the invocation queue. - let result = self - .worker_access - .enqueue_invocation(&owned_agent_id, *invocation) - .await; + // A mismatch means the original worker was deleted and recreated — drop the stale + // invocation silently. + let stale = match self.worker_service.get(&owned_agent_id).await { + Some(meta) => { + meta.initial_worker_metadata.fingerprint != target_worker_fingerprint + } + None => true, + }; - if let Err(e) = result { - error!( + if stale { + info!( agent_id = owned_agent_id.to_string(), - "Failed to invoke worker with scheduled invocation: {e}" + "Dropping stale scheduled invocation: target worker was deleted and recreated" ); - }; + } else { + // TODO: We probably need more error handling here and retry the action when we fail to enqueue the invocation. + // We don't really care that it completes here, but it needs to be persisted in the invocation queue. + let result = self + .worker_access + .enqueue_invocation(&owned_agent_id, *invocation) + .await; + + if let Err(e) = result { + error!( + agent_id = owned_agent_id.to_string(), + "Failed to invoke worker with scheduled invocation: {e}" + ); + } + } } ScheduledAction::Resume { agent_created_by: _, diff --git a/golem-worker-executor/src/services/worker.rs b/golem-worker-executor/src/services/worker.rs index 333f27c467..b4725f61cf 100644 --- a/golem-worker-executor/src/services/worker.rs +++ b/golem-worker-executor/src/services/worker.rs @@ -26,7 +26,7 @@ use async_trait::async_trait; use golem_common::model::agent::{AgentMode, ParsedAgentId}; use golem_common::model::oplog::{OplogEntry, OplogIndex}; use golem_common::model::{ - AgentId, AgentMetadata, AgentStatus, AgentStatusRecord, OwnedAgentId, ShardId, + AgentFingerprint, AgentId, AgentMetadata, AgentStatus, AgentStatusRecord, OwnedAgentId, ShardId, }; use std::sync::Arc; use tracing::debug; @@ -158,6 +158,7 @@ impl WorkerService for DefaultWorkerService { initial_active_plugins, local_agent_config, original_phantom_id, + instance_id, }, )) => { let agent_type_name = ParsedAgentId::parse_agent_type_name(&agent_id.agent_id).ok(); @@ -196,6 +197,7 @@ impl WorkerService for DefaultWorkerService { ..AgentStatusRecord::default() }, original_phantom_id, + fingerprint: AgentFingerprint(instance_id), }; let status_value: Option> = self diff --git a/golem-worker-executor/src/services/worker_fork.rs b/golem-worker-executor/src/services/worker_fork.rs index 11a65137da..16e14db090 100644 --- a/golem-worker-executor/src/services/worker_fork.rs +++ b/golem-worker-executor/src/services/worker_fork.rs @@ -51,8 +51,8 @@ use golem_common::model::oplog::{ DurableFunctionType, HostPayloadPair, HostRequest, HostRequestNoInput, HostResponse, HostResponseGolemApiFork, OplogEntry, OplogIndex, OplogIndexRange, }; +use golem_common::model::{AgentFingerprint, AgentMetadata, Timestamp}; use golem_common::model::{AgentId, IdempotencyKey, OwnedAgentId}; -use golem_common::model::{AgentMetadata, Timestamp}; use golem_common::read_only_lock; use golem_service_base::error::worker_executor::WorkerExecutorError; use std::sync::Arc; @@ -517,6 +517,8 @@ impl DefaultWorkerFork { let initial_source_worker_metadata = source_worker_instance.get_initial_worker_metadata(); + let instance_id = Uuid::new_v4(); + // Use the source worker's `created_by` (the component owner) rather // than `fork_account_id` (the fork caller). This ensures the forked // worker's metadata is consistent with its initial oplog entry (which @@ -533,6 +535,7 @@ impl DefaultWorkerFork { parent: None, last_known_status: initial_source_worker_metadata.last_known_status.clone(), original_phantom_id: initial_source_worker_metadata.original_phantom_id, + fingerprint: AgentFingerprint(instance_id), }; let source_oplog = source_worker_instance.oplog(); @@ -541,7 +544,7 @@ impl DefaultWorkerFork { // Update the oplog initial entry with the new worker let target_initial_oplog_entry = - Self::update_agent_id(initial_oplog_entry, &target_agent_id).ok_or( + Self::update_agent_id(initial_oplog_entry, &target_agent_id, instance_id).ok_or( WorkerExecutorError::unknown("Failed to update worker id in oplog entry"), )?; @@ -653,7 +656,11 @@ impl DefaultWorkerFork { Ok(new_oplog) } - pub fn update_agent_id(entry: OplogEntry, agent_id: &AgentId) -> Option { + pub fn update_agent_id( + entry: OplogEntry, + agent_id: &AgentId, + instance_id: Uuid, + ) -> Option { match entry { OplogEntry::Create { timestamp, @@ -668,6 +675,7 @@ impl DefaultWorkerFork { local_agent_config, agent_id: _, original_phantom_id, + .. } => Some(OplogEntry::Create { timestamp, agent_id: agent_id.clone(), @@ -681,6 +689,7 @@ impl DefaultWorkerFork { initial_active_plugins, local_agent_config, original_phantom_id, + instance_id, }), _ => None, } diff --git a/golem-worker-executor/src/services/worker_proxy.rs b/golem-worker-executor/src/services/worker_proxy.rs index 0ee12fd38f..4822d138de 100644 --- a/golem-worker-executor/src/services/worker_proxy.rs +++ b/golem-worker-executor/src/services/worker_proxy.rs @@ -36,8 +36,8 @@ use golem_common::model::invocation_context::InvocationContextStack; use golem_common::model::oplog::OplogIndex; use golem_common::model::worker::{AgentConfigEntryDto, RevertWorkerTarget}; use golem_common::model::{ - AgentId, AgentInvocationOutput, AgentInvocationResult, IdempotencyKey, InvocationStatus, - OwnedAgentId, PromiseId, + AgentFingerprint, AgentId, AgentInvocationOutput, AgentInvocationResult, IdempotencyKey, + InvocationStatus, OwnedAgentId, PromiseId, }; use golem_service_base::error::worker_executor::WorkerExecutorError; use golem_service_base::grpc::client::GrpcClient; @@ -61,7 +61,7 @@ pub trait WorkerProxy: Send + Sync { caller_account_id: AccountId, config: Vec, principal: Principal, - ) -> Result<(), WorkerProxyError>; + ) -> Result; async fn invoke_agent( &self, @@ -273,7 +273,7 @@ impl WorkerProxy for RemoteWorkerProxy { caller_account_id: AccountId, config: Vec, principal: Principal, - ) -> Result<(), WorkerProxyError> { + ) -> Result { debug!(owned_agent_id=%owned_agent_id, "Starting remote worker"); let auth_ctx = self.get_auth_ctx(caller_account_id); @@ -301,9 +301,16 @@ impl WorkerProxy for RemoteWorkerProxy { .into_inner(); match response.result { - Some(launch_new_worker_response::Result::Success(_)) => Ok(()), + Some(launch_new_worker_response::Result::Success(success)) => { + let instance_id = success.instance_id.ok_or_else(|| { + WorkerProxyError::InternalError(WorkerExecutorError::unknown( + "Missing instance_id in LaunchNewWorker response", + )) + })?; + Ok(AgentFingerprint(instance_id.into())) + } Some(launch_new_worker_response::Result::Error(error)) => match error.error { - Some(agent_error::Error::AlreadyExists(_)) => Ok(()), + Some(agent_error::Error::AlreadyExists(_)) => Ok(AgentFingerprint::new()), _ => Err(error.into()), }, None => Err(WorkerProxyError::InternalError( diff --git a/golem-worker-executor/src/worker/mod.rs b/golem-worker-executor/src/worker/mod.rs index 0f1c9c0ff8..5a4e60e774 100644 --- a/golem-worker-executor/src/worker/mod.rs +++ b/golem-worker-executor/src/worker/mod.rs @@ -62,8 +62,8 @@ use golem_common::model::oplog::{OplogEntry, OplogIndex, UpdateDescription}; use golem_common::model::regions::{DeletedRegionsBuilder, OplogRegion}; use golem_common::model::worker::{AgentConfigEntryDto, RevertWorkerTarget}; use golem_common::model::{ - AgentId, AgentInvocation, AgentInvocationOutput, AgentInvocationResult, AgentMetadata, - AgentStatusRecord, IdempotencyKey, OwnedAgentId, ScheduledAction, Timestamp, + AgentFingerprint, AgentId, AgentInvocation, AgentInvocationOutput, AgentInvocationResult, + AgentMetadata, AgentStatusRecord, IdempotencyKey, OwnedAgentId, ScheduledAction, Timestamp, TimestampedAgentInvocation, }; use golem_common::one_shot::OneShotEvent; @@ -1944,6 +1944,9 @@ impl Worker { // cross-environment RPC the caller may pass its own account/environment, // but the worker must belong to the component's owning account and // environment for correct metric attribution and quota enforcement. + + let instance_id = Uuid::now_v7(); + let initial_worker_metadata = AgentMetadata { agent_id: owned_agent_id.agent_id(), env: worker_env, @@ -1954,6 +1957,7 @@ impl Worker { parent, last_known_status: initial_status.clone(), original_phantom_id: agent_id.as_ref().and_then(|id| id.phantom_id), + fingerprint: AgentFingerprint(instance_id), }; // Alternatively, we could just write the oplog entry and recompute the initial_worker_metadata from it. @@ -1981,6 +1985,7 @@ impl Worker { .map(Into::into) .collect(), initial_worker_metadata.original_phantom_id, + instance_id, ); let initial_status = Arc::new(tokio::sync::RwLock::new(initial_status)); diff --git a/golem-worker-executor/src/worker/status.rs b/golem-worker-executor/src/worker/status.rs index 57cab457c7..fd5895ae89 100644 --- a/golem-worker-executor/src/worker/status.rs +++ b/golem-worker-executor/src/worker/status.rs @@ -1075,6 +1075,7 @@ mod test { use std::collections::{BTreeMap, HashMap, HashSet}; use std::sync::Arc; use test_r::test; + use uuid::Uuid; #[test] async fn empty() { @@ -1707,6 +1708,7 @@ mod test { HashSet::new(), Vec::new(), None, + Uuid::new_v4(), ), expected_status: status.clone(), }], @@ -2685,6 +2687,7 @@ mod test { Default::default(), vec![], None, + Uuid::now_v7(), ), ) } diff --git a/golem-worker-service/src/api/worker.rs b/golem-worker-service/src/api/worker.rs index 72ca837ae0..cec7f5072b 100644 --- a/golem-worker-service/src/api/worker.rs +++ b/golem-worker-service/src/api/worker.rs @@ -117,7 +117,7 @@ impl WorkerApi { .normalize_agent_id_by_latest_version(component_id, &name) .await?; - let component_revision = self + let (component_revision, _created_at) = self .worker_service .create_with_component( &agent_id, diff --git a/golem-worker-service/src/grpcapi/worker.rs b/golem-worker-service/src/grpcapi/worker.rs index be7b3b676c..54601d2793 100644 --- a/golem-worker-service/src/grpcapi/worker.rs +++ b/golem-worker-service/src/grpcapi/worker.rs @@ -32,7 +32,7 @@ use golem_common::model::component::{ComponentId, ComponentRevision}; use golem_common::model::oplog::OplogIndex; use golem_common::model::worker::AgentConfigEntryDto; use golem_common::model::worker::AgentUpdateMode; -use golem_common::model::{AgentId, IdempotencyKey}; +use golem_common::model::{AgentFingerprint, AgentId, IdempotencyKey}; use golem_common::recorded_grpc_api_request; use golem_service_base::grpc::proto_agent_id_string; use golem_service_base::model::auth::AuthCtx; @@ -62,10 +62,11 @@ impl GrpcWorkerService for WorkerGrpcApi { .instrument(record.span.clone()) .await { - Ok((agent_id, component_version)) => record.succeed( + Ok((agent_id, component_version, fingerprint)) => record.succeed( launch_new_worker_response::Result::Success(LaunchNewWorkerSuccessResponse { agent_id: Some(agent_id.into()), component_version: component_version.into(), + instance_id: Some(fingerprint.0.into()), }), ), Err(error) => record.fail( @@ -306,7 +307,7 @@ impl WorkerGrpcApi { async fn launch_new_worker( &self, request: LaunchNewWorkerRequest, - ) -> Result<(AgentId, ComponentRevision), GrpcAgentError> { + ) -> Result<(AgentId, ComponentRevision, AgentFingerprint), GrpcAgentError> { let auth: AuthCtx = request .auth_ctx .ok_or(bad_request_error("auth_ctx not found"))? @@ -330,7 +331,7 @@ impl WorkerGrpcApi { .collect::, _>>() .map_err(|e| bad_request_error(format!("failed converting config: {e}")))?; - let latest_component_revision = self + let (latest_component_revision, fingerprint) = self .worker_service .create( &agent_id, @@ -343,7 +344,7 @@ impl WorkerGrpcApi { ) .await?; - Ok((agent_id, latest_component_revision)) + Ok((agent_id, latest_component_revision, fingerprint)) } async fn complete_promise( diff --git a/golem-worker-service/src/mcp/invoke/test_support.rs b/golem-worker-service/src/mcp/invoke/test_support.rs index 4c64985444..fdacd9872c 100644 --- a/golem-worker-service/src/mcp/invoke/test_support.rs +++ b/golem-worker-service/src/mcp/invoke/test_support.rs @@ -37,7 +37,7 @@ use golem_common::model::diff::Hash; use golem_common::model::environment::EnvironmentId; use golem_common::model::oplog::{OplogCursor, OplogIndex}; use golem_common::model::worker::{AgentConfigEntryDto, AgentMetadataDto, RevertWorkerTarget}; -use golem_common::model::{AgentFilter, AgentId, IdempotencyKey, ScanCursor}; +use golem_common::model::{AgentFilter, AgentFingerprint, AgentId, IdempotencyKey, ScanCursor}; use golem_service_base::clients::registry::{RegistryService, RegistryServiceError}; use golem_service_base::model::auth::{AuthCtx, AuthDetailsForEnvironment, EnvironmentAction}; use golem_service_base::model::component::Component; @@ -332,7 +332,7 @@ impl WorkerClient for RecordingWorkerClient { _: AuthCtx, _: Option, _: Option, - ) -> WorkerResult { + ) -> WorkerResult<(AgentId, AgentFingerprint)> { unimplemented!() } diff --git a/golem-worker-service/src/service/worker/client.rs b/golem-worker-service/src/service/worker/client.rs index 61dde9df20..eef89e77d9 100644 --- a/golem-worker-service/src/service/worker/client.rs +++ b/golem-worker-service/src/service/worker/client.rs @@ -43,7 +43,8 @@ use golem_common::model::worker::AgentConfigEntryDto; use golem_common::model::worker::AgentUpdateMode; use golem_common::model::worker::{AgentMetadataDto, RevertWorkerTarget}; use golem_common::model::{ - AgentFilter, AgentId, AgentStatus, FilterComparator, IdempotencyKey, PromiseId, ScanCursor, + AgentFilter, AgentFingerprint, AgentId, AgentStatus, FilterComparator, IdempotencyKey, + PromiseId, ScanCursor, }; use golem_common::model::{AgentInvocationOutput, AgentInvocationResult, InvocationStatus}; use golem_service_base::error::worker_executor::WorkerExecutorError; @@ -70,7 +71,7 @@ pub trait WorkerClient: Send + Sync { auth_ctx: AuthCtx, invocation_context: Option, principal: Option, - ) -> WorkerResult; + ) -> WorkerResult<(AgentId, AgentFingerprint)>; async fn connect( &self, @@ -423,46 +424,55 @@ impl WorkerClient for WorkerExecutorWorkerClient { auth_ctx: AuthCtx, invocation_context: Option, principal: Option, - ) -> WorkerResult { + ) -> WorkerResult<(AgentId, AgentFingerprint)> { let agent_id_clone = agent_id.clone(); let account_id_clone = account_id; - self.call_worker_executor( - agent_id.clone(), - "create_worker", - move |worker_executor_client| { - let agent_id = agent_id_clone.clone(); - Box::pin( - worker_executor_client.create_worker(CreateWorkerRequest { - agent_id: Some(agent_id.into()), - env: environment_variables.clone(), - config: config - .clone() - .into_iter() - .map(golem_api_grpc::proto::golem::worker::AgentConfigEntryDto::from) - .collect(), - component_owner_account_id: Some(account_id_clone.into()), - environment_id: Some(environment_id.into()), - ignore_already_existing, - auth_ctx: Some(auth_ctx.clone().into()), - principal: principal.clone(), - invocation_context: invocation_context.clone(), - }), - ) - }, - |response| match response.into_inner() { - workerexecutor::v1::CreateWorkerResponse { - result: Some(workerexecutor::v1::create_worker_response::Result::Success(_)), - } => Ok(()), - workerexecutor::v1::CreateWorkerResponse { - result: Some(workerexecutor::v1::create_worker_response::Result::Failure(err)), - } => Err(err.into()), - workerexecutor::v1::CreateWorkerResponse { .. } => Err("Empty response".into()), - }, - WorkerServiceError::InternalCallError, - ) - .await?; + let fingerprint = self + .call_worker_executor( + agent_id.clone(), + "create_worker", + move |worker_executor_client| { + let agent_id = agent_id_clone.clone(); + Box::pin( + worker_executor_client.create_worker(CreateWorkerRequest { + agent_id: Some(agent_id.into()), + env: environment_variables.clone(), + config: config + .clone() + .into_iter() + .map( + golem_api_grpc::proto::golem::worker::AgentConfigEntryDto::from, + ) + .collect(), + component_owner_account_id: Some(account_id_clone.into()), + environment_id: Some(environment_id.into()), + ignore_already_existing, + auth_ctx: Some(auth_ctx.clone().into()), + principal: principal.clone(), + invocation_context: invocation_context.clone(), + }), + ) + }, + |response| match response.into_inner() { + workerexecutor::v1::CreateWorkerResponse { + result: + Some(workerexecutor::v1::create_worker_response::Result::Success( + workerexecutor::v1::CreateWorkerSuccessResponse { + instance_id: Some(u), + }, + )), + } => Ok(AgentFingerprint(u.into())), + workerexecutor::v1::CreateWorkerResponse { + result: + Some(workerexecutor::v1::create_worker_response::Result::Failure(err)), + } => Err(err.into()), + workerexecutor::v1::CreateWorkerResponse { .. } => Err("Empty response".into()), + }, + WorkerServiceError::InternalCallError, + ) + .await?; - Ok(agent_id.clone()) + Ok((agent_id.clone(), fingerprint)) } async fn connect( diff --git a/golem-worker-service/src/service/worker/service.rs b/golem-worker-service/src/service/worker/service.rs index 988277d3b5..6369bcb0af 100644 --- a/golem-worker-service/src/service/worker/service.rs +++ b/golem-worker-service/src/service/worker/service.rs @@ -41,7 +41,7 @@ use golem_common::model::oplog::OplogIndex; use golem_common::model::worker::AgentConfigEntryDto; use golem_common::model::worker::AgentUpdateMode; use golem_common::model::worker::{AgentMetadataDto, RevertWorkerTarget}; -use golem_common::model::{AgentFilter, AgentId, IdempotencyKey, ScanCursor}; +use golem_common::model::{AgentFilter, AgentFingerprint, AgentId, IdempotencyKey, ScanCursor}; use golem_service_base::model::auth::{AuthCtx, EnvironmentAction}; use golem_service_base::model::component::Component; use golem_service_base::model::{ComponentFileSystemNode, GetOplogResponse}; @@ -111,7 +111,7 @@ impl WorkerService { auth_ctx: AuthCtx, invocation_context: Option, principal: Option, - ) -> WorkerResult { + ) -> WorkerResult<(ComponentRevision, AgentFingerprint)> { let component = self .component_service .get_latest_by_id_uncached(agent_id.component_id) @@ -141,7 +141,7 @@ impl WorkerService { auth_ctx: AuthCtx, invocation_context: Option, principal: Option, - ) -> WorkerResult { + ) -> WorkerResult<(ComponentRevision, AgentFingerprint)> { assert!(component.id == agent_id.component_id); let environment_auth_details = self @@ -153,7 +153,8 @@ impl WorkerService { ) .await?; - self.worker_client + let (_, fingerprint) = self + .worker_client .create( agent_id, environment_variables, @@ -167,7 +168,7 @@ impl WorkerService { ) .await?; - Ok(component.revision) + Ok((component.revision, fingerprint)) } pub async fn connect( @@ -850,7 +851,7 @@ impl WorkerService { ) .await?; - let component_revision = self + let (component_revision, _created_at) = self .create_with_component( &agent_id, component, @@ -1082,7 +1083,7 @@ mod tests { use golem_common::model::environment::{EnvironmentId, EnvironmentName}; use golem_common::model::oplog::{OplogCursor, OplogIndex}; use golem_common::model::worker::{AgentConfigEntryDto, AgentMetadataDto, RevertWorkerTarget}; - use golem_common::model::{AgentFilter, AgentId, IdempotencyKey, ScanCursor}; + use golem_common::model::{AgentFilter, AgentFingerprint, AgentId, IdempotencyKey, ScanCursor}; use golem_service_base::clients::registry::{RegistryService, RegistryServiceError}; use golem_service_base::model::auth::{AuthCtx, AuthDetailsForEnvironment, EnvironmentAction}; use golem_service_base::model::component::Component; @@ -1400,12 +1401,12 @@ mod tests { _: AuthCtx, _: Option, _: Option, - ) -> WorkerResult { + ) -> WorkerResult<(AgentId, AgentFingerprint)> { self.created_agent_ids .lock() .unwrap() .push(agent_id.clone()); - Ok(agent_id.clone()) + Ok((agent_id.clone(), AgentFingerprint::new())) } async fn connect( diff --git a/integration-tests/tests/worker.rs b/integration-tests/tests/worker.rs index 07a9cedd99..eb439a35d8 100644 --- a/integration-tests/tests/worker.rs +++ b/integration-tests/tests/worker.rs @@ -1035,6 +1035,76 @@ async fn worker_recreation( Ok(()) } +/// When a worker is deleted and a new worker is created with the same agent ID, any +/// scheduled invocations that targeted the original worker must NOT fire on the new instance. +#[test] +#[tracing::instrument] +#[timeout(120000)] +async fn stale_scheduled_invocation_dropped_after_worker_recreation( + deps: &EnvBasedTestDependencies, + _tracing: &Tracing, +) -> anyhow::Result<()> { + let user = deps.user().await?; + let (_, env) = user.app_and_env().await?; + let component = user + .component(&env.id, "golem_it_agent_rpc_rust_release") + .name("golem-it:agent-rpc-rust") + .store() + .await?; + + // Create the server worker that will be the target of the scheduled invocation. + let server_parsed = agent_id!("ScheduledInvocationServer", "stale-sched-server"); + let server_agent_id = user + .start_agent(&component.id, server_parsed.clone()) + .await?; + + // Create the client worker that will *schedule* an invocation on the server via wasm-rpc. + // test1() schedules `inc_global_by(1)` on the given server 200ms in the future. + let client_parsed = agent_id!("ScheduledInvocationClient", "stale-sched-client"); + user.start_agent(&component.id, client_parsed.clone()) + .await?; + + // Schedule the invocation. At this point `target_worker_fingerprint` is set to the + // server's UUID fingerprint inside the KV store. + user.invoke_and_await_agent( + &component, + &client_parsed, + "test1", + data_value!("stale-sched-server"), + ) + .await?; + + // Delete the original server worker immediately — before the 200ms fires. + user.delete_worker(&server_agent_id).await?; + + // Recreate a brand-new worker with the same agent ID. It gets a fresh UUID fingerprint. + user.start_agent(&component.id, server_parsed.clone()) + .await?; + + // Wait well past the 200ms scheduled time so the scheduler has time to process the entry. + sleep(Duration::from_secs(5)).await; + + // The new worker's counter must still be 0 — the stale invocation must have been dropped. + let result = user + .invoke_and_await_agent( + &component, + &server_parsed, + "get_global_value", + data_value!(), + ) + .await? + .into_return_value() + .expect("Expected a return value"); + + assert_eq!( + result, + Value::U64(0), + "Stale scheduled invocation was unexpectedly delivered to the recreated worker" + ); + + Ok(()) +} + #[test] #[tracing::instrument] #[timeout(600000)] diff --git a/openapi/golem-service.yaml b/openapi/golem-service.yaml index b7a90b378f..2281eae749 100644 --- a/openapi/golem-service.yaml +++ b/openapi/golem-service.yaml @@ -13902,6 +13902,9 @@ components: originalPhantomId: type: string format: uuid + instanceId: + type: string + format: uuid required: - timestamp - agentId @@ -13913,6 +13916,7 @@ components: - initialTotalLinearMemorySize - initialActivePlugins - localAgentConfig + - instanceId r#CreateResourceParams: title: r#CreateResourceParams type: object diff --git a/openapi/golem-worker-service.yaml b/openapi/golem-worker-service.yaml index fbf564aff6..7817ef0900 100644 --- a/openapi/golem-worker-service.yaml +++ b/openapi/golem-worker-service.yaml @@ -5957,6 +5957,7 @@ components: - initialTotalLinearMemorySize - initialActivePlugins - localAgentConfig + - instanceId properties: timestamp: type: string @@ -5995,6 +5996,9 @@ components: originalPhantomId: type: string format: uuid + instanceId: + type: string + format: uuid r#CreateResourceParams: type: object title: r#CreateResourceParams diff --git a/sdks/moonbit/golem_sdk/wit/deps/golem-1.x/golem-oplog.wit b/sdks/moonbit/golem_sdk/wit/deps/golem-1.x/golem-oplog.wit index 18e98d5672..4cc7f5a569 100644 --- a/sdks/moonbit/golem_sdk/wit/deps/golem-1.x/golem-oplog.wit +++ b/sdks/moonbit/golem_sdk/wit/deps/golem-1.x/golem-oplog.wit @@ -106,7 +106,8 @@ interface oplog { initial-total-linear-memory-size: u64, initial-active-plugins: list, local-agent-config: list, - original-phantom-id: option + original-phantom-id: option, + instance-id: uuid } record host-call-parameters { @@ -453,7 +454,8 @@ interface oplog { initial-total-linear-memory-size: u64, initial-active-plugins: list, local-agent-config: list, - original-phantom-id: option + original-phantom-id: option, + instance-id: uuid } record raw-host-call-parameters { diff --git a/sdks/rust/golem-rust/wit/deps/golem-1.x/golem-oplog.wit b/sdks/rust/golem-rust/wit/deps/golem-1.x/golem-oplog.wit index 18e98d5672..4cc7f5a569 100644 --- a/sdks/rust/golem-rust/wit/deps/golem-1.x/golem-oplog.wit +++ b/sdks/rust/golem-rust/wit/deps/golem-1.x/golem-oplog.wit @@ -106,7 +106,8 @@ interface oplog { initial-total-linear-memory-size: u64, initial-active-plugins: list, local-agent-config: list, - original-phantom-id: option + original-phantom-id: option, + instance-id: uuid } record host-call-parameters { @@ -453,7 +454,8 @@ interface oplog { initial-total-linear-memory-size: u64, initial-active-plugins: list, local-agent-config: list, - original-phantom-id: option + original-phantom-id: option, + instance-id: uuid } record raw-host-call-parameters { diff --git a/sdks/scala/wit/deps/golem-1.x/golem-oplog.wit b/sdks/scala/wit/deps/golem-1.x/golem-oplog.wit index 18e98d5672..4cc7f5a569 100644 --- a/sdks/scala/wit/deps/golem-1.x/golem-oplog.wit +++ b/sdks/scala/wit/deps/golem-1.x/golem-oplog.wit @@ -106,7 +106,8 @@ interface oplog { initial-total-linear-memory-size: u64, initial-active-plugins: list, local-agent-config: list, - original-phantom-id: option + original-phantom-id: option, + instance-id: uuid } record host-call-parameters { @@ -453,7 +454,8 @@ interface oplog { initial-total-linear-memory-size: u64, initial-active-plugins: list, local-agent-config: list, - original-phantom-id: option + original-phantom-id: option, + instance-id: uuid } record raw-host-call-parameters { diff --git a/sdks/scala/wit/dts/golem_api_1_5_0_oplog.d.ts b/sdks/scala/wit/dts/golem_api_1_5_0_oplog.d.ts index 8240e4d5f0..cab708861a 100644 --- a/sdks/scala/wit/dts/golem_api_1_5_0_oplog.d.ts +++ b/sdks/scala/wit/dts/golem_api_1_5_0_oplog.d.ts @@ -164,6 +164,7 @@ declare module 'golem:api/oplog@1.5.0' { initialActivePlugins: PluginInstallationDescription[]; localAgentConfig: LocalAgentConfigEntry[]; originalPhantomId?: Uuid; + instanceId: Uuid; }; export type HostCallParameters = { timestamp: Datetime; @@ -537,6 +538,7 @@ declare module 'golem:api/oplog@1.5.0' { initialActivePlugins: EnvironmentPluginGrantId[]; localAgentConfig: RawLocalAgentConfigEntry[]; originalPhantomId?: Uuid; + instanceId: Uuid; }; export type RawHostCallParameters = { timestamp: Datetime; diff --git a/sdks/ts/packages/golem-ts-sdk/types/golem_api_1_5_0_oplog.d.ts b/sdks/ts/packages/golem-ts-sdk/types/golem_api_1_5_0_oplog.d.ts index 8240e4d5f0..cab708861a 100644 --- a/sdks/ts/packages/golem-ts-sdk/types/golem_api_1_5_0_oplog.d.ts +++ b/sdks/ts/packages/golem-ts-sdk/types/golem_api_1_5_0_oplog.d.ts @@ -164,6 +164,7 @@ declare module 'golem:api/oplog@1.5.0' { initialActivePlugins: PluginInstallationDescription[]; localAgentConfig: LocalAgentConfigEntry[]; originalPhantomId?: Uuid; + instanceId: Uuid; }; export type HostCallParameters = { timestamp: Datetime; @@ -537,6 +538,7 @@ declare module 'golem:api/oplog@1.5.0' { initialActivePlugins: EnvironmentPluginGrantId[]; localAgentConfig: RawLocalAgentConfigEntry[]; originalPhantomId?: Uuid; + instanceId: Uuid; }; export type RawHostCallParameters = { timestamp: Datetime; diff --git a/sdks/ts/wit/deps/golem-1.x/golem-oplog.wit b/sdks/ts/wit/deps/golem-1.x/golem-oplog.wit index 18e98d5672..4cc7f5a569 100644 --- a/sdks/ts/wit/deps/golem-1.x/golem-oplog.wit +++ b/sdks/ts/wit/deps/golem-1.x/golem-oplog.wit @@ -106,7 +106,8 @@ interface oplog { initial-total-linear-memory-size: u64, initial-active-plugins: list, local-agent-config: list, - original-phantom-id: option + original-phantom-id: option, + instance-id: uuid } record host-call-parameters { @@ -453,7 +454,8 @@ interface oplog { initial-total-linear-memory-size: u64, initial-active-plugins: list, local-agent-config: list, - original-phantom-id: option + original-phantom-id: option, + instance-id: uuid } record raw-host-call-parameters { diff --git a/wit/deps/golem-1.x/golem-oplog.wit b/wit/deps/golem-1.x/golem-oplog.wit index 18e98d5672..4cc7f5a569 100644 --- a/wit/deps/golem-1.x/golem-oplog.wit +++ b/wit/deps/golem-1.x/golem-oplog.wit @@ -106,7 +106,8 @@ interface oplog { initial-total-linear-memory-size: u64, initial-active-plugins: list, local-agent-config: list, - original-phantom-id: option + original-phantom-id: option, + instance-id: uuid } record host-call-parameters { @@ -453,7 +454,8 @@ interface oplog { initial-total-linear-memory-size: u64, initial-active-plugins: list, local-agent-config: list, - original-phantom-id: option + original-phantom-id: option, + instance-id: uuid } record raw-host-call-parameters {