Skip to content

Commit 8dcda8a

Browse files
dablauranusjr
andauthored
Make set_xcom route in API server DRY by reusing logic of XComModel.set method (#55289)
Co-authored-by: Tzu-ping Chung <uranusjr@gmail.com>
1 parent 2b77043 commit 8dcda8a

2 files changed

Lines changed: 17 additions & 33 deletions

File tree

  • airflow-core/src/airflow

airflow-core/src/airflow/api_fastapi/execution_api/routes/xcoms.py

Lines changed: 5 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -385,39 +385,20 @@ def set_xcom(
385385
# TODO: Can/should we check if a client _hasn't_ provided this for an upstream of a mapped task? That
386386
# means loading the serialized dag and that seems like a relatively costly operation for minimal benefit
387387
# (the mapped task would fail in a moment as it can't be expanded anyway.)
388-
from airflow.models.dagrun import DagRun
389-
390-
if not run_id:
391-
raise HTTPException(status.HTTP_404_NOT_FOUND, f"Run with ID: `{run_id}` was not found")
392-
393-
dag_run_id = session.query(DagRun.id).filter_by(dag_id=dag_id, run_id=run_id).scalar()
394-
if dag_run_id is None:
395-
raise HTTPException(status.HTTP_404_NOT_FOUND, f"DAG run not found on DAG {dag_id} with ID {run_id}")
396-
397-
# Remove duplicate XComs and insert a new one.
398-
session.execute(
399-
delete(XComModel).where(
400-
XComModel.key == key,
401-
XComModel.run_id == run_id,
402-
XComModel.task_id == task_id,
403-
XComModel.dag_id == dag_id,
404-
XComModel.map_index == map_index,
405-
)
406-
)
407-
408388
try:
409389
# We expect serialised value from the caller - sdk, do not serialise in here
410-
new = XComModel(
411-
dag_run_id=dag_run_id,
390+
XComModel.set(
412391
key=key,
413392
value=value,
414393
run_id=run_id,
415394
task_id=task_id,
416395
dag_id=dag_id,
417396
map_index=map_index,
397+
serialize=False,
398+
session=session,
418399
)
419-
session.add(new)
420-
session.flush()
400+
except ValueError as e:
401+
raise HTTPException(status.HTTP_404_NOT_FOUND, str(e))
421402
except TypeError as e:
422403
raise HTTPException(
423404
status_code=status.HTTP_400_BAD_REQUEST,

airflow-core/src/airflow/models/xcom.py

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,7 @@ def set(
167167
task_id: str,
168168
run_id: str,
169169
map_index: int = -1,
170+
serialize: bool = True,
170171
session: Session = NEW_SESSION,
171172
) -> None:
172173
"""
@@ -178,7 +179,8 @@ def set(
178179
:param task_id: Task ID.
179180
:param run_id: DAG run ID for the task.
180181
:param map_index: Optional map index to assign XCom for a mapped task.
181-
The default is ``-1`` (set for a non-mapped task).
182+
:param serialize: Optional parameter to specify if value should be serialized or not.
183+
The default is ``True``.
182184
:param session: Database session. If not given, a new session will be
183185
created for this function.
184186
"""
@@ -215,14 +217,15 @@ def set(
215217
)
216218
value = list(value)
217219

218-
value = cls.serialize_value(
219-
value=value,
220-
key=key,
221-
task_id=task_id,
222-
dag_id=dag_id,
223-
run_id=run_id,
224-
map_index=map_index,
225-
)
220+
if serialize:
221+
value = cls.serialize_value(
222+
value=value,
223+
key=key,
224+
task_id=task_id,
225+
dag_id=dag_id,
226+
run_id=run_id,
227+
map_index=map_index,
228+
)
226229

227230
# Remove duplicate XComs and insert a new one.
228231
session.execute(

0 commit comments

Comments
 (0)