Skip to content

Commit c84ad5d

Browse files
JasonW404Jason Wang
andauthored
fix: Fix the sync issue between DataMate & LabelStudio; Fix the tag update issue in DataMate (#399)
Co-authored-by: Jason Wang <wangjinglong8@huawei.com>
1 parent 6513450 commit c84ad5d

File tree

10 files changed

+4023
-63
lines changed

10 files changed

+4023
-63
lines changed

runtime/datamate-python/app/module/annotation/interface/auto.py

Lines changed: 49 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1202,9 +1202,9 @@ async def sync_auto_task_annotations_to_database(
12021202
12031203
行为:
12041204
- 根据自动标注任务找到或创建对应的 Label Studio 项目(与 /sync-label-studio-back 相同的解析逻辑);
1205-
- 遍历项目下所有 task,按 task.data.file_id 定位 t_dm_dataset_files 记录
1206-
- 把 annotations + predictions 写入 annotation 字段,并抽取标签写入 tags,更新 tags_updated_at
1207-
返回成功更新的文件数量
1205+
- 先执行 DM -> Label Studio 的文件差异同步(确保任务文件集合最新)
1206+
- 再执行双向标签同步(LS <-> DM),按时间戳保留较新标注
1207+
返回同步变更数量(LS->DM + DM->LS)
12081208
"""
12091209

12101210
# 1. 获取并校验自动标注任务
@@ -1217,16 +1217,19 @@ async def sync_auto_task_annotations_to_database(
12171217
mappings = await mapping_service.get_mappings_by_dataset_id(task.dataset_id)
12181218

12191219
project_id: Optional[str] = None
1220+
selected_mapping = None
12201221
for m in mappings:
12211222
cfg = getattr(m, "configuration", None) or {}
12221223
if isinstance(cfg, dict) and cfg.get("autoTaskId") == task.id:
12231224
project_id = str(m.labeling_project_id)
1225+
selected_mapping = m
12241226
break
12251227

12261228
if project_id is None:
12271229
for m in mappings:
12281230
if m.name == task.name:
12291231
project_id = str(m.labeling_project_id)
1232+
selected_mapping = m
12301233
break
12311234

12321235
if project_id is None:
@@ -1263,13 +1266,52 @@ async def sync_auto_task_annotations_to_database(
12631266
),
12641267
)
12651268

1266-
# 3. 调用通用的 LS -> DM 同步服务
1269+
selected_mapping = await mapping_service.get_mapping_by_labeling_project_id(project_id)
1270+
1271+
if selected_mapping is None and project_id is not None:
1272+
selected_mapping = await mapping_service.get_mapping_by_labeling_project_id(project_id)
1273+
1274+
if selected_mapping is None:
1275+
raise HTTPException(
1276+
status_code=500,
1277+
detail="Failed to resolve mapping for auto task when syncing files to Label Studio.",
1278+
)
1279+
12671280
ls_client = LabelStudioClient(
12681281
base_url=settings.label_studio_base_url,
12691282
token=settings.label_studio_user_token,
12701283
)
1271-
sync_service = LSAnnotationSyncService(db, ls_client)
12721284

1273-
updated = await sync_service.sync_project_annotations_to_dm(project_id=str(project_id))
1285+
# 3. 先执行文件差异同步,确保 LS 工程任务集合与当前自动标注任务文件集合一致
1286+
dm_client = DatasetManagementService(db)
1287+
sync_orchestrator = SyncService(dm_client, ls_client, mapping_service)
1288+
1289+
allowed_file_ids = None
1290+
if task.file_ids:
1291+
allowed_file_ids = {str(fid) for fid in task.file_ids}
1292+
1293+
file_sync_result = await sync_orchestrator.sync_files(
1294+
selected_mapping,
1295+
batch_size=50,
1296+
allowed_file_ids=allowed_file_ids,
1297+
delete_orphans=True,
1298+
)
1299+
logger.info(
1300+
"Auto sync-db pre file-sync done: task_id=%s, created=%s, deleted=%s, total=%s",
1301+
task_id,
1302+
file_sync_result.get("created", 0),
1303+
file_sync_result.get("deleted", 0),
1304+
file_sync_result.get("total", 0),
1305+
)
1306+
1307+
# 4. 执行双向标签同步(基于时间戳决策覆盖)
1308+
annotation_sync_result = await sync_orchestrator.sync_annotations_bidirectional(
1309+
selected_mapping,
1310+
batch_size=50,
1311+
overwrite=True,
1312+
overwrite_ls=True,
1313+
sync_files_first=False,
1314+
)
12741315

1275-
return StandardResponse(code="0", message="success", data=updated)
1316+
total_synced = annotation_sync_result.synced_to_dm + annotation_sync_result.synced_to_ls
1317+
return StandardResponse(code="0", message="success", data=total_synced)

runtime/datamate-python/app/module/annotation/interface/project.py

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -612,14 +612,13 @@ async def sync_manual_annotations_to_database(
612612
):
613613
"""从 Label Studio 项目同步当前手动标注结果到 DM 数据库。
614614
615-
行为:
616-
- 基于 mapping_id 定位 Label Studio 项目;
617-
- 遍历项目下所有 task,按 task.data.file_id 找到对应 t_dm_dataset_files 记录;
618-
- 读取每个 task 的 annotations + predictions,写入:
619-
* tags: 从 result 中提取的标签概要,供 DM 列表/预览展示;
620-
* annotation: 完整原始 JSON 结果;
621-
* tags_updated_at: 当前时间戳。
622-
返回值为成功更新的文件数量。
615+
行为:
616+
- 基于 mapping_id 定位 Label Studio 项目;
617+
- 先执行 DM -> Label Studio 的文件差异同步(增量创建新文件任务、删除孤儿任务);
618+
- 再执行双向标签同步(LS <-> DM),按时间戳保留较新标注:
619+
* LS 更新时间更新 -> 覆盖 DM;
620+
* DM tags_updated_at 更新 -> 覆盖 LS。
621+
返回值为同步变更数量(LS->DM + DM->LS)。
623622
"""
624623

625624
mapping_service = DatasetMappingService(db)
@@ -631,13 +630,28 @@ async def sync_manual_annotations_to_database(
631630
base_url=settings.label_studio_base_url,
632631
token=settings.label_studio_user_token,
633632
)
634-
sync_service = LSAnnotationSyncService(db, ls_client)
635633

636-
updated = await sync_service.sync_project_annotations_to_dm(
637-
project_id=str(mapping.labeling_project_id),
634+
dm_client = DatasetManagementService(db)
635+
sync_orchestrator = SyncService(dm_client, ls_client, mapping_service)
636+
file_sync_result = await sync_orchestrator.sync_files(mapping, batch_size=50)
637+
logger.info(
638+
"Manual sync-db pre file-sync done: mapping=%s, created=%s, deleted=%s, total=%s",
639+
mapping_id,
640+
file_sync_result.get("created", 0),
641+
file_sync_result.get("deleted", 0),
642+
file_sync_result.get("total", 0),
643+
)
644+
645+
annotation_sync_result = await sync_orchestrator.sync_annotations_bidirectional(
646+
mapping,
647+
batch_size=50,
648+
overwrite=True,
649+
overwrite_ls=True,
650+
sync_files_first=False,
638651
)
639652

640-
return StandardResponse(code="0", message="success", data=updated)
653+
total_synced = annotation_sync_result.synced_to_dm + annotation_sync_result.synced_to_ls
654+
return StandardResponse(code="0", message="success", data=total_synced)
641655

642656
@router.get("", response_model=StandardResponse[PaginatedData[DatasetMappingResponse]])
643657
async def list_mappings(

runtime/datamate-python/app/module/annotation/interface/task.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -72,20 +72,23 @@ async def sync_dataset_content(
7272
await sync_service.sync_annotations_from_ls_to_dm(
7373
mapping,
7474
request.batch_size,
75-
request.overwrite
75+
request.overwrite,
76+
sync_files_first=False,
7677
)
7778
elif request.annotation_direction == "dm_to_ls":
7879
await sync_service.sync_annotations_from_dm_to_ls(
7980
mapping,
8081
request.batch_size,
81-
request.overwrite_labeling_project
82+
request.overwrite_labeling_project,
83+
sync_files_first=False,
8284
)
8385
elif request.annotation_direction == "bidirectional":
8486
await sync_service.sync_annotations_bidirectional(
8587
mapping,
8688
request.batch_size,
8789
request.overwrite,
88-
request.overwrite_labeling_project
90+
request.overwrite_labeling_project,
91+
sync_files_first=False,
8992
)
9093

9194
logger.info(f"Sync completed: {result.synced_files}/{result.total_files} files")
@@ -139,20 +142,23 @@ async def sync_annotations(
139142
result = await sync_service.sync_annotations_from_ls_to_dm(
140143
mapping,
141144
request.batch_size,
142-
request.overwrite
145+
request.overwrite,
146+
sync_files_first=True,
143147
)
144148
elif request.direction == "dm_to_ls":
145149
result = await sync_service.sync_annotations_from_dm_to_ls(
146150
mapping,
147151
request.batch_size,
148-
request.overwrite_labeling_project
152+
request.overwrite_labeling_project,
153+
sync_files_first=True,
149154
)
150155
elif request.direction == "bidirectional":
151156
result = await sync_service.sync_annotations_bidirectional(
152157
mapping,
153158
request.batch_size,
154159
request.overwrite,
155-
request.overwrite_labeling_project
160+
request.overwrite_labeling_project,
161+
sync_files_first=True,
156162
)
157163
else:
158164
raise HTTPException(

runtime/datamate-python/app/module/annotation/service/ls_annotation_sync.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,20 +194,34 @@ def iter_results() -> List[Dict[str, Any]]:
194194

195195
for r in results:
196196
r_type = r.get("type")
197+
if isinstance(r_type, str):
198+
r_type = r_type.strip().lower()
199+
197200
from_name = r.get("from_name") or r.get("fromName")
201+
to_name = r.get("to_name") or r.get("toName")
198202
value_obj = r.get("value") or {}
199203
if not isinstance(value_obj, dict):
200204
continue
201205

202206
# 将 Label Studio 的 value 映射为 values,方便前端统一解析
203207
values: Dict[str, Any] = {}
204208
for key, v in value_obj.items():
205-
values[key] = v
209+
normalized_key = str(key).strip().lower() if key is not None else ""
210+
if normalized_key:
211+
values[normalized_key] = v
212+
213+
if isinstance(r_type, str) and r_type:
214+
if r_type in values:
215+
values = {r_type: values[r_type]}
216+
elif len(values) == 1:
217+
only_value = next(iter(values.values()))
218+
values = {r_type: only_value}
206219

207220
tag = {
208221
"id": r.get("id"),
209222
"type": r_type,
210223
"from_name": from_name,
224+
"to_name": to_name,
211225
"values": values,
212226
}
213227
normalized.append(tag)

0 commit comments

Comments
 (0)