Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/doc-deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:
version: 8.10.5
- uses: actions/setup-node@v4
with:
node-version: '20'
node-version: '22'
cache: 'pnpm'
cache-dependency-path: ./docs/pnpm-lock.yaml

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test-doc-deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
version: 8.10.5
- uses: actions/setup-node@v4
with:
node-version: '20'
node-version: '22'
cache: 'pnpm'
cache-dependency-path: ./docs/pnpm-lock.yaml

Expand Down
193 changes: 193 additions & 0 deletions crates/arkflow-plugin/src/buffer/common_window.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

//! Common window infrastructure
//!
//! This module provides shared utilities for window implementations
//! to reduce code duplication across different window types.

use crate::buffer::join::JoinConfig;
use crate::buffer::window::BaseWindow;
use arkflow_core::{Error, Resource};
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::Notify;
use tokio::time::sleep;
use tokio_util::sync::CancellationToken;

/// Common window context
///
/// Provides shared infrastructure for window implementations including
/// timer management, notification, and cancellation handling.
pub struct CommonWindowContext {
/// Notification mechanism for signaling between threads
pub notify: Arc<Notify>,
/// Token for cancellation of background tasks
pub close_token: CancellationToken,
/// Last time the window was triggered
pub last_trigger: Arc<std::sync::RwLock<Instant>>,
}

