Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions procrastinate/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,31 @@ async def list_jobs_async(
)
return [jobs_module.Job.from_row(row) for row in rows]

async def list_jobs_by_ids_async(
self, job_ids: Iterable[int]
) -> list[jobs_module.Job]:
"""
List procrastinate jobs matching the provided IDs in a single query.

Parameters
----------
job_ids:
Iterable of job IDs to fetch. Missing IDs are ignored and results are
ordered by ascending job ID.

Returns
-------
:
"""
job_ids = list(dict.fromkeys(job_ids))
if not job_ids:
return []

rows = await self.connector.execute_query_all_async(
query=sql.queries["list_jobs_by_ids"], job_ids=job_ids
)
return [jobs_module.Job.from_row(row) for row in rows]

def list_jobs(
self,
id: int | None = None,
Expand All @@ -684,6 +709,19 @@ def list_jobs(
)
return [jobs_module.Job.from_row(row) for row in rows]

def list_jobs_by_ids(self, job_ids: Iterable[int]) -> list[jobs_module.Job]:
"""
Sync version of `list_jobs_by_ids_async`.
"""
job_ids = list(dict.fromkeys(job_ids))
if not job_ids:
return []

rows = self.connector.get_sync_connector().execute_query_all(
query=sql.queries["list_jobs_by_ids"], job_ids=job_ids
)
return [jobs_module.Job.from_row(row) for row in rows]

async def list_queues_async(
self,
queue: str | None = None,
Expand Down
18 changes: 18 additions & 0 deletions procrastinate/sql/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,24 @@ SELECT id,
AND (%(worker_id)s::bigint IS NULL OR worker_id = %(worker_id)s)
ORDER BY id ASC;

-- list_jobs_by_ids --
-- Get list of jobs filtered by multiple ids
SELECT id,
queue_name,
task_name,
priority,
lock,
queueing_lock,
args,
status,
scheduled_at,
attempts,
abort_requested,
worker_id
FROM procrastinate_jobs
WHERE id = ANY(%(job_ids)s::bigint[])
ORDER BY id ASC;

-- list_queues --
-- Get list of queues and number of jobs per queue
WITH jobs AS (
Expand Down
8 changes: 8 additions & 0 deletions procrastinate/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,14 @@ async def list_jobs_all(self, **kwargs: Any):
jobs.append(job)
return iter(jobs)

async def list_jobs_by_ids_all(self, job_ids: Iterable[int]):
unique_job_ids = {int(job_id) for job_id in job_ids}
return [
self.jobs[job_id]
for job_id in sorted(unique_job_ids)
if job_id in self.jobs
]

async def list_queues_all(self, **kwargs: Any):
result: list[dict[str, Any]] = []
jobs = list(await self.list_jobs_all(**kwargs))
Expand Down
36 changes: 36 additions & 0 deletions tests/integration/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -781,6 +781,42 @@ async def test_list_jobs(fixture_jobs, kwargs, expected, pg_job_manager):
] == expected


async def test_list_jobs_by_ids(fixture_jobs, pg_job_manager):
result = await pg_job_manager.list_jobs_by_ids_async([4, 999, 2, 2, 1])

assert [job.id for job in result] == [1, 2, 4]


def test_list_jobs_by_ids_sync(sync_psycopg_connector, job_factory):
pg_job_manager = manager.JobManager(connector=sync_psycopg_connector)
job1 = pg_job_manager.defer_job(
job=job_factory(
id=None,
queue="queue_a",
task_name="task_a",
task_kwargs={},
lock="lock_a",
queueing_lock=None,
)
)
job2 = pg_job_manager.defer_job(
job=job_factory(
id=None,
queue="queue_b",
task_name="task_b",
task_kwargs={},
lock="lock_b",
queueing_lock=None,
)
)
assert job1.id is not None
assert job2.id is not None

result = pg_job_manager.list_jobs_by_ids([job2.id, 999, job1.id, job1.id])

assert [job.id for job in result] == [job1.id, job2.id]


async def test_list_queues_dict(fixture_jobs, pg_job_manager):
assert (await pg_job_manager.list_queues_async())[0] == {
"name": "q1",
Expand Down
32 changes: 32 additions & 0 deletions tests/unit/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,38 @@ def test_list_jobs(job_manager, job_factory):
assert job_manager.list_jobs() == [job]


async def test_list_jobs_by_ids_async(job_manager, job_factory):
job1 = await job_manager.defer_job_async(job=job_factory(id=None))
job2 = await job_manager.defer_job_async(job=job_factory(id=None))
job3 = await job_manager.defer_job_async(job=job_factory(id=None))

result = await job_manager.list_jobs_by_ids_async(
[job3.id, 999, job1.id, job1.id, job2.id]
)

assert result == [job1, job2, job3]


def test_list_jobs_by_ids(job_manager, job_factory):
job1 = job_manager.defer_job(job=job_factory(id=None))
job2 = job_manager.defer_job(job=job_factory(id=None))
job3 = job_manager.defer_job(job=job_factory(id=None))

result = job_manager.list_jobs_by_ids([job3.id, 999, job1.id, job1.id, job2.id])

assert result == [job1, job2, job3]


async def test_list_jobs_by_ids_async_empty(job_manager, connector):
assert await job_manager.list_jobs_by_ids_async([]) == []
assert connector.queries == []


def test_list_jobs_by_ids_empty(job_manager, connector):
assert job_manager.list_jobs_by_ids([]) == []
assert connector.queries == []


async def test_list_queues_async(job_manager, job_factory):
await job_manager.defer_job_async(job=job_factory(queue="foo"))

Expand Down