chore: reformat the code#1166
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (2)
✅ Files skipped from review due to trivial changes (2)
WalkthroughAdds shared window infrastructure ( Changes
Sequence Diagram(s)sequenceDiagram
rect rgba(200,200,255,0.5)
participant TimerTask
participant CommonWindowContext
participant BaseWindow
participant Reader
end
TimerTask->>CommonWindowContext: tick(check_interval)
CommonWindowContext->>CommonWindowContext: notify_waiters()
CommonWindowContext->>BaseWindow: poll/process windows
BaseWindow->>Reader: deliver windowed records / wake reader
Reader->>CommonWindowContext: await notified()
alt close requested
TimerTask->>CommonWindowContext: cancel()
CommonWindowContext->>Reader: notify_waiters() (wake to exit)
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
crates/arkflow-plugin/src/output/mqtt.rs (1)
151-156: 🛠️ Refactor suggestion | 🟠 MajorUse the shared
parse_qoshelper to eliminate duplicate QoS logic.This inline QoS matching duplicates the logic in
mqtt_client::parse_qos. Sinceparse_qosis already imported, use it here for consistency.♻️ Proposed fix to use shared parse_qos
- // Determine the QoS level - let qos_level = match self.config.qos { - Some(0) => QoS::AtMostOnce, - Some(1) => QoS::AtLeastOnce, - Some(2) => QoS::ExactlyOnce, - _ => QoS::AtLeastOnce, // The default is QoS 1 - }; + // Determine the QoS level using shared utility + let qos_level = parse_qos(self.config.qos);🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/arkflow-plugin/src/output/mqtt.rs` around lines 151 - 156, Replace the inline QoS match on self.config.qos with the shared helper parse_qos to avoid duplicating logic: call mqtt_client::parse_qos (already imported as parse_qos) with self.config.qos and use its returned QoS instead of the local match block in the function that sets qos_level; remove the existing match arms (Some(0)/Some(1)/Some(2)/_) so the code consistently relies on parse_qos's defaulting and mapping behavior.
🧹 Nitpick comments (3)
crates/arkflow-plugin/src/mqtt_client.rs (1)
104-111: Consider logging a warning for invalid QoS values.Invalid QoS values (e.g.,
Some(99)) silently default toAtLeastOnce. While this is a reasonable fallback, consider logging a warning to help users identify configuration mistakes.💡 Optional: Add tracing warning for invalid QoS
+use tracing::warn; + pub fn parse_qos(qos: Option<u8>) -> QoS { match qos { Some(0) => QoS::AtMostOnce, Some(1) => QoS::AtLeastOnce, Some(2) => QoS::ExactlyOnce, - _ => QoS::AtLeastOnce, // Default is QoS 1 + None => QoS::AtLeastOnce, // Default is QoS 1 + Some(invalid) => { + warn!("Invalid QoS value {}, defaulting to AtLeastOnce (1)", invalid); + QoS::AtLeastOnce + } } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/arkflow-plugin/src/mqtt_client.rs` around lines 104 - 111, The parse_qos function currently maps invalid QoS values to QoS::AtLeastOnce silently; modify parse_qos to detect when qos is Some(v) with v not in {0,1,2} and emit a warning (e.g., tracing::warn! or log::warn!) that includes the invalid numeric value and the chosen fallback before returning QoS::AtLeastOnce; keep behavior for None and valid values unchanged and reference the function name parse_qos and the QoS variants QoS::AtMostOnce, QoS::AtLeastOnce, QoS::ExactlyOnce so reviewers can locate the change.crates/arkflow-plugin/src/buffer/tumbling_window.rs (1)
54-57: Consider removing the unusedstrategyfield.The
TumblingWindowStrategyis stored in the struct but only used during initialization to getcheck_interval(). After that, it's never accessed again. Consider removing this field to reduce memory footprint.💡 Optional: Remove unused strategy field
struct TumblingWindow { /// Base window implementation for queue management base_window: BaseWindow, /// Common window context for timer and notification management context: CommonWindowContext, - /// Window strategy for trigger logic - strategy: TumblingWindowStrategy, }And in
new():Ok(Self { base_window, context, - strategy, })🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/arkflow-plugin/src/buffer/tumbling_window.rs` around lines 54 - 57, The TumblingWindow struct currently stores a strategy field (TumblingWindowStrategy) that is only used during construction to obtain check_interval(); remove the unused strategy field from the struct, update the TumblingWindow::new() constructor to call strategy.check_interval() locally and store the resulting interval (or incorporate it into CommonWindowContext) instead of keeping the whole strategy, and remove any remaining references to self.strategy (ensure no methods reference it); also update the constructor signature and any callers/tests to stop passing or expecting the strategy field.crates/arkflow-plugin/src/buffer/common_window.rs (1)
64-87: Consider makingstart_timer()idempotent to prevent duplicate timer tasks.Repeated calls can spawn multiple loops and duplicate wakeups; a simple started-flag guard would harden this API.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/arkflow-plugin/src/buffer/common_window.rs` around lines 64 - 87, start_timer can spawn multiple concurrent timer tasks if called repeatedly; make it idempotent by adding a started guard (e.g., an AtomicBool or AtomicU8) on the struct and atomically set it when entering start_timer so only the first caller spawns the tokio::spawn task. In start_timer check the flag with compare_exchange (or fetch_update) and return immediately if already set; ensure the flag is set before cloning notify/close_token and spawning, and consider clearing it on cancellation only if you intend to allow restart. This targets the start_timer method and the struct that holds notify and close_token.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@crates/arkflow-plugin/src/buffer/common_window.rs`:
- Around line 124-126: The close() method currently only cancels close_token
which can leave tasks awaiting notify blocked if start_timer() was never
started; update close() to both cancel close_token and explicitly wake the
notify waiters (e.g., call notify.notify_waiters() or notify.notify_one() on the
same Notify used by waiting tasks) so that any tasks awaiting the notification
are released immediately; locate the close() function and the Notify instance
used by start_timer()/waiters to add the notify wake after canceling
close_token.
In `@crates/arkflow-plugin/src/buffer/sliding_window.rs`:
- Around line 71-74: The code incorrectly converts config.window_size (a message
count) into a Duration for SlidingWindowStrategy::new; instead, make the
strategy message-count-based: change the SlidingWindowStrategy::new signature to
accept a message_count (e.g., u32 or usize) rather than a Duration, update the
call site to pass config.window_size (cast to the chosen integer type) instead
of Duration::from_secs(...), and remove the unused Duration field/logic inside
SlidingWindowStrategy; ensure checks that use queue_lock.len() (and
self.config.window_size) remain consistent with the new message_count type.
In `@crates/arkflow-plugin/src/buffer/window_strategy.rs`:
- Around line 78-81: check_interval() can return Duration::ZERO when window_size
or slide intervals truncate to 0ms, causing the timer in
CommonWindowContext::start_timer() to spin; change check_interval (and the
equivalent interval-calculating helpers at the other locations) to clamp the
computed milliseconds to a positive minimum (e.g., use .max(1) on the
millisecond value before casting to u64) so it always returns at least
Duration::from_millis(1), ensuring the sleep never receives Duration::ZERO.
In `@crates/arkflow-plugin/src/output/mqtt.rs`:
- Line 20: Replace the local inline QoS matching logic that converts the
incoming qos string to a rumqttc::QoS with a call to the shared helper
crate::mqtt_client::parse_qos (keep the create_mqtt_options import), i.e. remove
the local match/if block that produces QoS and instead set qos =
parse_qos(qos_str) (or unwrap/handle its Result/Option the same way other
callers do), so the parse_qos import is actually used and the duplicated mapping
logic is eliminated.
---
Outside diff comments:
In `@crates/arkflow-plugin/src/output/mqtt.rs`:
- Around line 151-156: Replace the inline QoS match on self.config.qos with the
shared helper parse_qos to avoid duplicating logic: call mqtt_client::parse_qos
(already imported as parse_qos) with self.config.qos and use its returned QoS
instead of the local match block in the function that sets qos_level; remove the
existing match arms (Some(0)/Some(1)/Some(2)/_) so the code consistently relies
on parse_qos's defaulting and mapping behavior.
---
Nitpick comments:
In `@crates/arkflow-plugin/src/buffer/common_window.rs`:
- Around line 64-87: start_timer can spawn multiple concurrent timer tasks if
called repeatedly; make it idempotent by adding a started guard (e.g., an
AtomicBool or AtomicU8) on the struct and atomically set it when entering
start_timer so only the first caller spawns the tokio::spawn task. In
start_timer check the flag with compare_exchange (or fetch_update) and return
immediately if already set; ensure the flag is set before cloning
notify/close_token and spawning, and consider clearing it on cancellation only
if you intend to allow restart. This targets the start_timer method and the
struct that holds notify and close_token.
In `@crates/arkflow-plugin/src/buffer/tumbling_window.rs`:
- Around line 54-57: The TumblingWindow struct currently stores a strategy field
(TumblingWindowStrategy) that is only used during construction to obtain
check_interval(); remove the unused strategy field from the struct, update the
TumblingWindow::new() constructor to call strategy.check_interval() locally and
store the resulting interval (or incorporate it into CommonWindowContext)
instead of keeping the whole strategy, and remove any remaining references to
self.strategy (ensure no methods reference it); also update the constructor
signature and any callers/tests to stop passing or expecting the strategy field.
In `@crates/arkflow-plugin/src/mqtt_client.rs`:
- Around line 104-111: The parse_qos function currently maps invalid QoS values
to QoS::AtLeastOnce silently; modify parse_qos to detect when qos is Some(v)
with v not in {0,1,2} and emit a warning (e.g., tracing::warn! or log::warn!)
that includes the invalid numeric value and the chosen fallback before returning
QoS::AtLeastOnce; keep behavior for None and valid values unchanged and
reference the function name parse_qos and the QoS variants QoS::AtMostOnce,
QoS::AtLeastOnce, QoS::ExactlyOnce so reviewers can locate the change.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: bac13be6-7777-4f4c-9ade-30d2b924ffdc
📒 Files selected for processing (10)
crates/arkflow-plugin/src/buffer/common_window.rscrates/arkflow-plugin/src/buffer/mod.rscrates/arkflow-plugin/src/buffer/session_window.rscrates/arkflow-plugin/src/buffer/sliding_window.rscrates/arkflow-plugin/src/buffer/tumbling_window.rscrates/arkflow-plugin/src/buffer/window_strategy.rscrates/arkflow-plugin/src/input/mqtt.rscrates/arkflow-plugin/src/lib.rscrates/arkflow-plugin/src/mqtt_client.rscrates/arkflow-plugin/src/output/mqtt.rs
| pub fn close(&self) { | ||
| self.close_token.cancel(); | ||
| } |
There was a problem hiding this comment.
Wake waiters directly in close() to avoid shutdown stalls when no timer is running.
Current shutdown notification is indirect (via timer task). If start_timer was never called, waiters on notify may not be released.
🔧 Suggested fix
pub fn close(&self) {
self.close_token.cancel();
+ self.notify.notify_waiters();
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@crates/arkflow-plugin/src/buffer/common_window.rs` around lines 124 - 126,
The close() method currently only cancels close_token which can leave tasks
awaiting notify blocked if start_timer() was never started; update close() to
both cancel close_token and explicitly wake the notify waiters (e.g., call
notify.notify_waiters() or notify.notify_one() on the same Notify used by
waiting tasks) so that any tasks awaiting the notification are released
immediately; locate the close() function and the Notify instance used by
start_timer()/waiters to add the notify wake after canceling close_token.
| fn check_interval(&self) -> std::time::Duration { | ||
| // Check more frequently than window size to ensure timely triggers | ||
| std::time::Duration::from_millis(self.window_size.as_millis().min(1000) as u64) | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Verify whether zero-duration windows/slides can be configured anywhere.
rg -nP --type=rust -C2 'Duration::ZERO|from_millis\(\s*0\s*\)|from_micros\(\s*0\s*\)|from_nanos\(\s*0\s*\)'Repository: arkflow-rs/arkflow
Length of output: 532
🏁 Script executed:
cat -n crates/arkflow-plugin/src/buffer/window_strategy.rs | head -200Repository: arkflow-rs/arkflow
Length of output: 7902
🏁 Script executed:
rg -n 'TumblingWindowStrategy::new|SlidingWindowStrategy::new|SessionWindowStrategy::new' --type=rust -B2 -A2Repository: arkflow-rs/arkflow
Length of output: 3094
🏁 Script executed:
rg -n 'struct TumblingWindowConfig|struct SlidingWindowConfig|struct SessionWindowConfig' --type=rust -A5Repository: arkflow-rs/arkflow
Length of output: 1946
🏁 Script executed:
rg -n 'fn deserialize_duration' --type=rust -A10Repository: arkflow-rs/arkflow
Length of output: 850
🏁 Script executed:
cat -n crates/arkflow-plugin/src/buffer/tumbling_window.rs | head -100Repository: arkflow-rs/arkflow
Length of output: 4565
🏁 Script executed:
rg -n 'struct CommonWindowContext|fn start_timer' --type=rust -A15 crates/arkflow-plugin/src/buffer/Repository: arkflow-rs/arkflow
Length of output: 2851
🏁 Script executed:
rg -n 'tokio::time::sleep' --type=rust -B2 -A2Repository: arkflow-rs/arkflow
Length of output: 10931
🏁 Script executed:
cat -n crates/arkflow-plugin/src/buffer/common_window.rs | sed -n '64,80p'Repository: arkflow-rs/arkflow
Length of output: 831
🏁 Script executed:
cat -n crates/arkflow-plugin/src/buffer/window_strategy.rs | sed -n '168,220p'Repository: arkflow-rs/arkflow
Length of output: 2075
🏁 Script executed:
rg -n 'Duration::ZERO|from_millis(0)|Duration::new(0' --type=rust crates/arkflow-plugin/src/buffer/Repository: arkflow-rs/arkflow
Length of output: 201
🏁 Script executed:
rg -n 'Duration::ZERO|from_millis\(0\)|Duration::new\(0' --type=rust crates/arkflow-plugin/src/buffer/Repository: arkflow-rs/arkflow
Length of output: 44
Clamp check_interval to a positive minimum to prevent hot loops.
Zero-duration windows and slide intervals can be configured (humantime allows parsing "0ms"), and as_millis() truncates sub-millisecond durations to 0. When check_interval() returns zero duration, the timer loop in CommonWindowContext::start_timer() spins in a busy loop calling sleep(Duration::ZERO) repeatedly, consuming CPU unnecessarily.
🔧 Suggested fix
+const MIN_CHECK_INTERVAL: std::time::Duration = std::time::Duration::from_millis(1);
+
impl WindowStrategy for TumblingWindowStrategy {
@@
fn check_interval(&self) -> std::time::Duration {
// Check more frequently than window size to ensure timely triggers
- std::time::Duration::from_millis(self.window_size.as_millis().min(1000) as u64)
+ self.window_size
+ .min(std::time::Duration::from_secs(1))
+ .max(MIN_CHECK_INTERVAL)
}
@@
impl WindowStrategy for SlidingWindowStrategy {
@@
fn check_interval(&self) -> std::time::Duration {
// Check at slide interval to ensure timely triggers
- self.slide_interval
+ self.slide_interval.max(MIN_CHECK_INTERVAL)
}
@@
impl WindowStrategy for SessionWindowStrategy {
@@
fn check_interval(&self) -> std::time::Duration {
// Check frequently to detect session gaps quickly
- std::time::Duration::from_millis(self.gap_duration.as_millis().min(500) as u64)
+ self.gap_duration
+ .min(std::time::Duration::from_millis(500))
+ .max(MIN_CHECK_INTERVAL)
}Also applies to: 121-124, 157-160
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@crates/arkflow-plugin/src/buffer/window_strategy.rs` around lines 78 - 81,
check_interval() can return Duration::ZERO when window_size or slide intervals
truncate to 0ms, causing the timer in CommonWindowContext::start_timer() to
spin; change check_interval (and the equivalent interval-calculating helpers at
the other locations) to clamp the computed milliseconds to a positive minimum
(e.g., use .max(1) on the millisecond value before casting to u64) so it always
returns at least Duration::from_millis(1), ensuring the sleep never receives
Duration::ZERO.
| //! Send the processed data to the MQTT broker | ||
|
|
||
| use crate::expr::Expr; | ||
| use crate::mqtt_client::{create_mqtt_options, parse_qos}; |
There was a problem hiding this comment.
Unused import: parse_qos is imported but not used.
The parse_qos function is imported but the QoS determination at lines 151-156 still uses inline matching logic instead of the shared helper.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@crates/arkflow-plugin/src/output/mqtt.rs` at line 20, Replace the local
inline QoS matching logic that converts the incoming qos string to a
rumqttc::QoS with a call to the shared helper crate::mqtt_client::parse_qos
(keep the create_mqtt_options import), i.e. remove the local match/if block that
produces QoS and instead set qos = parse_qos(qos_str) (or unwrap/handle its
Result/Option the same way other callers do), so the parse_qos import is
actually used and the duplicated mapping logic is eliminated.
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
crates/arkflow-plugin/src/buffer/session_window.rs (1)
110-127:⚠️ Potential issue | 🔴 CriticalDrain pending messages before returning
Noneon close.Line 110 treats any closed window as empty, so
close()can drop buffered batches instead of emitting them. Also re-check cancellation inside the loop so a reader waiting on an empty queue exits after shutdown.Proposed fix
- if self.context.is_closed() { - return Ok(None); - } - loop { - { - if !self.base_window.queue_is_empty().await { - let last_time = *self.last_message_time.read().await; - // Check if the session gap has elapsed since the last message - let duration = last_time.elapsed(); - if duration >= self.config.gap { - break; - } - } + if self.base_window.queue_is_empty().await { + if self.context.is_closed() { + return Ok(None); + } + } else { + let last_time = *self.last_message_time.read().await; + // Check if the session gap has elapsed since the last message + let duration = last_time.elapsed(); + if self.context.is_closed() || duration >= self.config.gap { + break; + } } // Wait for notification from timer or write operation - self.context.notify.notified().await; + tokio::select! { + _ = self.context.notify.notified() => {} + _ = self.context.close_token.cancelled() => {} + } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/arkflow-plugin/src/buffer/session_window.rs` around lines 110 - 127, The current early-return when self.context.is_closed() causes buffered batches to be dropped; change the logic in the session window reader (the loop that checks self.base_window.queue_is_empty().await, self.last_message_time.read().await and self.config.gap) so that on shutdown you first drain any pending messages from base_window before returning Ok(None), and also re-check self.context.is_closed() inside the loop after awaiting self.context.notify.notified(). In practice modify the initial close check to only trigger the closed path after confirming base_window.queue_is_empty() (or draining it via the existing pop/emit code), and add a cancellation check immediately after the notified().await so readers waiting on an empty queue will exit promptly when close() is called.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In `@crates/arkflow-plugin/src/buffer/session_window.rs`:
- Around line 110-127: The current early-return when self.context.is_closed()
causes buffered batches to be dropped; change the logic in the session window
reader (the loop that checks self.base_window.queue_is_empty().await,
self.last_message_time.read().await and self.config.gap) so that on shutdown you
first drain any pending messages from base_window before returning Ok(None), and
also re-check self.context.is_closed() inside the loop after awaiting
self.context.notify.notified(). In practice modify the initial close check to
only trigger the closed path after confirming base_window.queue_is_empty() (or
draining it via the existing pop/emit code), and add a cancellation check
immediately after the notified().await so readers waiting on an empty queue will
exit promptly when close() is called.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: f8b83f5b-a2af-4a94-9a55-a0fd5a8ee503
📒 Files selected for processing (3)
crates/arkflow-plugin/src/buffer/session_window.rscrates/arkflow-plugin/src/buffer/sliding_window.rscrates/arkflow-plugin/src/buffer/tumbling_window.rs
✅ Files skipped from review due to trivial changes (1)
- crates/arkflow-plugin/src/buffer/sliding_window.rs
🚧 Files skipped from review as they are similar to previous changes (1)
- crates/arkflow-plugin/src/buffer/tumbling_window.rs
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
crates/arkflow-plugin/src/buffer/sliding_window.rs (2)
80-98:⚠️ Potential issue | 🟠 MajorTimer can be indefinitely starved by write-triggered notifications.
The spawned loop recreates
sleep(interval)on every iteration, and the_ = notify_clone.notified()branch wakes on everyBuffer::write()call (sincewrite()invokesself.notify.notify_waiters()). Under a steady write rate, each write cancels the pendingsleepand restarts it from zero, so the timer branch may never fire and windows only emit whenqueue_lock.len() >= window_sizealready holds — effectively disabling the time-based flush and defeating the purpose ofinterval.Consider using
tokio::time::interval(which ticks on a schedule independent of other branches) and dropping thenotified()branch in the timer task, or only usingnotified()as a cancellation signal rather than re-entering the loop.♻️ Sketch
- tokio::spawn(async move { - loop { - let timer = sleep(interval); - tokio::select! { - _ = timer => { - notify_clone.notify_waiters(); - } - _ = close_clone.cancelled() => { - notify_clone.notify_waiters(); - break; - } - _ = notify_clone.notified() => { - if close_clone.is_cancelled(){ - break; - } - } - } - } - }); + tokio::spawn(async move { + let mut ticker = tokio::time::interval(interval); + ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); + ticker.tick().await; // consume immediate first tick + loop { + tokio::select! { + _ = ticker.tick() => { + notify_clone.notify_waiters(); + } + _ = close_clone.cancelled() => { + notify_clone.notify_waiters(); + break; + } + } + } + });🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/arkflow-plugin/src/buffer/sliding_window.rs` around lines 80 - 98, The timer task can be starved because each notify_clone.notified() wake restarts sleep(interval); replace the recreate-sleep pattern with a stable ticker and remove the write-triggered notified() branch: create a tokio::time::interval (e.g. let mut ticker = tokio::time::interval(interval)) in the tokio::spawn closure and use ticker.tick() in the select, keeping only ticker.tick() to call notify_clone.notify_waiters() and close_clone.cancelled() to break; remove or do not re-enter the notify_clone.notified() branch so Buffer::write() notifications do not reset the timer.
187-209:⚠️ Potential issue | 🟠 MajorLost wakeup race between queue check and
notified()registration.The
read()function has a race condition: ifnotify_waiters()fires between dropping the queue read lock (line 203) and callingnotified().await(line 205), the notification is lost and the reader blocks indefinitely even with sufficient buffered messages.The fix is to create the
Notifiedfuture and callenable()before inspecting the queue:♻️ Proposed fix
loop { if self.close.is_cancelled() { return Ok(None); } - { - let queue_lock = self.queue.read().await; - // If there are enough messages to form a window, break the loop and process them - if queue_lock.len() >= self.config.window_size as usize { - break; - } - // If the buffer is closed, return None - } - // Wait for notification from timer, write operation, or close - self.notify.notified().await; + let notified = self.notify.notified(); + tokio::pin!(notified); + notified.as_mut().enable(); + { + let queue_lock = self.queue.read().await; + if queue_lock.len() >= self.config.window_size as usize { + break; + } + } + notified.await; }This ensures the waker is registered before any queue inspection, closing the gap where notifications could be lost.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/arkflow-plugin/src/buffer/sliding_window.rs` around lines 187 - 209, The read() method has a lost-wakeup race between dropping the queue read lock and awaiting self.notify.notified(); fix it by creating the Notified future before inspecting the queue (e.g., let notified = self.notify.notified()), call notified.enable() to register the waker, then acquire/read self.queue (queue.read().await) and check queue_lock.len() against self.config.window_size; if there are not enough messages, drop the lock and await notified.await, otherwise proceed to process_slide().await; update references in read() to use the created notified variable instead of calling self.notify.notified().await directly.
🧹 Nitpick comments (1)
crates/arkflow-plugin/src/buffer/sliding_window.rs (1)
215-223: Holding the read lock acrossnotify_waiters()is unnecessary.Minor: the read guard is kept alive through
self.notify.notify_waiters(). Not a correctness bug (readers take a read lock too, so they won't be blocked), but dropping the guard before notifying matches the pattern you just established inwrite()(line 175) and avoids surprises if the lock type changes.- let queue_lock = self.queue.read().await; - if !queue_lock.is_empty() { - // Notify any waiting readers to process remaining messages - self.notify.notify_waiters(); - } + let non_empty = !self.queue.read().await.is_empty(); + if non_empty { + self.notify.notify_waiters(); + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/arkflow-plugin/src/buffer/sliding_window.rs` around lines 215 - 223, In flush(), don't hold the read guard across the notify call: after acquiring queue_lock via self.queue.read().await check is_empty(), then explicitly drop the guard (the queue_lock variable) before calling self.notify.notify_waiters(); mirror the pattern used in write() so the read lock is released prior to notifying readers.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In `@crates/arkflow-plugin/src/buffer/sliding_window.rs`:
- Around line 80-98: The timer task can be starved because each
notify_clone.notified() wake restarts sleep(interval); replace the
recreate-sleep pattern with a stable ticker and remove the write-triggered
notified() branch: create a tokio::time::interval (e.g. let mut ticker =
tokio::time::interval(interval)) in the tokio::spawn closure and use
ticker.tick() in the select, keeping only ticker.tick() to call
notify_clone.notify_waiters() and close_clone.cancelled() to break; remove or do
not re-enter the notify_clone.notified() branch so Buffer::write() notifications
do not reset the timer.
- Around line 187-209: The read() method has a lost-wakeup race between dropping
the queue read lock and awaiting self.notify.notified(); fix it by creating the
Notified future before inspecting the queue (e.g., let notified =
self.notify.notified()), call notified.enable() to register the waker, then
acquire/read self.queue (queue.read().await) and check queue_lock.len() against
self.config.window_size; if there are not enough messages, drop the lock and
await notified.await, otherwise proceed to process_slide().await; update
references in read() to use the created notified variable instead of calling
self.notify.notified().await directly.
---
Nitpick comments:
In `@crates/arkflow-plugin/src/buffer/sliding_window.rs`:
- Around line 215-223: In flush(), don't hold the read guard across the notify
call: after acquiring queue_lock via self.queue.read().await check is_empty(),
then explicitly drop the guard (the queue_lock variable) before calling
self.notify.notify_waiters(); mirror the pattern used in write() so the read
lock is released prior to notifying readers.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: f78eb684-9a7f-4639-a980-bfca6bef28c8
📒 Files selected for processing (1)
crates/arkflow-plugin/src/buffer/sliding_window.rs
Summary by CodeRabbit
New Features
Refactor
Chores
Tests
Documentation