Skip to content

Commit 8564706

Browse files
authored
ref(outcomes): Remove outcomes aggregator, implement forwarding of metrics as client reports (#6107)
1 parent 8e3e880 commit 8564706

11 files changed

Lines changed: 469 additions & 436 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
- Rename objectstore use-case from `profiles_raw` to `profile_attachments`. ([#6108](https://github.com/getsentry/relay/pull/6108))
1919
- Require timestamps and verification in auth signatures. ([#6069](https://github.com/getsentry/relay/pull/6069))
20+
- Internally handle outcomes as metrics. ([#6107](https://github.com/getsentry/relay/pull/6107))
2021
- Have relay generate metric billing outcomes. ([#6066](https://github.com/getsentry/relay/pull/6066))
2122
- Update sentry-conventions to 0.12.0.
2223
- Upgrade release image to Debian 13. ([#6110](https://github.com/getsentry/relay/pull/6110))

relay-config/src/config.rs

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1233,25 +1233,6 @@ pub enum NormalizationLevel {
12331233
Full,
12341234
}
12351235

1236-
/// Configuration values for the outcome aggregator
1237-
#[derive(Serialize, Deserialize, Debug)]
1238-
#[serde(default)]
1239-
pub struct OutcomeAggregatorConfig {
1240-
/// Defines the width of the buckets into which outcomes are aggregated, in seconds.
1241-
pub bucket_interval: u64,
1242-
/// Defines how often all buckets are flushed, in seconds.
1243-
pub flush_interval: u64,
1244-
}
1245-
1246-
impl Default for OutcomeAggregatorConfig {
1247-
fn default() -> Self {
1248-
Self {
1249-
bucket_interval: 60,
1250-
flush_interval: 120,
1251-
}
1252-
}
1253-
}
1254-
12551236
/// Configuration options for objectstore's auth scheme.
12561237
#[derive(Serialize, Deserialize)]
12571238
pub struct ObjectstoreAuthConfig {
@@ -1418,8 +1399,6 @@ pub struct Outcomes {
14181399
/// Defines the source string registered in the outcomes originating from
14191400
/// this Relay (typically something like the region or the layer).
14201401
pub source: Option<String>,
1421-
/// Configures the outcome aggregator.
1422-
pub aggregator: OutcomeAggregatorConfig,
14231402
}
14241403

14251404
impl Default for Outcomes {
@@ -1429,7 +1408,6 @@ impl Default for Outcomes {
14291408
batch_size: 1000,
14301409
batch_interval: 500,
14311410
source: None,
1432-
aggregator: OutcomeAggregatorConfig::default(),
14331411
}
14341412
}
14351413
}
@@ -2163,11 +2141,6 @@ impl Config {
21632141
self.values.outcomes.source.as_deref()
21642142
}
21652143

2166-
/// Returns the width of the buckets into which outcomes are aggregated, in seconds.
2167-
pub fn outcome_aggregator(&self) -> &OutcomeAggregatorConfig {
2168-
&self.values.outcomes.aggregator
2169-
}
2170-
21712144
/// Returns logging configuration.
21722145
pub fn logging(&self) -> &relay_log::LogConfig {
21732146
&self.values.logging

relay-server/src/service.rs

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@ use crate::services::objectstore::Objectstore;
1818
#[cfg(feature = "processing")]
1919
use crate::services::objectstore::ObjectstoreService;
2020
use crate::services::outcome::{
21-
OutcomeAggregator, OutcomeProducer, OutcomeProducerService, TrackOutcome,
21+
ClientReportOutcomeProducerService, NullOutcomeProducerService, OutcomeProducerService,
22+
TrackOutcome,
2223
};
2324
use crate::services::processor::{
2425
self, EnvelopeProcessor, EnvelopeProcessorService, EnvelopeProcessorServicePool,
@@ -39,7 +40,7 @@ use anyhow::Result;
3940
use axum::extract::FromRequestParts;
4041
use axum::http::request::Parts;
4142
use relay_cogs::Cogs;
42-
use relay_config::Config;
43+
use relay_config::{Config, EmitOutcomes, RelayMode};
4344
#[cfg(feature = "processing")]
4445
use relay_config::{RedisConfigRef, RedisConfigsRef};
4546
#[cfg(feature = "processing")]
@@ -73,7 +74,6 @@ pub enum ServiceError {
7374
#[derive(Clone, Debug)]
7475
pub struct Registry {
7576
pub health_check: Addr<HealthCheck>,
76-
pub outcome_producer: Addr<OutcomeProducer>,
7777
pub outcome_aggregator: Addr<TrackOutcome>,
7878
pub processor: Addr<EnvelopeProcessor>,
7979
pub relay_cache: Addr<RelayCache>,
@@ -195,19 +195,25 @@ impl ServiceState {
195195
// Create an address for the `EnvelopeProcessor`, which can be injected into the
196196
// other services.
197197
let (processor, processor_rx) = match config.relay_mode() {
198-
relay_config::RelayMode::Proxy => channel(ProxyProcessorService::name()),
199-
relay_config::RelayMode::Managed => channel(EnvelopeProcessorService::name()),
198+
RelayMode::Proxy => channel(ProxyProcessorService::name()),
199+
RelayMode::Managed => channel(EnvelopeProcessorService::name()),
200200
};
201201

202202
let (aggregator, aggregator_rx) = channel(RouterService::name());
203203

204-
let outcome_producer = services.start(OutcomeProducerService::create(
205-
config.clone(),
206-
processor.clone(),
207-
aggregator.clone(),
208-
)?);
209-
let outcome_aggregator =
210-
services.start(OutcomeAggregator::new(&config, outcome_producer.clone()));
204+
let outcome_aggregator = match config.emit_outcomes() {
205+
EmitOutcomes::None => services.start(NullOutcomeProducerService::new()),
206+
_ => match config.relay_mode() {
207+
RelayMode::Proxy => services.start(ClientReportOutcomeProducerService::new(
208+
&config,
209+
processor.clone(),
210+
)),
211+
RelayMode::Managed => services.start(OutcomeProducerService::new(
212+
Arc::clone(&config),
213+
aggregator.clone(),
214+
)),
215+
},
216+
};
211217

212218
let (global_config, global_config_rx) =
213219
GlobalConfigService::new(config.clone(), upstream_relay.clone());
@@ -267,7 +273,7 @@ impl ServiceState {
267273
);
268274

269275
let (processor_pool, aggregator_handle, autoscaling) = match config.relay_mode() {
270-
relay_config::RelayMode::Proxy => {
276+
RelayMode::Proxy => {
271277
services.start_with(
272278
ProxyProcessorService::new(
273279
config.clone(),
@@ -281,7 +287,7 @@ impl ServiceState {
281287
);
282288
(None, None, None)
283289
}
284-
relay_config::RelayMode::Managed => {
290+
RelayMode::Managed => {
285291
let processor_pool = create_processor_pool(&config)?;
286292

287293
let router = RouterService::new(
@@ -365,7 +371,6 @@ impl ServiceState {
365371
let registry = Registry {
366372
processor,
367373
health_check,
368-
outcome_producer,
369374
outcome_aggregator,
370375
relay_cache,
371376
global_config,
@@ -426,17 +431,12 @@ impl ServiceState {
426431
&self.inner.registry.health_check
427432
}
428433

429-
/// Returns the address of the [`OutcomeProducer`] service.
430-
pub fn outcome_producer(&self) -> &Addr<OutcomeProducer> {
431-
&self.inner.registry.outcome_producer
432-
}
433-
434-
/// Returns the address of the [`OutcomeProducer`] service.
434+
/// Returns the address of the [`UpstreamRelay`] service.
435435
pub fn upstream_relay(&self) -> &Addr<UpstreamRelay> {
436436
&self.inner.registry.upstream_relay
437437
}
438438

439-
/// Returns the address of the [`OutcomeProducer`] service.
439+
/// Returns the address of the [`EnvelopeProcessor`] service.
440440
pub fn processor(&self) -> &Addr<EnvelopeProcessor> {
441441
&self.inner.registry.processor
442442
}
@@ -446,7 +446,7 @@ impl ServiceState {
446446
&self.inner.registry.global_config
447447
}
448448

449-
/// Returns the address of the [`OutcomeProducer`] service.
449+
/// Returns the address of the [`TrackOutcome`] service.
450450
pub fn outcome_aggregator(&self) -> &Addr<TrackOutcome> {
451451
&self.inner.registry.outcome_aggregator
452452
}

relay-server/src/services/outcome/aggregator.rs

Lines changed: 0 additions & 173 deletions
This file was deleted.

0 commit comments

Comments
 (0)