Skip to content

chore: reformat the code#1166

Open
chenquan wants to merge 5 commits into
mainfrom
chore/reformat
Open

chore: reformat the code#1166
chenquan wants to merge 5 commits into
mainfrom
chore/reformat

Conversation

@chenquan
Copy link
Copy Markdown
Member

@chenquan chenquan commented Apr 12, 2026

Summary by CodeRabbit

  • New Features

    • Pluggable windowing strategies for tumbling, sliding, and session behavior.
    • Shared MQTT utilities for building connection options and QoS handling.
  • Refactor

    • Window implementations now use a unified shared context for coordinated timers, notifications, and shutdown.
  • Chores

    • Module layout updated to expose new windowing and MQTT utilities; CI Node runtime bumped.
  • Tests

    • Added unit tests for strategies, shared window context, and MQTT helpers.
  • Documentation

    • Blog posts updated with truncation directive.

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Apr 12, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 107fa8b0-35df-4407-a4d1-5063e6e2c80a

📥 Commits

Reviewing files that changed from the base of the PR and between 2f5949e and 7c14787.

📒 Files selected for processing (2)
  • docs/blog/2025/04/01-v0.2.0-rc1.md
  • docs/blog/2025/04/15-v0.2.0-release.md
✅ Files skipped from review due to trivial changes (2)
  • docs/blog/2025/04/15-v0.2.0-release.md
  • docs/blog/2025/04/01-v0.2.0-rc1.md

Walkthrough

Adds shared window infrastructure (CommonWindowContext, WindowStrategy) and refactors tumbling/sliding/session windows to use the shared context; extracts MQTT helpers (create_mqtt_options, parse_qos) and updates MQTT input/output to use them; minor CI node version bumps and docs truncation comments.

Changes

