Skip to content

Commit cc6561e

Browse files
committed
DO NOT MERGE: queue diagnose
1 parent 904ee37 commit cc6561e

3 files changed

Lines changed: 61 additions & 2 deletions

File tree

  • rivetkit-rust/packages/rivetkit-core/src/actor
  • rivetkit-typescript/packages

rivetkit-rust/packages/rivetkit-core/src/actor/queue.rs

Lines changed: 53 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,12 @@ impl ActorContext {
274274
self.clear_preloaded_messages();
275275

276276
let config = self.config();
277+
tracing::warn!(
278+
name,
279+
body_len = body.len(),
280+
max_queue_size = config.max_queue_size,
281+
"DEBUG_QUEUE enqueue_message: config"
282+
);
277283
if encoded_message.len() > config.max_queue_message_size as usize {
278284
return Err(QueueMessageTooLarge {
279285
size: encoded_message.len(),
@@ -283,6 +289,12 @@ impl ActorContext {
283289
}
284290

285291
let mut metadata = self.0.queue_metadata.lock().await;
292+
tracing::warn!(
293+
metadata_size = metadata.size,
294+
max_queue_size = config.max_queue_size,
295+
will_reject = metadata.size >= config.max_queue_size,
296+
"DEBUG_QUEUE enqueue_message: pre-enqueue check"
297+
);
286298
if metadata.size >= config.max_queue_size {
287299
return Err(QueueFull {
288300
limit: config.max_queue_size,
@@ -374,17 +386,30 @@ impl ActorContext {
374386
let deadline = opts.timeout.map(|timeout| Instant::now() + timeout);
375387
let names = normalize_names(opts.names);
376388

389+
tracing::warn!(
390+
count,
391+
timeout_ms = opts.timeout.map(|t| t.as_millis() as u64),
392+
?names,
393+
completable = opts.completable,
394+
"DEBUG_QUEUE next_batch: enter"
395+
);
396+
377397
loop {
378398
let messages = self
379399
.try_receive_batch(names.as_ref(), count, opts.completable)
380400
.await?;
381401
if !messages.is_empty() {
402+
tracing::warn!(
403+
received = messages.len(),
404+
"DEBUG_QUEUE next_batch: returning messages"
405+
);
382406
return Ok(messages);
383407
}
384408

385409
let remaining_timeout =
386410
deadline.map(|deadline| deadline.saturating_duration_since(Instant::now()));
387411
if matches!(remaining_timeout, Some(timeout) if timeout.is_zero()) {
412+
tracing::warn!("DEBUG_QUEUE next_batch: returning empty, timeout zero");
388413
return Ok(Vec::new());
389414
}
390415

@@ -560,6 +585,7 @@ impl ActorContext {
560585
.queue_initialize
561586
.get_or_try_init(|| async {
562587
let preload = self.0.queue_preloaded_kv.lock().take();
588+
let has_preload = preload.is_some();
563589
let metadata = if let Some(preloaded) = preload.as_ref() {
564590
self.configure_preloaded_messages(preloaded);
565591
if let Some(metadata) = self.load_metadata_from_preload(preloaded).await? {
@@ -570,6 +596,12 @@ impl ActorContext {
570596
} else {
571597
self.load_or_create_metadata().await?
572598
};
599+
tracing::warn!(
600+
has_preload,
601+
size = metadata.size,
602+
next_id = metadata.next_id,
603+
"DEBUG_QUEUE ensure_initialized"
604+
);
573605
let mut state = self.0.queue_metadata.lock().await;
574606
*state = metadata;
575607
self.0.metrics.set_queue_depth(state.size);
@@ -667,6 +699,15 @@ impl ActorContext {
667699
let _receive_guard = self.0.queue_receive_lock.lock().await;
668700

669701
let messages = self.list_messages().await?;
702+
tracing::warn!(
703+
kv_message_count = messages.len(),
704+
?names,
705+
count,
706+
completable,
707+
message_names = ?messages.iter().map(|m| &m.name).collect::<Vec<_>>(),
708+
message_ids = ?messages.iter().map(|m| m.id).collect::<Vec<_>>(),
709+
"DEBUG_QUEUE try_receive_batch: listed messages"
710+
);
670711
let mut selected = Vec::new();
671712
for message in messages {
672713
if let Some(names) = names
@@ -726,10 +767,15 @@ impl ActorContext {
726767

727768
async fn list_message_entries(&self) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
728769
if let Some(entries) = self.0.queue_preloaded_message_entries.lock().take() {
770+
tracing::warn!(
771+
entry_count = entries.len(),
772+
"DEBUG_QUEUE list_message_entries: using preloaded"
773+
);
729774
return Ok(entries);
730775
}
731776

732-
self.0
777+
let entries = self
778+
.0
733779
.kv
734780
.list_prefix(
735781
&QUEUE_MESSAGES_PREFIX,
@@ -739,7 +785,12 @@ impl ActorContext {
739785
},
740786
)
741787
.await
742-
.context("list queue messages")
788+
.context("list queue messages")?;
789+
tracing::warn!(
790+
entry_count = entries.len(),
791+
"DEBUG_QUEUE list_message_entries: read from kv"
792+
);
793+
Ok(entries)
743794
}
744795

745796
fn clear_preloaded_messages(&self) {

rivetkit-typescript/packages/rivetkit/src/registry/native.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3110,12 +3110,14 @@ class NativeWorkflowRuntimeAdapter {
31103110
_abortSignal,
31113111
completable,
31123112
) => {
3113+
logger().debug({ msg: "DEBUG_QUEUE native receive: calling nextBatch", names, count, timeout, completable, effectiveTimeout: timeout ?? 0 });
31133114
const messages = await this.#ctx.queue.nextBatch({
31143115
names,
31153116
count,
31163117
timeout: timeout ?? 0,
31173118
completable,
31183119
});
3120+
logger().debug({ msg: "DEBUG_QUEUE native receive: nextBatch returned", messageCount: messages.length });
31193121
return messages.map((message) =>
31203122
this.#wrapQueueMessage(message),
31213123
);

rivetkit-typescript/packages/workflow-engine/src/context.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1648,6 +1648,7 @@ export class WorkflowContextImpl implements WorkflowContextInterface {
16481648

16491649
if (existingCount && existingCount.kind.type === "message") {
16501650
const replayCount = existingCount.kind.data.data as number;
1651+
this.log("debug", { msg: "DEBUG_QUEUE executeQueueNextBatch: replaying from history", name, replayCount, completable });
16511652
return await this.readReplayQueueMessages<T>(
16521653
name,
16531654
replayCount,
@@ -1657,6 +1658,7 @@ export class WorkflowContextImpl implements WorkflowContextInterface {
16571658

16581659
const now = Date.now();
16591660
if (deadline !== undefined && now >= deadline) {
1661+
this.log("debug", { msg: "DEBUG_QUEUE executeQueueNextBatch: deadline already passed", name, deadline, now });
16601662
if (deadlineEntry && deadlineEntry.kind.type === "sleep") {
16611663
deadlineEntry.kind.data.state = "completed";
16621664
deadlineEntry.dirty = true;
@@ -1669,11 +1671,13 @@ export class WorkflowContextImpl implements WorkflowContextInterface {
16691671
return [];
16701672
}
16711673

1674+
this.log("debug", { msg: "DEBUG_QUEUE executeQueueNextBatch: calling receiveMessagesNow", name, messageNames, count, completable });
16721675
const received = await this.receiveMessagesNow(
16731676
messageNames,
16741677
count,
16751678
completable,
16761679
);
1680+
this.log("debug", { msg: "DEBUG_QUEUE executeQueueNextBatch: receiveMessagesNow returned", name, receivedCount: received.length, receivedNames: received.map(m => m.name) });
16771681
if (received.length > 0) {
16781682
const historyMessages = received.map((message) =>
16791683
this.toWorkflowQueueMessage<T>(message),
@@ -1701,8 +1705,10 @@ export class WorkflowContextImpl implements WorkflowContextInterface {
17011705
}
17021706

17031707
if (deadline === undefined) {
1708+
this.log("debug", { msg: "DEBUG_QUEUE executeQueueNextBatch: no messages and no deadline, throwing MessageWaitError", name, messageNames });
17041709
throw new MessageWaitError(messageNames);
17051710
}
1711+
this.log("debug", { msg: "DEBUG_QUEUE executeQueueNextBatch: no messages, throwing SleepError", name, messageNames, deadline });
17061712
throw new SleepError(deadline, messageNames);
17071713
}
17081714

0 commit comments

Comments
 (0)