diff --git a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/dto/DatasetFileResponse.java b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/dto/DatasetFileResponse.java index 9e7dfbeb..fae3b4f3 100644 --- a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/dto/DatasetFileResponse.java +++ b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/dto/DatasetFileResponse.java @@ -31,6 +31,8 @@ public class DatasetFileResponse { private String tags; /** 标签更新时间 */ private LocalDateTime tagsUpdatedAt; + /** 文件元数据(包含标注信息等,JSON 字符串) */ + private String metadata; /** 上传时间 */ private LocalDateTime uploadTime; /** 最后更新时间 */ diff --git a/frontend/src/components/business/DatasetFileTransfer.tsx b/frontend/src/components/business/DatasetFileTransfer.tsx index 22bfce16..7e89b68e 100644 --- a/frontend/src/components/business/DatasetFileTransfer.tsx +++ b/frontend/src/components/business/DatasetFileTransfer.tsx @@ -21,6 +21,12 @@ interface DatasetFileTransferProps onSelectedFilesChange: (filesMap: { [key: string]: DatasetFile }) => void; onDatasetSelect?: (dataset: Dataset | null) => void; datasetTypeFilter?: DatasetType; + /** + * 允许选择的文件扩展名白名单(小写,包含点号,例如 ".jpg")。 + * - 若不设置,则不过滤扩展名; + * - 若设置,则仅展示和选择这些扩展名的文件(包括“全选当前数据集”)。 + */ + allowedFileExtensions?: string[]; /** * 是否强制“单数据集模式”: * - 为 true 时,仅允许从同一个数据集选择文件; @@ -77,6 +83,7 @@ const DatasetFileTransfer: React.FC = ({ onSelectedFilesChange, onDatasetSelect, datasetTypeFilter, + allowedFileExtensions, singleDatasetOnly, fixedDatasetId, lockedFileIds, @@ -180,19 +187,28 @@ const DatasetFileTransfer: React.FC = ({ size: pageSize, keyword, }); - setFiles( - (data.content || []).map((item: DatasetFile) => ({ - ...item, - id: item.id, - key: String(item.id), // rowKey 使用字符串,确保与 selectedRowKeys 类型一致 - // 记录所属数据集,方便后续在“全不选”时只清空当前数据集的选择 - // DatasetFile 接口是后端模型,这里在前端扩展 datasetId 字段 - // eslint-disable-next-line @typescript-eslint/ban-ts-comment - // @ts-ignore - datasetId: selectedDataset.id, - datasetName: selectedDataset.name, - })) - ); + const mapped = (data.content || []).map((item: DatasetFile) => ({ + ...item, + id: item.id, + key: String(item.id), // rowKey 使用字符串,确保与 selectedRowKeys 类型一致 + // 记录所属数据集,方便后续在“全不选”时只清空当前数据集的选择 + // DatasetFile 接口是后端模型,这里在前端扩展 datasetId 字段 + // eslint-disable-next-line @typescript-eslint/ban-ts-comment + // @ts-ignore + datasetId: selectedDataset.id, + datasetName: selectedDataset.name, + })); + + const filtered = + allowedFileExtensions && allowedFileExtensions.length > 0 + ? mapped.filter((file) => { + const ext = + file.fileName?.toLowerCase().match(/\.[^.]+$/)?.[0] || ""; + return allowedFileExtensions.includes(ext); + }) + : mapped; + + setFiles(filtered); setFilesPagination((prev) => ({ ...prev, current: page, @@ -200,7 +216,7 @@ const DatasetFileTransfer: React.FC = ({ total: data.totalElements, })); }, - [selectedDataset, filesPagination.current, filesPagination.pageSize, filesSearch] + [selectedDataset, filesPagination.current, filesPagination.pageSize, filesSearch, allowedFileExtensions] ); useEffect(() => { @@ -269,7 +285,7 @@ const DatasetFileTransfer: React.FC = ({ size: pageSize, }); - const content: DatasetFile[] = (data.content || []).map( + const mapped: DatasetFile[] = (data.content || []).map( (item: DatasetFile) => ({ ...item, key: item.id, @@ -281,6 +297,15 @@ const DatasetFileTransfer: React.FC = ({ }), ); + const content: DatasetFile[] = + allowedFileExtensions && allowedFileExtensions.length > 0 + ? mapped.filter((file) => { + const ext = + file.fileName?.toLowerCase().match(/\.[^.]+$/)?.[0] || ""; + return allowedFileExtensions.includes(ext); + }) + : mapped; + if (!content.length) { break; } @@ -306,7 +331,7 @@ const DatasetFileTransfer: React.FC = ({ onSelectedFilesChange(newMap); - const count = total || allFiles.length; + const count = allFiles.length; if (count > 0) { message.success(`已选中当前数据集的全部 ${count} 个文件`); } else { diff --git a/frontend/src/pages/DataAnnotation/AutoAnnotation/AutoAnnotation.tsx b/frontend/src/pages/DataAnnotation/AutoAnnotation/AutoAnnotation.tsx index 8de4d190..06c30f28 100644 --- a/frontend/src/pages/DataAnnotation/AutoAnnotation/AutoAnnotation.tsx +++ b/frontend/src/pages/DataAnnotation/AutoAnnotation/AutoAnnotation.tsx @@ -18,6 +18,7 @@ import { deleteAutoAnnotationTaskByIdUsingDelete, downloadAutoAnnotationResultUsingGet, queryAnnotationTasksUsingGet, + syncAutoAnnotationToDatabaseUsingPost, } from "../annotation.api"; import CreateAutoAnnotationDialog from "./components/CreateAutoAnnotationDialog"; import EditAutoAnnotationDatasetDialog from "./components/EditAutoAnnotationDatasetDialog"; @@ -123,6 +124,32 @@ export default function AutoAnnotation() { setShowImportDialog(true); }; + const handleSyncToDatabase = (task: AutoAnnotationTask) => { + Modal.confirm({ + title: `确认将自动标注任务「${task.name}」在 Label Studio 中的标注结果同步到数据库吗?`, + content: ( +
+
此操作会根据 Label Studio 中的任务数据覆盖当前文件标签与标注信息。
+
同步完成后,可在数据管理的文件详情中查看最新标签与标注。
+
+ ), + okText: "同步到数据库", + cancelText: "取消", + onOk: async () => { + const hide = message.loading("正在从 Label Studio 同步标注到数据库...", 0); + try { + await syncAutoAnnotationToDatabaseUsingPost(task.id); + hide(); + message.success("同步完成"); + } catch (e) { + console.error(e); + hide(); + message.error("同步失败,请稍后重试"); + } + }, + }); + }; + const handleDelete = (task: AutoAnnotationTask) => { Modal.confirm({ title: `确认删除自动标注任务「${task.name}」吗?`, @@ -307,14 +334,14 @@ export default function AutoAnnotation() { 编辑 - + @@ -344,6 +371,12 @@ export default function AutoAnnotation() { , + onClick: () => handleImportFromLabelStudio(record), + }, { key: "edit-dataset", label: "编辑任务数据集", diff --git a/frontend/src/pages/DataAnnotation/Create/components/CreateAnnotationTaskDialog.tsx b/frontend/src/pages/DataAnnotation/Create/components/CreateAnnotationTaskDialog.tsx index cc389ecf..d9cbf0c6 100644 --- a/frontend/src/pages/DataAnnotation/Create/components/CreateAnnotationTaskDialog.tsx +++ b/frontend/src/pages/DataAnnotation/Create/components/CreateAnnotationTaskDialog.tsx @@ -120,6 +120,7 @@ export default function CreateAnnotationTask({ const [selectedDataset, setSelectedDataset] = useState(null); const [imageFileCount, setImageFileCount] = useState(0); const [manualDatasetTypeFilter, setManualDatasetTypeFilter] = useState(undefined); + const [manualAllowedExtensions, setManualAllowedExtensions] = useState(undefined); useEffect(() => { if (!open) return; @@ -178,6 +179,11 @@ export default function CreateAnnotationTask({ setImageFileCount(count); }, [selectedFilesMap]); + const IMAGE_EXTENSIONS = [".jpg", ".jpeg", ".png", ".bmp", ".gif", ".tiff", ".webp"]; + const TEXT_EXTENSIONS = [".txt", ".md", ".csv", ".tsv", ".jsonl", ".log"]; + const AUDIO_EXTENSIONS = [".wav", ".mp3", ".flac", ".aac", ".ogg", ".m4a", ".wma"]; + const VIDEO_EXTENSIONS = [".mp4", ".avi", ".mov", ".mkv", ".flv", ".wmv", ".webm"]; + const mapTemplateDataTypeToDatasetType = (raw?: string): DatasetType | undefined => { if (!raw) return undefined; const v = String(raw).trim().toLowerCase(); @@ -218,6 +224,40 @@ export default function CreateAnnotationTask({ return undefined; }; + const getAllowedExtensionsForTemplateDataType = (raw?: string): string[] | undefined => { + if (!raw) return undefined; + const v = String(raw).trim().toLowerCase(); + + const textTokens = new Set([ + "text", + DataType.TEXT.toLowerCase(), + "文本", + ]); + const imageTokens = new Set([ + "image", + DataType.IMAGE.toLowerCase(), + "图像", + "图片", + ]); + const audioTokens = new Set([ + "audio", + DataType.AUDIO.toLowerCase(), + "音频", + ]); + const videoTokens = new Set([ + "video", + DataType.VIDEO.toLowerCase(), + "视频", + ]); + + if (textTokens.has(v)) return TEXT_EXTENSIONS; + if (imageTokens.has(v)) return IMAGE_EXTENSIONS; + if (audioTokens.has(v)) return AUDIO_EXTENSIONS; + if (videoTokens.has(v)) return VIDEO_EXTENSIONS; + + return undefined; + }; + const handleManualSubmit = async () => { try { const values = await manualForm.validateFields(); @@ -417,6 +457,9 @@ export default function CreateAnnotationTask({ const nextType = mapTemplateDataTypeToDatasetType(tpl?.dataType); setManualDatasetTypeFilter(nextType); + const nextExtensions = getAllowedExtensionsForTemplateDataType(tpl?.dataType); + setManualAllowedExtensions(nextExtensions); + // 若当前已选数据集类型与模板不匹配,则清空当前选择 if (selectedDataset && nextType && selectedDataset.datasetType !== nextType) { setSelectedDataset(null); @@ -459,6 +502,7 @@ export default function CreateAnnotationTask({ } }} datasetTypeFilter={manualDatasetTypeFilter} + allowedFileExtensions={manualAllowedExtensions} singleDatasetOnly disabled={!manualForm.getFieldValue("templateId")} /> @@ -512,6 +556,7 @@ export default function CreateAnnotationTask({ autoForm.setFieldsValue({ datasetId: dataset?.id ?? "" }); }} datasetTypeFilter={DatasetType.IMAGE} + allowedFileExtensions={IMAGE_EXTENSIONS} singleDatasetOnly /> {selectedDataset && ( diff --git a/frontend/src/pages/DataAnnotation/Home/DataAnnotation.tsx b/frontend/src/pages/DataAnnotation/Home/DataAnnotation.tsx index 8f045076..6129a039 100644 --- a/frontend/src/pages/DataAnnotation/Home/DataAnnotation.tsx +++ b/frontend/src/pages/DataAnnotation/Home/DataAnnotation.tsx @@ -22,6 +22,8 @@ import { deleteAutoAnnotationTaskByIdUsingDelete, getAutoAnnotationLabelStudioProjectUsingGet, loginAnnotationUsingGet, + syncManualAnnotationToDatabaseUsingPost, + syncAutoAnnotationToDatabaseUsingPost, } from "../annotation.api"; import { mapAnnotationTask } from "../annotation.const"; import CreateAnnotationTask from "../Create/components/CreateAnnotationTaskDialog"; @@ -261,6 +263,37 @@ export default function DataAnnotation() { setShowImportManualDialog(true); }; + const handleSyncManualToDatabase = async (task: AnnotationTask) => { + if (!task?.id) { + message.error("未找到标注任务"); + return; + } + + Modal.confirm({ + title: `确认将「${task.name}」在 Label Studio 中的标注结果同步到数据库吗?`, + content: ( +
+
此操作会根据 Label Studio 中的任务数据覆盖当前文件标签与标注信息。
+
同步完成后,可在数据管理的文件详情中查看最新标签与标注。
+
+ ), + okText: "同步到数据库", + cancelText: "取消", + onOk: async () => { + const hide = message.loading("正在从 Label Studio 同步标注到数据库...", 0); + try { + await syncManualAnnotationToDatabaseUsingPost(task.id as any); + hide(); + message.success("同步完成"); + } catch (e) { + console.error(e); + hide(); + message.error("同步失败,请稍后重试"); + } + }, + }); + }; + const handleImportAutoFromLabelStudio = (row: any) => { if (!row?.id) { message.error("未找到对应的自动标注任务"); @@ -277,6 +310,37 @@ export default function DataAnnotation() { setShowImportAutoDialog(true); }; + const handleSyncAutoToDatabase = (row: any) => { + if (!row?.id) { + message.error("未找到对应的自动标注任务"); + return; + } + + Modal.confirm({ + title: `确认将自动标注任务「${row.name}」在 Label Studio 中的标注结果同步到数据库吗?`, + content: ( +
+
此操作会根据 Label Studio 中的任务数据覆盖当前文件标签与标注信息。
+
同步完成后,可在数据管理的文件详情中查看最新标签与标注。
+
+ ), + okText: "同步到数据库", + cancelText: "取消", + onOk: async () => { + const hide = message.loading("正在从 Label Studio 同步标注到数据库...", 0); + try { + await syncAutoAnnotationToDatabaseUsingPost(row.id); + hide(); + message.success("同步完成"); + } catch (e) { + console.error(e); + hide(); + message.error("同步失败,请稍后重试"); + } + }, + }); + }; + const handleAnnotateAuto = (task: any) => { (async () => { try { @@ -430,9 +494,15 @@ export default function DataAnnotation() { onClick: handleAnnotate, }, { - key: "back-sync", - label: "同步", + key: "sync-db", + label: "同步到数据库", icon: , + onClick: handleSyncManualToDatabase, + }, + { + key: "export-result", + label: "导出标注结果", + icon: , // 导出/下载类图标 onClick: handleImportManualFromLabelStudio, }, { @@ -624,15 +694,21 @@ export default function DataAnnotation() { , + onClick: () => handleImportManualFromLabelStudio(task), + }, { key: "edit-dataset", label: "编辑任务数据集", @@ -668,16 +744,22 @@ export default function DataAnnotation() { {/* 二级功能:编辑任务数据集 + 删除任务(折叠菜单) */} , + onClick: () => handleImportAutoFromLabelStudio(task), + }, { key: "edit-dataset", label: "编辑任务数据集", diff --git a/frontend/src/pages/DataAnnotation/annotation.api.ts b/frontend/src/pages/DataAnnotation/annotation.api.ts index f0faa5e1..1fb21f36 100644 --- a/frontend/src/pages/DataAnnotation/annotation.api.ts +++ b/frontend/src/pages/DataAnnotation/annotation.api.ts @@ -79,6 +79,11 @@ export function importManualAnnotationFromLabelStudioUsingPost( return post(`/api/annotation/project/${mappingId}/sync-label-studio-back`, data); } +// 手动标注:将 Label Studio 中的标注结果同步回数据库(更新 t_dm_dataset_files.tags/annotation) +export function syncManualAnnotationToDatabaseUsingPost(mappingId: string) { + return post(`/api/annotation/project/${mappingId}/sync-db`); +} + export function downloadAutoAnnotationResultUsingGet(taskId: string) { return download(`/api/annotation/auto/${taskId}/download`); } @@ -96,6 +101,11 @@ export function importAutoAnnotationFromLabelStudioUsingPost( return post(`/api/annotation/auto/${taskId}/sync-label-studio-back`, data); } +// 自动标注:将 Label Studio 中的标注结果同步回数据库(更新 t_dm_dataset_files.tags/annotation) +export function syncAutoAnnotationToDatabaseUsingPost(taskId: string) { + return post(`/api/annotation/auto/${taskId}/sync-db`); +} + // 查询自动标注任务关联的 Label Studio 项目 export function getAutoAnnotationLabelStudioProjectUsingGet(taskId: string) { return get(`/api/annotation/auto/${taskId}/label-studio-project`); diff --git a/frontend/src/pages/DataManagement/Detail/components/Overview.tsx b/frontend/src/pages/DataManagement/Detail/components/Overview.tsx index 8983d462..8ed179e2 100644 --- a/frontend/src/pages/DataManagement/Detail/components/Overview.tsx +++ b/frontend/src/pages/DataManagement/Detail/components/Overview.tsx @@ -657,6 +657,27 @@ export default function Overview({ dataset, filesOperation, fetchDataset }) { {previewFileDetail.description} )} +
+ 标注信息: + + {(() => { + const raw = (previewFileDetail?.annotation + ?? previewFileDetail?.metadata + ?? previewFileDetail?.tags) as any; + if (!raw) return "-"; + + if (typeof raw === "string") { + return raw; + } + + try { + return JSON.stringify(raw); + } catch { + return "-"; + } + })()} + +
diff --git a/runtime/datamate-python/app/db/models/dataset_management.py b/runtime/datamate-python/app/db/models/dataset_management.py index 70266332..7c64d218 100644 --- a/runtime/datamate-python/app/db/models/dataset_management.py +++ b/runtime/datamate-python/app/db/models/dataset_management.py @@ -61,6 +61,7 @@ class DatasetFiles(Base): check_sum = Column(String(64), nullable=True, comment="文件校验和") tags = Column(JSON, nullable=True, comment="文件标签信息") tags_updated_at = Column(TIMESTAMP, nullable=True, comment="标签最后更新时间") + annotation = Column(JSON, nullable=True, comment="完整标注结果(原始JSON)") dataset_filemetadata = Column("metadata", JSON, nullable=True, comment="文件元数据") status = Column(String(50), default='ACTIVE', comment="文件状态:ACTIVE/DELETED/PROCESSING") upload_time = Column(TIMESTAMP, server_default=func.current_timestamp(), comment="上传时间") diff --git a/runtime/datamate-python/app/module/annotation/interface/auto.py b/runtime/datamate-python/app/module/annotation/interface/auto.py index ba54ce7c..05fb3d1d 100644 --- a/runtime/datamate-python/app/module/annotation/interface/auto.py +++ b/runtime/datamate-python/app/module/annotation/interface/auto.py @@ -33,6 +33,7 @@ from ..service.mapping import DatasetMappingService from ..service.prediction import PredictionSyncService from ..service.sync import SyncService +from ..service.ls_annotation_sync import LSAnnotationSyncService from ..client import LabelStudioClient @@ -954,7 +955,13 @@ async def sync_auto_annotation_to_label_studio( if not os.path.isdir(output_dir): raise HTTPException(status_code=404, detail="Output directory not found") + # 兼容两种目录结构: + # 1) 旧版本:output_dir 为数据集根或中间目录,JSON 位于 output_dir/annotations/ + # 2) 新版本:JSON 直接位于 output_dir/ 下 annotations_dir = os.path.join(output_dir, "annotations") + if not os.path.isdir(annotations_dir): + annotations_dir = output_dir + if not os.path.isdir(annotations_dir): raise HTTPException(status_code=404, detail="Annotations directory not found") @@ -1184,3 +1191,85 @@ def _sanitize_base_name(raw: str) -> str: pass return StandardResponse(code=200, message="success", data=True) + + +@router.post("/{task_id}/sync-db", response_model=StandardResponse[int]) +async def sync_auto_task_annotations_to_database( + task_id: str = Path(..., description="任务ID"), + db: AsyncSession = Depends(get_db), +): + """将指定自动标注任务在 Label Studio 中的标注结果同步回 DM 数据库。 + + 行为: + - 根据自动标注任务找到或创建对应的 Label Studio 项目(与 /sync-label-studio-back 相同的解析逻辑); + - 遍历项目下所有 task,按 task.data.file_id 定位 t_dm_dataset_files 记录; + - 把 annotations + predictions 写入 annotation 字段,并抽取标签写入 tags,更新 tags_updated_at。 + 返回成功更新的文件数量。 + """ + + # 1. 获取并校验自动标注任务 + task = await service.get_task(db, task_id) + if not task: + raise HTTPException(status_code=404, detail="Task not found") + + # 2. 查找或创建与该自动标注任务关联的 Label Studio 项目 + mapping_service = DatasetMappingService(db) + mappings = await mapping_service.get_mappings_by_dataset_id(task.dataset_id) + + project_id: Optional[str] = None + for m in mappings: + cfg = getattr(m, "configuration", None) or {} + if isinstance(cfg, dict) and cfg.get("autoTaskId") == task.id: + project_id = str(m.labeling_project_id) + break + + if project_id is None: + for m in mappings: + if m.name == task.name: + project_id = str(m.labeling_project_id) + break + + if project_id is None: + # 与前向/后向同步逻辑保持一致:如无现成项目则按自动标注配置自动创建。 + try: + auto_config = AutoAnnotationConfig.model_validate(task.config) + except Exception as e: # pragma: no cover + logger.warning( + "Failed to parse auto task config when creating LS project for db sync: %s", + e, + ) + auto_config = AutoAnnotationConfig( + model_size="l", + conf_threshold=0.5, + target_classes=[], + ) + + project_id = await _ensure_ls_mapping_for_auto_task( + db, + dataset_id=task.dataset_id, + dataset_name=task.dataset_name, + config=auto_config, + task_name=task.name, + file_ids=[str(fid) for fid in (task.file_ids or [])], + auto_task_id=task.id, + ) + + if not project_id: + raise HTTPException( + status_code=500, + detail=( + "Failed to create or resolve Label Studio project for this auto task " + "when syncing annotations to database." + ), + ) + + # 3. 调用通用的 LS -> DM 同步服务 + ls_client = LabelStudioClient( + base_url=settings.label_studio_base_url, + token=settings.label_studio_user_token, + ) + sync_service = LSAnnotationSyncService(db, ls_client) + + updated = await sync_service.sync_project_annotations_to_dm(project_id=str(project_id)) + + return StandardResponse(code=200, message="success", data=updated) diff --git a/runtime/datamate-python/app/module/annotation/interface/project.py b/runtime/datamate-python/app/module/annotation/interface/project.py index a74c28fb..2a17c721 100644 --- a/runtime/datamate-python/app/module/annotation/interface/project.py +++ b/runtime/datamate-python/app/module/annotation/interface/project.py @@ -16,6 +16,7 @@ from ..client import LabelStudioClient from ..service.mapping import DatasetMappingService from ..service.sync import SyncService +from ..service.ls_annotation_sync import LSAnnotationSyncService from ..service.template import AnnotationTemplateService from ..schema import ( DatasetMappingCreateRequest, @@ -603,6 +604,41 @@ def _sanitize_base_name(raw: str) -> str: return StandardResponse(code=200, message="success", data=True) + +@router.post("/{mapping_id}/sync-db", response_model=StandardResponse[int]) +async def sync_manual_annotations_to_database( + mapping_id: str = Path(..., description="映射ID (mapping UUID)"), + db: AsyncSession = Depends(get_db), +): + """从 Label Studio 项目同步当前手动标注结果到 DM 数据库。 + + 行为: + - 基于 mapping_id 定位 Label Studio 项目; + - 遍历项目下所有 task,按 task.data.file_id 找到对应 t_dm_dataset_files 记录; + - 读取每个 task 的 annotations + predictions,写入: + * tags: 从 result 中提取的标签概要,供 DM 列表/预览展示; + * annotation: 完整原始 JSON 结果; + * tags_updated_at: 当前时间戳。 + 返回值为成功更新的文件数量。 + """ + + mapping_service = DatasetMappingService(db) + mapping = await mapping_service.get_mapping_by_uuid(mapping_id) + if not mapping: + raise HTTPException(status_code=404, detail="Mapping not found") + + ls_client = LabelStudioClient( + base_url=settings.label_studio_base_url, + token=settings.label_studio_user_token, + ) + sync_service = LSAnnotationSyncService(db, ls_client) + + updated = await sync_service.sync_project_annotations_to_dm( + project_id=str(mapping.labeling_project_id), + ) + + return StandardResponse(code=200, message="success", data=updated) + @router.get("", response_model=StandardResponse[PaginatedData[DatasetMappingResponse]]) async def list_mappings( page: int = Query(1, ge=1, description="页码(从1开始)"), diff --git a/runtime/datamate-python/app/module/annotation/service/ls_annotation_sync.py b/runtime/datamate-python/app/module/annotation/service/ls_annotation_sync.py new file mode 100644 index 00000000..519f1a77 --- /dev/null +++ b/runtime/datamate-python/app/module/annotation/service/ls_annotation_sync.py @@ -0,0 +1,215 @@ +from __future__ import annotations + +from datetime import datetime +from typing import Any, Dict, List, Optional + +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.core.config import settings +from app.core.logging import get_logger +from app.db.models import DatasetFiles + +from ..client import LabelStudioClient + +logger = get_logger(__name__) + + +class LSAnnotationSyncService: + """将 Label Studio 项目中的标注结果同步回 DM 数据库。 + + 约定: + - Label Studio task.data 中包含由 SyncService 写入的 DM 元信息: + {"file_id": "...", "dataset_id": "...", "file_path": "...", "original_name": "..."} + - 对于每个 task,我们会: + * 读取该 task 的 annotations 列表; + * 结合 task.predictions 一并写入 t_dm_dataset_files.annotation; + * 从 annotations/predictions 的 result 字段中提取标签,写入 t_dm_dataset_files.tags; + * 更新时间戳 tags_updated_at。 + """ + + def __init__(self, db: AsyncSession, ls_client: LabelStudioClient) -> None: + self.db = db + self.ls_client = ls_client + + async def sync_project_annotations_to_dm(self, project_id: str) -> int: + """从指定 Label Studio 项目中同步所有任务的标注到 DM 文件表。 + + Args: + project_id: Label Studio 项目 ID(字符串形式)。 + + Returns: + 成功更新的文件数量。 + """ + try: + page_size = getattr(settings, "ls_task_page_size", 1000) + result = await self.ls_client.get_project_tasks( + project_id=project_id, + page=None, + page_size=page_size, + ) + except Exception as e: # pragma: no cover - 防御性日志 + logger.error("Failed to fetch tasks from Label Studio project %s: %s", project_id, e) + return 0 + + if not result: + logger.warning("No tasks returned for project %s when syncing annotations", project_id) + return 0 + + tasks: List[Dict[str, Any]] = result.get("tasks", []) # type: ignore[assignment] + if not tasks: + logger.info("Project %s has no tasks to sync", project_id) + return 0 + + updated_files = 0 + + for task in tasks: + data: Dict[str, Any] = task.get("data") or {} + raw_file_id = data.get("file_id") + if not raw_file_id: + # 旧任务可能没有 DM file_id,跳过但记录日志 + logger.debug( + "Skip LS task %s because data.file_id is missing", + task.get("id"), + ) + continue + + file_id = str(raw_file_id) + + # 获取该任务的 annotations 列表 + annotations: Optional[List[Dict[str, Any]]] = None + try: + annotations = await self.ls_client.get_task_annotations(int(task["id"])) # type: ignore[arg-type] + except Exception as e: # pragma: no cover + logger.error( + "Failed to fetch annotations for LS task %s (file_id=%s): %s", + task.get("id"), + file_id, + e, + ) + + predictions: List[Dict[str, Any]] = task.get("predictions") or [] + + annotation_payload: Dict[str, Any] = { + "task": { + "id": task.get("id"), + "project": task.get("project"), + "data": data, + }, + "annotations": annotations or [], + "predictions": predictions, + } + + tags = self._extract_tags_from_results( + annotations or [], + predictions, + ) + + success = await self._update_dataset_file(file_id, tags, annotation_payload) + if success: + updated_files += 1 + + logger.info( + "Synced annotations from LS project %s to DM, updated %d files", + project_id, + updated_files, + ) + return updated_files + + async def _update_dataset_file( + self, + file_id: str, + tags: List[Dict[str, Any]], + annotation: Dict[str, Any], + ) -> bool: + """将提取出的 tags 与 annotation 写回 t_dm_dataset_files。 + + 直接更新 DatasetFiles.tags / tags_updated_at / annotation 字段, + 不走模板驱动的转换逻辑(模板转换在用户主动批量更新时使用)。 + """ + try: + result = await self.db.execute( + select(DatasetFiles).where(DatasetFiles.id == file_id) + ) + record = result.scalar_one_or_none() + + if not record: + logger.warning("Dataset file not found when syncing LS annotations: %s", file_id) + return False + + update_time = datetime.utcnow() + record.tags = tags # type: ignore[assignment] + record.tags_updated_at = update_time # type: ignore[assignment] + # annotation 可能较大,但为 JSONB,直接整体覆盖 + setattr(record, "annotation", annotation) + + await self.db.commit() + await self.db.refresh(record) + + logger.debug( + "Updated DM file %s with %d tags from LS annotations", + file_id, + len(tags), + ) + return True + + except Exception as e: # pragma: no cover + logger.error("Failed to update DM file %s from LS annotations: %s", file_id, e) + try: + await self.db.rollback() + except Exception: + pass + return False + + @staticmethod + def _extract_tags_from_results( + annotations: List[Dict[str, Any]], + predictions: List[Dict[str, Any]], + ) -> List[Dict[str, Any]]: + """从 annotations 和 predictions 的 result 字段中提取通用标签结构。 + + 输出格式与 DM 前端当前解析逻辑兼容: + - 每个元素包含: id/type/from_name/values; + - 其中 values 是一个字典,键通常与 type 一致,例如 "rectanglelabels"; + - 前端通过 tag.values[tag.type] 提取字符串标签集合。 + """ + + def iter_results() -> List[Dict[str, Any]]: + res: List[Dict[str, Any]] = [] + for ann in annotations or []: + for item in ann.get("result", []) or []: + if isinstance(item, dict): + res.append(item) + for pred in predictions or []: + for item in pred.get("result", []) or []: + if isinstance(item, dict): + res.append(item) + return res + + results = iter_results() + if not results: + return [] + + normalized: List[Dict[str, Any]] = [] + + for r in results: + r_type = r.get("type") + from_name = r.get("from_name") or r.get("fromName") + value_obj = r.get("value") or {} + if not isinstance(value_obj, dict): + continue + + # 将 Label Studio 的 value 映射为 values,方便前端统一解析 + values: Dict[str, Any] = {} + for key, v in value_obj.items(): + values[key] = v + + tag = { + "id": r.get("id"), + "type": r_type, + "from_name": from_name, + "values": values, + } + normalized.append(tag) + + return normalized diff --git a/runtime/datamate-python/app/module/dataset/schema/dataset_file.py b/runtime/datamate-python/app/module/dataset/schema/dataset_file.py index e387f9a0..e005329e 100644 --- a/runtime/datamate-python/app/module/dataset/schema/dataset_file.py +++ b/runtime/datamate-python/app/module/dataset/schema/dataset_file.py @@ -1,89 +1,90 @@ -from pydantic import BaseModel, Field -from typing import List, Optional, Dict, Any -from datetime import datetime - -class DatasetFileResponse(BaseModel): - """DM服务数据集文件响应模型""" - id: str = Field(..., description="文件ID") - fileName: str = Field(..., description="文件名") - fileType: str = Field(..., description="文件类型") - filePath: str = Field(..., description="文件路径") - originalName: Optional[str] = Field(None, description="原始文件名") - size: Optional[int] = Field(None, description="文件大小(字节)") - status: Optional[str] = Field(None, description="文件状态") - uploadedAt: Optional[datetime] = Field(None, description="上传时间") - description: Optional[str] = Field(None, description="文件描述") - uploadedBy: Optional[str] = Field(None, description="上传者") - lastAccessTime: Optional[datetime] = Field(None, description="最后访问时间") - tags: Optional[List[Dict[str, Any]]] = Field(None, description="文件标签/标注信息") - tags_updated_at: Optional[datetime] = Field(None, description="标签最后更新时间", alias="tagsUpdatedAt") - -class PagedDatasetFileResponse(BaseModel): - """DM服务分页文件响应模型""" - content: List[DatasetFileResponse] = Field(..., description="文件列表") - totalElements: int = Field(..., description="总元素数") - totalPages: int = Field(..., description="总页数") - page: int = Field(..., description="当前页码") - size: int = Field(..., description="每页大小") - -class DatasetFileTag(BaseModel): - id: str = Field(None, description="标签ID") - type: str = Field(None, description="类型") - from_name: str = Field(None, description="标签名称") - values: dict = Field(None, description="标签值") - - def get_tags(self) -> List[str]: - tags = [] - # 如果 values 是字典类型,根据 type 获取对应的值 - tag_values = self.values.get(self.type, []) - - # 处理标签值 - if isinstance(tag_values, list): - for tag in tag_values: - if isinstance(tag, str): - tags.append(str(tag)) - elif isinstance(tag_values, str): - tags.append(tag_values) - # 如果 from_name 不为空,添加前缀 - if self.from_name: - tags = [{"label": self.from_name, "value": tag} for tag in tags] - return tags - - -class FileTagUpdate(BaseModel): - """单个文件的标签更新请求""" - file_id: str = Field(..., alias="fileId", description="文件ID") - tags: List[Dict[str, Any]] = Field(..., description="要更新的标签列表(部分更新)") - - class Config: - populate_by_name = True - - -class BatchUpdateFileTagsRequest(BaseModel): - """批量更新文件标签请求""" - updates: List[FileTagUpdate] = Field(..., description="文件标签更新列表", min_length=1) - - class Config: - populate_by_name = True - - -class FileTagUpdateResult(BaseModel): - """单个文件标签更新结果""" - file_id: str = Field(..., alias="fileId", description="文件ID") - success: bool = Field(..., description="是否更新成功") - message: Optional[str] = Field(None, description="结果信息") - tags_updated_at: Optional[datetime] = Field(None, alias="tagsUpdatedAt", description="标签更新时间") - - class Config: - populate_by_name = True - - -class BatchUpdateFileTagsResponse(BaseModel): - """批量更新文件标签响应""" - results: List[FileTagUpdateResult] = Field(..., description="更新结果列表") - total: int = Field(..., description="总更新数量") - success_count: int = Field(..., alias="successCount", description="成功数量") - failure_count: int = Field(..., alias="failureCount", description="失败数量") - - class Config: - populate_by_name = True +from pydantic import BaseModel, Field +from typing import List, Optional, Dict, Any +from datetime import datetime + +class DatasetFileResponse(BaseModel): + """DM服务数据集文件响应模型""" + id: str = Field(..., description="文件ID") + fileName: str = Field(..., description="文件名") + fileType: str = Field(..., description="文件类型") + filePath: str = Field(..., description="文件路径") + originalName: Optional[str] = Field(None, description="原始文件名") + size: Optional[int] = Field(None, description="文件大小(字节)") + status: Optional[str] = Field(None, description="文件状态") + uploadedAt: Optional[datetime] = Field(None, description="上传时间") + description: Optional[str] = Field(None, description="文件描述") + uploadedBy: Optional[str] = Field(None, description="上传者") + lastAccessTime: Optional[datetime] = Field(None, description="最后访问时间") + tags: Optional[List[Dict[str, Any]]] = Field(None, description="文件标签/标注信息(简要结构)") + tags_updated_at: Optional[datetime] = Field(None, description="标签最后更新时间", alias="tagsUpdatedAt") + annotation: Optional[Dict[str, Any]] = Field(None, description="完整标注结果(原始JSON)") + +class PagedDatasetFileResponse(BaseModel): + """DM服务分页文件响应模型""" + content: List[DatasetFileResponse] = Field(..., description="文件列表") + totalElements: int = Field(..., description="总元素数") + totalPages: int = Field(..., description="总页数") + page: int = Field(..., description="当前页码") + size: int = Field(..., description="每页大小") + +class DatasetFileTag(BaseModel): + id: str = Field(None, description="标签ID") + type: str = Field(None, description="类型") + from_name: str = Field(None, description="标签名称") + values: dict = Field(None, description="标签值") + + def get_tags(self) -> List[str]: + tags = [] + # 如果 values 是字典类型,根据 type 获取对应的值 + tag_values = self.values.get(self.type, []) + + # 处理标签值 + if isinstance(tag_values, list): + for tag in tag_values: + if isinstance(tag, str): + tags.append(str(tag)) + elif isinstance(tag_values, str): + tags.append(tag_values) + # 如果 from_name 不为空,添加前缀 + if self.from_name: + tags = [{"label": self.from_name, "value": tag} for tag in tags] + return tags + + +class FileTagUpdate(BaseModel): + """单个文件的标签更新请求""" + file_id: str = Field(..., alias="fileId", description="文件ID") + tags: List[Dict[str, Any]] = Field(..., description="要更新的标签列表(部分更新)") + + class Config: + populate_by_name = True + + +class BatchUpdateFileTagsRequest(BaseModel): + """批量更新文件标签请求""" + updates: List[FileTagUpdate] = Field(..., description="文件标签更新列表", min_length=1) + + class Config: + populate_by_name = True + + +class FileTagUpdateResult(BaseModel): + """单个文件标签更新结果""" + file_id: str = Field(..., alias="fileId", description="文件ID") + success: bool = Field(..., description="是否更新成功") + message: Optional[str] = Field(None, description="结果信息") + tags_updated_at: Optional[datetime] = Field(None, alias="tagsUpdatedAt", description="标签更新时间") + + class Config: + populate_by_name = True + + +class BatchUpdateFileTagsResponse(BaseModel): + """批量更新文件标签响应""" + results: List[FileTagUpdateResult] = Field(..., description="更新结果列表") + total: int = Field(..., description="总更新数量") + success_count: int = Field(..., alias="successCount", description="成功数量") + failure_count: int = Field(..., alias="failureCount", description="失败数量") + + class Config: + populate_by_name = True diff --git a/runtime/datamate-python/app/module/dataset/service/service.py b/runtime/datamate-python/app/module/dataset/service/service.py index 6bccc54d..290bd6f9 100644 --- a/runtime/datamate-python/app/module/dataset/service/service.py +++ b/runtime/datamate-python/app/module/dataset/service/service.py @@ -116,7 +116,8 @@ async def get_dataset_files( uploadedBy=None, lastAccessTime=f.last_access_time, # type: ignore tags=f.tags, # type: ignore - tags_updated_at=f.tags_updated_at # type: ignore + tags_updated_at=f.tags_updated_at, # type: ignore + annotation=getattr(f, "annotation", None), # type: ignore ) for f in files ] diff --git a/runtime/ops/annotation/image_object_detection_bounding_box/process.py b/runtime/ops/annotation/image_object_detection_bounding_box/process.py index 66e6a587..a7b4f3e2 100644 --- a/runtime/ops/annotation/image_object_detection_bounding_box/process.py +++ b/runtime/ops/annotation/image_object_detection_bounding_box/process.py @@ -180,18 +180,17 @@ def execute(self, sample: Dict[str, Any]) -> Dict[str, Any]: else: output_dir = os.path.dirname(image_path) - # 创建输出子目录:仅保留 annotations 目录,不再额外保存标注图像, - # 以减少冗余占用;检测结果将直接写入 JSON 文件。 - annotations_dir = os.path.join(output_dir, "annotations") - os.makedirs(annotations_dir, exist_ok=True) - + # 创建 / 使用输出目录:直接在 output_dir 下保存 JSON, + # 约定最终路径为 /.json,避免 "annotations/annotations" 嵌套。 + os.makedirs(output_dir, exist_ok=True) + # 保持原始文件名(不添加后缀),确保一一对应 base_name = os.path.basename(image_path) name_without_ext = os.path.splitext(base_name)[0] - + # 保存标注 JSON(文件名与图像对应) json_filename = f"{name_without_ext}.json" - json_path = os.path.join(annotations_dir, json_filename) + json_path = os.path.join(output_dir, json_filename) with open(json_path, "w", encoding="utf-8") as f: json.dump(annotations, f, indent=2, ensure_ascii=False) diff --git a/scripts/db/data-management-init.sql b/scripts/db/data-management-init.sql index 4a6316cf..5551faa3 100644 --- a/scripts/db/data-management-init.sql +++ b/scripts/db/data-management-init.sql @@ -85,6 +85,7 @@ CREATE TABLE IF NOT EXISTS t_dm_dataset_files ( check_sum VARCHAR(64), tags JSONB, tags_updated_at TIMESTAMP, + annotation JSONB, metadata JSONB, status VARCHAR(50) DEFAULT 'ACTIVE', upload_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, @@ -101,8 +102,9 @@ COMMENT ON COLUMN t_dm_dataset_files.file_path IS '文件路径'; COMMENT ON COLUMN t_dm_dataset_files.file_type IS '文件格式:JPG/PNG/DCM/TXT等'; COMMENT ON COLUMN t_dm_dataset_files.file_size IS '文件大小(字节)'; COMMENT ON COLUMN t_dm_dataset_files.check_sum IS '文件校验和'; -COMMENT ON COLUMN t_dm_dataset_files.tags IS '文件标签信息'; +COMMENT ON COLUMN t_dm_dataset_files.tags IS '文件标签信息(结构化标签/标注概要)'; COMMENT ON COLUMN t_dm_dataset_files.tags_updated_at IS '标签最后更新时间'; +COMMENT ON COLUMN t_dm_dataset_files.annotation IS '完整标注结果(原始JSON)'; COMMENT ON COLUMN t_dm_dataset_files.metadata IS '文件元数据'; COMMENT ON COLUMN t_dm_dataset_files.status IS '文件状态:ACTIVE/DELETED/PROCESSING'; COMMENT ON COLUMN t_dm_dataset_files.upload_time IS '上传时间';