diff --git a/rust_snuba/benches/processors.rs b/rust_snuba/benches/processors.rs index 8f6793224b..18ddc4cf91 100644 --- a/rust_snuba/benches/processors.rs +++ b/rust_snuba/benches/processors.rs @@ -73,6 +73,8 @@ fn create_factory( ConcurrencyConfig::with_runtime(concurrency, RUNTIME.handle().to_owned()); let replacements_concurrency = ConcurrencyConfig::with_runtime(concurrency, RUNTIME.handle().to_owned()); + let late_arrivals_concurrency = + ConcurrencyConfig::with_runtime(concurrency, RUNTIME.handle().to_owned()); let factory = ConsumerStrategyFactoryV2 { storage_config: storage, env_config: EnvConfig::default(), @@ -84,6 +86,7 @@ fn create_factory( clickhouse_concurrency, commitlog_concurrency, replacements_concurrency, + late_arrivals_concurrency, async_inserts: false, python_max_queue_depth: None, use_rust_processor: true, @@ -106,6 +109,7 @@ fn create_factory( use_row_binary: false, blq_producer_config: None, blq_topic: None, + late_arrivals_config: None, }; Box::new(factory) } diff --git a/rust_snuba/src/config.rs b/rust_snuba/src/config.rs index 8754d41272..cfdebf37d9 100644 --- a/rust_snuba/src/config.rs +++ b/rust_snuba/src/config.rs @@ -26,6 +26,12 @@ pub struct ConsumerConfig { pub replacements_topic: Option, pub accepted_outcomes_topic: Option, pub dlq_topic: Option, + /// Optional Kafka topic for messages diverted by an eap-items-style + /// partition-boundary killswitch. Distinct from `dlq_topic`: late + /// arrivals are known-good payloads safe for automated replay, while + /// the DLQ is reserved for genuinely invalid messages. + #[serde(default)] + pub late_arrivals_topic: Option, pub accountant_topic: TopicConfig, pub max_batch_size: usize, pub max_batch_time_ms: u64, diff --git a/rust_snuba/src/consumer.rs b/rust_snuba/src/consumer.rs index ff3e33b52d..1ac4cf5a6a 100644 --- a/rust_snuba/src/consumer.rs +++ b/rust_snuba/src/consumer.rs @@ -250,6 +250,17 @@ pub fn consumer_impl( None }; + let late_arrivals_config = if let Some(topic_config) = consumer_config.late_arrivals_topic { + let producer_config = + KafkaConfig::new_producer_config(vec![], Some(topic_config.broker_config)); + Some(( + producer_config, + Topic::new(&topic_config.physical_topic_name), + )) + } else { + None + }; + let topic = Topic::new(&consumer_config.raw_topic.physical_topic_name); let mut rebalance_delay_secs = consumer_config @@ -274,6 +285,7 @@ pub fn consumer_impl( clickhouse_concurrency: ConcurrencyConfig::new(clickhouse_concurrency), commitlog_concurrency: ConcurrencyConfig::new(2), replacements_concurrency: ConcurrencyConfig::new(4), + late_arrivals_concurrency: ConcurrencyConfig::new(10), async_inserts, python_max_queue_depth, use_rust_processor, @@ -291,6 +303,7 @@ pub fn consumer_impl( use_row_binary, blq_producer_config: blq_producer_config.clone(), blq_topic: dlq_topic, + late_arrivals_config, }; let processor = StreamProcessor::with_kafka(config, factory, topic, dlq_policy); @@ -381,5 +394,23 @@ pub fn process_message( } } } + processors::ProcessingFunctionType::ProcessingFunctionWithLateArrivals(f) => { + let res = f(payload, meta, &config::ProcessorConfig::default()) + .map_err(|e| SnubaRustError::new_err(format!("invalid message: {e:?}")))?; + + match res { + crate::types::InsertOrLateArrival::Insert(r) => { + let payload = PyBytes::new(py, &r.rows.into_encoded_rows()).into(); + Ok((Some(payload), None)) + } + crate::types::InsertOrLateArrival::LateArrival(_) => { + // This Python helper is for single-message decoding (e.g., + // by the admin/replay tooling). A late-arrival outcome + // isn't insertable; surface it as an empty insert so the + // caller doesn't try to consume rows. + Ok((None, None)) + } + } + } } } diff --git a/rust_snuba/src/factory_v2.rs b/rust_snuba/src/factory_v2.rs index 457924e37f..3ef9b40b43 100644 --- a/rust_snuba/src/factory_v2.rs +++ b/rust_snuba/src/factory_v2.rs @@ -31,13 +31,15 @@ use crate::strategies::clickhouse::writer_v2::ClickhouseWriterStep; use crate::strategies::commit_log::ProduceCommitLog; use crate::strategies::healthcheck::HealthCheck as SnubaHealthCheck; use crate::strategies::join_timeout::SetJoinTimeout; +use crate::strategies::late_arrivals::ProduceLateArrivals; use crate::strategies::processor::{ get_schema, make_rust_processor, make_rust_processor_row_binary, - make_rust_processor_with_replacements, validate_schema, + make_rust_processor_with_late_arrivals, make_rust_processor_with_replacements, validate_schema, + RowBinaryProcessorFn, }; use crate::strategies::python::PythonTransformStep; use crate::strategies::replacements::ProduceReplacements; -use crate::types::{BytesInsertBatch, CogsData, RowData, TypedInsertBatch}; +use crate::types::{BytesInsertBatch, CogsData, InsertOrLateArrival, RowData}; pub struct ConsumerStrategyFactoryV2 { pub storage_config: config::StorageConfig, @@ -50,6 +52,7 @@ pub struct ConsumerStrategyFactoryV2 { pub clickhouse_concurrency: ConcurrencyConfig, pub commitlog_concurrency: ConcurrencyConfig, pub replacements_concurrency: ConcurrencyConfig, + pub late_arrivals_concurrency: ConcurrencyConfig, pub async_inserts: bool, pub python_max_queue_depth: Option, pub use_rust_processor: bool, @@ -67,6 +70,11 @@ pub struct ConsumerStrategyFactoryV2 { pub use_row_binary: bool, pub blq_producer_config: Option, pub blq_topic: Option, + /// Optional dedicated topic for messages diverted by the eap-items + /// partition-boundary killswitch. When set, late-arriving messages + /// are produced here instead of going through the regular DLQ. When + /// unset, the killswitch silently drops them. + pub late_arrivals_config: Option<(KafkaConfig, Topic)>, } impl ProcessingStrategyFactory for ConsumerStrategyFactoryV2 { @@ -236,12 +244,60 @@ impl ProcessingStrategyFactory for ConsumerStrategyFactoryV2 { self.stop_at_timestamp, ) } + ( + true, + Some(processors::ProcessingFunctionType::ProcessingFunctionWithLateArrivals(func)), + ) => { + let late_arrivals_step: Box< + dyn ProcessingStrategy>>, + > = match self.late_arrivals_config.clone() { + Some((producer_config, destination)) => { + tracing::info!( + "Routing late arrivals to dedicated topic {:?}", + destination, + ); + let producer = KafkaProducer::new(producer_config); + Box::new(ProduceLateArrivals::new( + next_step, + producer, + destination, + &self.late_arrivals_concurrency, + false, + )) + } + None => { + tracing::info!( + "No late_arrivals_topic configured; late arrivals will be dropped", + ); + Box::new(ProduceLateArrivals::disabled(next_step)) + } + }; + + make_rust_processor_with_late_arrivals( + late_arrivals_step, + func, + &self.logical_topic_name, + self.enforce_schema, + &self.processing_concurrency, + config::ProcessorConfig { + env_config: self.env_config.clone(), + storage_name: self.storage_config.name.clone(), + }, + self.stop_at_timestamp, + ) + } ( false, Some(processors::ProcessingFunctionType::ProcessingFunctionWithReplacements(_)), ) => { panic!("Consumer with replacements cannot be run in hybrid-mode"); } + ( + false, + Some(processors::ProcessingFunctionType::ProcessingFunctionWithLateArrivals(_)), + ) => { + panic!("Consumer with late arrivals cannot be run in hybrid-mode"); + } _ => { let schema = get_schema(&self.logical_topic_name, self.enforce_schema); @@ -319,11 +375,7 @@ impl ConsumerStrategyFactoryV2 { + 'static, >( &self, - func: fn( - KafkaPayload, - crate::types::KafkaMessageMetadata, - &config::ProcessorConfig, - ) -> anyhow::Result>, + func: RowBinaryProcessorFn, ) -> Box> { // Commit offsets let next_step = CommitOffsets::new(Duration::from_secs(1)); @@ -400,6 +452,30 @@ impl ConsumerStrategyFactoryV2 { ) .flush_empty_batches(true); + let next_step: Box>>>> = + match self.late_arrivals_config.clone() { + Some((producer_config, destination)) => { + tracing::info!( + "Routing late arrivals (row-binary) to dedicated topic {:?}", + destination, + ); + let producer = KafkaProducer::new(producer_config); + Box::new(ProduceLateArrivals::new( + next_step, + producer, + destination, + &self.late_arrivals_concurrency, + false, + )) + } + None => { + tracing::info!( + "No late_arrivals_topic configured (row-binary); late arrivals will be dropped", + ); + Box::new(ProduceLateArrivals::disabled(next_step)) + } + }; + let next_step = make_rust_processor_row_binary( next_step, func, diff --git a/rust_snuba/src/processors/eap_items.rs b/rust_snuba/src/processors/eap_items.rs index 8a696420ff..cb74891505 100644 --- a/rust_snuba/src/processors/eap_items.rs +++ b/rust_snuba/src/processors/eap_items.rs @@ -21,23 +21,26 @@ use crate::processors::utils::{ use crate::runtime_config::get_str_config; use crate::types::CogsData; use crate::types::{ - item_type_name, InsertBatch, ItemTypeMetrics, KafkaMessageMetadata, TypedInsertBatch, + item_type_name, InsertBatch, InsertOrLateArrival, ItemTypeMetrics, KafkaMessageMetadata, + TypedInsertBatch, }; /// Runtime config key prefix. Per-storage key -/// `eap_items_dlq_grace_period_min:`: a non-negative integer -/// in minutes. When set, the eap-items processor DLQs any message whose -/// event `timestamp` belongs to a prior weekly partition, but only once -/// we are at least `grace_min` minutes past the most recent Monday 00:00 -/// UTC partition boundary. Unset disables the killswitch for that -/// storage (this is how `eap_items_dlq_replay` opts out). +/// `eap_items_late_arrival_grace_period_min:`: a non-negative +/// integer in minutes. When set, the eap-items processor routes any message +/// whose event `timestamp` belongs to a prior weekly partition to the +/// dedicated late-arrivals topic (not the DLQ), but only once we are at +/// least `grace_min` minutes past the most recent Monday 00:00 UTC +/// partition boundary. Unset disables the killswitch for that storage +/// (this is how the replay consumer opts out). /// /// The eap_items table is partitioned by /// `(retention_days, toMonday(timestamp))`. Writes that straddle the -/// weekly partition boundary tax ClickHouse; dropping prior-partition -/// messages after a grace window prevents that pressure without -/// over-eagerly dropping current-week stragglers. -const DLQ_GRACE_PERIOD_MIN_KEY: &str = "eap_items_dlq_grace_period_min"; +/// weekly partition boundary tax ClickHouse; diverting prior-partition +/// messages to a side topic after a grace window prevents that pressure +/// without over-eagerly dropping current-week stragglers, and keeps the +/// real DLQ reserved for messages that are genuinely invalid. +const LATE_ARRIVAL_GRACE_PERIOD_MIN_KEY: &str = "eap_items_late_arrival_grace_period_min"; /// Precision factor for sampling_factor calculations to compensate for floating point errors const SAMPLING_FACTOR_PRECISION: f64 = 1e9; @@ -52,8 +55,13 @@ struct ProcessedItem { should_skip: bool, } -fn process_eap_item(msg: KafkaPayload, config: &ProcessorConfig) -> anyhow::Result { - let payload: &[u8] = msg.payload().context("Expected payload")?; +/// Returns `Ok(None)` when the message should be diverted to the +/// late-arrivals topic. Returns `Ok(Some(_))` for every other path, +/// including the silent-drop `should_skip` path. +fn process_eap_item( + payload: &[u8], + config: &ProcessorConfig, +) -> anyhow::Result> { let trace_item = TraceItem::decode(payload)?; let origin_timestamp = trace_item .received @@ -77,15 +85,13 @@ fn process_eap_item(msg: KafkaPayload, config: &ProcessorConfig) -> anyhow::Resu record_invalid_timestamp_metric("eap_items.messages", is_future, item_type); should_skip = true; } - // only DLQ messages that we don't want to drop (when should_skip=false) + // only route to the late-arrivals topic if we don't already plan to drop if !should_skip { - if let Some(grace_min) = get_dlq_grace_period_min(&config.storage_name) { - if should_dlq_for_prior_partition(event_ts, now, grace_min) { + if let Some(grace_min) = get_late_arrival_grace_period_min(&config.storage_name) { + if should_route_late_arrival(event_ts, now, grace_min) { let item_type_str = item_type_name(item_type); - counter!("eap_items.messages.dlqed_prior_partition", 1, "item_type" => item_type_str); - anyhow::bail!( - "eap-items message DLQed: event timestamp {event_ts} is before the prior weekly partition boundary; routed to DLQ" - ); + counter!("eap_items.messages.routed_late_arrival", 1, "item_type" => item_type_str); + return Ok(None); } } } @@ -142,24 +148,26 @@ fn process_eap_item(msg: KafkaPayload, config: &ProcessorConfig) -> anyhow::Resu data: BTreeMap::from([(app_feature, payload.len() as u64)]), }; - Ok(ProcessedItem { + Ok(Some(ProcessedItem { eap_item, origin_timestamp, item_type_metrics, cogs_data, should_skip, - }) + })) } -fn get_dlq_grace_period_min(storage_name: &str) -> Option { +fn get_late_arrival_grace_period_min(storage_name: &str) -> Option { if storage_name.is_empty() { return None; } - get_str_config(&format!("{DLQ_GRACE_PERIOD_MIN_KEY}:{storage_name}")) - .ok() - .flatten() - .and_then(|s| s.parse::().ok()) - .filter(|&n| n >= 0) + get_str_config(&format!( + "{LATE_ARRIVAL_GRACE_PERIOD_MIN_KEY}:{storage_name}" + )) + .ok() + .flatten() + .and_then(|s| s.parse::().ok()) + .filter(|&n| n >= 0) } /// Most recent Monday 00:00 UTC at or before `now` — the active weekly @@ -170,14 +178,11 @@ fn prior_partition_boundary(now: DateTime) -> DateTime { monday.and_time(chrono::NaiveTime::MIN).and_utc() } -/// True when we should DLQ this message because it belongs to the prior -/// week's partition and we are far enough past the current partition -/// boundary that prior-week stragglers should no longer be admitted. -fn should_dlq_for_prior_partition( - event_ts: DateTime, - now: DateTime, - grace_min: i64, -) -> bool { +/// True when we should route this message to the late-arrivals topic +/// because it belongs to the prior week's partition and we are far +/// enough past the current partition boundary that prior-week +/// stragglers should no longer be admitted to ClickHouse directly. +fn should_route_late_arrival(event_ts: DateTime, now: DateTime, grace_min: i64) -> bool { let boundary = prior_partition_boundary(now); let past_min = now.signed_duration_since(boundary).num_minutes(); past_min >= grace_min && event_ts < boundary @@ -187,25 +192,36 @@ pub fn process_message( msg: KafkaPayload, _metadata: KafkaMessageMetadata, config: &ProcessorConfig, -) -> anyhow::Result { - let processed = process_eap_item(msg, config)?; +) -> anyhow::Result> { + let payload: &[u8] = msg.payload().context("Expected payload")?; + let processed = match process_eap_item(payload, config)? { + Some(p) => p, + None => return Ok(InsertOrLateArrival::LateArrival(msg)), + }; if processed.should_skip { - return Ok(InsertBatch::skip()); + return Ok(InsertOrLateArrival::Insert(InsertBatch::skip())); } let mut batch = InsertBatch::from_rows([processed.eap_item], processed.origin_timestamp)?; batch.item_type_metrics = Some(processed.item_type_metrics); batch.cogs_data = Some(processed.cogs_data); - Ok(batch) + Ok(InsertOrLateArrival::Insert(batch)) } pub fn process_message_row_binary( msg: KafkaPayload, _metadata: KafkaMessageMetadata, config: &ProcessorConfig, -) -> anyhow::Result> { - let processed = process_eap_item(msg, config)?; +) -> anyhow::Result>> { + let payload: &[u8] = msg.payload().context("Expected payload")?; + let processed = match process_eap_item(payload, config)? { + Some(p) => p, + None => return Ok(InsertOrLateArrival::LateArrival(msg)), + }; if processed.should_skip { - return Ok(TypedInsertBatch::from_rows(vec![], None)); + return Ok(InsertOrLateArrival::Insert(TypedInsertBatch::from_rows( + vec![], + None, + ))); } let mut batch = TypedInsertBatch::from_rows( vec![EAPItemRow::try_from(processed.eap_item)?], @@ -213,7 +229,7 @@ pub fn process_message_row_binary( ); batch.item_type_metrics = Some(processed.item_type_metrics); batch.cogs_data = Some(processed.cogs_data); - Ok(batch) + Ok(InsertOrLateArrival::Insert(batch)) } #[derive(Debug, Default, Serialize)] @@ -570,7 +586,8 @@ mod tests { timestamp: DateTime::from(SystemTime::now()), }; let batch = process_message(payload, meta, &ProcessorConfig::default()) - .expect("The message should be processed"); + .expect("The message should be processed") + .unwrap_insert(); #[derive(Deserialize)] pub struct Item { @@ -601,7 +618,8 @@ mod tests { timestamp: DateTime::from(SystemTime::now()), }; let batch = process_message(payload, meta, &ProcessorConfig::default()) - .expect("The message should be processed"); + .expect("The message should be processed") + .unwrap_insert(); #[derive(Deserialize)] pub struct Item { @@ -633,7 +651,8 @@ mod tests { }; let batch = process_message(payload, meta, &ProcessorConfig::default()) - .expect("The message should be processed"); + .expect("The message should be processed") + .unwrap_insert(); // Verify that item_type_metrics is populated assert!(batch.item_type_metrics.is_some()); @@ -670,7 +689,8 @@ mod tests { }; let batch_1 = process_message(payload_1, meta_1, &ProcessorConfig::default()) - .expect("The message should be processed"); + .expect("The message should be processed") + .unwrap_insert(); // Process second message with item type Log let item_id_2 = Uuid::new_v4(); @@ -688,7 +708,8 @@ mod tests { }; let batch_2 = process_message(payload_2, meta_2, &ProcessorConfig::default()) - .expect("The message should be processed"); + .expect("The message should be processed") + .unwrap_insert(); // Merge the metrics as would happen in the pipeline let mut merged_metrics = batch_1.item_type_metrics.unwrap(); @@ -746,7 +767,8 @@ mod tests { timestamp: DateTime::from(SystemTime::now()), }; let batch = process_message(payload, meta, &ProcessorConfig::default()) - .expect("The message should be processed when received is None"); + .expect("The message should be processed when received is None") + .unwrap_insert(); assert!(batch.origin_timestamp.is_none()); } @@ -773,7 +795,8 @@ mod tests { meta.clone(), &ProcessorConfig::default(), ) - .expect("The message should be processed"); + .expect("The message should be processed") + .unwrap_insert(); #[derive(Deserialize)] struct Item { @@ -792,7 +815,8 @@ mod tests { meta, &ProcessorConfig::default(), ) - .expect("The message should be processed"); + .expect("The message should be processed") + .unwrap_insert(); let row = &rb_batch.rows[0]; let received_at = row @@ -824,7 +848,8 @@ mod tests { meta.clone(), &ProcessorConfig::default(), ) - .expect("The message should be processed"); + .expect("The message should be processed") + .unwrap_insert(); #[derive(Deserialize)] struct Item { @@ -842,7 +867,8 @@ mod tests { meta, &ProcessorConfig::default(), ) - .expect("The message should be processed"); + .expect("The message should be processed") + .unwrap_insert(); let row = &rb_batch.rows[0]; let has_received_at = row @@ -886,58 +912,137 @@ mod tests { } #[test] - fn test_should_dlq_prior_week_past_grace() { - // 45 min past Monday 00:00 UTC; message timestamp is prior week → DLQ. + fn test_should_route_late_arrival_prior_week_past_grace() { + // 45 min past Monday 00:00 UTC; message timestamp is prior week → route to late-arrivals. let now = ymd_hms(2026, 5, 18, 0, 45, 0); let event_ts = ymd_hms(2026, 5, 17, 23, 30, 0); - assert!(should_dlq_for_prior_partition(event_ts, now, 45)); + assert!(should_route_late_arrival(event_ts, now, 45)); } #[test] - fn test_should_not_dlq_within_grace() { - // 30 min past Monday 00:00 UTC, grace=45 → not DLQ even though prior week. + fn test_should_not_route_within_grace() { + // 30 min past Monday 00:00 UTC, grace=45 → not routed even though prior week. let now = ymd_hms(2026, 5, 18, 0, 30, 0); let event_ts = ymd_hms(2026, 5, 17, 23, 30, 0); - assert!(!should_dlq_for_prior_partition(event_ts, now, 45)); + assert!(!should_route_late_arrival(event_ts, now, 45)); } #[test] - fn test_should_not_dlq_current_week_past_grace() { - // Past grace, but message belongs to current week → never DLQ. + fn test_should_not_route_current_week_past_grace() { + // Past grace, but message belongs to current week → never routed. let now = ymd_hms(2026, 5, 18, 1, 30, 0); let event_ts = ymd_hms(2026, 5, 18, 1, 0, 0); - assert!(!should_dlq_for_prior_partition(event_ts, now, 45)); + assert!(!should_route_late_arrival(event_ts, now, 45)); } #[test] - fn test_should_not_dlq_event_at_boundary() { - // event_ts == boundary belongs to the new week — must not DLQ. + fn test_should_not_route_event_at_boundary() { + // event_ts == boundary belongs to the new week — must not be routed. let now = ymd_hms(2026, 5, 18, 6, 0, 0); let event_ts = ymd_hms(2026, 5, 18, 0, 0, 0); - assert!(!should_dlq_for_prior_partition(event_ts, now, 45)); + assert!(!should_route_late_arrival(event_ts, now, 45)); } #[test] - fn test_should_not_dlq_future_event_ts() { + fn test_should_not_route_future_event_ts() { // Producer clock skew: event_ts > now. Belongs to current week (or future); - // event_ts < boundary is false, so we never DLQ. + // event_ts < boundary is false, so we never route. let now = ymd_hms(2026, 5, 21, 12, 0, 0); let event_ts = ymd_hms(2026, 5, 22, 0, 0, 0); - assert!(!should_dlq_for_prior_partition(event_ts, now, 45)); + assert!(!should_route_late_arrival(event_ts, now, 45)); } #[test] - fn test_should_dlq_grace_zero_dlqs_immediately_at_boundary() { - // grace=0 → DLQ prior-week messages the instant the new week starts. + fn test_should_route_grace_zero_immediately_at_boundary() { + // grace=0 → route prior-week messages the instant the new week starts. let now = ymd_hms(2026, 5, 18, 0, 0, 0); let event_ts = ymd_hms(2026, 5, 17, 23, 59, 59); - assert!(should_dlq_for_prior_partition(event_ts, now, 0)); + assert!(should_route_late_arrival(event_ts, now, 0)); } #[test] - fn test_get_dlq_grace_period_min_unset_storage_returns_none() { + fn test_get_late_arrival_grace_period_min_unset_storage_returns_none() { // Empty storage_name short-circuits to None without hitting Python. - assert_eq!(get_dlq_grace_period_min(""), None); + assert_eq!(get_late_arrival_grace_period_min(""), None); + } + + /// Cover the integration of the killswitch with `process_message`: + /// when the runtime config is armed and the event timestamp is firmly + /// in a prior week, the processor must return the `LateArrival` + /// variant carrying the original Kafka payload rather than an Insert. + #[test] + fn test_process_message_routes_late_arrival_when_armed() { + // grace=0 + event_ts in 2020 → guaranteed prior-week regardless of + // when this test runs. + crate::runtime_config::patch_str_config_for_test( + "eap_items_late_arrival_grace_period_min:test_la_storage_json", + Some("0"), + ); + + let item_id = Uuid::new_v4(); + let mut trace_item = generate_trace_item(item_id); + trace_item.timestamp = Some(Timestamp { + seconds: 1_577_836_800, // 2020-01-01 + nanos: 0, + }); + + let mut payload_bytes = Vec::new(); + trace_item.encode(&mut payload_bytes).unwrap(); + let payload = KafkaPayload::new(None, None, Some(payload_bytes.clone())); + let meta = KafkaMessageMetadata { + partition: 0, + offset: 1, + timestamp: DateTime::from(SystemTime::now()), + }; + let config = ProcessorConfig { + env_config: Default::default(), + storage_name: "test_la_storage_json".to_string(), + }; + + let result = process_message(payload, meta, &config).expect("processing should not error"); + match result { + crate::types::InsertOrLateArrival::LateArrival(p) => { + assert_eq!(p.payload(), Some(&payload_bytes)); + } + crate::types::InsertOrLateArrival::Insert(_) => panic!("expected LateArrival"), + } + } + + #[test] + fn test_process_message_row_binary_routes_late_arrival_when_armed() { + crate::runtime_config::patch_str_config_for_test( + "eap_items_late_arrival_grace_period_min:test_la_storage_rb", + Some("0"), + ); + + let item_id = Uuid::new_v4(); + let mut trace_item = generate_trace_item(item_id); + trace_item.timestamp = Some(Timestamp { + seconds: 1_577_836_800, // 2020-01-01 + nanos: 0, + }); + + let mut payload_bytes = Vec::new(); + trace_item.encode(&mut payload_bytes).unwrap(); + let payload = KafkaPayload::new(None, None, Some(payload_bytes.clone())); + let meta = KafkaMessageMetadata { + partition: 0, + offset: 1, + timestamp: DateTime::from(SystemTime::now()), + }; + let config = ProcessorConfig { + env_config: Default::default(), + storage_name: "test_la_storage_rb".to_string(), + }; + + let result = process_message_row_binary(payload, meta, &config) + .expect("processing should not error"); + match result { + crate::types::InsertOrLateArrival::LateArrival(p) => { + assert_eq!(p.payload(), Some(&payload_bytes)); + } + crate::types::InsertOrLateArrival::Insert(_) => panic!("expected LateArrival"), + } } #[test] @@ -957,7 +1062,8 @@ mod tests { timestamp: DateTime::from(SystemTime::now()), }; let batch = process_message_row_binary(payload, meta, &ProcessorConfig::default()) - .expect("The message should be processed"); + .expect("The message should be processed") + .unwrap_insert(); assert_eq!(batch.rows.len(), 1); let row = &batch.rows[0]; @@ -985,7 +1091,8 @@ mod tests { timestamp: DateTime::from(SystemTime::now()), }; let batch = process_message_row_binary(payload, meta, &ProcessorConfig::default()) - .expect("The message should be processed"); + .expect("The message should be processed") + .unwrap_insert(); let row = &batch.rows[0]; assert_eq!(row.retention_days, row.downsampled_retention_days); @@ -1007,7 +1114,8 @@ mod tests { timestamp: DateTime::from(SystemTime::now()), }; let batch = process_message_row_binary(payload, meta, &ProcessorConfig::default()) - .expect("The message should be processed"); + .expect("The message should be processed") + .unwrap_insert(); let row = &batch.rows[0]; assert_eq!(row.downsampled_retention_days, 365); @@ -1031,7 +1139,8 @@ mod tests { }; let batch = process_message_row_binary(payload, meta, &ProcessorConfig::default()) - .expect("The message should be processed"); + .expect("The message should be processed") + .unwrap_insert(); assert!(batch.item_type_metrics.is_some()); let metrics = batch.item_type_metrics.unwrap(); @@ -1060,7 +1169,8 @@ mod tests { }; let batch = process_message_row_binary(payload, meta, &ProcessorConfig::default()) - .expect("The message should be processed"); + .expect("The message should be processed") + .unwrap_insert(); assert!(batch.cogs_data.is_some()); let cogs = batch.cogs_data.unwrap(); @@ -1106,7 +1216,8 @@ mod tests { }; let batch = process_message_row_binary(payload, meta, &ProcessorConfig::default()) - .expect("The message should be processed"); + .expect("The message should be processed") + .unwrap_insert(); let row = &batch.rows[0]; @@ -1145,7 +1256,8 @@ mod tests { }; let batch = process_message_row_binary(payload, meta, &ProcessorConfig::default()) - .expect("The message should be processed"); + .expect("The message should be processed") + .unwrap_insert(); let row = &batch.rows[0]; assert_eq!(row.sampling_weight, 8); // 1 / (0.5 * 0.25) = 8 @@ -1218,12 +1330,14 @@ mod tests { // Process through JSON path let json_payload = KafkaPayload::new(None, None, Some(payload_bytes.clone())); let json_batch = process_message(json_payload, meta.clone(), &ProcessorConfig::default()) - .expect("JSON path should succeed"); + .expect("JSON path should succeed") + .unwrap_insert(); // Process through RowBinary path let rb_payload = KafkaPayload::new(None, None, Some(payload_bytes)); let rb_batch = process_message_row_binary(rb_payload, meta, &ProcessorConfig::default()) - .expect("RowBinary path should succeed"); + .expect("RowBinary path should succeed") + .unwrap_insert(); // Parse the JSON output let json_str = @@ -1489,7 +1603,8 @@ mod tests { }; let batch = process_message_row_binary(payload, meta, &ProcessorConfig::default()) - .expect("The message should be processed"); + .expect("The message should be processed") + .unwrap_insert(); assert_eq!(batch.rows.len(), 1); let row = batch.rows[0].clone(); diff --git a/rust_snuba/src/processors/mod.rs b/rust_snuba/src/processors/mod.rs index 5c2c5e19a9..64e43c760f 100644 --- a/rust_snuba/src/processors/mod.rs +++ b/rust_snuba/src/processors/mod.rs @@ -13,12 +13,13 @@ mod replays; pub mod utils; use crate::config::ProcessorConfig; -use crate::types::{InsertBatch, InsertOrReplacement, KafkaMessageMetadata}; +use crate::types::{InsertBatch, InsertOrLateArrival, InsertOrReplacement, KafkaMessageMetadata}; use sentry_arroyo::backends::kafka::types::KafkaPayload; pub enum ProcessingFunctionType { ProcessingFunction(ProcessingFunction), ProcessingFunctionWithReplacements(ProcessingFunctionWithReplacements), + ProcessingFunctionWithLateArrivals(ProcessingFunctionWithLateArrivals), } pub type ProcessingFunction = @@ -31,6 +32,13 @@ pub type ProcessingFunctionWithReplacements = config: &ProcessorConfig, ) -> anyhow::Result>; +pub type ProcessingFunctionWithLateArrivals = + fn( + KafkaPayload, + KafkaMessageMetadata, + config: &ProcessorConfig, + ) -> anyhow::Result>; + macro_rules! define_processing_functions { ($(($name:literal, $logical_topic:literal, $function:expr)),* $(,)*) => { // define function via macro so we can assert statically that processor names are unique. @@ -63,7 +71,7 @@ define_processing_functions! { ("PolymorphicMetricsProcessor", "snuba-metrics", ProcessingFunctionType::ProcessingFunction(release_health_metrics::process_metrics_message)), ("ErrorsProcessor", "events", ProcessingFunctionType::ProcessingFunctionWithReplacements(errors::process_message_with_replacement)), ("ProfileChunksProcessor", "snuba-profile-chunks", ProcessingFunctionType::ProcessingFunction(profile_chunks::process_message)), - ("EAPItemsProcessor", "snuba-items", ProcessingFunctionType::ProcessingFunction(eap_items::process_message)), + ("EAPItemsProcessor", "snuba-items", ProcessingFunctionType::ProcessingFunctionWithLateArrivals(eap_items::process_message)), } // COGS is recorded for these processors @@ -208,6 +216,30 @@ mod tests { InsertOrReplacement::Replacement(_replacement) => {} } } + ProcessingFunctionType::ProcessingFunctionWithLateArrivals(processor_fn) => { + let processed = + processor_fn(payload, metadata.clone(), &processor_config).unwrap(); + + match processed { + InsertOrLateArrival::Insert(insert) => { + let encoded_rows = + String::from_utf8(insert.rows.into_encoded_rows()).unwrap(); + let mut snapshot_payload = Vec::new(); + for row in encoded_rows.lines() { + let row_value: serde_json::Value = + serde_json::from_str(row).unwrap(); + snapshot_payload.push(row_value); + } + insta::assert_json_snapshot!(snapshot_payload); + } + InsertOrLateArrival::LateArrival(_) => { + // Schema examples are never expected to trigger + // the killswitch — runtime config is unset in + // tests, so this branch is unreachable. + panic!("unexpected LateArrival for schema example"); + } + } + } } } } diff --git a/rust_snuba/src/strategies/late_arrivals.rs b/rust_snuba/src/strategies/late_arrivals.rs new file mode 100644 index 0000000000..8fa672442c --- /dev/null +++ b/rust_snuba/src/strategies/late_arrivals.rs @@ -0,0 +1,200 @@ +//! # ProduceLateArrivals +//! +//! Fans out messages emitted by a processor whose return type is +//! `InsertOrLateArrival`. `Insert(batch)` is forwarded to the next step +//! (typically the Reduce/ClickHouse path). `LateArrival(payload)` carries the +//! original Kafka payload and is produced to a dedicated side topic — separate +//! from the regular DLQ — so it can be safely auto-replayed later without +//! mixing it with messages that are genuinely invalid. +//! +//! Modeled on `strategies::replacements::ProduceReplacements`. + +use crate::strategies::noop::Noop; +use crate::types::InsertOrLateArrival; +use sentry_arroyo::backends::kafka::types::KafkaPayload; +use sentry_arroyo::backends::Producer; +use sentry_arroyo::processing::strategies::merge_commit_request; +use sentry_arroyo::processing::strategies::produce::Produce; +use sentry_arroyo::processing::strategies::run_task_in_threads::ConcurrencyConfig; +use sentry_arroyo::processing::strategies::{ + CommitRequest, MessageRejected, ProcessingStrategy, StrategyError, SubmitError, +}; +use sentry_arroyo::types::{Message, Topic, TopicOrPartition}; +use std::time::Duration; + +pub struct ProduceLateArrivals { + next_step: Box>, + inner: Box>, + skip_produce: bool, +} + +impl ProduceLateArrivals { + pub fn new( + next_step: N, + producer: impl Producer + 'static, + destination: Topic, + concurrency: &ConcurrencyConfig, + skip_produce: bool, + ) -> Self + where + N: ProcessingStrategy + 'static, + { + let inner: Box> = if skip_produce { + Box::new(Noop {}) + } else { + Box::new(Produce::new( + Noop {}, + producer, + concurrency, + TopicOrPartition::Topic(destination), + )) + }; + + ProduceLateArrivals { + next_step: Box::new(next_step), + inner, + skip_produce, + } + } + + /// Constructor for the case where the storage has not configured a + /// `late_arrivals_topic`. Late arrivals are silently dropped (matching + /// "killswitch disabled when no topic" semantics); inserts forward + /// normally to `next_step`. + pub fn disabled(next_step: N) -> Self + where + N: ProcessingStrategy + 'static, + { + ProduceLateArrivals { + next_step: Box::new(next_step), + inner: Box::new(Noop {}), + skip_produce: true, + } + } +} + +impl ProcessingStrategy> + for ProduceLateArrivals +{ + fn poll(&mut self) -> Result, StrategyError> { + // Late-arrival offsets are not separately committed here — the inner + // Produce strategy is wrapped in a Noop, mirroring ProduceReplacements. + let _ = self.inner.poll(); + self.next_step.poll() + } + + fn submit( + &mut self, + message: Message>, + ) -> Result<(), SubmitError>> { + let payload = message.clone().into_payload(); + + match payload { + InsertOrLateArrival::Insert(insert) => self + .next_step + .submit(message.clone().replace(insert)) + .map_err(|err| match err { + SubmitError::MessageRejected(message_rejected) => { + let payload = message_rejected.message.into_payload(); + SubmitError::MessageRejected(MessageRejected { + message: message.replace(InsertOrLateArrival::Insert(payload)), + }) + } + SubmitError::InvalidMessage(invalid) => SubmitError::InvalidMessage(invalid), + }), + InsertOrLateArrival::LateArrival(kafka_payload) => { + if self.skip_produce { + tracing::info!("Skipping late-arrival message"); + return Ok(()); + } + + self.inner + .submit(message.clone().replace(kafka_payload)) + .map_err(|err| match err { + SubmitError::MessageRejected(message_rejected) => { + let payload = message_rejected.message.into_payload(); + SubmitError::MessageRejected(MessageRejected { + message: message.replace(InsertOrLateArrival::LateArrival(payload)), + }) + } + SubmitError::InvalidMessage(invalid) => { + SubmitError::InvalidMessage(invalid) + } + }) + } + } + } + + fn terminate(&mut self) { + self.inner.terminate(); + self.next_step.terminate(); + } + + fn join(&mut self, timeout: Option) -> Result, StrategyError> { + // Wait for all in-flight produces to complete before joining the next + // step, to avoid committing offsets for a late arrival that hasn't + // actually been written to the side topic yet. + let committable = self.inner.join(None)?; + Ok(merge_commit_request( + committable, + self.next_step.join(timeout)?, + )) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::testutils::{MockProducer, TestStrategy}; + use crate::types::{BytesInsertBatch, InsertOrLateArrival, RowData}; + use chrono::Utc; + use std::collections::BTreeMap; + use std::sync::{Arc, Mutex}; + + #[test] + fn forwards_insert_diverts_late_arrival() { + let next_step = TestStrategy::new(); + let produced_payloads = Arc::new(Mutex::new(vec![])); + let producer = MockProducer { + payloads: produced_payloads.clone(), + }; + let destination = Topic::new("late-arrivals"); + let concurrency = ConcurrencyConfig::new(10); + + let mut strategy: ProduceLateArrivals> = + ProduceLateArrivals::new(next_step, producer, destination, &concurrency, false); + + // Insert path: nothing should be produced to the late-arrivals topic. + strategy + .submit(Message::new_any_message( + InsertOrLateArrival::Insert( + BytesInsertBatch::from_rows(RowData::from_rows(Vec::::new()).unwrap()) + .with_message_timestamp(Utc::now()), + ), + BTreeMap::new(), + )) + .unwrap(); + assert_eq!(produced_payloads.lock().unwrap().len(), 0); + + // LateArrival path: the original payload should land on the late-arrivals topic. + let payload_bytes = b"original-protobuf".to_vec(); + let key_bytes = b"k".to_vec(); + strategy + .submit(Message::new_any_message( + InsertOrLateArrival::LateArrival(KafkaPayload::new( + Some(key_bytes.clone()), + None, + Some(payload_bytes.clone()), + )), + BTreeMap::new(), + )) + .unwrap(); + + strategy.poll().unwrap(); + strategy.join(None).unwrap(); + + let produced = produced_payloads.lock().unwrap(); + assert_eq!(produced.len(), 1); + assert_eq!(produced[0].1.payload(), Some(&payload_bytes)); + } +} diff --git a/rust_snuba/src/strategies/mod.rs b/rust_snuba/src/strategies/mod.rs index 3ac88c5b28..e27384a92e 100644 --- a/rust_snuba/src/strategies/mod.rs +++ b/rust_snuba/src/strategies/mod.rs @@ -5,6 +5,7 @@ pub mod clickhouse; pub mod commit_log; pub mod healthcheck; pub mod join_timeout; +pub mod late_arrivals; pub mod noop; pub mod processor; pub mod python; diff --git a/rust_snuba/src/strategies/processor.rs b/rust_snuba/src/strategies/processor.rs index d3f055912c..46e22cd8ce 100644 --- a/rust_snuba/src/strategies/processor.rs +++ b/rust_snuba/src/strategies/processor.rs @@ -14,13 +14,23 @@ use sentry_arroyo::types::{BrokerMessage, InnerMessage, Message, Partition}; use sentry_kafka_schemas::{Schema, SchemaError, SchemaType}; use crate::config::ProcessorConfig; -use crate::processors::{ProcessingFunction, ProcessingFunctionWithReplacements}; +use crate::processors::{ + ProcessingFunction, ProcessingFunctionWithLateArrivals, ProcessingFunctionWithReplacements, +}; use crate::types::{ BytesInsertBatch, CommitLogEntry, CommitLogOffsets, EstimatedSize, InsertBatch, - InsertOrReplacement, KafkaMessageMetadata, RowData, TypedInsertBatch, + InsertOrLateArrival, InsertOrReplacement, KafkaMessageMetadata, RowData, TypedInsertBatch, }; use tokio::time::Instant; +/// Row-binary processor function pointer that may flag a message as a late +/// arrival rather than an insert. Used by the eap-items row-binary pipeline. +pub type RowBinaryProcessorFn = fn( + KafkaPayload, + KafkaMessageMetadata, + &ProcessorConfig, +) -> anyhow::Result>>; + pub fn make_rust_processor( next_step: impl ProcessingStrategy> + 'static, func: ProcessingFunction, @@ -179,13 +189,92 @@ pub fn make_rust_processor_with_replacements( )) } +pub fn make_rust_processor_with_late_arrivals( + next_step: impl ProcessingStrategy>> + 'static, + func: ProcessingFunctionWithLateArrivals, + schema_name: &str, + enforce_schema: bool, + concurrency: &ConcurrencyConfig, + processor_config: ProcessorConfig, + stop_at_timestamp: Option, +) -> Box> { + let schema = get_schema(schema_name, enforce_schema); + + fn result_to_next_msg( + transformed: InsertOrLateArrival, + partition: Partition, + offset: u64, + timestamp: DateTime, + stop_at_timestamp: Option, + ) -> anyhow::Result>>> { + if let Some(stop) = stop_at_timestamp { + if stop < timestamp.timestamp() { + let payload = BytesInsertBatch::default(); + return Ok(Message::new_broker_message( + InsertOrLateArrival::Insert(payload), + partition, + offset, + timestamp, + )); + } + } + + let payload = match transformed { + InsertOrLateArrival::Insert(transformed) => { + let num_bytes = transformed.rows.encoded_rows.len(); + let mut batch = BytesInsertBatch::from_rows(transformed.rows) + .with_num_bytes(num_bytes) + .with_message_timestamp(timestamp) + .with_commit_log_offsets(CommitLogOffsets(BTreeMap::from([( + partition.index, + CommitLogEntry { + offset: offset + 1, + orig_message_ts: timestamp, + received_p99: transformed.origin_timestamp.into_iter().collect(), + }, + )]))) + .with_cogs_data(transformed.cogs_data.unwrap_or_default()); + + if let Some(ts) = transformed.origin_timestamp { + batch = batch.with_origin_timestamp(ts); + } + if let Some(ts) = transformed.sentry_received_timestamp { + batch = batch.with_sentry_received_timestamp(ts); + } + if let Some(metrics) = transformed.item_type_metrics { + batch = batch.with_item_type_metrics(metrics); + } + + InsertOrLateArrival::Insert(batch) + } + InsertOrLateArrival::LateArrival(p) => InsertOrLateArrival::LateArrival(p), + }; + + Ok(Message::new_broker_message( + payload, partition, offset, timestamp, + )) + } + + let task_runner = MessageProcessor { + schema, + enforce_schema, + func, + result_to_next_msg, + processor_config, + stop_at_timestamp, + }; + + Box::new(RunTaskInThreads::new( + next_step, + task_runner, + concurrency, + Some("process_message"), + )) +} + pub fn make_rust_processor_row_binary( - next_step: impl ProcessingStrategy>> + 'static, - func: fn( - KafkaPayload, - KafkaMessageMetadata, - &ProcessorConfig, - ) -> anyhow::Result>, + next_step: impl ProcessingStrategy>>> + 'static, + func: RowBinaryProcessorFn, schema_name: &str, enforce_schema: bool, concurrency: &ConcurrencyConfig, @@ -195,12 +284,12 @@ pub fn make_rust_processor_row_binary( - transformed: TypedInsertBatch, + transformed: InsertOrLateArrival>, partition: Partition, offset: u64, timestamp: DateTime, stop_at_timestamp: Option, - ) -> anyhow::Result>>> { + ) -> anyhow::Result>>>> { // If a stop timestamp is set (used for backfills / replays), skip // processing messages whose timestamp exceeds the cutoff by returning // an empty batch that still commits the offset. @@ -208,34 +297,44 @@ pub fn make_rust_processor_row_binary { + let num_bytes: usize = transformed.rows.iter().map(|r| r.estimated_size()).sum(); + let mut batch = BytesInsertBatch::from_rows(transformed.rows) + .with_num_bytes(num_bytes) + .with_message_timestamp(timestamp) + .with_commit_log_offsets(CommitLogOffsets(BTreeMap::from([( + partition.index, + CommitLogEntry { + offset: offset + 1, + orig_message_ts: timestamp, + received_p99: transformed.origin_timestamp.into_iter().collect(), + }, + )]))) + .with_cogs_data(transformed.cogs_data.unwrap_or_default()); - if let Some(ts) = transformed.origin_timestamp { - payload = payload.with_origin_timestamp(ts); - } - if let Some(ts) = transformed.sentry_received_timestamp { - payload = payload.with_sentry_received_timestamp(ts); - } - if let Some(metrics) = transformed.item_type_metrics { - payload = payload.with_item_type_metrics(metrics); - } + if let Some(ts) = transformed.origin_timestamp { + batch = batch.with_origin_timestamp(ts); + } + if let Some(ts) = transformed.sentry_received_timestamp { + batch = batch.with_sentry_received_timestamp(ts); + } + if let Some(metrics) = transformed.item_type_metrics { + batch = batch.with_item_type_metrics(metrics); + } + + InsertOrLateArrival::Insert(batch) + } + InsertOrLateArrival::LateArrival(p) => InsertOrLateArrival::LateArrival(p), + }; Ok(Message::new_broker_message( payload, partition, offset, timestamp, diff --git a/rust_snuba/src/types.rs b/rust_snuba/src/types.rs index 38c4893f44..09eed9720f 100644 --- a/rust_snuba/src/types.rs +++ b/rust_snuba/src/types.rs @@ -198,6 +198,29 @@ pub enum InsertOrReplacement { Replacement(ReplacementData), } +/// Output of a processor that may flag a message as a late arrival rather than +/// an insert. `LateArrival` carries the original Kafka payload so a downstream +/// strategy can re-produce it verbatim to a side topic (separate from the DLQ). +#[derive(Clone, Debug)] +pub enum InsertOrLateArrival { + Insert(T), + LateArrival(KafkaPayload), +} + +impl InsertOrLateArrival { + /// Test helper: extract the `Insert` variant or panic. Use in tests that + /// exercise the normal path and treat a `LateArrival` outcome as a bug. + #[cfg(test)] + pub fn unwrap_insert(self) -> T { + match self { + InsertOrLateArrival::Insert(t) => t, + InsertOrLateArrival::LateArrival(_) => { + panic!("expected InsertOrLateArrival::Insert, got LateArrival") + } + } + } +} + /// The return value of message processors. #[derive(Clone, Debug, Default, PartialEq)] pub struct InsertBatch { diff --git a/snuba/consumers/consumer_config.py b/snuba/consumers/consumer_config.py index 115cad1898..5df504940f 100644 --- a/snuba/consumers/consumer_config.py +++ b/snuba/consumers/consumer_config.py @@ -68,6 +68,7 @@ class ConsumerConfig: replacements_topic: Optional[TopicConfig] accepted_outcomes_topic: Optional[TopicConfig] dlq_topic: Optional[TopicConfig] + late_arrivals_topic: Optional[TopicConfig] max_batch_size: int max_batch_time_ms: int max_batch_size_calculation: str @@ -258,6 +259,15 @@ def resolve_consumer_config( slice_id, ) + # Late-arrivals topic does not support override via CLI + late_arrivals_topic_spec = stream_loader.get_late_arrivals_topic_spec() + resolved_late_arrivals_topic = _resolve_topic_config( + "late arrivals topic", + late_arrivals_topic_spec, + None, + slice_id, + ) + accountant_topic = _resolve_topic_config( "accountant topic", KafkaTopicSpec(Topic.COGS_SHARED_RESOURCES_USAGE), @@ -276,6 +286,7 @@ def resolve_consumer_config( replacements_topic=resolved_replacements_topic, accepted_outcomes_topic=resolved_accepted_outcomes_topic, dlq_topic=resolved_dlq_topic, + late_arrivals_topic=resolved_late_arrivals_topic, max_batch_size=max_batch_size, max_batch_time_ms=max_batch_time_ms, max_batch_size_calculation=max_batch_size_calculation, diff --git a/snuba/datasets/configuration/events_analytics_platform/storages/eap_items.yaml b/snuba/datasets/configuration/events_analytics_platform/storages/eap_items.yaml index f521925b85..af733c3cb7 100644 --- a/snuba/datasets/configuration/events_analytics_platform/storages/eap_items.yaml +++ b/snuba/datasets/configuration/events_analytics_platform/storages/eap_items.yaml @@ -168,3 +168,4 @@ stream_loader: subscription_result_topic: subscription-results-eap-items subscription_delay_seconds: 60 dlq_topic: snuba-dead-letter-items + late_arrivals_topic: snuba-eap-items-late-arrivals diff --git a/snuba/datasets/configuration/json_schema.py b/snuba/datasets/configuration/json_schema.py index 88c5063a5b..a470feb6b6 100644 --- a/snuba/datasets/configuration/json_schema.py +++ b/snuba/datasets/configuration/json_schema.py @@ -67,6 +67,10 @@ def string_with_description(description: str) -> dict[str, str]: "type": ["string", "null"], "description": "Name of the DLQ Kafka topic", }, + "late_arrivals_topic": { + "type": ["string", "null"], + "description": "Name of the dedicated Kafka topic for late-arriving messages diverted by a partition-boundary killswitch (distinct from the DLQ; safe for automated replay).", + }, "pre_filter": { "type": "object", "properties": { diff --git a/snuba/datasets/configuration/storage_builder.py b/snuba/datasets/configuration/storage_builder.py index 77cf00071c..342b6c4de8 100644 --- a/snuba/datasets/configuration/storage_builder.py +++ b/snuba/datasets/configuration/storage_builder.py @@ -206,6 +206,7 @@ def build_stream_loader(loader_config: dict[str, Any]) -> KafkaStreamLoader: subscription_delay_seconds = loader_config.get("subscription_delay_seconds") dlq_topic = __get_topic(loader_config, "dlq_topic") + late_arrivals_topic = __get_topic(loader_config, "late_arrivals_topic") return build_kafka_stream_loader_from_settings( processor, @@ -219,6 +220,7 @@ def build_stream_loader(loader_config: dict[str, Any]) -> KafkaStreamLoader: subscription_synchronization_timestamp, subscription_delay_seconds, dlq_topic, + late_arrivals_topic, ) diff --git a/snuba/datasets/table_storage.py b/snuba/datasets/table_storage.py index 28575a930f..1e71334a49 100644 --- a/snuba/datasets/table_storage.py +++ b/snuba/datasets/table_storage.py @@ -2,7 +2,7 @@ from typing import Any, Mapping, Optional, Sequence from arroyo.backends.kafka import KafkaPayload -from confluent_kafka.admin import ( +from confluent_kafka.admin import ( # type: ignore[attr-defined] AdminClient, ConfigResource, ResourceType, @@ -113,6 +113,7 @@ def __init__( subscription_result_topic_spec: Optional[KafkaTopicSpec] = None, subscription_delay_seconds: Optional[int] = None, dlq_topic_spec: Optional[KafkaTopicSpec] = None, + late_arrivals_topic_spec: Optional[KafkaTopicSpec] = None, ) -> None: subscription_values = [ bool(subscription_scheduled_topic_spec), @@ -136,6 +137,7 @@ def __init__( self.__subscription_delay_seconds = subscription_delay_seconds self.__pre_filter = pre_filter self.__dlq_topic_spec = dlq_topic_spec + self.__late_arrivals_topic_spec = late_arrivals_topic_spec def get_processor(self) -> MessageProcessor: return self.__processor @@ -174,6 +176,9 @@ def get_subscription_delay_seconds(self) -> Optional[int]: def get_dlq_topic_spec(self) -> Optional[KafkaTopicSpec]: return self.__dlq_topic_spec + def get_late_arrivals_topic_spec(self) -> Optional[KafkaTopicSpec]: + return self.__late_arrivals_topic_spec + def build_kafka_stream_loader_from_settings( processor: MessageProcessor, @@ -187,6 +192,7 @@ def build_kafka_stream_loader_from_settings( subscription_synchronization_timestamp: Optional[str] = None, subscription_delay_seconds: Optional[int] = None, dlq_topic: Optional[Topic] = None, + late_arrivals_topic: Optional[Topic] = None, ) -> KafkaStreamLoader: default_topic_spec = KafkaTopicSpec(default_topic) @@ -220,6 +226,11 @@ def build_kafka_stream_loader_from_settings( else: dlq_topic_spec = None + if late_arrivals_topic is not None: + late_arrivals_topic_spec: Optional[KafkaTopicSpec] = KafkaTopicSpec(late_arrivals_topic) + else: + late_arrivals_topic_spec = None + return KafkaStreamLoader( processor, default_topic_spec, @@ -232,6 +243,7 @@ def build_kafka_stream_loader_from_settings( subscription_result_topic_spec=subscription_result_topic_spec, subscription_delay_seconds=subscription_delay_seconds, dlq_topic_spec=dlq_topic_spec, + late_arrivals_topic_spec=late_arrivals_topic_spec, ) diff --git a/snuba/utils/streams/topics.py b/snuba/utils/streams/topics.py index d872e44838..2cd1cbb04d 100644 --- a/snuba/utils/streams/topics.py +++ b/snuba/utils/streams/topics.py @@ -68,6 +68,7 @@ class Topic(Enum): SUBSCRIPTION_SCHEDULED_EAP_ITEMS = "scheduled-subscriptions-eap-items" SUBSCRIPTION_RESULTS_EAP_ITEMS = "subscription-results-eap-items" DEAD_LETTER_ITEMS = "snuba-dead-letter-items" + EAP_ITEMS_LATE_ARRIVALS = "snuba-eap-items-late-arrivals" LW_DELETIONS_GENERIC_EVENTS = "snuba-lw-deletions-generic-events" LW_DELETIONS_EAP_ITEMS = "snuba-lw-deletions-eap-items" diff --git a/tests/utils/streams/test_topics.py b/tests/utils/streams/test_topics.py index e348b79be2..ca474d90cd 100644 --- a/tests/utils/streams/test_topics.py +++ b/tests/utils/streams/test_topics.py @@ -18,6 +18,9 @@ def test_valid_topics() -> None: Topic.EAP_MUTATIONS, Topic.OURLOGS, ) + # New topics pending registration in sentry-kafka-schemas. + # Remove the entry from this tuple once a schema has been added. + pending_schema_registration = (Topic.EAP_ITEMS_LATE_ARRIVALS,) - if topic not in deprecated_topics: + if topic not in deprecated_topics and topic not in pending_schema_registration: raise