Skip to content

Commit 688cba0

Browse files
committed
[DOP-31903] Include all found parents into job dependencies response
1 parent 6aefc67 commit 688cba0

2 files changed

Lines changed: 31 additions & 17 deletions

File tree

data_rentgen/server/services/job.py

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ class JobServicePaginatedResult(PaginationDTO[JobServiceResult]):
6464

6565
@dataclass
6666
class JobDependenciesResult:
67-
parents: list[tuple[int, int]] = field(default_factory=list)
67+
parents: set[tuple[int, int]] = field(default_factory=set)
6868
dependencies: set[tuple[int, int, str | None]] = field(default_factory=set)
6969
jobs: list[JobServiceResult] = field(default_factory=list)
7070

@@ -113,21 +113,25 @@ async def get_job_dependencies(
113113
direction: Literal["UPSTREAM", "DOWNSTREAM", "BOTH"],
114114
) -> JobDependenciesResult:
115115
logger.info("Get Job dependencies with start at job with id %s and direction: %s", start_node_id, direction)
116-
job_ids = {start_node_id}
117116

118117
ancestor_relations = await self._uow.job.list_ancestor_relations([start_node_id])
119118
descendant_relations = await self._uow.job.list_descendant_relations([start_node_id])
120-
job_ids |= {p_id for p_id, _ in ancestor_relations}
121-
job_ids |= {c_id for _, c_id in descendant_relations}
119+
job_ids = (
120+
{start_node_id}
121+
| {p.parent_job_id for p in ancestor_relations}
122+
| {p.child_job_id for p in descendant_relations}
123+
)
122124

123125
dependencies = await self._uow.job_dependency.get_dependencies(job_ids=list(job_ids), direction=direction)
124-
for dependency in dependencies:
125-
job_ids.add(dependency.from_job_id)
126-
job_ids.add(dependency.to_job_id)
127-
jobs = await self._uow.job.list_by_ids(list(job_ids))
126+
dependency_job_ids = {d.from_job_id for d in dependencies} | {d.to_job_id for d in dependencies}
127+
job_ids |= dependency_job_ids
128128

129+
# return ancestors for all found jobs in the graph
130+
ancestor_relations += await self._uow.job.list_ancestor_relations(list(dependency_job_ids))
131+
job_ids |= {p.parent_job_id for p in ancestor_relations}
132+
jobs = await self._uow.job.list_by_ids(list(job_ids))
129133
return JobDependenciesResult(
130-
parents=ancestor_relations + descendant_relations,
134+
parents={(p.parent_job_id, p.child_job_id) for p in ancestor_relations + descendant_relations},
131135
dependencies={(d.from_job_id, d.to_job_id, d.type) for d in dependencies},
132136
jobs=[JobServiceResult.from_orm(job) for job in jobs],
133137
)

tests/test_server/test_jobs/test_job_dependencies.py

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -83,13 +83,15 @@ async def test_get_job_dependencies_with_direction_both(
8383
mocked_user: MockedUser,
8484
):
8585
(
86-
(_, dag2, _),
86+
(dag1, dag2, dag3),
8787
(task1, task2, task3),
8888
(_, spark2, _),
8989
) = job_dependency_chain
9090
expected_nodes = await enrich_jobs(
9191
[
92+
dag1,
9293
dag2,
94+
dag3,
9395
task1,
9496
task2,
9597
task3,
@@ -107,7 +109,7 @@ async def test_get_job_dependencies_with_direction_both(
107109
assert response.status_code == HTTPStatus.OK, response.json()
108110
assert response.json() == {
109111
"relations": {
110-
"parents": jobs_ancestors_to_json([task2, spark2]),
112+
"parents": jobs_ancestors_to_json(expected_nodes),
111113
"dependencies": [
112114
{
113115
"from": {"kind": "JOB", "id": str(from_id)},
@@ -132,9 +134,13 @@ async def test_get_job_dependencies_with_direction_upstream(
132134
async_session: AsyncSession,
133135
mocked_user: MockedUser,
134136
):
135-
(_, dag2, _), (task1, task2, _), (_, spark2, _) = job_dependency_chain
137+
(
138+
(dag1, dag2, _),
139+
(task1, task2, _),
140+
(_, spark2, _),
141+
) = job_dependency_chain
136142
expected_nodes = await enrich_jobs(
137-
[task1, dag2, task2, spark2],
143+
[dag1, task1, dag2, task2, spark2],
138144
async_session,
139145
)
140146

@@ -147,7 +153,7 @@ async def test_get_job_dependencies_with_direction_upstream(
147153
assert response.status_code == HTTPStatus.OK, response.json()
148154
assert response.json() == {
149155
"relations": {
150-
"parents": jobs_ancestors_to_json([task2, spark2]),
156+
"parents": jobs_ancestors_to_json(expected_nodes),
151157
"dependencies": [
152158
{
153159
"from": {"kind": "JOB", "id": str(task1.id)},
@@ -166,9 +172,13 @@ async def test_get_job_dependencies_with_direction_downstream(
166172
async_session: AsyncSession,
167173
mocked_user: MockedUser,
168174
):
169-
(_, dag2, _), (_, task2, task3), (_, spark2, _) = job_dependency_chain
175+
(
176+
(_, dag2, dag3),
177+
(_, task2, task3),
178+
(_, spark2, _),
179+
) = job_dependency_chain
170180
expected_nodes = await enrich_jobs(
171-
[dag2, task2, spark2, task3],
181+
[dag2, task2, spark2, dag3, task3],
172182
async_session,
173183
)
174184

@@ -181,7 +191,7 @@ async def test_get_job_dependencies_with_direction_downstream(
181191
assert response.status_code == HTTPStatus.OK, response.json()
182192
assert response.json() == {
183193
"relations": {
184-
"parents": jobs_ancestors_to_json([task2, spark2]),
194+
"parents": jobs_ancestors_to_json(expected_nodes),
185195
"dependencies": [
186196
{
187197
"from": {"kind": "JOB", "id": str(task2.id)},

0 commit comments

Comments
 (0)