Skip to content

Commit 33b1654

Browse files
committed
route origin plugin streams through native dispatch
1 parent b3e1cae commit 33b1654

2 files changed

Lines changed: 372 additions & 13 deletions

File tree

garyx/src/channel_plugin_host.rs

Lines changed: 266 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,10 @@ use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
3030

3131
use async_trait::async_trait;
3232
use garyx_bridge::MultiProviderBridge;
33-
use garyx_channels::dispatcher::{ChannelDispatcher, OutboundMessage, SwappableDispatcher};
33+
use garyx_channels::dispatcher::{
34+
ChannelDispatcher, OutboundMessage, StreamDispatchRole, StreamingDispatchTarget,
35+
SwappableDispatcher, build_stream_dispatch_callback,
36+
};
3437
use garyx_channels::plugin::ChannelPluginManager;
3538
use garyx_channels::plugin_host::{
3639
AccountDescriptor, AttachmentRef, BackfillOutcome, HostContext, InboundHandler,
@@ -367,17 +370,6 @@ impl HostInboundHandler {
367370

368371
let thread_holder: Arc<StdMutex<Option<String>>> = Arc::new(StdMutex::new(None));
369372

370-
let response_callback = build_response_callback(StreamCallbackCtx {
371-
plugin_id: self.plugin_id.clone(),
372-
account_id: parsed.account_id.clone(),
373-
chat_id: parsed.thread_binding_key.clone(),
374-
stream_id: stream_id.clone(),
375-
swap: self.swap.clone(),
376-
streams: self.streams.clone(),
377-
thread_holder: thread_holder.clone(),
378-
live_streams: self.live_streams.clone(),
379-
});
380-
381373
// Ensure a stable, non-empty run id so the committed-stream replay
382374
// adapter can filter this run's records (plugins may omit run_id). The
383375
// subprocess protocol frames carry the local stream_id/seq, not run_id,
@@ -411,6 +403,38 @@ impl HostInboundHandler {
411403
&parsed.account_id,
412404
&parsed.thread_binding_key,
413405
);
406+
let origin_native_stream = self
407+
.swap
408+
.plugin_sender(&self.plugin_id)
409+
.filter(|sender| sender.capabilities().dispatch_stream_event)
410+
.map(|_| {
411+
DeferredOriginNativeStream::new(DeferredOriginNativeStreamCtx {
412+
plugin_id: self.plugin_id.clone(),
413+
account_id: parsed.account_id.clone(),
414+
chat_id: parsed.thread_binding_key.clone(),
415+
stream_id: stream_id.clone(),
416+
run_id: run_id.clone(),
417+
endpoint_identity: origin_endpoint_identity.clone(),
418+
router: self.router.clone(),
419+
dispatcher: self.swap.clone(),
420+
streams: self.streams.clone(),
421+
live_streams: self.live_streams.clone(),
422+
})
423+
});
424+
let response_callback = if let Some(origin_native_stream) = origin_native_stream.as_ref() {
425+
origin_native_stream.consumer()
426+
} else {
427+
build_response_callback(StreamCallbackCtx {
428+
plugin_id: self.plugin_id.clone(),
429+
account_id: parsed.account_id.clone(),
430+
chat_id: parsed.thread_binding_key.clone(),
431+
stream_id: stream_id.clone(),
432+
swap: self.swap.clone(),
433+
streams: self.streams.clone(),
434+
thread_holder: thread_holder.clone(),
435+
live_streams: self.live_streams.clone(),
436+
})
437+
};
414438
let deferred_fanout = garyx_channels::bound_fanout::DeferredBoundStreamFanout::new(
415439
self.router.clone(),
416440
self.swap.clone(),
@@ -468,16 +492,25 @@ impl HostInboundHandler {
468492
Ok(result) => result,
469493
Err(err) => {
470494
replay_subscription.abort();
495+
if let Some(origin_native_stream) = origin_native_stream.as_ref() {
496+
origin_native_stream.finish_without_stream();
497+
}
471498
return Err((PluginErrorCode::InternalError.as_i32(), err));
472499
}
473500
};
474501

475502
if let Ok(mut holder) = thread_holder.lock() {
476503
*holder = Some(result.thread_id.clone());
477504
}
505+
if let Some(origin_native_stream) = origin_native_stream.as_ref() {
506+
origin_native_stream.attach_thread(&result.thread_id).await;
507+
}
478508
deferred_fanout.attach_thread(&result.thread_id).await;
479509
if result.local_reply.is_some() {
480510
replay_subscription.abort();
511+
if let Some(origin_native_stream) = origin_native_stream.as_ref() {
512+
origin_native_stream.finish_without_stream();
513+
}
481514
} else {
482515
replay_subscription.detach();
483516
}
@@ -573,6 +606,227 @@ struct StreamCallbackCtx {
573606
live_streams: Arc<StdMutex<HashSet<String>>>,
574607
}
575608

609+
struct DeferredOriginNativeStreamCtx {
610+
plugin_id: String,
611+
account_id: String,
612+
chat_id: String,
613+
stream_id: String,
614+
run_id: String,
615+
endpoint_identity: String,
616+
router: Arc<Mutex<MessageRouter>>,
617+
dispatcher: Arc<dyn ChannelDispatcher>,
618+
streams: Arc<StreamRegistry>,
619+
live_streams: Arc<StdMutex<HashSet<String>>>,
620+
}
621+
622+
type StreamCallback = Arc<dyn Fn(StreamEvent) + Send + Sync>;
623+
624+
#[derive(Default)]
625+
struct DeferredOriginNativeStreamState {
626+
attached: bool,
627+
closed: bool,
628+
callback: Option<StreamCallback>,
629+
buffered: Vec<StreamEvent>,
630+
}
631+
632+
struct DeferredOriginNativeStreamInner {
633+
plugin_id: String,
634+
account_id: String,
635+
chat_id: String,
636+
stream_id: String,
637+
run_id: String,
638+
endpoint_identity: String,
639+
router: Arc<Mutex<MessageRouter>>,
640+
dispatcher: Arc<dyn ChannelDispatcher>,
641+
streams: Arc<StreamRegistry>,
642+
typed_stream_id: StreamId,
643+
live_streams: Arc<StdMutex<HashSet<String>>>,
644+
state: StdMutex<DeferredOriginNativeStreamState>,
645+
}
646+
647+
#[derive(Clone)]
648+
struct DeferredOriginNativeStream {
649+
inner: Arc<DeferredOriginNativeStreamInner>,
650+
}
651+
652+
impl DeferredOriginNativeStream {
653+
fn new(ctx: DeferredOriginNativeStreamCtx) -> Self {
654+
Self {
655+
inner: Arc::new(DeferredOriginNativeStreamInner {
656+
typed_stream_id: StreamId::from(ctx.stream_id.as_str()),
657+
plugin_id: ctx.plugin_id,
658+
account_id: ctx.account_id,
659+
chat_id: ctx.chat_id,
660+
stream_id: ctx.stream_id,
661+
run_id: ctx.run_id,
662+
endpoint_identity: ctx.endpoint_identity,
663+
router: ctx.router,
664+
dispatcher: ctx.dispatcher,
665+
streams: ctx.streams,
666+
live_streams: ctx.live_streams,
667+
state: StdMutex::new(DeferredOriginNativeStreamState::default()),
668+
}),
669+
}
670+
}
671+
672+
fn consumer(&self) -> StreamCallback {
673+
let inner = self.inner.clone();
674+
Arc::new(move |event| {
675+
inner.dispatch_or_buffer(event);
676+
})
677+
}
678+
679+
async fn attach_thread(&self, thread_id: &str) {
680+
self.inner.attach_thread(thread_id).await;
681+
}
682+
683+
fn finish_without_stream(&self) {
684+
self.inner.close_stream();
685+
}
686+
}
687+
688+
impl DeferredOriginNativeStreamInner {
689+
fn dispatch_or_buffer(&self, event: StreamEvent) {
690+
if self.streams.is_tombstoned(&self.typed_stream_id) {
691+
if matches!(event, StreamEvent::Done) {
692+
self.close_stream();
693+
}
694+
return;
695+
}
696+
697+
let callback = {
698+
let mut state = match self.state.lock() {
699+
Ok(state) => state,
700+
Err(_) => {
701+
warn!("deferred origin stream state lock poisoned");
702+
return;
703+
}
704+
};
705+
if state.closed {
706+
return;
707+
}
708+
if !state.attached {
709+
state.buffered.push(event);
710+
return;
711+
}
712+
state.callback.clone()
713+
};
714+
715+
self.dispatch_attached_event(callback.as_ref(), event);
716+
}
717+
718+
async fn attach_thread(&self, thread_id: &str) {
719+
let already_attached_or_closed = match self.state.lock() {
720+
Ok(state) => state.attached || state.closed,
721+
Err(_) => {
722+
warn!("deferred origin stream state lock poisoned");
723+
return;
724+
}
725+
};
726+
if already_attached_or_closed {
727+
return;
728+
}
729+
730+
let target = StreamingDispatchTarget {
731+
target_thread_id: thread_id.to_owned(),
732+
endpoint_identity: self.endpoint_identity.clone(),
733+
run_id: self.run_id.clone(),
734+
channel: self.plugin_id.clone(),
735+
account_id: self.account_id.clone(),
736+
chat_id: self.chat_id.clone(),
737+
delivery_target_type: "chat_id".to_owned(),
738+
delivery_target_id: self.chat_id.clone(),
739+
thread_id: None,
740+
};
741+
let callback = build_stream_dispatch_callback(
742+
self.dispatcher.clone(),
743+
target,
744+
self.router.clone(),
745+
StreamDispatchRole::Origin,
746+
);
747+
if callback.is_some() {
748+
info!(
749+
plugin_id = %self.plugin_id,
750+
stream_id = %self.stream_id,
751+
run_id = %self.run_id,
752+
thread_id = %thread_id,
753+
endpoint_identity = %self.endpoint_identity,
754+
"using native plugin origin stream dispatch_stream_event"
755+
);
756+
} else {
757+
warn!(
758+
plugin_id = %self.plugin_id,
759+
stream_id = %self.stream_id,
760+
run_id = %self.run_id,
761+
thread_id = %thread_id,
762+
endpoint_identity = %self.endpoint_identity,
763+
"plugin advertised native stream dispatch but no dispatch_stream_event callback was available"
764+
);
765+
}
766+
767+
loop {
768+
let buffered = {
769+
let mut state = match self.state.lock() {
770+
Ok(state) => state,
771+
Err(_) => {
772+
warn!("deferred origin stream state lock poisoned");
773+
return;
774+
}
775+
};
776+
if state.attached || state.closed {
777+
return;
778+
}
779+
if state.buffered.is_empty() {
780+
state.callback = callback.clone();
781+
state.attached = true;
782+
return;
783+
}
784+
std::mem::take(&mut state.buffered)
785+
};
786+
787+
for event in buffered {
788+
self.dispatch_attached_event(callback.as_ref(), event);
789+
}
790+
}
791+
}
792+
793+
fn dispatch_attached_event(&self, callback: Option<&StreamCallback>, event: StreamEvent) {
794+
if self.streams.is_tombstoned(&self.typed_stream_id) {
795+
if matches!(event, StreamEvent::Done) {
796+
self.close_stream();
797+
}
798+
return;
799+
}
800+
801+
if let Some(callback) = callback {
802+
callback(event.clone());
803+
}
804+
805+
if matches!(event, StreamEvent::Done) {
806+
info!(
807+
plugin_id = %self.plugin_id,
808+
stream_id = %self.stream_id,
809+
run_id = %self.run_id,
810+
"emitted plugin origin stream done via dispatch_stream_event"
811+
);
812+
self.close_stream();
813+
}
814+
}
815+
816+
fn close_stream(&self) {
817+
if let Ok(mut state) = self.state.lock() {
818+
if state.closed {
819+
return;
820+
}
821+
state.closed = true;
822+
state.buffered.clear();
823+
}
824+
if let Ok(mut guard) = self.live_streams.lock() {
825+
guard.remove(&self.stream_id);
826+
}
827+
}
828+
}
829+
576830
/// Build the stream callback that does TWO things on every agent
577831
/// event:
578832
///

0 commit comments

Comments
 (0)