Cohort / File(s) Summary
Window Core
crates/arkflow-plugin/src/buffer/common_window.rs, crates/arkflow-plugin/src/buffer/mod.rs
New CommonWindowContext (shared Notify, CancellationToken, last-trigger tracking, timer loop, lifecycle APIs) and export of new buffer submodules. Adds create_base_window to wire context into BaseWindow.
Window Strategies
crates/arkflow-plugin/src/buffer/window_strategy.rs
New WindowStrategy trait and three implementations (TumblingWindowStrategy, SlidingWindowStrategy, SessionWindowStrategy) plus unit tests for trigger/check-interval behavior.
Window Implementations
crates/arkflow-plugin/src/buffer/session_window.rs, crates/arkflow-plugin/src/buffer/sliding_window.rs, crates/arkflow-plugin/src/buffer/tumbling_window.rs
Refactored to use CommonWindowContext for notification, cancellation checks, and last-trigger handling; removed per-instance Notify/CancellationToken; adjusted wait/notify/control-flow and minor field initialization reorderings.
MQTT Core
crates/arkflow-plugin/src/mqtt_client.rs, crates/arkflow-plugin/src/lib.rs
New mqtt_client module exposing create_mqtt_options and parse_qos helpers with unit tests; module exported in lib.rs.
MQTT Callers
crates/arkflow-plugin/src/input/mqtt.rs, crates/arkflow-plugin/src/output/mqtt.rs
Replaced inline rumqttc::MqttOptions construction and QoS mapping with calls to shared create_mqtt_options and parse_qos.
CI Workflows
.github/workflows/doc-deploy.yml, .github/workflows/test-doc-deploy.yml
Updated Node.js runtime from 20 to 22 for documentation build jobs.
Docs
docs/blog/*
Inserted <!-- truncate --> comment in two blog markdown files (presentation change only).

Sequence Diagram(s)

sequenceDiagram
  rect rgba(200,200,255,0.5)
    participant TimerTask
    participant CommonWindowContext
    participant BaseWindow
    participant Reader
  end
  TimerTask->>CommonWindowContext: tick(check_interval)
  CommonWindowContext->>CommonWindowContext: notify_waiters()
  CommonWindowContext->>BaseWindow: poll/process windows
  BaseWindow->>Reader: deliver windowed records / wake reader
  Reader->>CommonWindowContext: await notified()
  alt close requested
    TimerTask->>CommonWindowContext: cancel()
    CommonWindowContext->>Reader: notify_waiters() (wake to exit)
  end
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Possibly related PRs

Poem

🐰 I tick the timer soft and light,
Shared context hums through day and night.
Windows wake and strategies sing,
MQTT hops in on a string.
A joyful hop — the buffers spring 🌷

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Title check ⚠️ Warning The PR title 'chore: reformat the code' is misleading. The changeset includes substantial new features (CommonWindowContext, WindowStrategy trait, mqtt_client helpers) and refactoring of window implementations, not just code reformatting. Revise the title to accurately reflect the main changes, such as 'feat: introduce CommonWindowContext and WindowStrategy abstractions' or 'refactor: consolidate window context and MQTT utilities'.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch chore/reformat

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
crates/arkflow-plugin/src/output/mqtt.rs (1)

151-156: 🛠️ Refactor suggestion | 🟠 Major

Use the shared parse_qos helper to eliminate duplicate QoS logic.

This inline QoS matching duplicates the logic in mqtt_client::parse_qos. Since parse_qos is already imported, use it here for consistency.

♻️ Proposed fix to use shared parse_qos
-        // Determine the QoS level
-        let qos_level = match self.config.qos {
-            Some(0) => QoS::AtMostOnce,
-            Some(1) => QoS::AtLeastOnce,
-            Some(2) => QoS::ExactlyOnce,
-            _ => QoS::AtLeastOnce, // The default is QoS 1
-        };
+        // Determine the QoS level using shared utility
+        let qos_level = parse_qos(self.config.qos);
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/arkflow-plugin/src/output/mqtt.rs` around lines 151 - 156, Replace the
inline QoS match on self.config.qos with the shared helper parse_qos to avoid
duplicating logic: call mqtt_client::parse_qos (already imported as parse_qos)
with self.config.qos and use its returned QoS instead of the local match block
in the function that sets qos_level; remove the existing match arms
(Some(0)/Some(1)/Some(2)/_) so the code consistently relies on parse_qos's
defaulting and mapping behavior.
🧹 Nitpick comments (3)
crates/arkflow-plugin/src/mqtt_client.rs (1)

104-111: Consider logging a warning for invalid QoS values.

Invalid QoS values (e.g., Some(99)) silently default to AtLeastOnce. While this is a reasonable fallback, consider logging a warning to help users identify configuration mistakes.

💡 Optional: Add tracing warning for invalid QoS
+use tracing::warn;
+
 pub fn parse_qos(qos: Option<u8>) -> QoS {
     match qos {
         Some(0) => QoS::AtMostOnce,
         Some(1) => QoS::AtLeastOnce,
         Some(2) => QoS::ExactlyOnce,
-        _ => QoS::AtLeastOnce, // Default is QoS 1
+        None => QoS::AtLeastOnce, // Default is QoS 1
+        Some(invalid) => {
+            warn!("Invalid QoS value {}, defaulting to AtLeastOnce (1)", invalid);
+            QoS::AtLeastOnce
+        }
     }
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/arkflow-plugin/src/mqtt_client.rs` around lines 104 - 111, The
parse_qos function currently maps invalid QoS values to QoS::AtLeastOnce
silently; modify parse_qos to detect when qos is Some(v) with v not in {0,1,2}
and emit a warning (e.g., tracing::warn! or log::warn!) that includes the
invalid numeric value and the chosen fallback before returning QoS::AtLeastOnce;
keep behavior for None and valid values unchanged and reference the function
name parse_qos and the QoS variants QoS::AtMostOnce, QoS::AtLeastOnce,
QoS::ExactlyOnce so reviewers can locate the change.
crates/arkflow-plugin/src/buffer/tumbling_window.rs (1)

54-57: Consider removing the unused strategy field.

The TumblingWindowStrategy is stored in the struct but only used during initialization to get check_interval(). After that, it's never accessed again. Consider removing this field to reduce memory footprint.

💡 Optional: Remove unused strategy field
 struct TumblingWindow {
     /// Base window implementation for queue management
     base_window: BaseWindow,
     /// Common window context for timer and notification management
     context: CommonWindowContext,
-    /// Window strategy for trigger logic
-    strategy: TumblingWindowStrategy,
 }

And in new():

         Ok(Self {
             base_window,
             context,
-            strategy,
         })
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/arkflow-plugin/src/buffer/tumbling_window.rs` around lines 54 - 57,
The TumblingWindow struct currently stores a strategy field
(TumblingWindowStrategy) that is only used during construction to obtain
check_interval(); remove the unused strategy field from the struct, update the
TumblingWindow::new() constructor to call strategy.check_interval() locally and
store the resulting interval (or incorporate it into CommonWindowContext)
instead of keeping the whole strategy, and remove any remaining references to
self.strategy (ensure no methods reference it); also update the constructor
signature and any callers/tests to stop passing or expecting the strategy field.
crates/arkflow-plugin/src/buffer/common_window.rs (1)

64-87: Consider making start_timer() idempotent to prevent duplicate timer tasks.

Repeated calls can spawn multiple loops and duplicate wakeups; a simple started-flag guard would harden this API.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/arkflow-plugin/src/buffer/common_window.rs` around lines 64 - 87,
start_timer can spawn multiple concurrent timer tasks if called repeatedly; make
it idempotent by adding a started guard (e.g., an AtomicBool or AtomicU8) on the
struct and atomically set it when entering start_timer so only the first caller
spawns the tokio::spawn task. In start_timer check the flag with
compare_exchange (or fetch_update) and return immediately if already set; ensure
the flag is set before cloning notify/close_token and spawning, and consider
clearing it on cancellation only if you intend to allow restart. This targets
the start_timer method and the struct that holds notify and close_token.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@crates/arkflow-plugin/src/buffer/common_window.rs`:
- Around line 124-126: The close() method currently only cancels close_token
which can leave tasks awaiting notify blocked if start_timer() was never
started; update close() to both cancel close_token and explicitly wake the
notify waiters (e.g., call notify.notify_waiters() or notify.notify_one() on the
same Notify used by waiting tasks) so that any tasks awaiting the notification
are released immediately; locate the close() function and the Notify instance
used by start_timer()/waiters to add the notify wake after canceling
close_token.

In `@crates/arkflow-plugin/src/buffer/sliding_window.rs`:
- Around line 71-74: The code incorrectly converts config.window_size (a message
count) into a Duration for SlidingWindowStrategy::new; instead, make the
strategy message-count-based: change the SlidingWindowStrategy::new signature to
accept a message_count (e.g., u32 or usize) rather than a Duration, update the
call site to pass config.window_size (cast to the chosen integer type) instead
of Duration::from_secs(...), and remove the unused Duration field/logic inside
SlidingWindowStrategy; ensure checks that use queue_lock.len() (and
self.config.window_size) remain consistent with the new message_count type.

In `@crates/arkflow-plugin/src/buffer/window_strategy.rs`:
- Around line 78-81: check_interval() can return Duration::ZERO when window_size
or slide intervals truncate to 0ms, causing the timer in
CommonWindowContext::start_timer() to spin; change check_interval (and the
equivalent interval-calculating helpers at the other locations) to clamp the
computed milliseconds to a positive minimum (e.g., use .max(1) on the
millisecond value before casting to u64) so it always returns at least
Duration::from_millis(1), ensuring the sleep never receives Duration::ZERO.

In `@crates/arkflow-plugin/src/output/mqtt.rs`:
- Line 20: Replace the local inline QoS matching logic that converts the
incoming qos string to a rumqttc::QoS with a call to the shared helper
crate::mqtt_client::parse_qos (keep the create_mqtt_options import), i.e. remove
the local match/if block that produces QoS and instead set qos =
parse_qos(qos_str) (or unwrap/handle its Result/Option the same way other
callers do), so the parse_qos import is actually used and the duplicated mapping
logic is eliminated.

---

Outside diff comments:
In `@crates/arkflow-plugin/src/output/mqtt.rs`:
- Around line 151-156: Replace the inline QoS match on self.config.qos with the
shared helper parse_qos to avoid duplicating logic: call mqtt_client::parse_qos
(already imported as parse_qos) with self.config.qos and use its returned QoS
instead of the local match block in the function that sets qos_level; remove the
existing match arms (Some(0)/Some(1)/Some(2)/_) so the code consistently relies
on parse_qos's defaulting and mapping behavior.

---

Nitpick comments:
In `@crates/arkflow-plugin/src/buffer/common_window.rs`:
- Around line 64-87: start_timer can spawn multiple concurrent timer tasks if
called repeatedly; make it idempotent by adding a started guard (e.g., an
AtomicBool or AtomicU8) on the struct and atomically set it when entering
start_timer so only the first caller spawns the tokio::spawn task. In
start_timer check the flag with compare_exchange (or fetch_update) and return
immediately if already set; ensure the flag is set before cloning
notify/close_token and spawning, and consider clearing it on cancellation only
if you intend to allow restart. This targets the start_timer method and the
struct that holds notify and close_token.

In `@crates/arkflow-plugin/src/buffer/tumbling_window.rs`:
- Around line 54-57: The TumblingWindow struct currently stores a strategy field
(TumblingWindowStrategy) that is only used during construction to obtain
check_interval(); remove the unused strategy field from the struct, update the
TumblingWindow::new() constructor to call strategy.check_interval() locally and
store the resulting interval (or incorporate it into CommonWindowContext)
instead of keeping the whole strategy, and remove any remaining references to
self.strategy (ensure no methods reference it); also update the constructor
signature and any callers/tests to stop passing or expecting the strategy field.

In `@crates/arkflow-plugin/src/mqtt_client.rs`:
- Around line 104-111: The parse_qos function currently maps invalid QoS values
to QoS::AtLeastOnce silently; modify parse_qos to detect when qos is Some(v)
with v not in {0,1,2} and emit a warning (e.g., tracing::warn! or log::warn!)
that includes the invalid numeric value and the chosen fallback before returning
QoS::AtLeastOnce; keep behavior for None and valid values unchanged and
reference the function name parse_qos and the QoS variants QoS::AtMostOnce,
QoS::AtLeastOnce, QoS::ExactlyOnce so reviewers can locate the change.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: bac13be6-7777-4f4c-9ade-30d2b924ffdc

📥 Commits

Reviewing files that changed from the base of the PR and between 63455e8 and a60aa7f.

📒 Files selected for processing (10)
  • crates/arkflow-plugin/src/buffer/common_window.rs
  • crates/arkflow-plugin/src/buffer/mod.rs
  • crates/arkflow-plugin/src/buffer/session_window.rs
  • crates/arkflow-plugin/src/buffer/sliding_window.rs
  • crates/arkflow-plugin/src/buffer/tumbling_window.rs
  • crates/arkflow-plugin/src/buffer/window_strategy.rs
  • crates/arkflow-plugin/src/input/mqtt.rs
  • crates/arkflow-plugin/src/lib.rs
  • crates/arkflow-plugin/src/mqtt_client.rs
  • crates/arkflow-plugin/src/output/mqtt.rs

Comment on lines +124 to +126
pub fn close(&self) {
self.close_token.cancel();
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Wake waiters directly in close() to avoid shutdown stalls when no timer is running.

Current shutdown notification is indirect (via timer task). If start_timer was never called, waiters on notify may not be released.

🔧 Suggested fix
 pub fn close(&self) {
     self.close_token.cancel();
+    self.notify.notify_waiters();
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/arkflow-plugin/src/buffer/common_window.rs` around lines 124 - 126,
The close() method currently only cancels close_token which can leave tasks
awaiting notify blocked if start_timer() was never started; update close() to
both cancel close_token and explicitly wake the notify waiters (e.g., call
notify.notify_waiters() or notify.notify_one() on the same Notify used by
waiting tasks) so that any tasks awaiting the notification are released
immediately; locate the close() function and the Notify instance used by
start_timer()/waiters to add the notify wake after canceling close_token.

Comment thread crates/arkflow-plugin/src/buffer/sliding_window.rs Outdated
Comment on lines +78 to +81
fn check_interval(&self) -> std::time::Duration {
// Check more frequently than window size to ensure timely triggers
std::time::Duration::from_millis(self.window_size.as_millis().min(1000) as u64)
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Verify whether zero-duration windows/slides can be configured anywhere.
rg -nP --type=rust -C2 'Duration::ZERO|from_millis\(\s*0\s*\)|from_micros\(\s*0\s*\)|from_nanos\(\s*0\s*\)'

Repository: arkflow-rs/arkflow

Length of output: 532


🏁 Script executed:

cat -n crates/arkflow-plugin/src/buffer/window_strategy.rs | head -200

Repository: arkflow-rs/arkflow

Length of output: 7902


🏁 Script executed:

rg -n 'TumblingWindowStrategy::new|SlidingWindowStrategy::new|SessionWindowStrategy::new' --type=rust -B2 -A2

Repository: arkflow-rs/arkflow

Length of output: 3094


🏁 Script executed:

rg -n 'struct TumblingWindowConfig|struct SlidingWindowConfig|struct SessionWindowConfig' --type=rust -A5

Repository: arkflow-rs/arkflow

Length of output: 1946


🏁 Script executed:

rg -n 'fn deserialize_duration' --type=rust -A10

Repository: arkflow-rs/arkflow

Length of output: 850


🏁 Script executed:

cat -n crates/arkflow-plugin/src/buffer/tumbling_window.rs | head -100

Repository: arkflow-rs/arkflow

Length of output: 4565


🏁 Script executed:

rg -n 'struct CommonWindowContext|fn start_timer' --type=rust -A15 crates/arkflow-plugin/src/buffer/

Repository: arkflow-rs/arkflow

Length of output: 2851


🏁 Script executed:

rg -n 'tokio::time::sleep' --type=rust -B2 -A2

Repository: arkflow-rs/arkflow

Length of output: 10931


🏁 Script executed:

cat -n crates/arkflow-plugin/src/buffer/common_window.rs | sed -n '64,80p'

Repository: arkflow-rs/arkflow

Length of output: 831


🏁 Script executed:

cat -n crates/arkflow-plugin/src/buffer/window_strategy.rs | sed -n '168,220p'

Repository: arkflow-rs/arkflow

Length of output: 2075


🏁 Script executed:

rg -n 'Duration::ZERO|from_millis(0)|Duration::new(0' --type=rust crates/arkflow-plugin/src/buffer/

Repository: arkflow-rs/arkflow

Length of output: 201


🏁 Script executed:

rg -n 'Duration::ZERO|from_millis\(0\)|Duration::new\(0' --type=rust crates/arkflow-plugin/src/buffer/

Repository: arkflow-rs/arkflow

Length of output: 44


Clamp check_interval to a positive minimum to prevent hot loops.

Zero-duration windows and slide intervals can be configured (humantime allows parsing "0ms"), and as_millis() truncates sub-millisecond durations to 0. When check_interval() returns zero duration, the timer loop in CommonWindowContext::start_timer() spins in a busy loop calling sleep(Duration::ZERO) repeatedly, consuming CPU unnecessarily.

🔧 Suggested fix
+const MIN_CHECK_INTERVAL: std::time::Duration = std::time::Duration::from_millis(1);
+
 impl WindowStrategy for TumblingWindowStrategy {
@@
     fn check_interval(&self) -> std::time::Duration {
         // Check more frequently than window size to ensure timely triggers
-        std::time::Duration::from_millis(self.window_size.as_millis().min(1000) as u64)
+        self.window_size
+            .min(std::time::Duration::from_secs(1))
+            .max(MIN_CHECK_INTERVAL)
     }
@@
 impl WindowStrategy for SlidingWindowStrategy {
@@
     fn check_interval(&self) -> std::time::Duration {
         // Check at slide interval to ensure timely triggers
-        self.slide_interval
+        self.slide_interval.max(MIN_CHECK_INTERVAL)
     }
@@
 impl WindowStrategy for SessionWindowStrategy {
@@
     fn check_interval(&self) -> std::time::Duration {
         // Check frequently to detect session gaps quickly
-        std::time::Duration::from_millis(self.gap_duration.as_millis().min(500) as u64)
+        self.gap_duration
+            .min(std::time::Duration::from_millis(500))
+            .max(MIN_CHECK_INTERVAL)
     }

Also applies to: 121-124, 157-160

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/arkflow-plugin/src/buffer/window_strategy.rs` around lines 78 - 81,
check_interval() can return Duration::ZERO when window_size or slide intervals
truncate to 0ms, causing the timer in CommonWindowContext::start_timer() to
spin; change check_interval (and the equivalent interval-calculating helpers at
the other locations) to clamp the computed milliseconds to a positive minimum
(e.g., use .max(1) on the millisecond value before casting to u64) so it always
returns at least Duration::from_millis(1), ensuring the sleep never receives
Duration::ZERO.

//! Send the processed data to the MQTT broker

use crate::expr::Expr;
use crate::mqtt_client::{create_mqtt_options, parse_qos};
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Unused import: parse_qos is imported but not used.

The parse_qos function is imported but the QoS determination at lines 151-156 still uses inline matching logic instead of the shared helper.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/arkflow-plugin/src/output/mqtt.rs` at line 20, Replace the local
inline QoS matching logic that converts the incoming qos string to a
rumqttc::QoS with a call to the shared helper crate::mqtt_client::parse_qos
(keep the create_mqtt_options import), i.e. remove the local match/if block that
produces QoS and instead set qos = parse_qos(qos_str) (or unwrap/handle its
Result/Option the same way other callers do), so the parse_qos import is
actually used and the duplicated mapping logic is eliminated.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
crates/arkflow-plugin/src/buffer/session_window.rs (1)

110-127: ⚠️ Potential issue | 🔴 Critical

Drain pending messages before returning None on close.

Line 110 treats any closed window as empty, so close() can drop buffered batches instead of emitting them. Also re-check cancellation inside the loop so a reader waiting on an empty queue exits after shutdown.

Proposed fix
-        if self.context.is_closed() {
-            return Ok(None);
-        }
-
         loop {
-            {
-                if !self.base_window.queue_is_empty().await {
-                    let last_time = *self.last_message_time.read().await;
-                    // Check if the session gap has elapsed since the last message
-                    let duration = last_time.elapsed();
-                    if duration >= self.config.gap {
-                        break;
-                    }
-                }
+            if self.base_window.queue_is_empty().await {
+                if self.context.is_closed() {
+                    return Ok(None);
+                }
+            } else {
+                let last_time = *self.last_message_time.read().await;
+                // Check if the session gap has elapsed since the last message
+                let duration = last_time.elapsed();
+                if self.context.is_closed() || duration >= self.config.gap {
+                    break;
+                }
             }
 
             // Wait for notification from timer or write operation
-            self.context.notify.notified().await;
+            tokio::select! {
+                _ = self.context.notify.notified() => {}
+                _ = self.context.close_token.cancelled() => {}
+            }
         }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/arkflow-plugin/src/buffer/session_window.rs` around lines 110 - 127,
The current early-return when self.context.is_closed() causes buffered batches
to be dropped; change the logic in the session window reader (the loop that
checks self.base_window.queue_is_empty().await,
self.last_message_time.read().await and self.config.gap) so that on shutdown you
first drain any pending messages from base_window before returning Ok(None), and
also re-check self.context.is_closed() inside the loop after awaiting
self.context.notify.notified(). In practice modify the initial close check to
only trigger the closed path after confirming base_window.queue_is_empty() (or
draining it via the existing pop/emit code), and add a cancellation check
immediately after the notified().await so readers waiting on an empty queue will
exit promptly when close() is called.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Outside diff comments:
In `@crates/arkflow-plugin/src/buffer/session_window.rs`:
- Around line 110-127: The current early-return when self.context.is_closed()
causes buffered batches to be dropped; change the logic in the session window
reader (the loop that checks self.base_window.queue_is_empty().await,
self.last_message_time.read().await and self.config.gap) so that on shutdown you
first drain any pending messages from base_window before returning Ok(None), and
also re-check self.context.is_closed() inside the loop after awaiting
self.context.notify.notified(). In practice modify the initial close check to
only trigger the closed path after confirming base_window.queue_is_empty() (or
draining it via the existing pop/emit code), and add a cancellation check
immediately after the notified().await so readers waiting on an empty queue will
exit promptly when close() is called.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: f8b83f5b-a2af-4a94-9a55-a0fd5a8ee503

📥 Commits

Reviewing files that changed from the base of the PR and between a60aa7f and edb9fea.

📒 Files selected for processing (3)
  • crates/arkflow-plugin/src/buffer/session_window.rs
  • crates/arkflow-plugin/src/buffer/sliding_window.rs
  • crates/arkflow-plugin/src/buffer/tumbling_window.rs
✅ Files skipped from review due to trivial changes (1)
  • crates/arkflow-plugin/src/buffer/sliding_window.rs
🚧 Files skipped from review as they are similar to previous changes (1)
  • crates/arkflow-plugin/src/buffer/tumbling_window.rs

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
crates/arkflow-plugin/src/buffer/sliding_window.rs (2)

80-98: ⚠️ Potential issue | 🟠 Major

Timer can be indefinitely starved by write-triggered notifications.

The spawned loop recreates sleep(interval) on every iteration, and the _ = notify_clone.notified() branch wakes on every Buffer::write() call (since write() invokes self.notify.notify_waiters()). Under a steady write rate, each write cancels the pending sleep and restarts it from zero, so the timer branch may never fire and windows only emit when queue_lock.len() >= window_size already holds — effectively disabling the time-based flush and defeating the purpose of interval.

Consider using tokio::time::interval (which ticks on a schedule independent of other branches) and dropping the notified() branch in the timer task, or only using notified() as a cancellation signal rather than re-entering the loop.

♻️ Sketch
-        tokio::spawn(async move {
-            loop {
-                let timer = sleep(interval);
-                tokio::select! {
-                    _ = timer => {
-                        notify_clone.notify_waiters();
-                    }
-                    _ = close_clone.cancelled() => {
-                        notify_clone.notify_waiters();
-                        break;
-                    }
-                    _ = notify_clone.notified() => {
-                        if close_clone.is_cancelled(){
-                            break;
-                        }
-                    }
-                }
-            }
-        });
+        tokio::spawn(async move {
+            let mut ticker = tokio::time::interval(interval);
+            ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
+            ticker.tick().await; // consume immediate first tick
+            loop {
+                tokio::select! {
+                    _ = ticker.tick() => {
+                        notify_clone.notify_waiters();
+                    }
+                    _ = close_clone.cancelled() => {
+                        notify_clone.notify_waiters();
+                        break;
+                    }
+                }
+            }
+        });
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/arkflow-plugin/src/buffer/sliding_window.rs` around lines 80 - 98, The
timer task can be starved because each notify_clone.notified() wake restarts
sleep(interval); replace the recreate-sleep pattern with a stable ticker and
remove the write-triggered notified() branch: create a tokio::time::interval
(e.g. let mut ticker = tokio::time::interval(interval)) in the tokio::spawn
closure and use ticker.tick() in the select, keeping only ticker.tick() to call
notify_clone.notify_waiters() and close_clone.cancelled() to break; remove or do
not re-enter the notify_clone.notified() branch so Buffer::write() notifications
do not reset the timer.

187-209: ⚠️ Potential issue | 🟠 Major

Lost wakeup race between queue check and notified() registration.

The read() function has a race condition: if notify_waiters() fires between dropping the queue read lock (line 203) and calling notified().await (line 205), the notification is lost and the reader blocks indefinitely even with sufficient buffered messages.

The fix is to create the Notified future and call enable() before inspecting the queue:

♻️ Proposed fix
         loop {
             if self.close.is_cancelled() {
                 return Ok(None);
             }
-            {
-                let queue_lock = self.queue.read().await;
-                // If there are enough messages to form a window, break the loop and process them
-                if queue_lock.len() >= self.config.window_size as usize {
-                    break;
-                }
-                // If the buffer is closed, return None
-            }
-            // Wait for notification from timer, write operation, or close
-            self.notify.notified().await;
+            let notified = self.notify.notified();
+            tokio::pin!(notified);
+            notified.as_mut().enable();
+            {
+                let queue_lock = self.queue.read().await;
+                if queue_lock.len() >= self.config.window_size as usize {
+                    break;
+                }
+            }
+            notified.await;
         }

This ensures the waker is registered before any queue inspection, closing the gap where notifications could be lost.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/arkflow-plugin/src/buffer/sliding_window.rs` around lines 187 - 209,
The read() method has a lost-wakeup race between dropping the queue read lock
and awaiting self.notify.notified(); fix it by creating the Notified future
before inspecting the queue (e.g., let notified = self.notify.notified()), call
notified.enable() to register the waker, then acquire/read self.queue
(queue.read().await) and check queue_lock.len() against self.config.window_size;
if there are not enough messages, drop the lock and await notified.await,
otherwise proceed to process_slide().await; update references in read() to use
the created notified variable instead of calling self.notify.notified().await
directly.
🧹 Nitpick comments (1)
crates/arkflow-plugin/src/buffer/sliding_window.rs (1)

215-223: Holding the read lock across notify_waiters() is unnecessary.

Minor: the read guard is kept alive through self.notify.notify_waiters(). Not a correctness bug (readers take a read lock too, so they won't be blocked), but dropping the guard before notifying matches the pattern you just established in write() (line 175) and avoids surprises if the lock type changes.

-        let queue_lock = self.queue.read().await;
-        if !queue_lock.is_empty() {
-            // Notify any waiting readers to process remaining messages
-            self.notify.notify_waiters();
-        }
+        let non_empty = !self.queue.read().await.is_empty();
+        if non_empty {
+            self.notify.notify_waiters();
+        }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/arkflow-plugin/src/buffer/sliding_window.rs` around lines 215 - 223,
In flush(), don't hold the read guard across the notify call: after acquiring
queue_lock via self.queue.read().await check is_empty(), then explicitly drop
the guard (the queue_lock variable) before calling self.notify.notify_waiters();
mirror the pattern used in write() so the read lock is released prior to
notifying readers.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Outside diff comments:
In `@crates/arkflow-plugin/src/buffer/sliding_window.rs`:
- Around line 80-98: The timer task can be starved because each
notify_clone.notified() wake restarts sleep(interval); replace the
recreate-sleep pattern with a stable ticker and remove the write-triggered
notified() branch: create a tokio::time::interval (e.g. let mut ticker =
tokio::time::interval(interval)) in the tokio::spawn closure and use
ticker.tick() in the select, keeping only ticker.tick() to call
notify_clone.notify_waiters() and close_clone.cancelled() to break; remove or do
not re-enter the notify_clone.notified() branch so Buffer::write() notifications
do not reset the timer.
- Around line 187-209: The read() method has a lost-wakeup race between dropping
the queue read lock and awaiting self.notify.notified(); fix it by creating the
Notified future before inspecting the queue (e.g., let notified =
self.notify.notified()), call notified.enable() to register the waker, then
acquire/read self.queue (queue.read().await) and check queue_lock.len() against
self.config.window_size; if there are not enough messages, drop the lock and
await notified.await, otherwise proceed to process_slide().await; update
references in read() to use the created notified variable instead of calling
self.notify.notified().await directly.

---

Nitpick comments:
In `@crates/arkflow-plugin/src/buffer/sliding_window.rs`:
- Around line 215-223: In flush(), don't hold the read guard across the notify
call: after acquiring queue_lock via self.queue.read().await check is_empty(),
then explicitly drop the guard (the queue_lock variable) before calling
self.notify.notify_waiters(); mirror the pattern used in write() so the read
lock is released prior to notifying readers.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: f78eb684-9a7f-4639-a980-bfca6bef28c8

📥 Commits

Reviewing files that changed from the base of the PR and between edb9fea and 080a022.

📒 Files selected for processing (1)
  • crates/arkflow-plugin/src/buffer/sliding_window.rs

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant