Skip to content

Implement global concurrency per job/partition#50

Open
PaulM5406 wants to merge 11 commits into
TkTech:26_uxfrom
PaulM5406:implement-concurrency
Open

Implement global concurrency per job/partition#50
PaulM5406 wants to merge 11 commits into
TkTech:26_uxfrom
PaulM5406:implement-concurrency

Conversation

@PaulM5406
Copy link
Copy Markdown
Contributor

Hey @TkTech,

I worked on an implementation to enforce global concurrency by job and partition before the case of rate limiting as it is more useful for my team and it looks easier.

I tried designing a SQL query in fetch_jobs that respects global concurrency and would still be performant. It is not perfect though as there is the edge case where a lot of jobs subject to concurrency constraint could block a queue but it gives us ground for discussion.

I liked your idea of faking it using temporary queues too. Maybe we could use this for rate limiting ? I have the intuition we can solve the concurrency problem in SQL and stay performant.

Comment thread chancy/app.py Outdated
job = job.job
if job.concurrency_key:
cursor.execute(
self._push_concurrency_config_sql(),
Copy link
Copy Markdown
Owner

@TkTech TkTech Aug 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've got some concerns about doing this per-job - if users are doing this the "right" way they may be batching tens of thousands of jobs at a time, which is going to issue an extra query per job with a concurrency_key. IMO we should be gathering up all the unique concurrency keys and issuing 1 bulk upsert.

Alternatively, and which might be better, we should require the user to call a function to push a concurrency rule before they take effect - a concurrency_key with no rule set would just be ignored when fetching jobs. This way we can remove this upsert here entirely and never have to process individual jobs as they get inserted.

Copy link
Copy Markdown
Contributor Author

@PaulM5406 PaulM5406 Aug 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right for the bulk insert, would be easy to do. Should we insert the jobs as one bulk insert as well ?

I went on this implementation because I found the API simpler for the user but maybe you're wright that might be clearer to push ahead of time a concurrency rule.

Copy link
Copy Markdown
Contributor Author

@PaulM5406 PaulM5406 Aug 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The think is, having a table with concurrency keys as rows that can be locked the time to fetch jobs in workers looks like the easiest way to implement concurrency and avoid race conditions. On job push is the first moment we know the full concurrency key when using a partition id.
We could use a trigger too if we do not support methods as concurrency key. If we use a bulk insert for jobs, the trigger would be more performant as it avoid a round trip.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have implemented deduplication of concurrency key and bulk insert. Performance should be fine even when inserting a high number of jobs.

Comment thread docs/howto/jobs.rst Outdated
Concurrency
-----------------------

Control the number of jobs that can run simultaneously using concurrency:
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably reword this a bit to make it clear immediately that this impacts global concurrency. I see you've got the note down below but it's the core reason to use this. Something like:

"Control the number of jobs with the same concurrency key that can run simultaneously across all workers and queues using with_concurrency():"?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added more documentation about how concurrency is implemented and the why on the choice of the solution.

Comment thread chancy/migrations/v6.py Outdated
await cursor.execute(
sql.SQL(
"""
CREATE TABLE {concurrency_configs} (
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm worried about high cardinality in the concurrency_key (like <func_name>_<user ID>), it might quickly result in a lot of rows. But, that gotcha might just need to be documented to be A-OK.

Copy link
Copy Markdown
Contributor Author

@PaulM5406 PaulM5406 Aug 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid the table to increase infinitely, we could have the pruner plugin to clean this table on a regular basis.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have extended the pruner plugin to prune new concurrency_configs table.

@PaulM5406
Copy link
Copy Markdown
Contributor Author

PaulM5406 commented Aug 28, 2025

Update: work is still ongoing. Need to find some time.

@PaulM5406 PaulM5406 force-pushed the implement-concurrency branch from 9e04bf0 to 92fa930 Compare December 29, 2025 16:24
@PaulM5406
Copy link
Copy Markdown
Contributor Author

PaulM5406 commented Dec 29, 2025

Hey @TkTech, I have found the time to go on with the work after thinking extensively about it.

Tradeoffs I did:

  • simplicity of the api to add concurrency limits
    It couples concurrency limits definition with job push. I admit this is not ideal but it simplifies the API and it easily supports concurrency limits on a partition key.
  • simplicity of the implementation

I looked at how other products implement global concurrency.

hatchet: use a scheduler that runs in the hatchet service. Chancy leader could have this responsability. I chose to not go in this direction to keep things simple and I am not sure adding such a responsability to a leader that is a worker too is a good idea...

temporal, pgqueuer: not implemented, it is a feature request

kafka: if I could dare a parallel, the implementation in Chancy would be close to create dynamic and ephemeral queues in workers for every different concurrency keys. I don't think this is a proper solution that would scale because of the high cardinality in concurrency_keys when using partition key for exemple... each queue polling its jobs in a different while True loop.

None of these 2 alternatives looked easy and fit into Chancy current architecture, so I decided to go for resolving concurrency limits when fetching jobs. It has downsides for sure:

  • adding overhead to the fetch query. For what I could observe, overhead looks ok for the usecases I tested.
  • the case with a lot of jobs with same concurrency keys and low concurrency limits could block fetching other jobs because we need to scan for more jobs but it is hard to know how many more in advance... I think for this use case, creating a new queue is better. We could add it to the documentation ? We could also implement an adaptative way of determining how many jobs to fetch ?
  • hard enforcement of concurrency limits across all workers
    This implies locking on a concurrency key but the it could create contention. That said, I think this is a minor trade-off given Chancy's queue-driven architecture.

Please tell me what you think about this work and if you could see it in chancy in the future ?

Thanks again for all the work on chancy !

EDIT:
steady-queue implements global concurrency in another way: check concurrency limits at enqueue time and flag tasks to be ready to be picked by worker, or flag them as blocked. When a task with concurrency control finished (success or error), switch a blocked task to ready.

@PaulM5406
Copy link
Copy Markdown
Contributor Author

Hy @TkTech,
I'm just checking in to see if you've had a moment to look at this PR. I’m still very interested in getting this feature merged.

Please let me know if you disagree with the choices I have done or there are any adjustments or further testing you'd like me to do—I'm happy to refine the code based on your feedback. Thanks for all your work on the project!"

@codecov
Copy link
Copy Markdown

codecov Bot commented Jan 17, 2026

Codecov Report

❌ Patch coverage is 92.02454% with 13 lines in your changes missing coverage. Please review.
⚠️ Please upload report for BASE (26_ux@b0e3e94). Learn more about missing BASE report.

Files with missing lines Patch % Lines
chancy/plugins/pruner.py 72.72% 9 Missing ⚠️
chancy/job.py 93.75% 3 Missing ⚠️
chancy/rule.py 95.00% 1 Missing ⚠️
Additional details and impacted files
@@           Coverage Diff            @@
##             26_ux      #50   +/-   ##
========================================
  Coverage         ?   73.91%           
========================================
  Files            ?       59           
  Lines            ?     3316           
  Branches         ?        0           
========================================
  Hits             ?     2451           
  Misses           ?      865           
  Partials         ?        0           
Flag Coverage Δ
unittests 73.91% <92.02%> (?)

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@TkTech
Copy link
Copy Markdown
Owner

TkTech commented Jan 19, 2026

Can't see anything I'd disagree with right off the bat. Two of the windows tests stalled out, I've just rerun them to see if it was spurious or a code issue.

@TkTech TkTech assigned TkTech and unassigned TkTech Jan 19, 2026
@TkTech TkTech added the enhancement New feature or request label Jan 19, 2026
@TkTech TkTech changed the base branch from main to 26_ux January 20, 2026 09:45
@TkTech
Copy link
Copy Markdown
Owner

TkTech commented Jan 28, 2026

Hm, we're consistently deadlocking on two of the tests. I don't particularly think this is an issue with your change (it might be coming from v0.26) but I'll need to hunt it down.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants