Skip to content

Commit 0423400

Browse files
lym953claude
andcommitted
refactor(logs): replace pending_durable_logs vec with held_logs map keyed by request_id
- pending_durable_logs: Vec<IntakeLog> → held_logs: HashMap<String, Vec<IntakeLog>> Logs are now grouped by request_id, enabling O(1) lookup and targeted draining. - try_update_durable_map now returns bool (true = new entry added). The caller is responsible for draining held_logs when a new entry arrives, so the method stays focused on map maintenance only. - New drain_held_for_request_id: when a request_id's durable context first becomes known, immediately moves all stashed logs for that request_id into ready_logs with the appropriate durable tags, without touching the rest of held_logs. - New resolve_held_logs_on_durable_function_set: called at the exact moment is_durable_function transitions from None to Some(...) — inside PlatformInitStart and the PlatformStart fallback — rather than on every process() call. For Some(false) all held logs are flushed; for Some(true) the held logs are scanned for durable context and eligible ones are moved to ready_logs, ineligible ones remain. - queue_log_after_rules updated: in the None state logs with a request_id go into held_logs[request_id]; in the Some(true) state logs with no durable context yet are also stashed in held_logs[request_id] instead of being dropped. - process() no longer needs to drain pending logs on each call; that is now driven entirely by the state-transition helpers above. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 9cf1e2e commit 0423400

1 file changed

Lines changed: 134 additions & 38 deletions

File tree

bottlecap/src/logs/lambda/processor.rs

