Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions cli/golem-cli/wit/deps/golem-1.x/golem-oplog.wit
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ interface oplog {
initial-total-linear-memory-size: u64,
initial-active-plugins: list<plugin-installation-description>,
local-agent-config: list<local-agent-config-entry>,
original-phantom-id: option<uuid>
original-phantom-id: option<uuid>,
instance-id: uuid
}

record host-call-parameters {
Expand Down Expand Up @@ -453,7 +454,8 @@ interface oplog {
initial-total-linear-memory-size: u64,
initial-active-plugins: list<environment-plugin-grant-id>,
local-agent-config: list<raw-local-agent-config-entry>,
original-phantom-id: option<uuid>
original-phantom-id: option<uuid>,
instance-id: uuid
}

record raw-host-call-parameters {
Expand Down
1 change: 1 addition & 0 deletions golem-api-grpc/proto/golem/worker/public_oplog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ message CreateParameters {
repeated PluginInstallationDescription initial_active_plugins = 11;
golem.common.EnvironmentId environment_id = 12;
optional golem.common.UUID original_phantom_id = 13;
golem.common.UUID instance_id = 14;
}

message HostCallParameters {
Expand Down
1 change: 1 addition & 0 deletions golem-api-grpc/proto/golem/worker/raw_oplog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ message RawCreateParameters {
repeated golem.component.EnvironmentPluginGrantId initial_active_plugins = 9;
repeated bytes local_agent_config = 10;
optional golem.common.UUID original_phantom_id = 11;
optional golem.common.UUID instance_id = 12;
}

message RawHostCallParameters {
Expand Down
1 change: 1 addition & 0 deletions golem-api-grpc/proto/golem/worker/v1/worker_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ message LaunchNewWorkerResponse {
message LaunchNewWorkerSuccessResponse {
golem.worker.AgentId agentId = 1;
uint64 component_version = 2;
golem.common.UUID instance_id = 3;
}

message CompletePromiseRequest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,13 @@ message CreateWorkerRequest {
optional golem.worker.InvocationContext invocation_context = 9;
}

message CreateWorkerSuccessResponse {
golem.common.UUID instance_id = 1;
}

message CreateWorkerResponse {
oneof result {
golem.common.Empty success = 1;
CreateWorkerSuccessResponse success = 1;
golem.worker.v1.WorkerExecutionError failure = 2;
}
}
Expand Down
33 changes: 33 additions & 0 deletions golem-common/src/base_model/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,39 @@ impl FromValue for Timestamp {
}
}

/// A stable, per-instance fingerprint for a worker, generated as a random UUID at creation time.
/// Globally unique across recreations of the same agent ID.
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[cfg_attr(
feature = "full",
derive(
desert_rust::BinaryCodec,
golem_wasm_derive::IntoValue,
golem_wasm_derive::FromValue,
)
)]
#[cfg_attr(feature = "full", desert(transparent))]
#[serde(transparent)]
pub struct AgentFingerprint(pub Uuid);

impl Default for AgentFingerprint {
fn default() -> Self {
Self::new()
}
}

impl AgentFingerprint {
pub fn new() -> Self {
AgentFingerprint(Uuid::now_v7())
}
}

impl Display for AgentFingerprint {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}

#[cfg(feature = "full")]
impl From<Timestamp> for golem_wasm::wasi::clocks::wall_clock::Datetime {
fn from(value: Timestamp) -> Self {
Expand Down
7 changes: 5 additions & 2 deletions golem-common/src/base_model/oplog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ oplog_entry! {
initial_total_linear_memory_size: u64,
initial_active_plugins: HashSet<EnvironmentPluginGrantId>,
local_agent_config: Vec<UntypedAgentConfigEntry>,
original_phantom_id: Option<Uuid>
original_phantom_id: Option<Uuid>,
/// Per-instance fingerprint, unique across recreations of the same agent ID.
instance_id: Uuid
}
public {
agent_id: AgentId,
Expand All @@ -93,7 +95,8 @@ oplog_entry! {
initial_total_linear_memory_size: u64,
initial_active_plugins: BTreeSet<PluginInstallationDescription>,
local_agent_config: Vec<TypedAgentConfigEntry>,
original_phantom_id: Option<Uuid>
original_phantom_id: Option<Uuid>,
instance_id: Uuid
}
},
/// The agent invoked a host function
Expand Down
4 changes: 4 additions & 0 deletions golem-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// The oplog_entry! macro expands to large enums processed by serde/poem_openapi
// which require an increased recursion limit.
#![recursion_limit = "256"]

use http::Uri;
use shadow_rs::shadow;
use std::convert::Infallible;
Expand Down
29 changes: 9 additions & 20 deletions golem-common/src/model/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,10 +335,17 @@ pub enum ScheduledAction {
},
/// Invoke the given action on the worker. The invocation will only
/// be persisted in the oplog when it's actually getting scheduled.
///
/// `target_worker_fingerprint` guards against stale invocations: when `Some(fp)`, the
/// scheduler compares `fp` against the current worker's `fingerprint` before firing. A
/// mismatch means the original worker was deleted and a new one was created with the same
/// ID — the invocation is silently dropped. `None` means fire unconditionally (used for
/// legacy entries and non-wasm-rpc invocations).
Invoke {
account_id: AccountId,
owned_agent_id: OwnedAgentId,
invocation: Box<AgentInvocation>,
target_worker_fingerprint: AgentFingerprint,
},
/// Resume the agent
Resume {
Expand Down Expand Up @@ -506,8 +513,7 @@ impl Display for ShardAssignment {
}
}

#[derive(Clone, Debug, PartialEq, BinaryCodec)]
#[desert(evolution())]
#[derive(Clone, Debug, PartialEq)]
pub struct AgentMetadata {
pub agent_id: AgentId,
pub env: Vec<(String, String)>,
Expand All @@ -518,27 +524,10 @@ pub struct AgentMetadata {
pub parent: Option<AgentId>,
pub last_known_status: AgentStatusRecord,
pub original_phantom_id: Option<Uuid>,
pub fingerprint: AgentFingerprint,
}

impl AgentMetadata {
pub fn default(
agent_id: AgentId,
created_by: AccountId,
environment_id: EnvironmentId,
) -> AgentMetadata {
AgentMetadata {
agent_id,
env: vec![],
environment_id,
created_by,
config: Vec::new(),
created_at: Timestamp::now_utc(),
parent: None,
last_known_status: AgentStatusRecord::default(),
original_phantom_id: None,
}
}

pub fn owned_agent_id(&self) -> OwnedAgentId {
OwnedAgentId::new(self.environment_id, &self.agent_id)
}
Expand Down
8 changes: 6 additions & 2 deletions golem-common/src/model/oplog/payload/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ use crate::model::oplog::{
};
use crate::model::worker::TypedAgentConfigEntry;
use crate::model::{
AccountId, AgentId, AgentInvocation, AgentMetadata, AgentStatus, IdempotencyKey, OwnedAgentId,
RdbmsPoolKey, ScheduleId, ScheduledAction,
AccountId, AgentFingerprint, AgentId, AgentInvocation, AgentMetadata, AgentStatus,
IdempotencyKey, OwnedAgentId, RdbmsPoolKey, ScheduleId, ScheduledAction,
};
use bigdecimal::BigDecimal;
use bit_vec::BitVec;
Expand Down Expand Up @@ -1355,6 +1355,7 @@ pub struct SerializableScheduledInvocation {
pub trace_id: TraceId,
pub trace_states: Vec<String>,
pub spans: Vec<Vec<PublicSpanData>>,
pub target_worker_fingerprint: AgentFingerprint,
}

impl SerializableScheduledInvocation {
Expand All @@ -1364,6 +1365,7 @@ impl SerializableScheduledInvocation {
account_id,
owned_agent_id,
invocation,
target_worker_fingerprint,
} => match *invocation {
AgentInvocation::AgentMethod {
idempotency_key,
Expand All @@ -1383,6 +1385,7 @@ impl SerializableScheduledInvocation {
spans: encode_span_data(&invocation_context.to_oplog_data()),
trace_id: invocation_context.trace_id,
trace_states: invocation_context.trace_states,
target_worker_fingerprint,
}),
other => Err(format!(
"ScheduleId contains a non-method invocation: {:?}",
Expand Down Expand Up @@ -1414,6 +1417,7 @@ impl SerializableScheduledInvocation {
invocation_context,
principal: self.principal,
}),
target_worker_fingerprint: self.target_worker_fingerprint,
},
}
}
Expand Down
11 changes: 11 additions & 0 deletions golem-common/src/model/oplog/protobuf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,10 @@ impl TryFrom<golem_api_grpc::proto::golem::worker::OplogEntry> for PublicOplogEn
.collect::<Result<Vec<_>, _>>()?,
),
original_phantom_id: create.original_phantom_id.map(|id| id.into()),
instance_id: create
.instance_id
.map(|id| id.into())
.ok_or("Missing instance_id in Create entry")?,
})),
oplog_entry::Entry::HostCall(host_call) => {
Ok(PublicOplogEntry::HostCall(HostCallParams {
Expand Down Expand Up @@ -759,6 +763,7 @@ impl TryFrom<PublicOplogEntry> for golem_api_grpc::proto::golem::worker::OplogEn
.map(Into::into)
.collect(),
original_phantom_id: create.original_phantom_id.map(|id| id.into()),
instance_id: Some(create.instance_id.into()),
},
)),
},
Expand Down Expand Up @@ -2125,6 +2130,7 @@ impl TryFrom<PublicOplogEntry> for OplogEntry {
.map(|p| p.environment_plugin_grant_id)
.collect(),
original_phantom_id: create.original_phantom_id,
instance_id: create.instance_id
}),
PublicOplogEntry::HostCall(host_call) => {
let durable_function_type = match host_call.durable_function_type {
Expand Down Expand Up @@ -2795,6 +2801,7 @@ impl TryFrom<OplogEntry> for golem_api_grpc::proto::golem::worker::RawOplogEntry
initial_active_plugins,
local_agent_config,
original_phantom_id,
instance_id,
..
} => Entry::Create(RawCreateParameters {
agent_id: Some(agent_id.into()),
Expand All @@ -2817,6 +2824,7 @@ impl TryFrom<OplogEntry> for golem_api_grpc::proto::golem::worker::RawOplogEntry
.map(|e| crate::serialization::serialize(&e))
.collect::<Result<Vec<_>, _>>()?,
original_phantom_id: original_phantom_id.map(Into::into),
instance_id: Some(instance_id.into()),
}),
OplogEntry::HostCall {
function_name,
Expand Down Expand Up @@ -3147,6 +3155,8 @@ impl TryFrom<golem_api_grpc::proto::golem::worker::RawOplogEntry> for OplogEntry
let proto_uuid: golem_api_grpc::proto::golem::common::Uuid = u;
uuid::Uuid::from(proto_uuid)
});
let instance_id = p.instance_id.ok_or("Missing instance_id")?.into();

Ok(OplogEntry::Create {
timestamp,
agent_id,
Expand All @@ -3160,6 +3170,7 @@ impl TryFrom<golem_api_grpc::proto::golem::worker::RawOplogEntry> for OplogEntry
initial_active_plugins,
local_agent_config,
original_phantom_id,
instance_id,
})
}
Entry::HostCall(p) => {
Expand Down
1 change: 1 addition & 0 deletions golem-common/src/model/oplog/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ fn create_serialization_poem_serde_equivalence() {
parameters: BTreeMap::new(),
}]),
original_phantom_id: None,
instance_id: Uuid::new_v4(),
});
let serialized = entry.to_json_string();
let deserialized: PublicOplogEntry = serde_json::from_str(&serialized).unwrap();
Expand Down
5 changes: 3 additions & 2 deletions golem-common/src/model/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::model::environment::EnvironmentId;
use crate::model::oplog::{OplogIndex, TimestampedUpdateDescription, UpdateDescription};
use crate::model::worker::TypedAgentConfigEntry;
use crate::model::{
AccountId, AgentFilter, AgentId, AgentInvocation, AgentMetadata, AgentStatus,
AccountId, AgentFilter, AgentFingerprint, AgentId, AgentInvocation, AgentMetadata, AgentStatus,
AgentStatusRecord, ComponentId, FilterComparator, IdempotencyKey, StringFilterComparator,
Timestamp, TimestampedAgentInvocation,
};
Expand All @@ -28,7 +28,7 @@ use serde::{Deserialize, Serialize};
use std::str::FromStr;
use std::vec;
use test_r::test;
use uuid::uuid;
use uuid::{Uuid, uuid};

