Skip to content

Commit b4315ff

Browse files
authored
quota and config test flakyness fixes (#3194)
* Add agent secret changes invalidation events * wip * wip * wip * quota service wiring * fmt * fix unit test
1 parent 57d4368 commit b4315ff

23 files changed

Lines changed: 338 additions & 160 deletions

File tree

.scala-build/golem_d5c0a6989e/src_generated/main/sdks/scala/mill/build.scala

Lines changed: 0 additions & 70 deletions
This file was deleted.

golem-api-grpc/proto/golem/registry/v1/registry_service.proto

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -369,6 +369,7 @@ message RegistryInvalidationEvent {
369369
SecuritySchemeChangedEvent security_scheme_changed = 7;
370370
ResourceDefinitionChangedEvent resource_definition_changed = 8;
371371
RetryPolicyChangedEvent retry_policy_changed = 9;
372+
AgentSecretChangedEvent agent_secret_changed = 10;
372373
}
373374
}
374375

@@ -416,3 +417,8 @@ message ResourceDefinitionChangedEvent {
416417
message RetryPolicyChangedEvent {
417418
golem.common.EnvironmentId environment_id = 1;
418419
}
420+
421+
// Sent when an environment agent secret is created, updated, or deleted.
422+
message AgentSecretChangedEvent {
423+
golem.common.EnvironmentId environment_id = 1;
424+
}

golem-common/src/base_model/agent.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,11 @@ pub enum RegistryInvalidationEvent {
201201
resource_definition_id: crate::base_model::quota::ResourceDefinitionId,
202202
resource_name: crate::base_model::quota::ResourceName,
203203
},
204+
/// An agent secret was created, updated, or deleted.
205+
AgentSecretChanged {
206+
event_id: u64,
207+
environment_id: EnvironmentId,
208+
},
204209
}
205210

206211
impl RegistryInvalidationEvent {
@@ -214,6 +219,7 @@ impl RegistryInvalidationEvent {
214219
Self::SecuritySchemeChanged { event_id, .. } => *event_id,
215220
Self::RetryPolicyChanged { event_id, .. } => *event_id,
216221
Self::ResourceDefinitionChanged { event_id, .. } => *event_id,
222+
Self::AgentSecretChanged { event_id, .. } => *event_id,
217223
}
218224
}
219225
}