impl CommonWindowContext {
/// Create a new common window context
///
/// # Returns
///
/// A new CommonWindowContext instance
pub fn new() -> Self {
Self {
notify: Arc::new(Notify::new()),
close_token: CancellationToken::new(),
last_trigger: Arc::new(std::sync::RwLock::new(Instant::now())),
}
}

/// Start the background timer task
///
/// This spawns a background task that periodically notifies waiters
/// based on the check interval. The task runs until cancelled.
///
/// # Arguments
///
/// * `check_interval` - How often to check if window should trigger
pub fn start_timer(&self, check_interval: std::time::Duration) {
let notify = Arc::clone(&self.notify);
let close = self.close_token.clone();

tokio::spawn(async move {
loop {
let timer = sleep(check_interval);
tokio::select! {
_ = timer => {
notify.notify_waiters();
}
_ = close.cancelled() => {
notify.notify_waiters();
break;
}
_ = notify.notified() => {
if close.is_cancelled(){
break;
}
}
}
}
});
}

/// Update the last trigger time
///
/// # Arguments
///
/// * `time` - The new last trigger time
pub fn update_last_trigger(&self, time: Instant) {
if let Ok(mut last) = self.last_trigger.write() {
*last = time;
}
}

/// Get the last trigger time
///
/// # Returns
///
/// The last trigger time
pub fn get_last_trigger(&self) -> Instant {
self.last_trigger
.read()
.map(|t| *t)
.unwrap_or_else(|_| Instant::now())
}

/// Check if the window is closed
///
/// # Returns
///
/// * `bool` - true if closed, false otherwise
pub fn is_closed(&self) -> bool {
self.close_token.is_cancelled()
}

/// Close the window context
///
/// Cancels the background timer task
pub fn close(&self) {
self.close_token.cancel();
}
Comment on lines +124 to +126
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Wake waiters directly in close() to avoid shutdown stalls when no timer is running.

Current shutdown notification is indirect (via timer task). If start_timer was never called, waiters on notify may not be released.

🔧 Suggested fix
 pub fn close(&self) {
     self.close_token.cancel();
+    self.notify.notify_waiters();
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/arkflow-plugin/src/buffer/common_window.rs` around lines 124 - 126,
The close() method currently only cancels close_token which can leave tasks
awaiting notify blocked if start_timer() was never started; update close() to
both cancel close_token and explicitly wake the notify waiters (e.g., call
notify.notify_waiters() or notify.notify_one() on the same Notify used by
waiting tasks) so that any tasks awaiting the notification are released
immediately; locate the close() function and the Notify instance used by
start_timer()/waiters to add the notify wake after canceling close_token.


/// Create a BaseWindow with join support
///
/// # Arguments
///
/// * `join_config` - Optional join configuration
/// * `gap` - Time interval for the timer
/// * `resource` - Resource reference
///
/// # Returns
///
/// A BaseWindow instance or an error
pub fn create_base_window(
&self,
join_config: Option<JoinConfig>,
gap: std::time::Duration,
resource: &Resource,
) -> Result<BaseWindow, Error> {
BaseWindow::new(
join_config,
Arc::clone(&self.notify),
self.close_token.clone(),
gap,
resource,
)
}
}

impl Default for CommonWindowContext {
fn default() -> Self {
Self::new()
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_common_window_context_creation() {
let ctx = CommonWindowContext::new();
assert!(!ctx.is_closed());
assert!(ctx.get_last_trigger() <= Instant::now());
}

#[test]
fn test_common_window_context_close() {
let ctx = CommonWindowContext::new();
assert!(!ctx.is_closed());
ctx.close();
assert!(ctx.is_closed());
}

#[test]
fn test_common_window_context_update_trigger() {
let ctx = CommonWindowContext::new();
let now = Instant::now();
ctx.update_last_trigger(now);
assert!(ctx.get_last_trigger() >= now);
}

#[test]
fn test_common_window_context_default() {
let ctx = CommonWindowContext::default();
assert!(!ctx.is_closed());
}
}
2 changes: 2 additions & 0 deletions crates/arkflow-plugin/src/buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
pub mod common_window;
mod join;
pub mod memory;
pub mod session_window;
pub mod sliding_window;
pub mod tumbling_window;
pub(crate) mod window;
pub mod window_strategy;

use arkflow_core::Error;

Expand Down
39 changes: 14 additions & 25 deletions crates/arkflow-plugin/src/buffer/session_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
//! if they arrive within the gap duration of each other. When the gap duration elapses
//! without new messages, the session is closed and all accumulated messages are emitted.

use crate::buffer::common_window::CommonWindowContext;
use crate::buffer::join::JoinConfig;
use crate::buffer::window::BaseWindow;
use crate::time::deserialize_duration;
Expand All @@ -30,9 +31,8 @@ use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::sync::Arc;
use std::time;
use tokio::sync::{Notify, RwLock};
use tokio::sync::RwLock;
use tokio::time::Instant;
use tokio_util::sync::CancellationToken;

/// Configuration for the session window buffer
#[derive(Debug, Clone, Serialize, Deserialize)]
Expand All @@ -52,10 +52,8 @@ struct SessionWindow {
/// Configuration parameters for the session window
config: SessionWindowConfig,
base_window: BaseWindow,
/// Notification mechanism for signaling between threads
notify: Arc<Notify>,
/// Token for cancellation of background tasks
close: CancellationToken,
/// Common window context for timer and notification management
context: CommonWindowContext,
/// Timestamp of the last received message, used to determine session boundaries
last_message_time: Arc<RwLock<Instant>>,
}
Expand All @@ -69,26 +67,16 @@ impl SessionWindow {
/// # Returns
/// * `Result<Self, Error>` - A new session window instance or an error
fn new(config: SessionWindowConfig, resource: &Resource) -> Result<Self, Error> {
let notify = Arc::new(Notify::new());
let notify_clone = Arc::clone(&notify);
let gap = config.gap;
let close = CancellationToken::new();
let close_clone = close.clone();
let last_message_time = Arc::new(RwLock::new(Instant::now()));
let base_window = BaseWindow::new(
config.join.clone(),
notify_clone,
close_clone,
gap,
resource,
)?;
let context = CommonWindowContext::new();

// BaseWindow already starts a background timer, no need to start another one here
let base_window = context.create_base_window(config.join.clone(), config.gap, resource)?;

Ok(Self {
close,
notify,
config,
base_window,
last_message_time,
context,
last_message_time: Arc::new(RwLock::new(Instant::now())),
})
}
}
Expand All @@ -107,6 +95,8 @@ impl Buffer for SessionWindow {
self.base_window.write(msg, ack).await?;
// Update the last message timestamp to track session activity
*self.last_message_time.write().await = Instant::now();
// Notify waiting readers that a new message has arrived
self.context.notify.notify_waiters();
Ok(())
}

Expand All @@ -117,7 +107,7 @@ impl Buffer for SessionWindow {
/// * `Result<Option<(MessageBatchRef, Arc<dyn Ack>)>, Error>` - The merged message batch and combined acknowledgment,
/// or None if the buffer is closed and empty
async fn read(&self) -> Result<Option<(MessageBatchRef, Arc<dyn Ack>)>, Error> {
if self.close.is_cancelled() {
if self.context.is_closed() {
return Ok(None);
}

Expand All @@ -134,8 +124,7 @@ impl Buffer for SessionWindow {
}

// Wait for notification from timer or write operation
let notify = Arc::clone(&self.notify);
notify.notified().await;
self.context.notify.notified().await;
}
// Process and return the current session
self.base_window.process_window().await
Expand Down
23 changes: 13 additions & 10 deletions crates/arkflow-plugin/src/buffer/sliding_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ impl SlidingWindow {
let close = CancellationToken::new();
let close_clone = close.clone();

// SlidingWindow needs its own timer since it doesn't use BaseWindow
tokio::spawn(async move {
loop {
let timer = sleep(interval);
Expand All @@ -97,10 +98,10 @@ impl SlidingWindow {
});

Ok(Self {
close,
notify,
config,
queue: Arc::new(Default::default()),
notify,
close,
})
}

Expand Down Expand Up @@ -171,6 +172,9 @@ impl Buffer for SlidingWindow {
async fn write(&self, msg: MessageBatchRef, ack: Arc<dyn Ack>) -> Result<(), Error> {
let mut queue_lock = self.queue.write().await;
queue_lock.push_back((msg, ack));
drop(queue_lock);
// Notify waiting readers that a new message has arrived
self.notify.notify_waiters();
Ok(())
}

Expand All @@ -186,18 +190,19 @@ impl Buffer for SlidingWindow {
}

loop {
if self.close.is_cancelled() {
return Ok(None);
}
{
let queue_arc = Arc::clone(&self.queue);
let queue_lock = queue_arc.read().await;
let queue_lock = self.queue.read().await;
// If there are enough messages to form a window, break the loop and process them
if queue_lock.len() >= self.config.window_size as usize {
break;
}
// If the buffer is closed, return None
}
// Wait for notification from timer, write operation, or close
let notify = Arc::clone(&self.notify);
notify.notified().await;
self.notify.notified().await;
}
// Process the current window and slide forward
self.process_slide().await
Expand All @@ -209,12 +214,10 @@ impl Buffer for SlidingWindow {
/// * `Result<(), Error>` - Success or an error
async fn flush(&self) -> Result<(), Error> {
self.close.cancel();
let queue_arc = Arc::clone(&self.queue);
let queue_lock = queue_arc.read().await;
let queue_lock = self.queue.read().await;
if !queue_lock.is_empty() {
// Notify any waiting readers to process remaining messages
let notify = Arc::clone(&self.notify);
notify.notify_waiters();
self.notify.notify_waiters();
}
Ok(())
}
Expand Down
Loading
Loading