#[test]
fn timestamp_conversion() {
Expand Down Expand Up @@ -228,6 +228,7 @@ fn worker_filter_matches() {
..AgentStatusRecord::default()
},
original_phantom_id: None,
fingerprint: AgentFingerprint(Uuid::now_v7()),
};

assert!(
Expand Down
6 changes: 4 additions & 2 deletions golem-common/wit/deps/golem-1.x/golem-oplog.wit
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ interface oplog {
initial-total-linear-memory-size: u64,
initial-active-plugins: list<plugin-installation-description>,
local-agent-config: list<local-agent-config-entry>,
original-phantom-id: option<uuid>
original-phantom-id: option<uuid>,
instance-id: uuid
}

record host-call-parameters {
Expand Down Expand Up @@ -453,7 +454,8 @@ interface oplog {
initial-total-linear-memory-size: u64,
initial-active-plugins: list<environment-plugin-grant-id>,
local-agent-config: list<raw-local-agent-config-entry>,
original-phantom-id: option<uuid>
original-phantom-id: option<uuid>,
instance-id: uuid
}

record raw-host-call-parameters {
Expand Down
2 changes: 2 additions & 0 deletions golem-debugging-service/src/debug_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ fn get_oplog_entry_from_public_oplog_entry(
initial_total_linear_memory_size,
initial_active_plugins,
original_phantom_id,
instance_id,
}) => Ok(OplogEntry::Create {
timestamp,
agent_id,
Expand All @@ -260,6 +261,7 @@ fn get_oplog_entry_from_public_oplog_entry(
.map(|x| x.environment_plugin_grant_id)
.collect(),
original_phantom_id,
instance_id,
}),
PublicOplogEntry::HostCall(HostCallParams {
timestamp,
Expand Down
6 changes: 4 additions & 2 deletions golem-debugging-service/tests/services/worker_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ use golem_common::model::agent::{AgentInvocationMode, Principal, UntypedDataValu
use golem_common::model::component::ComponentRevision;
use golem_common::model::invocation_context::InvocationContextStack;
use golem_common::model::worker::{AgentConfigEntryDto, RevertWorkerTarget};
use golem_common::model::{AgentId, IdempotencyKey, InvocationStatus, OwnedAgentId, PromiseId};
use golem_common::model::{
AgentFingerprint, AgentId, IdempotencyKey, InvocationStatus, OwnedAgentId, PromiseId,
};
use golem_service_base::error::worker_executor::WorkerExecutorError;
use golem_service_base::model::auth::{AuthCtx, UserAuthCtx};
use golem_worker_executor::services::worker_proxy::{WorkerProxy, WorkerProxyError};
Expand Down Expand Up @@ -74,7 +76,7 @@ impl WorkerProxy for TestWorkerProxy {
_caller_account_id: AccountId,
_agent_config: Vec<AgentConfigEntryDto>,
_principal: Principal,
) -> Result<(), WorkerProxyError> {
) -> Result<AgentFingerprint, WorkerProxyError> {
Err(WorkerProxyError::InternalError(
WorkerExecutorError::unknown(
"Not implemented in tests as debug service is not expected to call start through proxy",
Expand Down
Loading
Loading