golem-debugging-service/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ impl Bootstrap<DebugContext> for ServerBootstrap {
9898
fn create_quota_service(
9999
&self,
100100
_shard_manager_client: Arc<dyn golem_service_base::clients::shard_manager::ShardManager>,
101-
_golem_config: &GolemConfig,
101+
_config: &golem_worker_executor::services::golem_config::QuotaServiceConfig,
102102
_shutdown_token: tokio_util::sync::CancellationToken,
103103
) -> Arc<dyn golem_worker_executor::services::quota::QuotaService> {
104104
Arc::new(golem_worker_executor::services::quota::UnlimitedQuotaService)

golem-debugging-service/tests/debug_mode/debug_bootstrap.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ impl Bootstrap<DebugContext> for TestDebuggingServerBootStrap {
8181
fn create_quota_service(
8282
&self,
8383
_shard_manager_client: Arc<dyn golem_service_base::clients::shard_manager::ShardManager>,
84-
_golem_config: &GolemConfig,
84+
_config: &golem_worker_executor::services::golem_config::QuotaServiceConfig,
8585
_shutdown_token: tokio_util::sync::CancellationToken,
8686
) -> Arc<dyn golem_worker_executor::services::quota::QuotaService> {
8787
Arc::new(golem_worker_executor::services::quota::UnlimitedQuotaService)

golem-registry-service/src/bootstrap/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,7 @@ impl Services {
336336
let agent_secret_service = Arc::new(AgentSecretService::new(
337337
repos.agent_secret_repo.clone(),
338338
environment_service.clone(),
339+
registry_change_notifier.clone(),
339340
));
340341

341342
let retry_policy_service = Arc::new(RetryPolicyService::new(

golem-registry-service/src/repo/agent_secret.rs

Lines changed: 35 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,11 @@ use super::model::agent_secrets::{
1616
AgentSecretCreationRecord, AgentSecretExtRevisionRecord, AgentSecretRepoError,
1717
AgentSecretRevisionRecord,
1818
};
19+
use super::registry_change::{RequiresNotificationSignal, RequiresSignalExt};
1920
use crate::repo::model::BindFields;
2021
pub use crate::repo::model::account::AccountRecord;
2122
use crate::repo::model::agent_secrets::AgentSecretRecord;
23+
use crate::repo::registry_change::{DbRegistryChangeRepo, NewRegistryChangeEvent};
2224
use async_trait::async_trait;
2325
use conditional_trait_gen::trait_gen;
2426
use futures::FutureExt;
@@ -35,17 +37,17 @@ pub trait AgentSecretRepo: Send + Sync {
3537
async fn create(
3638
&self,
3739
record: AgentSecretCreationRecord,
38-
) -> Result<AgentSecretExtRevisionRecord, AgentSecretRepoError>;
40+
) -> Result<RequiresNotificationSignal<AgentSecretExtRevisionRecord>, AgentSecretRepoError>;
3941

4042
async fn update(
4143
&self,
4244
revision: AgentSecretRevisionRecord,
43-
) -> Result<AgentSecretExtRevisionRecord, AgentSecretRepoError>;
45+
) -> Result<RequiresNotificationSignal<AgentSecretExtRevisionRecord>, AgentSecretRepoError>;
4446

4547
async fn delete(
4648
&self,
4749
revision: AgentSecretRevisionRecord,
48-
) -> Result<AgentSecretExtRevisionRecord, AgentSecretRepoError>;
50+
) -> Result<RequiresNotificationSignal<AgentSecretExtRevisionRecord>, AgentSecretRepoError>;
4951

5052
async fn get_by_id(
5153
&self,
@@ -83,23 +85,26 @@ impl<Repo: AgentSecretRepo> AgentSecretRepo for LoggedAgentSecretRepo<Repo> {
8385
async fn create(
8486
&self,
8587
record: AgentSecretCreationRecord,
86-
) -> Result<AgentSecretExtRevisionRecord, AgentSecretRepoError> {
88+
) -> Result<RequiresNotificationSignal<AgentSecretExtRevisionRecord>, AgentSecretRepoError>
89+
{
8790
let span = Self::span_environment_id(record.environment_id);
8891
self.repo.create(record).instrument(span).await
8992
}
9093

9194
async fn update(
9295
&self,
9396
revision: AgentSecretRevisionRecord,
94-
) -> Result<AgentSecretExtRevisionRecord, AgentSecretRepoError> {
97+
) -> Result<RequiresNotificationSignal<AgentSecretExtRevisionRecord>, AgentSecretRepoError>
98+
{
9599
let span = Self::span_agent_secret_id(revision.agent_secret_id);
96100
self.repo.update(revision).instrument(span).await
97101
}
98102

99103
async fn delete(
100104
&self,
101105
revision: AgentSecretRevisionRecord,
102-
) -> Result<AgentSecretExtRevisionRecord, AgentSecretRepoError> {
106+
) -> Result<RequiresNotificationSignal<AgentSecretExtRevisionRecord>, AgentSecretRepoError>
107+
{
103108
let span = Self::span_agent_secret_id(revision.agent_secret_id);
104109
self.repo.delete(revision).instrument(span).await
105110
}
@@ -197,6 +202,9 @@ impl DbAgentSecretRepo<PostgresPool> {
197202

198203
let revision = Self::insert_revision(tx, record.revision).await?;
199204

205+
let change_event = NewRegistryChangeEvent::agent_secret_changed(record.environment_id);
206+
DbRegistryChangeRepo::<PostgresPool>::create_change_event_in_tx(tx, &change_event).await?;
207+
200208
Ok(AgentSecretExtRevisionRecord {
201209
environment_id: agent_secret_record.environment_id,
202210
path: agent_secret_record.path,
@@ -227,6 +235,10 @@ impl DbAgentSecretRepo<PostgresPool> {
227235
).await?
228236
.ok_or(AgentSecretRepoError::ConcurrentModification)?;
229237

238+
let change_event =
239+
NewRegistryChangeEvent::agent_secret_changed(agent_secret_record.environment_id);
240+
DbRegistryChangeRepo::<PostgresPool>::create_change_event_in_tx(tx, &change_event).await?;
241+
230242
Ok(AgentSecretExtRevisionRecord {
231243
environment_id: agent_secret_record.environment_id,
232244
path: agent_secret_record.path,
@@ -243,29 +255,34 @@ impl AgentSecretRepo for DbAgentSecretRepo<PostgresPool> {
243255
async fn create(
244256
&self,
245257
record: AgentSecretCreationRecord,
246-
) -> Result<AgentSecretExtRevisionRecord, AgentSecretRepoError> {
258+
) -> Result<RequiresNotificationSignal<AgentSecretExtRevisionRecord>, AgentSecretRepoError>
259+
{
247260
self.db_pool
248261
.with_tx_err(METRICS_SVC_NAME, "create", |tx| {
249262
Self::create_within_transaction(tx, record).boxed()
250263
})
251264
.await
265+
.map(RequiresSignalExt::requires_notification_signal)
252266
}
253267

254268
async fn update(
255269
&self,
256270
revision: AgentSecretRevisionRecord,
257-
) -> Result<AgentSecretExtRevisionRecord, AgentSecretRepoError> {
271+
) -> Result<RequiresNotificationSignal<AgentSecretExtRevisionRecord>, AgentSecretRepoError>
272+
{
258273
self.db_pool
259274
.with_tx_err(METRICS_SVC_NAME, "update", |tx| {
260275
Self::update_within_transaction(tx, revision).boxed()
261276
})
262277
.await
278+
.map(RequiresSignalExt::requires_notification_signal)
263279
}
264280

265281
async fn delete(
266282
&self,
267283
revision: AgentSecretRevisionRecord,
268-
) -> Result<AgentSecretExtRevisionRecord, AgentSecretRepoError> {
284+
) -> Result<RequiresNotificationSignal<AgentSecretExtRevisionRecord>, AgentSecretRepoError>
285+
{
269286
self.db_pool.with_tx_err(METRICS_SVC_NAME, "update", |tx| {
270287
async move {
271288
let revision = Self::insert_revision(tx, revision.clone()).await?;
@@ -285,6 +302,12 @@ impl AgentSecretRepo for DbAgentSecretRepo<PostgresPool> {
285302
).await?
286303
.ok_or(AgentSecretRepoError::ConcurrentModification)?;
287304

305+
let change_event = NewRegistryChangeEvent::agent_secret_changed(
306+
agent_secret_record.environment_id,
307+
);
308+
DbRegistryChangeRepo::<PostgresPool>::create_change_event_in_tx(tx, &change_event)
309+
.await?;
310+
288311
Ok(AgentSecretExtRevisionRecord {
289312
environment_id: agent_secret_record.environment_id,
290313
path: agent_secret_record.path,
@@ -293,7 +316,9 @@ impl AgentSecretRepo for DbAgentSecretRepo<PostgresPool> {
293316
revision
294317
})
295318
}.boxed()
296-
}).await
319+
})
320+
.await
321+
.map(RequiresSignalExt::requires_notification_signal)
297322
}
298323

299324
async fn get_by_id(

golem-registry-service/src/repo/registry_change.rs

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ pub enum RegistryEventType {
6262
SecuritySchemeChanged = 4,
6363
ResourceDefinitionChanged = 5,
6464
RetryPolicyChanged = 6,
65+
AgentSecretChanged = 7,
6566
}
6667

6768
impl TryFrom<i16> for RegistryEventType {
@@ -76,6 +77,7 @@ impl TryFrom<i16> for RegistryEventType {
7677
4 => Ok(RegistryEventType::SecuritySchemeChanged),
7778
5 => Ok(RegistryEventType::ResourceDefinitionChanged),
7879
6 => Ok(RegistryEventType::RetryPolicyChanged),
80+
7 => Ok(RegistryEventType::AgentSecretChanged),
7981
other => Err(RepoError::InternalError(anyhow::anyhow!(
8082
"Unknown registry event type: {other}"
8183
))),
@@ -126,6 +128,10 @@ pub enum RegistryChangeEvent {
126128
resource_definition_id: Uuid,
127129
resource_name: String,
128130
},
131+
AgentSecretChanged {
132+
event_id: ChangeEventId,
133+
environment_id: Uuid,
134+
},
129135
}
130136

131137
impl RegistryChangeEvent {
@@ -138,6 +144,7 @@ impl RegistryChangeEvent {
138144
Self::SecuritySchemeChanged { event_id, .. } => *event_id,
139145
Self::RetryPolicyChanged { event_id, .. } => *event_id,
140146
Self::ResourceDefinitionChanged { event_id, .. } => *event_id,
147+
Self::AgentSecretChanged { event_id, .. } => *event_id,
141148
}
142149
}
143150
}
@@ -270,6 +277,17 @@ impl TryFrom<RegistryChangeEventRow> for RegistryChangeEvent {
270277
resource_name,
271278
})
272279
}
280+
RegistryEventType::AgentSecretChanged => {
281+
let environment_id = row.environment_id.ok_or_else(|| {
282+
RepoError::InternalError(anyhow::anyhow!(
283+
"AgentSecretChanged event missing environment_id"
284+
))
285+
})?;
286+
Ok(RegistryChangeEvent::AgentSecretChanged {
287+
event_id: row.event_id,
288+
environment_id,
289+
})
290+
}
273291
}
274292
}
275293
}
@@ -417,6 +435,20 @@ impl NewRegistryChangeEvent {
417435
resource_name: Some(resource_name),
418436
}
419437
}
438+
439+
pub fn agent_secret_changed(environment_id: Uuid) -> Self {
440+
Self {
441+
event_type: RegistryEventType::AgentSecretChanged,
442+
environment_id: Some(environment_id),
443+
deployment_revision_id: None,
444+
current_deployment_revision_id: None,
445+
account_id: None,
446+
grantee_account_id: None,
447+
domains: Vec::new(),
448+
resource_definition_id: None,
449+
resource_name: None,
450+
}
451+
}
420452
}
421453

422454
/// Database operations for the registry_change_events outbox table.
@@ -835,6 +867,18 @@ mod tests {
835867
resource_definition_id: Some(resource_definition_id),
836868
resource_name: Some("res-name".to_string()),
837869
},
870+
RegistryChangeEventRow {
871+
event_id,
872+
event_type: RegistryEventType::AgentSecretChanged,
873+
environment_id: Some(environment_id),
874+
deployment_revision_id: None,
875+
current_deployment_revision_id: None,
876+
account_id: None,
877+
grantee_account_id: None,
878+
domains: Vec::new(),
879+
resource_definition_id: None,
880+
resource_name: None,
881+
},
838882
];
839883

840884
for row in cases {

0 commit comments

Comments
 (0)