Skip to content

Commit 26810cf

Browse files
committed
fix: fix the collection task creation
1 parent 8d07d36 commit 26810cf

4 files changed

Lines changed: 63 additions & 38 deletions

File tree

frontend/src/pages/DataCollection/Create/CreateTask.tsx

Lines changed: 33 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ export default function CollectionTaskCreate() {
182182
});
183183

184184
return (
185-
<div className="grid grid-cols-1 md:grid-cols-2 gap-x-6">
185+
<div className="grid grid-cols-1 md:grid-cols-2 gap-x-4 gap-y-2">
186186
{items}
187187
</div>
188188
);
@@ -211,10 +211,11 @@ export default function CollectionTaskCreate() {
211211
</div>
212212

213213
<div className="flex-overflow-auto border-card">
214-
<div className="flex-1 overflow-auto p-6">
214+
<div className="flex-1 overflow-auto p-4">
215215
<Form
216216
form={form}
217217
layout="vertical"
218+
className="[&_.ant-form-item]:mb-3 [&_.ant-form-item-label]:pb-1"
218219
initialValues={newTask}
219220
onValuesChange={(_, allValues) => {
220221
setNewTask({ ...newTask, ...allValues });
@@ -223,33 +224,36 @@ export default function CollectionTaskCreate() {
223224
{/* 基本信息 */}
224225
<h2 className="font-medium text-gray-900 text-lg mb-2">基本信息</h2>
225226

226-
<Form.Item
227-
label="名称"
228-
name="name"
229-
rules={[{ required: true, message: "请输入任务名称" }]}
230-
>
231-
<Input placeholder="请输入任务名称" />
232-
</Form.Item>
233-
<Form.Item label="描述" name="description">
234-
<TextArea placeholder="请输入任务描述" rows={3} />
235-
</Form.Item>
227+
<div className="grid grid-cols-1 md:grid-cols-2 gap-x-4 gap-y-2">
228+
<Form.Item
229+
label="名称"
230+
name="name"
231+
rules={[{ required: true, message: "请输入任务名称" }]}
232+
>
233+
<Input placeholder="请输入任务名称" />
234+
</Form.Item>
236235

237-
<Form.Item
238-
label="超时时间(秒)"
239-
name="timeoutSeconds"
240-
rules={[{ required: true, message: "请输入超时时间" }]}
241-
initialValue={3600}
242-
>
243-
<InputNumber
244-
className="w-full"
245-
min={1}
246-
precision={0}
247-
placeholder="默认 3600"
248-
/>
249-
</Form.Item>
236+
<Form.Item
237+
label="超时时间(秒)"
238+
name="timeoutSeconds"
239+
rules={[{ required: true, message: "请输入超时时间" }]}
240+
initialValue={3600}
241+
>
242+
<InputNumber
243+
className="w-full"
244+
min={1}
245+
precision={0}
246+
placeholder="默认 3600"
247+
/>
248+
</Form.Item>
249+
250+
<Form.Item className="md:col-span-2" label="描述" name="description">
251+
<TextArea placeholder="请输入任务描述" rows={2} />
252+
</Form.Item>
253+
</div>
250254

251255
{/* 同步配置 */}
252-
<h2 className="font-medium text-gray-900 pt-6 mb-2 text-lg">
256+
<h2 className="font-medium text-gray-900 pt-2 mb-1 text-lg">
253257
同步配置
254258
</h2>
255259
<Form.Item name="syncMode" label="同步方式">
@@ -275,7 +279,7 @@ export default function CollectionTaskCreate() {
275279
rules={[{ required: true, message: "请输入Cron表达式" }]}
276280
>
277281
<SimpleCronScheduler
278-
className="px-2 rounded"
282+
className="px-2 py-1 rounded"
279283
value={scheduleExpression}
280284
onChange={(value) => {
281285
setScheduleExpression(value);
@@ -289,7 +293,7 @@ export default function CollectionTaskCreate() {
289293
)}
290294

291295
{/* 模板配置 */}
292-
<h2 className="font-medium text-gray-900 pt-6 mb-2 text-lg">
296+
<h2 className="font-medium text-gray-900 pt-4 mb-2 text-lg">
293297
模板配置
294298
</h2>
295299

@@ -372,7 +376,7 @@ export default function CollectionTaskCreate() {
372376
) : null}
373377
</Form>
374378
</div>
375-
<div className="flex gap-2 justify-end border-top p-6">
379+
<div className="flex gap-2 justify-end border-top p-4">
376380
<Button onClick={() => navigate("/data/collection")}>取消</Button>
377381
<Button type="primary" onClick={handleSubmit}>
378382
创建任务

runtime/datamate-python/app/module/collection/client/datax_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ def _run_process(self, cmd: list[str], log_f) -> int:
155155

156156
# 创建读取线程
157157
stdout_thread = threading.Thread(target=lambda stream=process.stdout: self.read_stream(stream, log_f))
158-
stderr_thread = threading.Thread(target=lambda stream=process.stdout: self.read_stream(stream, log_f))
158+
stderr_thread = threading.Thread(target=lambda stream=process.stderr: self.read_stream(stream, log_f))
159159

160160
stdout_thread.start()
161161
stderr_thread.start()

runtime/datamate-python/app/module/collection/interface/collection.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import uuid
33
from typing import Optional
44

5-
from fastapi import APIRouter, Depends, HTTPException, Query, BackgroundTasks
5+
from fastapi import APIRouter, Depends, HTTPException, Query
66
from sqlalchemy import select, func
77
from sqlalchemy.ext.asyncio import AsyncSession
88

@@ -25,7 +25,6 @@
2525
@router.post("", response_model=StandardResponse[CollectionTaskBase])
2626
async def create_task(
2727
request: CollectionTaskCreate,
28-
background_tasks: BackgroundTasks,
2928
db: AsyncSession = Depends(get_db)
3029
):
3130
"""创建归集任务"""
@@ -40,7 +39,7 @@ async def create_task(
4039
task = convert_for_create(request, task_id)
4140
task.template_name = template.name
4241

43-
task_service = CollectionTaskService(db, background_tasks)
42+
task_service = CollectionTaskService(db)
4443
task = await task_service.create_task(task)
4544

4645
task = await db.execute(select(CollectionTask).where(CollectionTask.id == task.id))

runtime/datamate-python/app/module/collection/service/collection.py

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import asyncio
2+
from dataclasses import dataclass
3+
from typing import Any, Optional
24

3-
from fastapi import BackgroundTasks
45
from sqlalchemy import select
56
from sqlalchemy.ext.asyncio import AsyncSession
67

@@ -13,10 +14,29 @@
1314

1415
logger = get_logger(__name__)
1516

17+
18+
@dataclass
19+
class _RuntimeTask:
20+
id: str
21+
config: str
22+
timeout_seconds: int
23+
sync_mode: str
24+
status: Optional[str] = None
25+
26+
27+
@dataclass
28+
class _RuntimeExecution:
29+
id: str
30+
log_path: str
31+
started_at: Optional[Any] = None
32+
completed_at: Optional[Any] = None
33+
duration_seconds: Optional[float] = None
34+
error_message: Optional[str] = None
35+
status: Optional[str] = None
36+
1637
class CollectionTaskService:
17-
def __init__(self, db: AsyncSession, background_tasks: BackgroundTasks = None):
38+
def __init__(self, db: AsyncSession):
1839
self.db = db
19-
self.background_tasks = background_tasks
2040

2141
async def create_task(self, task: CollectionTask) -> CollectionTask:
2242
self.db.add(task)
@@ -25,7 +45,7 @@ async def create_task(self, task: CollectionTask) -> CollectionTask:
2545
if task.sync_mode == SyncMode.ONCE:
2646
task.status = TaskStatus.RUNNING.name
2747
await self.db.commit()
28-
asyncio.create_task(self.run_async(task.id))
48+
asyncio.create_task(CollectionTaskService.run_async(task.id))
2949
return task
3050

3151
@staticmethod
@@ -44,5 +64,7 @@ async def run_async(task_id: str):
4464
task_execution = create_execute_record(task)
4565
session.add(task_execution)
4666
await session.commit()
47-
DataxClient(execution=task_execution, task=task).run_datax_job()
67+
await asyncio.to_thread(
68+
DataxClient(execution=task_execution, task=task).run_datax_job
69+
)
4870
await session.commit()

0 commit comments

Comments
 (0)