Skip to content

Commit 2dec26f

Browse files
authored
fix(pb): backport serverless fix (#4611)
# Description Please include a summary of the changes and the related issue. Please also include relevant motivation and context. ## Type of change - [ ] Bug fix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to not work as expected) - [ ] This change requires a documentation update ## How Has This Been Tested? Please describe the tests that you ran to verify your changes. ## Checklist: - [ ] My code follows the style guidelines of this project - [ ] I have performed a self-review of my code - [ ] I have commented my code, particularly in hard-to-understand areas - [ ] I have made corresponding changes to the documentation - [ ] My changes generate no new warnings - [ ] I have added tests that prove my fix is effective or that my feature works - [ ] New and existing unit tests pass locally with my changes
1 parent 605261e commit 2dec26f

File tree

2 files changed

+2
-11
lines changed

2 files changed

+2
-11
lines changed

engine/packages/pegboard/src/workflows/serverless/conn.rs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -156,16 +156,13 @@ enum OutboundReqOutput {
156156
#[timeout = u64::MAX]
157157
async fn outbound_req(ctx: &ActivityCtx, input: &OutboundReqInput) -> Result<OutboundReqOutput> {
158158
let mut term_signal = TermSignal::get();
159-
let mut drain_sub = ctx
160-
.subscribe::<Drain>(("workflow_id", ctx.workflow_id()))
161-
.await?;
162159

163160
loop {
164161
metrics::SERVERLESS_OUTBOUND_REQ_ACTIVE
165162
.with_label_values(&[&input.namespace_id.to_string(), &input.runner_name])
166163
.inc();
167164

168-
let res = outbound_req_inner(ctx, input, &mut term_signal, &mut drain_sub).await;
165+
let res = outbound_req_inner(ctx, input, &mut term_signal).await;
169166

170167
metrics::SERVERLESS_OUTBOUND_REQ_ACTIVE
171168
.with_label_values(&[&input.namespace_id.to_string(), &input.runner_name])
@@ -203,7 +200,6 @@ async fn outbound_req_inner(
203200
ctx: &ActivityCtx,
204201
input: &OutboundReqInput,
205202
term_signal: &mut TermSignal,
206-
drain_sub: &mut message::SubscriptionHandle<Drain>,
207203
) -> Result<OutboundReqOutput> {
208204
if is_runner_draining(ctx, input.receiver_wf_id).await? {
209205
return Ok(OutboundReqOutput::Draining { drain_sent: false });
@@ -438,7 +434,6 @@ async fn outbound_req_inner(
438434
}
439435
},
440436
_ = tokio::time::sleep(sleep_until_drain) => {}
441-
_ = drain_sub.next() => {}
442437
_ = term_signal.recv() => {}
443438
};
444439

engine/packages/pegboard/src/workflows/serverless/receiver.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,7 @@ pub async fn pegboard_serverless_receiver(ctx: &mut WorkflowCtx, input: &Input)
4848
.send()
4949
.await?;
5050

51-
// if the connection is currently running an outbound req, this will be received
52-
ctx.msg(conn::Drain {})
53-
.topic(("workflow_id", conn_wf_id))
54-
.send()
55-
.await?;
51+
ctx.removed::<Message<conn::Drain>>().await?;
5652

5753
// Wait for connection wf to complete so this wf's state remains readable
5854
ctx.workflow::<conn::Input>(conn_wf_id).output().await?;

0 commit comments

Comments
 (0)