Skip to content

Commit 360b613

Browse files
committed
donotmerge: rivetkit test
1 parent c58ef25 commit 360b613

8 files changed

Lines changed: 429 additions & 43 deletions

File tree

engine/packages/pegboard-gateway2/src/lib.rs

Lines changed: 104 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,12 @@ impl PegboardGateway2 {
100100
// Use the actor ID from the gateway instance
101101
let actor_id = self.actor_id.to_string();
102102
let request_id = req_ctx.in_flight_request_id()?;
103+
tracing::warn!(
104+
request_id=%protocol::util::id_to_string(&request_id),
105+
actor_id=%self.actor_id,
106+
path=%self.path,
107+
"DEBUG-TRACE: handle_request_inner entered",
108+
);
103109

104110
// Extract request parts
105111
let headers = req
@@ -124,6 +130,10 @@ impl PegboardGateway2 {
124130
let mut stopped_sub = ctx
125131
.subscribe::<pegboard::workflows::actor2::Stopped>(("actor_id", self.actor_id))
126132
.await?;
133+
tracing::warn!(
134+
request_id=%protocol::util::id_to_string(&request_id),
135+
"DEBUG-TRACE: subscribed to actor2::Stopped",
136+
);
127137

128138
// Build subject to publish to
129139
let tunnel_subject = pegboard::pubsub_subjects::EnvoyReceiverSubject::new(
@@ -133,14 +143,24 @@ impl PegboardGateway2 {
133143
.to_string();
134144

135145
// Start listening for request responses
146+
tracing::warn!(
147+
request_id=%protocol::util::id_to_string(&request_id),
148+
%tunnel_subject,
149+
"DEBUG-TRACE: calling create_or_wake_in_flight_request",
150+
);
136151
let InFlightRequestCtx {
137152
mut msg_rx,
138153
mut drop_rx,
139154
handle: in_flight_req,
140155
} = self
141156
.shared_state
142157
.create_or_wake_in_flight_request(tunnel_subject, request_id, false)
143-
.await?;
158+
.await
159+
.inspect_err(|e| tracing::warn!(?e, request_id=%protocol::util::id_to_string(&request_id), "DEBUG-TRACE: create_or_wake_in_flight_request returned Err"))?;
160+
tracing::warn!(
161+
request_id=%protocol::util::id_to_string(&request_id),
162+
"DEBUG-TRACE: create_or_wake_in_flight_request returned Ok",
163+
);
144164

145165
// Start request
146166
let message = protocol::ToEnvoyTunnelMessageKind::ToEnvoyRequestStart(
@@ -157,10 +177,24 @@ impl PegboardGateway2 {
157177
stream: false,
158178
},
159179
);
160-
in_flight_req.send_message(message).await?;
180+
tracing::warn!(
181+
request_id=%protocol::util::id_to_string(&request_id),
182+
"DEBUG-TRACE: about to send_message(ReqStart)",
183+
);
184+
in_flight_req
185+
.send_message(message)
186+
.await
187+
.inspect_err(|e| tracing::warn!(?e, request_id=%protocol::util::id_to_string(&request_id), "DEBUG-TRACE: send_message(ReqStart) returned Err"))?;
188+
tracing::warn!(
189+
request_id=%protocol::util::id_to_string(&request_id),
190+
"DEBUG-TRACE: send_message(ReqStart) returned Ok",
191+
);
161192

162193
// Wait for response
163-
tracing::debug!("gateway waiting for response from tunnel");
194+
tracing::warn!(
195+
request_id=%protocol::util::id_to_string(&request_id),
196+
"DEBUG-TRACE: gateway waiting for response from tunnel",
197+
);
164198
let fut = async {
165199
loop {
166200
tokio::select! {
@@ -170,30 +204,51 @@ impl PegboardGateway2 {
170204
protocol::ToRivetTunnelMessageKind::ToRivetResponseStart(
171205
response_start,
172206
) => {
207+
tracing::warn!(
208+
request_id=%protocol::util::id_to_string(&request_id),
209+
status=response_start.status,
210+
"DEBUG-TRACE: wait_for_tunnel_response got ResponseStart",
211+
);
173212
return anyhow::Ok(response_start);
174213
}
175214
protocol::ToRivetTunnelMessageKind::ToRivetResponseAbort => {
176-
tracing::warn!("request aborted");
215+
tracing::warn!(
216+
request_id=%protocol::util::id_to_string(&request_id),
217+
"DEBUG-TRACE: wait_for_tunnel_response got ResponseAbort",
218+
);
177219
return Err(TunnelRequestAborted.build());
178220
}
179-
_ => {
180-
tracing::warn!("received non-response message from pubsub");
221+
other => {
222+
tracing::warn!(
223+
request_id=%protocol::util::id_to_string(&request_id),
224+
variant=?std::mem::discriminant(&other),
225+
"DEBUG-TRACE: wait_for_tunnel_response got non-response message from pubsub",
226+
);
181227
}
182228
}
183229
} else {
184230
tracing::warn!(
185231
request_id=%protocol::util::id_to_string(&request_id),
186-
"received no message response during request init",
232+
"DEBUG-TRACE: wait_for_tunnel_response msg_rx closed (TunnelResponseClosed)",
187233
);
188234
return Err(TunnelResponseClosed.build());
189235
}
190236
}
191-
_ = stopped_sub.next() => {
192-
tracing::debug!("actor stopped while waiting for request response");
237+
stopped = stopped_sub.next() => {
238+
tracing::warn!(
239+
request_id=%protocol::util::id_to_string(&request_id),
240+
?stopped,
241+
"DEBUG-TRACE: wait_for_tunnel_response stopped_sub fired (ActorStoppedWhileWaiting)",
242+
);
193243
return Err(ActorStoppedWhileWaiting.build());
194244
}
195-
_ = drop_rx.changed() => {
196-
tracing::warn!(reason=?drop_rx.borrow(), "tunnel message timeout");
245+
changed = drop_rx.changed() => {
246+
tracing::warn!(
247+
request_id=%protocol::util::id_to_string(&request_id),
248+
?changed,
249+
reason=?drop_rx.borrow(),
250+
"DEBUG-TRACE: wait_for_tunnel_response drop_rx fired (TunnelMessageTimeout)",
251+
);
197252
return Err(TunnelMessageTimeout.build());
198253
}
199254
}
@@ -209,11 +264,17 @@ impl PegboardGateway2 {
209264
let response_start = tokio::time::timeout(response_start_timeout, fut)
210265
.await
211266
.map_err(|_| {
212-
tracing::warn!("timed out waiting for response start from envoy");
267+
tracing::warn!(
268+
request_id=%protocol::util::id_to_string(&request_id),
269+
"DEBUG-TRACE: outer tokio::time::timeout fired (GatewayResponseStartTimeout)",
270+
);
213271

214272
GatewayResponseStartTimeout.build()
215273
})??;
216-
tracing::debug!("response handler task ended");
274+
tracing::warn!(
275+
request_id=%protocol::util::id_to_string(&request_id),
276+
"DEBUG-TRACE: response handler task ended",
277+
);
217278

218279
// Build HTTP response
219280
let mut response_builder =
@@ -228,7 +289,15 @@ impl PegboardGateway2 {
228289
let body = response_start.body.unwrap_or_default();
229290
let response = response_builder.body(ResponseBody::Full(Full::new(Bytes::from(body))))?;
230291

292+
tracing::warn!(
293+
request_id=%protocol::util::id_to_string(&request_id),
294+
"DEBUG-TRACE: about to call stop() on success path",
295+
);
231296
in_flight_req.stop().await;
297+
tracing::warn!(
298+
request_id=%protocol::util::id_to_string(&request_id),
299+
"DEBUG-TRACE: stop() returned, returning Ok response",
300+
);
232301

233302
Ok(response)
234303
}
@@ -585,6 +654,16 @@ impl CustomServeTrait for PegboardGateway2 {
585654
let ctx = self.ctx.with_ray(req_ctx.ray_id(), req_ctx.req_id())?;
586655
let req_body_size_hint = req.body().size_hint();
587656

657+
let in_flight_request_id_str = req_ctx
658+
.in_flight_request_id()
659+
.ok()
660+
.map(|id| protocol::util::id_to_string(&id));
661+
tracing::warn!(
662+
?in_flight_request_id_str,
663+
actor_id=%self.actor_id,
664+
req_id=%req_ctx.req_id(),
665+
"DEBUG-TRACE: handle_request invoked (CustomServeTrait entry point)",
666+
);
588667
let (res, metrics_res) = tokio::join!(
589668
self.handle_request_inner(&ctx, req, req_ctx),
590669
record_req_metrics(
@@ -598,6 +677,18 @@ impl CustomServeTrait for PegboardGateway2 {
598677
),
599678
),
600679
);
680+
match &res {
681+
Ok(resp) => tracing::warn!(
682+
?in_flight_request_id_str,
683+
status=%resp.status(),
684+
"DEBUG-TRACE: handle_request_inner returned Ok",
685+
),
686+
Err(err) => tracing::warn!(
687+
?in_flight_request_id_str,
688+
?err,
689+
"DEBUG-TRACE: handle_request_inner returned Err (this is what guard sees for retry decision)",
690+
),
691+
}
601692

602693
let response_size = match &res {
603694
Ok(res) => res.size_hint().upper().unwrap_or(res.size_hint().lower()),

0 commit comments

Comments
 (0)