Lines changed: 134 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,13 @@ pub struct LambdaProcessor {
4343
// Some(true) = durable function; apply durable ID filtering.
4444
// Some(false) = not a durable function; flush logs normally.
4545
is_durable_function: Option<bool>,
46-
// Logs held while is_durable_function is None (race-condition guard)
47-
pending_durable_logs: Vec<IntakeLog>,
46+
// Logs held pending resolution, keyed by request_id.
47+
// While is_durable_function is None every incoming log is stashed here so
48+
// we can decide whether to filter/tag it once the flag is known.
49+
// While is_durable_function is Some(true), logs whose request_id has no
50+
// durable execution context yet are also stashed here; they are drained
51+
// the moment that context arrives.
52+
held_logs: HashMap<String, Vec<IntakeLog>>,
4853
// Maps request_id -> (durable_execution_id, durable_execution_name)
4954
durable_id_map: HashMap<String, (String, String)>,
5055
// Insertion order for FIFO eviction when map reaches capacity
@@ -116,7 +121,7 @@ impl LambdaProcessor {
116121
event_bus,
117122
is_managed_instance_mode,
118123
is_durable_function: None,
119-
pending_durable_logs: Vec::new(),
124+
held_logs: HashMap::new(),
120125
durable_id_map: HashMap::with_capacity(5),
121126
durable_id_order: VecDeque::with_capacity(5),
122127
}
@@ -202,6 +207,7 @@ impl LambdaProcessor {
202207
let rv_arn = runtime_version_arn.unwrap_or("?".to_string()); // TODO: check what do containers display
203208

204209
self.is_durable_function = Some(rv.contains("DurableFunction"));
210+
self.resolve_held_logs_on_durable_function_set();
205211

206212
Ok(Message::new(
207213
format!("INIT_START Runtime Version: {rv} Runtime Version ARN: {rv_arn}"),
@@ -234,6 +240,7 @@ impl LambdaProcessor {
234240
// processed invocation), treat as non-durable to avoid holding logs forever.
235241
if self.is_durable_function.is_none() {
236242
self.is_durable_function = Some(false);
243+
self.resolve_held_logs_on_durable_function_set();
237244
}
238245

239246
let version = version.unwrap_or("$LATEST".to_string());
@@ -490,12 +497,15 @@ impl LambdaProcessor {
490497
}
491498
}
492499

493-
/// If the message is a JSON object with `durable_execution_id` and `durable_execution_name`
494-
/// fields, inserts a mapping of `request_id` -> `(execution_id, execution_name)` into the
495-
/// durable ID map. Evicts the oldest entry when the map is at capacity (5).
496-
fn try_update_durable_map(&mut self, request_id: &str, message: &str) {
500+
/// Parses `message` as JSON and, if it contains both `durable_execution_id` and
501+
/// `durable_execution_name` fields, inserts or updates the entry for `request_id` in the
502+
/// durable ID map (evicting the oldest entry when the map is at capacity 5).
503+
///
504+
/// Returns `true` if a brand-new entry was added (the caller may then drain `held_logs`
505+
/// for that `request_id`).
506+
fn try_update_durable_map(&mut self, request_id: &str, message: &str) -> bool {
497507
let Ok(serde_json::Value::Object(obj)) = serde_json::from_str(message) else {
498-
return;
508+
return false;
499509
};
500510
let execution_id = obj
501511
.get("durable_execution_id")
@@ -504,7 +514,8 @@ impl LambdaProcessor {
504514
.get("durable_execution_name")
505515
.and_then(serde_json::Value::as_str);
506516
if let (Some(id), Some(name)) = (execution_id, execution_name) {
507-
if !self.durable_id_map.contains_key(request_id) {
517+
let is_new = !self.durable_id_map.contains_key(request_id);
518+
if is_new {
508519
if self.durable_id_order.len() >= 5 {
509520
if let Some(oldest) = self.durable_id_order.pop_front() {
510521
self.durable_id_map.remove(&oldest);
@@ -514,6 +525,78 @@ impl LambdaProcessor {
514525
}
515526
self.durable_id_map
516527
.insert(request_id.to_string(), (id.to_string(), name.to_string()));
528+
return is_new;
529+
}
530+
false
531+
}
532+
533+
/// Moves all logs held for `request_id` into `ready_logs`, tagging each with the
534+
/// durable execution context that is now known for that request_id.
535+
fn drain_held_for_request_id(&mut self, request_id: &str) {
536+
let Some(held) = self.held_logs.remove(request_id) else {
537+
return;
538+
};
539+
let tags_suffix = self
540+
.durable_id_map
541+
.get(request_id)
542+
.map(|(id, name)| format!(",durable_execution_id:{id},durable_execution_name:{name}"));
543+
// Borrow of durable_id_map is released here (tags_suffix is an owned String).
544+
if let Some(suffix) = tags_suffix {
545+
for mut log in held {
546+
log.tags.push_str(&suffix);
547+
if let Ok(s) = serde_json::to_string(&log) {
548+
drop(log);
549+
self.ready_logs.push(s);
550+
}
551+
}
552+
}
553+
}
554+
555+
/// Called once when `is_durable_function` transitions from `None` to `Some(...)`.
556+
/// Drains every entry in `held_logs`, routing each batch according to the newly-known flag:
557+
/// - `Some(false)` → flush all held logs immediately.
558+
/// - `Some(true)` → try to extract durable context from the held logs; those whose
559+
/// request_id is now in the durable ID map are flushed with tags; the
560+
/// rest stay in `held_logs` until their context arrives.
561+
fn resolve_held_logs_on_durable_function_set(&mut self) {
562+
let held = std::mem::take(&mut self.held_logs);
563+
match self.is_durable_function {
564+
Some(false) => {
565+
for (_, logs) in held {
566+
for log in logs {
567+
if let Ok(s) = serde_json::to_string(&log) {
568+
self.ready_logs.push(s);
569+
}
570+
}
571+
}
572+
}
573+
Some(true) => {
574+
for (request_id, logs) in held {
575+
// Try to discover durable context from the held logs themselves.
576+
for log in &logs {
577+
self.try_update_durable_map(&request_id, &log.message.message);
578+
}
579+
let tags_suffix = self.durable_id_map.get(&request_id).map(|(id, name)| {
580+
format!(",durable_execution_id:{id},durable_execution_name:{name}")
581+
});
582+
// Borrow of durable_id_map released here.
583+
match tags_suffix {
584+
Some(suffix) => {
585+
for mut log in logs {
586+
log.tags.push_str(&suffix);
587+
if let Ok(s) = serde_json::to_string(&log) {
588+
self.ready_logs.push(s);
589+
}
590+
}
591+
}
592+
None => {
593+
// No context yet — put back and wait for it to arrive.
594+
self.held_logs.insert(request_id, logs);
595+
}
596+
}
597+
}
598+
}
599+
None => {} // Should not happen; guard against misuse.
517600
}
518601
}
519602

@@ -529,29 +612,45 @@ impl LambdaProcessor {
529612
/// Queues a log that has already had processing rules applied.
530613
///
531614
/// Routing depends on `is_durable_function`:
532-
/// - `None` → hold in `pending_durable_logs` until the flag is resolved.
533-
/// - `Some(false)` → serialize and push straight to `ready_logs`.
534-
/// - `Some(true)` → apply durable-ID filtering: only send if the request_id is
535-
/// already in the durable ID map, and append the execution tags.
615+
/// - `None` → stash in `held_logs[request_id]`; logs without a request_id are
616+
/// flushed immediately since they cannot carry durable context.
617+
/// - `Some(false)` → serialize and push straight to `ready_logs`.
618+
/// - `Some(true)` → try to update `durable_id_map` from the log; if a new entry was
619+
/// added, drain `held_logs` for that request_id; then flush this log if
620+
/// its request_id is in the map, otherwise stash it in `held_logs`.
536621
fn queue_log_after_rules(&mut self, mut log: IntakeLog) {
537622
match self.is_durable_function {
538623
None => {
539-
// Not yet known whether this is a durable function — hold the log.
540-
self.pending_durable_logs.push(log);
624+
match log.message.lambda.request_id.clone() {
625+
Some(rid) => {
626+
self.held_logs.entry(rid).or_default().push(log);
627+
}
628+
None => {
629+
// No request_id — cannot associate with durable context; flush now.
630+
if let Ok(s) = serde_json::to_string(&log) {
631+
drop(log);
632+
self.ready_logs.push(s);
633+
}
634+
}
635+
}
541636
}
542637
Some(false) => {
543638
if let Ok(serialized_log) = serde_json::to_string(&log) {
639+
// explicitly drop log so we don't accidentally re-use it and push
640+
// duplicate logs to the aggregator
544641
drop(log);
545642
self.ready_logs.push(serialized_log);
546643
}
547644
}
548645
Some(true) => {
549-
// Populate the durable ID map from this log if it carries execution context.
550-
if let Some(request_id) = log.message.lambda.request_id.clone() {
551-
self.try_update_durable_map(&request_id, &log.message.message);
646+
if let Some(rid) = log.message.lambda.request_id.clone() {
647+
if self.try_update_durable_map(&rid, &log.message.message) {
648+
// New durable context just discovered — drain previously held logs.
649+
self.drain_held_for_request_id(&rid);
650+
}
552651
}
553652

554-
// Only flush logs whose request_id is already in the durable ID map.
653+
// Flush this log if its request_id now has durable context; otherwise hold.
555654
let durable_tags = log
556655
.message
557656
.lambda
@@ -562,14 +661,22 @@ impl LambdaProcessor {
562661
format!(",durable_execution_id:{id},durable_execution_name:{name}")
563662
});
564663

565-
let Some(extra_tags) = durable_tags else {
566-
return;
567-
};
568-
569-
log.tags.push_str(&extra_tags);
570-
if let Ok(serialized_log) = serde_json::to_string(&log) {
571-
drop(log);
572-
self.ready_logs.push(serialized_log);
664+
match durable_tags {
665+
Some(extra_tags) => {
666+
log.tags.push_str(&extra_tags);
667+
if let Ok(serialized_log) = serde_json::to_string(&log) {
668+
// explicitly drop log so we don't accidentally re-use it and push
669+
// duplicate logs to the aggregator
670+
drop(log);
671+
self.ready_logs.push(serialized_log);
672+
}
673+
}
674+
None => {
675+
if let Some(rid) = log.message.lambda.request_id.clone() {
676+
self.held_logs.entry(rid).or_default().push(log);
677+
}
678+
// Logs without a request_id cannot match the durable map; drop them.
679+
}
573680
}
574681
}
575682
}
@@ -579,17 +686,6 @@ impl LambdaProcessor {
579686
if let Ok(log) = self.make_log(event).await {
580687
self.process_and_queue_log(log);
581688

582-
// If is_durable_function was just resolved, drain any logs that were held
583-
// while the flag was still None (race-condition guard).
584-
if self.is_durable_function.is_some() && !self.pending_durable_logs.is_empty() {
585-
let pending = std::mem::take(&mut self.pending_durable_logs);
586-
for pending_log in pending {
587-
// Rules were already applied before the log entered pending_durable_logs,
588-
// so go straight to queue_log_after_rules to avoid double-application.
589-
self.queue_log_after_rules(pending_log);
590-
}
591-
}
592-
593689
// Process orphan logs, since we have a `request_id` now
594690
let orphan_logs = std::mem::take(&mut self.orphan_logs);
595691
for mut orphan_log in orphan_logs {

0 commit comments

Comments
 (0)