Skip to content

Commit e624be0

Browse files
committed
feat: implement task executor for background processing and enhance task management
1 parent 012dc61 commit e624be0

File tree

1 file changed

+16
-0
lines changed

1 file changed

+16
-0
lines changed

runtime/datamate-python/app/module/generation/service/generation_service.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,11 +102,20 @@ async def process_task(self, task_id: str):
102102

103103
# 逐个文件处理
104104
processed_count = 0
105+
total_task_chunks = 0
106+
processed_task_chunks = 0
105107
for file_id in file_ids:
106108
try:
109+
file_task = await self._get_or_create_file_instance(
110+
session, str(synth_task.id), file_id
111+
)
107112
success = await self._process_single_file(
108113
session, synth_task, file_id, max_qa_pairs=max_qa_pairs
109114
)
115+
# 累加任务级别的切片统计
116+
if file_task:
117+
total_task_chunks += file_task.total_chunks or 0
118+
processed_task_chunks += file_task.processed_chunks or 0
110119
except Exception as e:
111120
logger.exception(
112121
f"Unexpected error when processing file {file_id} for task {task_id}: {e}"
@@ -119,13 +128,20 @@ async def process_task(self, task_id: str):
119128
if success:
120129
processed_count += 1
121130
synth_task.processed_files = processed_count
131+
# 更新任务级别的切片统计
132+
synth_task.total_chunks = total_task_chunks
133+
synth_task.processed_chunks = processed_task_chunks
122134
await session.commit()
123135

124136
# 更新最终任务状态
125137
if processed_count == len(file_ids):
126138
synth_task.status = "completed"
127139
else:
128140
synth_task.status = "partially_completed"
141+
# 确保任务完成时切片统计准确
142+
if synth_task.status in ("completed", "partially_completed"):
143+
synth_task.total_chunks = total_task_chunks
144+
synth_task.processed_chunks = processed_task_chunks
129145
await session.commit()
130146

131147
logger.info(f"Finished processing synthesis task {synth_task.id}")

0 commit comments

Comments
 (0)