Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
de76583
this is ok except for websocket server hella ugly
enisdenjo Apr 2, 2026
9b787a6
of course I didnt run tests
enisdenjo Apr 2, 2026
6d64f8e
dedupe what can be deduped
enisdenjo Apr 2, 2026
00dbae2
haha
enisdenjo Apr 2, 2026
8377ac6
broadcast capacity
enisdenjo Apr 2, 2026
d1e3160
haha
enisdenjo Apr 2, 2026
85e528d
he he
enisdenjo Apr 2, 2026
ced2195
format nad long lived client limit
enisdenjo Apr 2, 2026
4e732d3
of course race condition
enisdenjo Apr 3, 2026
6f8e441
ok attempt to clean this up but lets rebase first
enisdenjo Apr 3, 2026
2622dc8
ok
enisdenjo Apr 3, 2026
efccf7c
remove unused
enisdenjo Apr 3, 2026
7403f60
begin super unification the great joiner
enisdenjo Apr 3, 2026
4a44797
switch to ulid
enisdenjo Apr 3, 2026
d2192df
the great enforcer 2
enisdenjo Apr 3, 2026
6923ad4
excuse me please work its still great
enisdenjo Apr 3, 2026
0ff7156
close the gap
enisdenjo Apr 3, 2026
c898f7d
WIP
enisdenjo Apr 3, 2026
7cbbc15
now we're talking, the great divide
enisdenjo Apr 3, 2026
4fbaa84
lets talk naming
enisdenjo Apr 3, 2026
b428344
thats right keep that thang simple
enisdenjo Apr 3, 2026
37f4a28
promotion
enisdenjo Apr 3, 2026
6c2d847
WS server dedupe start hehe
enisdenjo Apr 3, 2026
7e684a5
ok this looks good right
enisdenjo Apr 3, 2026
c5764b3
lol of course
enisdenjo Apr 3, 2026
436a276
right, needs a spawn
enisdenjo Apr 3, 2026
50734f0
WS dedupe hehe
enisdenjo Apr 3, 2026
ec9f98a
big man thing across transports
enisdenjo Apr 3, 2026
91ff2dd
dedupe across boundaries fixed
enisdenjo Apr 4, 2026
12b97a5
leave a comment
enisdenjo Apr 4, 2026
843bbd7
sure clippy
enisdenjo Apr 4, 2026
3278e05
docs: update documentation
theguild-bot Apr 4, 2026
6b0cea0
chill out clippy
enisdenjo Apr 4, 2026
30260fd
long lived limit wrapping
enisdenjo Apr 4, 2026
36043b2
revert inflight map, testing
enisdenjo Apr 4, 2026
e5509db
no clone guard
enisdenjo Apr 4, 2026
383b356
with or without guard
enisdenjo Apr 4, 2026
0fa4d1e
Revert "with or without guard"
enisdenjo Apr 4, 2026
0329884
`_` drop immed `_guard` bind and drop at end
enisdenjo Apr 4, 2026
91e7938
Reapply "with or without guard"
enisdenjo Apr 4, 2026
7e8c6ae
👻
enisdenjo Apr 4, 2026
8314410
chill out with redundancy
enisdenjo Apr 4, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 15 additions & 4 deletions bin/router/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ use crate::{
},
jwt::JwtAuthRuntime,
pipeline::{
active_subscriptions::ActiveSubscriptions,
error::handle_pipeline_error,
graphql_request_handler,
header::ResponseMode,
http_callback::handler,
long_lived_client_limit::LongLivedClientLimitService,
request_extensions::{
read_graphql_operation_metric_identity, read_graphql_response_metric_status,
read_request_body_size, write_graphql_response_metric_status,
Expand Down Expand Up @@ -212,7 +214,7 @@ pub async fn router_entrypoint(plugin_registry: PluginRegistry) -> Result<(), Ro
.await?;

let shared_state_clone = shared_state.clone();
let active_subs = schema_state.active_callback_subscriptions.clone();
let callback_subscriptions_for_handler = schema_state.callback_subscriptions.clone();

// when `listen` is set, the callback route lives on a dedicated server bound to that address
// otherwise, the callback route is mounted on the main server on the `callback_path`
Expand All @@ -224,12 +226,12 @@ pub async fn router_entrypoint(plugin_registry: PluginRegistry) -> Result<(), Ro
}) => {
let cb_path = path.to_string();
let cb_addr = listen.to_string();
let cb_active_subs = active_subs.clone();
let cb_subs = callback_subscriptions_for_handler.clone();
let cb_server = web::HttpServer::new(async move || {
let cb_active_subs = cb_active_subs.clone();
let cb_subs = cb_subs.clone();
let cb_path = cb_path.clone();
web::App::new()
.state(cb_active_subs)
.state(cb_subs)
.configure(move |m| add_callback_handler(m, &cb_path))
})
.bind(&cb_addr)
Expand All @@ -248,10 +250,15 @@ pub async fn router_entrypoint(plugin_registry: PluginRegistry) -> Result<(), Ro
let paths = RouterPaths::new(graphql_path.clone(), websocket_path, callback_path);
paths.detect_conflicts(&prometheus)?;

let long_lived_client_limit_service =
LongLivedClientLimitService::new(&shared_state.router_config);

let maybe_error = web::HttpServer::new(async move || {
let landing_page_path = graphql_path.clone();
let prometheus = prometheus.clone();
let long_lived_client_limit_service = long_lived_client_limit_service.clone();
web::App::new()
.middleware(long_lived_client_limit_service)
.middleware(PluginService)
.state(shared_state.clone())
.state(schema_state.clone())
Expand Down Expand Up @@ -310,6 +317,8 @@ pub async fn configure_app_from_config(
};
let plugins_arc = plugin_registry.initialize_plugins(&router_config, bg_tasks_manager)?;

let active_subscriptions =
ActiveSubscriptions::new(router_config.subscriptions.broadcast_capacity);
let router_config_arc = Arc::new(router_config);
let telemetry_context_arc = Arc::new(telemetry_context);
let cache_state = Arc::new(CacheState::new());
Expand All @@ -324,6 +333,7 @@ pub async fn configure_app_from_config(
router_config_arc.clone(),
plugins_arc.clone(),
cache_state.clone(),
active_subscriptions.clone(),
)
.await?;
let schema_state_arc = Arc::new(schema_state);
Expand Down Expand Up @@ -351,6 +361,7 @@ pub async fn configure_app_from_config(
telemetry_context_arc,
plugins_arc,
cache_state,
active_subscriptions.clone(),
)?);

Ok((shared_state, schema_state_arc))
Expand Down
98 changes: 98 additions & 0 deletions bin/router/src/pipeline/active_subscriptions.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
use std::sync::Arc;

use bytes::Bytes;
use dashmap::DashMap;
use hive_router_plan_executor::response::graphql_error::GraphQLError;
use tokio::sync::broadcast;
use tracing::trace;
use ulid::Ulid;

use crate::shared_state::SharedRouterResponseGuard;

pub type SubscriptionId = String;

#[derive(Clone, Debug)]
pub enum SubscriptionEvent {
/// A normal subscription event from the upstream, already serialized.
/// Uses Bytes for zero-copy cloning across broadcast receivers.
Raw(Bytes),
/// An error pushed externally (e.g. supergraph reload, shutdown).
/// Consumers should yield this as the final event and then stop.
Error(Vec<GraphQLError>),
}

#[derive(Clone)]
pub struct ActiveSubscriptions {
map: Arc<DashMap<SubscriptionId, broadcast::Sender<SubscriptionEvent>>>,
broadcast_capacity: usize,
}

impl ActiveSubscriptions {
pub fn new(broadcast_capacity: usize) -> Self {
Self {
map: Arc::new(DashMap::new()),
broadcast_capacity,
}
}

/// Register a new active subscription. Returns a producer handle for the upstream pump
/// and a pre-subscribed receiver for the leader consumer. The pump task owns the handle
/// for the full lifetime of the upstream stream - when the handle drops (pump done or all
/// receivers gone) the broadcast channel closes and all consumer receivers terminate.
pub fn register(
&self,
guard: Option<SharedRouterResponseGuard>,
) -> (ProducerHandle, broadcast::Receiver<SubscriptionEvent>) {
let (sender, receiver) = broadcast::channel(self.broadcast_capacity);
let id = Ulid::new().to_string();
self.map.insert(id.clone(), sender.clone());

let handle = ProducerHandle {
id: id.clone(),
map: self.map.clone(),
sender,
_guard: guard,
};

trace!(subscription_id = %id, "registered new subscription");

(handle, receiver)
}

/// Close all active subscriptions with an error and clear the registry.
pub fn close_all_with_error(&self, errors: Vec<GraphQLError>) {
let item = SubscriptionEvent::Error(errors);
for entry in self.map.iter() {
let _ = entry.send(item.clone());
}
self.map.clear();
}
}

/// Held by the upstream pump task for the full lifetime of the stream. Dropping it removes
/// the subscription from the registry, closes the broadcast channel, and drops the inflight
/// cleanup guard - which removes the dedupe entry so new requests start a fresh upstream.
pub struct ProducerHandle {
id: SubscriptionId,
map: Arc<DashMap<SubscriptionId, broadcast::Sender<SubscriptionEvent>>>,
sender: broadcast::Sender<SubscriptionEvent>,
_guard: Option<SharedRouterResponseGuard>,
}

impl ProducerHandle {
pub fn sender(&self) -> &broadcast::Sender<SubscriptionEvent> {
&self.sender
}

/// Returns false when all consumers have gone and the event cannot be delivered.
pub fn send(&self, item: SubscriptionEvent) -> bool {
self.sender.send(item).is_ok()
}
}

impl Drop for ProducerHandle {
fn drop(&mut self) {
self.map.remove(&self.id);
trace!(subscription_id = %self.id, "producer dropped, upstream closed");
}
}
2 changes: 1 addition & 1 deletion bin/router/src/pipeline/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl SingleContentType {
// IMPORTANT: make sure that the serialized string representations are valid because
// there is an unwrap in the StreamContentType::media_types() method.
/// Streamable content types for GraphQL responses.
#[derive(PartialEq, Default, Debug, IntoStaticStr, EnumString, AsRefStr, EnumIter)]
#[derive(PartialEq, Default, Debug, IntoStaticStr, EnumString, AsRefStr, EnumIter, Clone)]
pub enum StreamContentType {
// The order of the variants here matters for negotiation with `Accept: */*`.
/// Incremental Delivery over HTTP (`multipart/mixed`)
Expand Down
26 changes: 13 additions & 13 deletions bin/router/src/pipeline/http_callback.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use bytes::Bytes as BytesLib;
use dashmap::mapref::one::Ref;
use hive_router_plan_executor::executors::http_callback::{
ActiveSubscription, ActiveSubscriptionsMap, CallbackMessage, CALLBACK_PROTOCOL_VERSION,
CallbackMessage, CallbackSubscription, CallbackSubscriptionsMap, CALLBACK_PROTOCOL_VERSION,
SUBSCRIPTION_PROTOCOL_HEADER,
};
use hive_router_plan_executor::response::graphql_error::GraphQLError;
Expand Down Expand Up @@ -142,16 +142,16 @@ fn validate_payload(
Ok(())
}

fn handle_check(subscription_id: &str, subscription: &Ref<'_, String, ActiveSubscription>) {
fn handle_check(subscription_id: &str, subscription: &Ref<'_, String, CallbackSubscription>) {
trace!(subscription_id = %subscription_id, "Received check message");
subscription.record_heartbeat();
}

fn handle_next(
subscription_id: &str,
payload: &CallbackPayload<'_>,
subscription: Ref<'_, String, ActiveSubscription>,
active_subscriptions: &ActiveSubscriptionsMap,
subscription: Ref<'_, String, CallbackSubscription>,
callback_subscriptions: &CallbackSubscriptionsMap,
) -> Result<(), CallbackError> {
trace!(subscription_id = %subscription_id, "Received next message");

Expand All @@ -174,15 +174,15 @@ fn handle_next(
// up. we terminate the subscription without an error message because it anyways cant go through
warn!(subscription_id = %subscription_id, "Subscription client is too slow");
drop(subscription);
active_subscriptions.remove(subscription_id);
callback_subscriptions.remove(subscription_id);
Err(CallbackError::ClientTooSlow {
subscription_id: subscription_id.to_string(),
})
}
Err(mpsc::error::TrySendError::Closed(_)) => {
debug!(subscription_id = %subscription_id, "Subscription receiver dropped");
drop(subscription);
active_subscriptions.remove(subscription_id);
callback_subscriptions.remove(subscription_id);
Err(CallbackError::SubscriptionDropped {
subscription_id: subscription_id.to_string(),
})
Expand All @@ -193,8 +193,8 @@ fn handle_next(
fn handle_complete(
subscription_id: &str,
payload: &CallbackPayload<'_>,
subscription: Ref<'_, String, ActiveSubscription>,
active_subscriptions: &ActiveSubscriptionsMap,
subscription: Ref<'_, String, CallbackSubscription>,
callback_subscriptions: &CallbackSubscriptionsMap,
) {
trace!(subscription_id = %subscription_id, "Received complete message");
// if the buffer is full or closed we ignore and remove the subscription, we dont send
Expand All @@ -203,14 +203,14 @@ fn handle_complete(
errors: payload.errors.clone(),
});
drop(subscription);
active_subscriptions.remove(subscription_id);
callback_subscriptions.remove(subscription_id);
}

pub async fn handler(
req: HttpRequest,
path: Path<String>,
body: Bytes,
active_subscriptions: web::types::State<ActiveSubscriptionsMap>,
callback_subscriptions: web::types::State<CallbackSubscriptionsMap>,
) -> Result<HttpResponse, CallbackError> {
let subscription_id_from_path = path.into_inner();

Expand All @@ -220,7 +220,7 @@ pub async fn handler(

validate_payload(&payload, &subscription_id_from_path)?;

let subscription = match active_subscriptions.get(&payload.id) {
let subscription = match callback_subscriptions.get(&payload.id) {
Some(sub) => sub,
None => {
return Err(CallbackError::SubscriptionNotFound {
Expand All @@ -238,10 +238,10 @@ pub async fn handler(
match payload.action {
CallbackAction::Check => handle_check(&payload.id, &subscription),
CallbackAction::Next => {
handle_next(&payload.id, &payload, subscription, &active_subscriptions)?;
handle_next(&payload.id, &payload, subscription, &callback_subscriptions)?;
}
CallbackAction::Complete => {
handle_complete(&payload.id, &payload, subscription, &active_subscriptions)
handle_complete(&payload.id, &payload, subscription, &callback_subscriptions)
}
};

Expand Down
Loading
Loading