-
Notifications
You must be signed in to change notification settings - Fork 44
Expand file tree
/
Copy pathabstract_parser.py
More file actions
129 lines (111 loc) · 4.08 KB
/
abstract_parser.py
File metadata and controls
129 lines (111 loc) · 4.08 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
"""
Abstract Parser
抽象解析器基类
"""
import json
import yaml
from abc import ABC, abstractmethod
from typing import Dict, Any, Optional
from app.module.operator.schema import OperatorDto, OperatorReleaseDto
from app.module.operator.constants import CATEGORY_MAP, CATEGORY_OTHER_VENDOR_ID, CATEGORY_CUSTOMIZED_ID, \
CATEGORY_CLEANING_ID
from app.module.operator.exceptions import FieldNotFoundError
class AbstractParser(ABC):
"""算子文件解析器抽象基类"""
@abstractmethod
def parse_yaml_from_archive(
self,
archive_path: str,
entry_path: str,
file_name: Optional[str] = None,
file_size: Optional[int] = None
) -> OperatorDto:
"""
从压缩包内读取指定路径的 yaml 文件并解析为 OperatorDto
Args:
archive_path: 压缩包路径(zip 或 tar)
entry_path: 压缩包内部的文件路径,例如 "config/app.yaml"
Returns:
解析后的 OperatorDto
"""
pass
@abstractmethod
def extract_to(self, archive_path: str, target_dir: str) -> None:
"""
将压缩包解压到目标目录(保持相对路径)
Args:
archive_path: 压缩包路径
target_dir: 目标目录
"""
pass
def parse_yaml(
self,
yaml_content: str,
file_name: Optional[str] = None,
file_size: Optional[int] = None
) -> OperatorDto:
"""解析 YAML 内容为 OperatorDto"""
content: Dict[str, Any] = yaml.safe_load(yaml_content)
operator = OperatorDto(
id=self._to_string(content.get("raw_id")),
name=self._to_string(content.get("name")),
description=self._to_string(content.get("description")),
version=self._to_string(content.get("version")),
inputs=self._to_json(content.get("inputs")),
outputs=self._to_json(content.get("outputs")),
runtime=self._to_json(content.get("runtime")),
settings=self._to_json(content.get("settings")),
metrics=self._to_json(content.get("metrics")),
file_name=file_name,
file_size=file_size,
)
# Handle changelog
changelog = content.get("release")
if isinstance(changelog, list):
operator_release = OperatorReleaseDto(
id=operator.id,
version=operator.version,
changelog=changelog
)
else:
operator_release = OperatorReleaseDto(
id=operator.id,
version=operator.version,
changelog=[]
)
operator.releases = [operator_release]
# Build categories
categories = []
types = content.get("types")
if isinstance(types, list):
for t in types:
if self._to_lower(t) in CATEGORY_MAP:
categories.append(CATEGORY_MAP[self._to_lower(t)])
if len(categories) == 0:
categories.append(CATEGORY_CLEANING_ID)
categories.extend([
CATEGORY_MAP.get(self._to_lower(content.get("language")), ""),
CATEGORY_MAP.get(self._to_lower(content.get("modal")), ""),
CATEGORY_MAP.get(self._to_lower(content.get("vendor")), CATEGORY_OTHER_VENDOR_ID),
CATEGORY_CUSTOMIZED_ID,
])
operator.categories = categories
return operator
def _to_string(self, obj: Any) -> str:
"""转换为字符串"""
if obj is None:
raise FieldNotFoundError("field")
return str(obj)
def _to_lower(self, obj: Any) -> str:
"""转换为小写字符串"""
if obj is None:
raise FieldNotFoundError("field")
return str(obj).lower()
def _to_json(self, obj: Any) -> Optional[str]:
"""转换为 JSON 字符串"""
if obj is None:
return None
try:
return json.dumps(obj).strip('"').strip("'")
except (TypeError, ValueError) as e:
raise ValueError(f"Failed to serialize to JSON: {e}")