Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion chancy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
"Limit",
"Reference",
"job",
"ConcurrencyRule",
)

from chancy.app import Chancy
from chancy.queue import Queue
from chancy.worker import Worker
from chancy.job import Limit, Job, QueuedJob, Reference, job
from chancy.job import Limit, Job, QueuedJob, Reference, job, ConcurrencyRule
129 changes: 106 additions & 23 deletions chancy/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -674,14 +674,27 @@ async def push_many_ex(
:param jobs: The jobs to push onto the queue.
:return: A list of references to the jobs in the queue.
"""
references = []
for job in jobs:
await cursor.execute(
self._push_job_sql(),
self._get_job_params(job),
# Insert concurrency configurations
concurrency_params = self._concurrency_params_iterator(jobs)
if concurrency_params:
await cursor.executemany(
self._push_concurrency_config_sql(),
concurrency_params,
)

# Insert jobs
await cursor.executemany(
self._push_job_sql(),
self._job_params_iterator(jobs),
returning=True,
)
references = []
while True:
record = await cursor.fetchone()
references.append(Reference(record["id"]))
if record:
references.append(Reference(record["id"]))
elif not cursor.nextset():
break

if self.notifications:
for queue in set(
Expand All @@ -693,7 +706,7 @@ async def push_many_ex(
return references

def sync_push_many_ex(
self, cursor: Cursor, jobs: list[Job]
self, cursor: Cursor, jobs: list[Job | IsAJob[..., Any]]
) -> list[Reference]:
"""
Synchronously push multiple jobs onto the queue using a specific cursor.
Expand All @@ -710,16 +723,31 @@ def sync_push_many_ex(
:param jobs: The jobs to push onto the queue.
:return: A list of references to the jobs in the queue.
"""
references = []
for job in jobs:
cursor.execute(
self._push_job_sql(),
self._get_job_params(job),
# Insert concurrency configurations
concurrency_params = self._concurrency_params_iterator(jobs)
if concurrency_params:
cursor.executemany(
self._push_concurrency_config_sql(),
concurrency_params,
)
record = cursor.fetchone()
references.append(Reference(record["id"]))

for queue in set(job.queue for job in jobs):
# Insert jobs
cursor.executemany(
self._push_job_sql(),
self._job_params_iterator(jobs),
returning=True,
)
references = []
while True:
record = cursor.fetchone()
if record:
references.append(Reference(record["id"]))
elif not cursor.nextset():
break

for queue in set(
job.queue if isinstance(job, Job) else job.job.queue for job in jobs
):
self.sync_notify(cursor, "queue.pushed", {"q": queue})

return references
Expand Down Expand Up @@ -1345,7 +1373,8 @@ def _push_job_sql(self):
priority,
max_attempts,
scheduled_at,
unique_key
unique_key,
concurrency_key
)
VALUES (
%(id)s,
Expand All @@ -1357,7 +1386,8 @@ def _push_job_sql(self):
%(priority)s,
%(max_attempts)s,
%(scheduled_at)s,
%(unique_key)s
%(unique_key)s,
%(concurrency_key)s
)
ON CONFLICT (unique_key)
WHERE
Expand Down Expand Up @@ -1464,17 +1494,43 @@ def _declare_sql(self, upsert: bool):
action=action,
)

def _push_concurrency_config_sql(self):
return sql.SQL(
"""
INSERT INTO {concurrency_configs}
(concurrency_key, concurrency_max, updated_at)
VALUES (%s, %s, NOW())
ON CONFLICT (concurrency_key) DO UPDATE SET
concurrency_max = EXCLUDED.concurrency_max,
updated_at = NOW()
"""
).format(
concurrency_configs=sql.Identifier(
f"{self.prefix}concurrency_configs"
)
)

@staticmethod
def _get_job_params(job: Job | IsAJob[..., Any]) -> dict:
def _get_concurrency_params(job: Job) -> tuple:
"""
Get the parameters for storing concurrency configuration.

:param job: The job containing concurrency configuration.
:return: A tuple of parameters for the concurrency config.
"""
return (
job.evaluate_concurrency_key(), # prefixed concurrency key
job.concurrency_rule.max if job.concurrency_rule else None,
)

@staticmethod
def _get_job_params(job: Job) -> dict:
"""
Get the parameters for a job to be inserted into the database.

:param job: The job to get parameters for.
:return: A dictionary of parameters for the job.
"""
if callable(job):
job = job.job

return {
"id": chancy_uuid(),
"queue": job.queue,
Expand All @@ -1486,11 +1542,38 @@ def _get_job_params(job: Job | IsAJob[..., Any]) -> dict:
"max_attempts": job.max_attempts,
"scheduled_at": job.scheduled_at,
"unique_key": job.unique_key,
"concurrency_key": job.evaluate_concurrency_key(),
}

def _concurrency_params_iterator(
self, jobs: list[Job | IsAJob[..., Any]]
) -> Iterator[tuple] | None:
"""
Collect and deduplicate concurrency configurations from jobs.
Create an iterator over the unique concurrency parameters.
"""
concurrency_configs = {}

for job in jobs:
if callable(job):
job = job.job
if job.concurrency_rule:
params = self._get_concurrency_params(job)
concurrency_configs[params[0]] = params

if concurrency_configs:
yield from concurrency_configs.values()

def _job_params_iterator(self, jobs: list[Job | IsAJob[..., Any]]):
"""Create iterator for job parameters."""
for job in jobs:
if callable(job):
job = job.job
yield self._get_job_params(job)


from chancy.plugins.pruner import Pruner # noqa: E402
from chancy.plugins.recovery import Recovery # noqa: E402
from chancy.plugins.leadership import Leadership # noqa: E402
from chancy.plugins.metrics import Metrics # noqa: E402
from chancy.plugins.pruner import Pruner # noqa: E402
from chancy.plugins.recovery import Recovery # noqa: E402
from chancy.plugins.workflow import WorkflowPlugin # noqa: E402
Loading
Loading