Skip to content

Commit 6aefc67

Browse files
committed
[DOP-31903] Make job_dependency tests more realistic
1 parent c1f7012 commit 6aefc67

2 files changed

Lines changed: 126 additions & 136 deletions

File tree

tests/test_server/fixtures/factories/job.py

Lines changed: 44 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -330,92 +330,90 @@ async def jobs_with_same_parent_job(
330330
@pytest_asyncio.fixture
331331
async def job_dependency_chain(
332332
async_session_maker: Callable[[], AbstractAsyncContextManager[AsyncSession]],
333-
) -> AsyncGenerator[tuple[tuple[Job, Job, Job]], None]:
333+
) -> AsyncGenerator[tuple[tuple[Job, Job, Job], ...], None]:
334334
"""
335335
Fixture that creates:
336-
- Parent-child hierarchy: root -> middle -> leaf via parent_job_id.
337-
- For each chain job - two items in job_dependency table:
338-
source_root -> root and root -> target_root
339-
source_middle -> middle and middle -> target_middle
340-
source_leaf -> leaf and leaf -> target_leaf
341-
342-
Returns (job_root, job_middle, job_leaf). Root/middle/leaf are not directly connected
343-
by dependency edges, only by parent-child links.
336+
- Parent-child hierarchy: dag -> task -> spark via parent_job_id
337+
- Job dependency edges: task1 -> task2 and task2 -> task3
338+
339+
There are no relations like Dag -> Dag and Spark -> Spark.
344340
"""
345341
async with async_session_maker() as async_session:
346342
location = await create_location(async_session)
347-
job_type = await create_job_type(async_session)
348-
job_root = await create_job(
343+
344+
job_type_dag = await create_job_type(async_session, {"type": "AIRFLOW_DAG"})
345+
dag1 = await create_job(
349346
async_session,
350347
location_id=location.id,
351-
job_type_id=job_type.id,
352-
job_kwargs={"name": "job-root"},
348+
job_type_id=job_type_dag.id,
349+
job_kwargs={"name": "dag1"},
353350
)
354-
job_middle = await create_job(
351+
dag2 = await create_job(
355352
async_session,
356353
location_id=location.id,
357-
job_type_id=job_type.id,
358-
job_kwargs={"name": "job-middle", "parent_job_id": job_root.id},
354+
job_type_id=job_type_dag.id,
355+
job_kwargs={"name": "dag2"},
359356
)
360-
job_leaf = await create_job(
357+
dag3 = await create_job(
361358
async_session,
362359
location_id=location.id,
363-
job_type_id=job_type.id,
364-
job_kwargs={"name": "job-leaf", "parent_job_id": job_middle.id},
360+
job_type_id=job_type_dag.id,
361+
job_kwargs={"name": "dag3"},
365362
)
366-
source_root = await create_job(
363+
364+
job_type_task = await create_job_type(async_session, {"type": "AIRFLOW_TASK"})
365+
task1 = await create_job(
367366
async_session,
368367
location_id=location.id,
369-
job_type_id=job_type.id,
370-
job_kwargs={"name": "source-root"},
368+
job_type_id=job_type_task.id,
369+
job_kwargs={"name": "task1", "parent_job_id": dag1.id},
371370
)
372-
target_root = await create_job(
371+
task2 = await create_job(
373372
async_session,
374373
location_id=location.id,
375-
job_type_id=job_type.id,
376-
job_kwargs={"name": "target-root"},
374+
job_type_id=job_type_task.id,
375+
job_kwargs={"name": "task2", "parent_job_id": dag2.id},
377376
)
378-
source_middle = await create_job(
377+
task3 = await create_job(
379378
async_session,
380379
location_id=location.id,
381-
job_type_id=job_type.id,
382-
job_kwargs={"name": "source-middle"},
380+
job_type_id=job_type_task.id,
381+
job_kwargs={"name": "task3", "parent_job_id": dag3.id},
383382
)
384-
target_middle = await create_job(
383+
384+
job_type_spark = await create_job_type(async_session, {"type": "SPARK_APPLICATION"})
385+
spark1 = await create_job(
385386
async_session,
386387
location_id=location.id,
387-
job_type_id=job_type.id,
388-
job_kwargs={"name": "target-middle"},
388+
job_type_id=job_type_spark.id,
389+
job_kwargs={"name": "spark1", "parent_job_id": task1.id},
389390
)
390-
source_leaf = await create_job(
391+
spark2 = await create_job(
391392
async_session,
392393
location_id=location.id,
393-
job_type_id=job_type.id,
394-
job_kwargs={"name": "source-leaf"},
394+
job_type_id=job_type_spark.id,
395+
job_kwargs={"name": "spark2", "parent_job_id": task2.id},
395396
)
396-
target_leaf = await create_job(
397+
spark3 = await create_job(
397398
async_session,
398399
location_id=location.id,
399-
job_type_id=job_type.id,
400-
job_kwargs={"name": "target-leaf"},
400+
job_type_id=job_type_spark.id,
401+
job_kwargs={"name": "spark3", "parent_job_id": task3.id},
401402
)
403+
402404
async_session.add_all(
403405
[
404-
JobDependency(from_job_id=source_root.id, to_job_id=job_root.id, type="DIRECT_DEPENDENCY"),
405-
JobDependency(from_job_id=job_root.id, to_job_id=target_root.id, type="DIRECT_DEPENDENCY"),
406-
JobDependency(from_job_id=source_middle.id, to_job_id=job_middle.id, type="DIRECT_DEPENDENCY"),
407-
JobDependency(from_job_id=job_middle.id, to_job_id=target_middle.id, type="DIRECT_DEPENDENCY"),
408-
JobDependency(from_job_id=source_leaf.id, to_job_id=job_leaf.id, type="DIRECT_DEPENDENCY"),
409-
JobDependency(from_job_id=job_leaf.id, to_job_id=target_leaf.id, type="DIRECT_DEPENDENCY"),
406+
JobDependency(from_job_id=task1.id, to_job_id=task2.id, type="DIRECT_DEPENDENCY"),
407+
JobDependency(from_job_id=task2.id, to_job_id=task3.id, type="DIRECT_DEPENDENCY"),
410408
],
411409
)
412410
await async_session.commit()
413411
async_session.expunge_all()
414412

415413
yield (
416-
(source_root, job_root, target_root),
417-
(source_middle, job_middle, target_middle),
418-
(source_leaf, job_leaf, target_leaf),
414+
(dag1, dag2, dag3),
415+
(task1, task2, task3),
416+
(spark1, spark2, spark3),
419417
)
420418

421419
async with async_session_maker() as async_session:

tests/test_server/test_jobs/test_job_dependencies.py

Lines changed: 82 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -60,12 +60,11 @@ async def test_get_job_dependencies_isolated_job(
6060

6161
async def test_get_job_dependencies_unauthorized(
6262
test_client: AsyncClient,
63-
job_dependency_chain: tuple[tuple[Job, Job, Job], ...],
63+
job: Job,
6464
):
65-
(_, job_middle, _) = job_dependency_chain[1]
6665
response = await test_client.get(
6766
"v1/jobs/dependencies",
68-
params={"start_node_id": job_middle.id},
67+
params={"start_node_id": job.id},
6968
)
7069
assert response.status_code == HTTPStatus.UNAUTHORIZED, response.json()
7170
assert response.json() == {
@@ -77,57 +76,54 @@ async def test_get_job_dependencies_unauthorized(
7776
}
7877

7978

80-
async def test_get_job_dependencies_default_request(
79+
async def test_get_job_dependencies_with_direction_both(
8180
test_client: AsyncClient,
8281
job_dependency_chain: tuple[tuple[Job, Job, Job], ...],
8382
async_session: AsyncSession,
8483
mocked_user: MockedUser,
8584
):
8685
(
87-
(source_root, job_root, target_root),
88-
(source_middle, job_middle, target_middle),
89-
(source_leaf, job_leaf, target_leaf),
86+
(_, dag2, _),
87+
(task1, task2, task3),
88+
(_, spark2, _),
9089
) = job_dependency_chain
91-
all_jobs = await enrich_jobs(
90+
expected_nodes = await enrich_jobs(
9291
[
93-
job_root,
94-
job_middle,
95-
job_leaf,
96-
source_root,
97-
target_root,
98-
source_middle,
99-
target_middle,
100-
source_leaf,
101-
target_leaf,
92+
dag2,
93+
task1,
94+
task2,
95+
task3,
96+
spark2,
10297
],
10398
async_session,
10499
)
105100

106-
response = await test_client.get(
107-
"v1/jobs/dependencies",
108-
headers={"Authorization": f"Bearer {mocked_user.access_token}"},
109-
params={"start_node_id": job_middle.id},
110-
)
111-
assert response.status_code == HTTPStatus.OK, response.json()
112-
assert response.json() == {
113-
"relations": {
114-
"parents": jobs_ancestors_to_json([job_root, job_middle, job_leaf]),
115-
"dependencies": [
116-
{"from": {"kind": "JOB", "id": str(from_id)}, "to": {"kind": "JOB", "id": str(to_id)}, "type": type_}
117-
for from_id, to_id, type_ in sorted(
118-
[
119-
(source_root.id, job_root.id, "DIRECT_DEPENDENCY"),
120-
(job_root.id, target_root.id, "DIRECT_DEPENDENCY"),
121-
(source_middle.id, job_middle.id, "DIRECT_DEPENDENCY"),
122-
(job_middle.id, target_middle.id, "DIRECT_DEPENDENCY"),
123-
(source_leaf.id, job_leaf.id, "DIRECT_DEPENDENCY"),
124-
(job_leaf.id, target_leaf.id, "DIRECT_DEPENDENCY"),
125-
]
126-
)
127-
],
128-
},
129-
"nodes": {"jobs": jobs_to_json(all_jobs)},
130-
}
101+
for start_node in [dag2, task2, spark2]:
102+
response = await test_client.get(
103+
"v1/jobs/dependencies",
104+
headers={"Authorization": f"Bearer {mocked_user.access_token}"},
105+
params={"start_node_id": start_node.id},
106+
)
107+
assert response.status_code == HTTPStatus.OK, response.json()
108+
assert response.json() == {
109+
"relations": {
110+
"parents": jobs_ancestors_to_json([task2, spark2]),
111+
"dependencies": [
112+
{
113+
"from": {"kind": "JOB", "id": str(from_id)},
114+
"to": {"kind": "JOB", "id": str(to_id)},
115+
"type": type_,
116+
}
117+
for from_id, to_id, type_ in sorted(
118+
[
119+
(task1.id, task2.id, "DIRECT_DEPENDENCY"),
120+
(task2.id, task3.id, "DIRECT_DEPENDENCY"),
121+
]
122+
)
123+
],
124+
},
125+
"nodes": {"jobs": jobs_to_json(expected_nodes)},
126+
}
131127

132128

133129
async def test_get_job_dependencies_with_direction_upstream(
@@ -136,34 +132,32 @@ async def test_get_job_dependencies_with_direction_upstream(
136132
async_session: AsyncSession,
137133
mocked_user: MockedUser,
138134
):
139-
(source_root, job_root, _), (source_middle, job_middle, _), (source_leaf, job_leaf, _) = job_dependency_chain
135+
(_, dag2, _), (task1, task2, _), (_, spark2, _) = job_dependency_chain
140136
expected_nodes = await enrich_jobs(
141-
[job_root, job_middle, job_leaf, source_root, source_middle, source_leaf],
137+
[task1, dag2, task2, spark2],
142138
async_session,
143139
)
144140

145-
response = await test_client.get(
146-
"v1/jobs/dependencies",
147-
headers={"Authorization": f"Bearer {mocked_user.access_token}"},
148-
params={"start_node_id": job_middle.id, "direction": "UPSTREAM"},
149-
)
150-
assert response.status_code == HTTPStatus.OK, response.json()
151-
assert response.json() == {
152-
"relations": {
153-
"parents": jobs_ancestors_to_json([job_root, job_middle, job_leaf]),
154-
"dependencies": [
155-
{"from": {"kind": "JOB", "id": str(from_id)}, "to": {"kind": "JOB", "id": str(to_id)}, "type": type_}
156-
for from_id, to_id, type_ in sorted(
157-
[
158-
(source_root.id, job_root.id, "DIRECT_DEPENDENCY"),
159-
(source_middle.id, job_middle.id, "DIRECT_DEPENDENCY"),
160-
(source_leaf.id, job_leaf.id, "DIRECT_DEPENDENCY"),
161-
]
162-
)
163-
],
164-
},
165-
"nodes": {"jobs": jobs_to_json(expected_nodes)},
166-
}
141+
for start_node in [dag2, task2, spark2]:
142+
response = await test_client.get(
143+
"v1/jobs/dependencies",
144+
headers={"Authorization": f"Bearer {mocked_user.access_token}"},
145+
params={"start_node_id": start_node.id, "direction": "UPSTREAM"},
146+
)
147+
assert response.status_code == HTTPStatus.OK, response.json()
148+
assert response.json() == {
149+
"relations": {
150+
"parents": jobs_ancestors_to_json([task2, spark2]),
151+
"dependencies": [
152+
{
153+
"from": {"kind": "JOB", "id": str(task1.id)},
154+
"to": {"kind": "JOB", "id": str(task2.id)},
155+
"type": "DIRECT_DEPENDENCY",
156+
},
157+
],
158+
},
159+
"nodes": {"jobs": jobs_to_json(expected_nodes)},
160+
}
167161

168162

169163
async def test_get_job_dependencies_with_direction_downstream(
@@ -172,31 +166,29 @@ async def test_get_job_dependencies_with_direction_downstream(
172166
async_session: AsyncSession,
173167
mocked_user: MockedUser,
174168
):
175-
(_, job_root, target_root), (_, job_middle, target_middle), (_, job_leaf, target_leaf) = job_dependency_chain
169+
(_, dag2, _), (_, task2, task3), (_, spark2, _) = job_dependency_chain
176170
expected_nodes = await enrich_jobs(
177-
[job_root, job_middle, job_leaf, target_root, target_middle, target_leaf],
171+
[dag2, task2, spark2, task3],
178172
async_session,
179173
)
180174

181-
response = await test_client.get(
182-
"v1/jobs/dependencies",
183-
headers={"Authorization": f"Bearer {mocked_user.access_token}"},
184-
params={"start_node_id": job_middle.id, "direction": "DOWNSTREAM"},
185-
)
186-
assert response.status_code == HTTPStatus.OK, response.json()
187-
assert response.json() == {
188-
"relations": {
189-
"parents": jobs_ancestors_to_json([job_root, job_middle, job_leaf]),
190-
"dependencies": [
191-
{"from": {"kind": "JOB", "id": str(from_id)}, "to": {"kind": "JOB", "id": str(to_id)}, "type": type_}
192-
for from_id, to_id, type_ in sorted(
193-
[
194-
(job_root.id, target_root.id, "DIRECT_DEPENDENCY"),
195-
(job_middle.id, target_middle.id, "DIRECT_DEPENDENCY"),
196-
(job_leaf.id, target_leaf.id, "DIRECT_DEPENDENCY"),
197-
]
198-
)
199-
],
200-
},
201-
"nodes": {"jobs": jobs_to_json(expected_nodes)},
202-
}
175+
for start_node in [dag2, task2, spark2]:
176+
response = await test_client.get(
177+
"v1/jobs/dependencies",
178+
headers={"Authorization": f"Bearer {mocked_user.access_token}"},
179+
params={"start_node_id": start_node.id, "direction": "DOWNSTREAM"},
180+
)
181+
assert response.status_code == HTTPStatus.OK, response.json()
182+
assert response.json() == {
183+
"relations": {
184+
"parents": jobs_ancestors_to_json([task2, spark2]),
185+
"dependencies": [
186+
{
187+
"from": {"kind": "JOB", "id": str(task2.id)},
188+
"to": {"kind": "JOB", "id": str(task3.id)},
189+
"type": "DIRECT_DEPENDENCY",
190+
},
191+
],
192+
},
193+
"nodes": {"jobs": jobs_to_json(expected_nodes)},
194+
}

0 commit comments

Comments
 (0)