Skip to content

Commit 43203a8

Browse files
authored
feat(all): version 0.7 (#539)
1 parent 11be8cd commit 43203a8

87 files changed

Lines changed: 2764 additions & 541 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

builder/core/src/main/java/com/antgroup/openspg/builder/core/physical/operator/PythonOperatorFactory.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ public class PythonOperatorFactory implements OperatorFactory {
2424

2525
private String pythonExec;
2626
private String pythonPaths;
27+
private String pythonEnv;
2728
private String hostAddr;
2829
private Long projectId;
2930

@@ -37,6 +38,7 @@ public static OperatorFactory getInstance() {
3738
public void init(BuilderContext context) {
3839
pythonExec = context.getPythonExec();
3940
pythonPaths = context.getPythonPaths();
41+
pythonEnv = context.getPythonEnv();
4042
hostAddr = context.getSchemaUrl();
4143
projectId = context.getProjectId();
4244
log.info("pythonExec={}, pythonPaths={}", pythonExec, pythonPaths);
@@ -51,6 +53,7 @@ public Object invoke(OperatorConfig config, Object... input) {
5153
new PemjaConfig(
5254
pythonExec,
5355
pythonPaths,
56+
pythonEnv,
5457
hostAddr,
5558
projectId,
5659
config.getModulePath(),

builder/core/src/main/java/com/antgroup/openspg/builder/core/physical/process/ParagraphSplitProcessor.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.antgroup.openspg.builder.model.record.StringRecord;
2828
import com.antgroup.openspg.common.constants.BuilderConstant;
2929
import com.antgroup.openspg.common.util.pemja.PythonInvokeMethod;
30+
import com.antgroup.openspg.server.common.model.bulider.BuilderJob;
3031
import com.antgroup.openspg.server.common.model.project.Project;
3132
import com.fasterxml.jackson.databind.ObjectMapper;
3233
import java.util.ArrayList;
@@ -57,20 +58,22 @@ public List<BaseRecord> process(List<BaseRecord> inputs) {
5758
node.addTraceLog("Start split document...");
5859
List<BaseRecord> results = new ArrayList<>();
5960
JSONObject pyConfig = new JSONObject();
60-
JSONObject extension = JSON.parseObject(config.getExtension());
61+
BuilderJob job = config.getJob();
62+
JSONObject extension = JSON.parseObject(job.getExtension());
6163
CommonUtils.getSplitterConfig(
6264
pyConfig,
6365
context.getPythonExec(),
6466
context.getPythonPaths(),
67+
context.getPythonEnv(),
6568
context.getSchemaUrl(),
6669
project,
6770
extension);
6871
for (BaseRecord record : inputs) {
6972
StringRecord stringRecord = (StringRecord) record;
7073

7174
String fileUrl = stringRecord.getDocument();
72-
String token = config.getToken();
73-
List<ChunkRecord.Chunk> chunks = readSource(fileUrl, token);
75+
node.addTraceLog("invoke split fileUrl:%s", fileUrl);
76+
List<ChunkRecord.Chunk> chunks = readSource(job);
7477
node.addTraceLog("invoke split operator:%s", config.getOperatorConfig().getClassName());
7578
for (ChunkRecord.Chunk chunk : chunks) {
7679
node.addTraceLog("invoke split chunk:%s", chunk.getName());
@@ -100,16 +103,17 @@ public List<BaseRecord> process(List<BaseRecord> inputs) {
100103
return results;
101104
}
102105

103-
public List<ChunkRecord.Chunk> readSource(String url, String token) {
106+
public List<ChunkRecord.Chunk> readSource(BuilderJob job) {
104107
node.addTraceLog("invoke read operator:%s", PythonInvokeMethod.BRIDGE_READER.getMethod());
105108
List<ChunkRecord.Chunk> chunkList =
106109
CommonUtils.readSource(
107110
context.getPythonExec(),
108111
context.getPythonPaths(),
112+
context.getPythonEnv(),
109113
context.getSchemaUrl(),
110114
project,
111-
url,
112-
token);
115+
job,
116+
null);
113117
node.addTraceLog(
114118
"invoke read operator:%s chunks:%s succeed",
115119
PythonInvokeMethod.BRIDGE_READER.getMethod(), chunkList.size());

builder/core/src/main/java/com/antgroup/openspg/builder/core/physical/utils/CommonUtils.java

Lines changed: 58 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
package com.antgroup.openspg.builder.core.physical.utils;
1515

1616
import com.alibaba.fastjson.JSON;
17-
import com.alibaba.fastjson.JSONArray;
1817
import com.alibaba.fastjson.JSONObject;
1918
import com.alibaba.fastjson.TypeReference;
2019
import com.antgroup.openspg.builder.core.runtime.BuilderContext;
@@ -46,22 +45,18 @@
4645
import com.antgroup.openspg.server.api.http.client.HttpSchemaFacade;
4746
import com.antgroup.openspg.server.api.http.client.util.ConnectionInfo;
4847
import com.antgroup.openspg.server.api.http.client.util.HttpClientBootstrap;
49-
import com.antgroup.openspg.server.common.model.CommonConstants;
48+
import com.antgroup.openspg.server.common.model.bulider.BuilderJob;
5049
import com.antgroup.openspg.server.common.model.project.Project;
51-
import com.google.common.collect.Lists;
5250
import com.google.common.collect.Maps;
5351
import java.util.ArrayList;
52+
import java.util.Date;
5453
import java.util.List;
5554
import java.util.Map;
5655
import java.util.stream.Collectors;
5756
import org.apache.commons.collections4.CollectionUtils;
58-
import org.apache.commons.io.FilenameUtils;
59-
import org.apache.commons.lang3.StringUtils;
6057

6158
public class CommonUtils {
6259

63-
private static final String DOT = ".";
64-
6560
private static final SPGTypeRef TEXT_REF =
6661
new SPGTypeRef(new BasicInfo<>(SPGTypeIdentifier.parse("Text")), SPGTypeEnum.BASIC_TYPE);
6762

@@ -109,10 +104,10 @@ public static List<BaseSPGRecord> convertNodes(
109104
}
110105

111106
public static String labelPrefix(String namespace, String label) {
112-
if (label.contains(DOT)) {
107+
if (label.contains(BuilderConstant.DOT)) {
113108
return label;
114109
}
115-
return namespace + DOT + label;
110+
return namespace + BuilderConstant.DOT + label;
116111
}
117112

118113
public static void addLabelPrefix(String namespace, List<SubGraphRecord> records) {
@@ -190,110 +185,85 @@ private static void replaceUnSpreadableStandardProperty(BaseSPGRecord record) {
190185
public static List<ChunkRecord.Chunk> readSource(
191186
String pythonExec,
192187
String pythonPaths,
188+
String pythonEnv,
193189
String hostAddr,
194190
Project project,
195-
String url,
196-
String token) {
191+
BuilderJob job,
192+
Date bizDate) {
197193
PythonInvokeMethod bridgeReader = PythonInvokeMethod.BRIDGE_READER;
198-
Long projectId = project.getId();
199-
JSONObject llm = JSONObject.parseObject(project.getConfig()).getJSONObject(CommonConstants.LLM);
200194
JSONObject pyConfig = new JSONObject();
201195
JSONObject scanner = new JSONObject();
202-
pyConfig.put(BuilderConstant.SCANNER, scanner);
203196
JSONObject reader = new JSONObject();
197+
com.antgroup.openspg.common.util.CommonUtils.getScannerReaderConfig(
198+
project, job, scanner, reader);
199+
pyConfig.put(BuilderConstant.SCANNER, scanner);
204200
pyConfig.put(BuilderConstant.READER, reader);
205-
206-
if (StringUtils.isNotBlank(token)) {
207-
scanner.put(BuilderConstant.TYPE, BuilderConstant.YU_QUE);
208-
scanner.put(BuilderConstant.TOKEN, token);
209-
reader.put(BuilderConstant.TYPE, BuilderConstant.YU_QUE);
210-
reader.put(BuilderConstant.CUT_DEPTH, 1);
211-
} else {
212-
String extension = FilenameUtils.getExtension(url).toLowerCase();
213-
switch (extension) {
214-
case BuilderConstant.CSV:
215-
scanner.put(BuilderConstant.TYPE, BuilderConstant.CSV);
216-
scanner.put(BuilderConstant.HEADER, true);
217-
JSONArray colNames = new JSONArray();
218-
colNames.add(BuilderConstant.CONTENT);
219-
scanner.put(BuilderConstant.COL_NAMES, colNames);
220-
reader.put(BuilderConstant.TYPE, BuilderConstant.DICT);
221-
break;
222-
case BuilderConstant.JSON:
223-
scanner.put(BuilderConstant.TYPE, BuilderConstant.JSON);
224-
reader.put(BuilderConstant.TYPE, BuilderConstant.DICT);
225-
reader.put(BuilderConstant.ID_COL, BuilderConstant.ID);
226-
reader.put(BuilderConstant.NAME_COL, BuilderConstant.NAME);
227-
reader.put(BuilderConstant.CONTENT_COL, BuilderConstant.CONTENT);
228-
break;
229-
case BuilderConstant.TXT:
230-
scanner.put(BuilderConstant.TYPE, BuilderConstant.FILE);
231-
reader.put(BuilderConstant.TYPE, BuilderConstant.TXT);
232-
break;
233-
case BuilderConstant.PDF:
234-
scanner.put(BuilderConstant.TYPE, BuilderConstant.FILE);
235-
reader.put(BuilderConstant.TYPE, BuilderConstant.PDF);
236-
reader.put(BuilderConstant.CUT_DEPTH, 1);
237-
reader.put(BuilderConstant.LLM, llm);
238-
break;
239-
case BuilderConstant.MD:
240-
scanner.put(BuilderConstant.TYPE, BuilderConstant.FILE);
241-
reader.put(BuilderConstant.TYPE, BuilderConstant.MD);
242-
reader.put(BuilderConstant.CUT_DEPTH, 1);
243-
reader.put(BuilderConstant.LLM, llm);
244-
break;
245-
case BuilderConstant.DOC:
246-
case BuilderConstant.DOCX:
247-
scanner.put(BuilderConstant.TYPE, BuilderConstant.FILE);
248-
reader.put(BuilderConstant.TYPE, BuilderConstant.DOCX);
249-
reader.put(BuilderConstant.LLM, llm);
250-
break;
251-
}
252-
}
201+
String input = com.antgroup.openspg.common.util.CommonUtils.getKagBuilderInput(job, bizDate);
253202
PemjaConfig config =
254203
new PemjaConfig(
255-
pythonExec, pythonPaths, hostAddr, projectId, bridgeReader, Maps.newHashMap());
256-
List<Object> result;
257-
if (StringUtils.isNotBlank(token)) {
258-
List<String> urls = Lists.newArrayList();
259-
urls.add(url);
260-
result = (List<Object>) PemjaUtils.invoke(config, pyConfig.toJSONString(), urls);
261-
} else {
262-
result = (List<Object>) PemjaUtils.invoke(config, pyConfig.toJSONString(), url);
263-
}
204+
pythonExec,
205+
pythonPaths,
206+
pythonEnv,
207+
hostAddr,
208+
project.getId(),
209+
bridgeReader,
210+
Maps.newHashMap());
211+
List<Object> result = (List<Object>) PemjaUtils.invoke(config, pyConfig.toJSONString(), input);
264212
List<ChunkRecord.Chunk> chunkList =
265213
JSON.parseObject(
266214
JSON.toJSONString(result), new TypeReference<List<ChunkRecord.Chunk>>() {});
267215
return chunkList;
268216
}
269217

218+
public static List<Map<String, Object>> scanSource(
219+
String pythonExec,
220+
String pythonPaths,
221+
String pythonEnv,
222+
String hostAddr,
223+
Project project,
224+
BuilderJob job,
225+
Date bizDate) {
226+
PythonInvokeMethod bridgeReader = PythonInvokeMethod.BRIDGE_SCANNER;
227+
228+
JSONObject pyConfig = new JSONObject();
229+
JSONObject scanner = new JSONObject();
230+
JSONObject reader = new JSONObject();
231+
com.antgroup.openspg.common.util.CommonUtils.getScannerReaderConfig(
232+
project, job, scanner, reader);
233+
pyConfig.put(BuilderConstant.SCANNER, scanner);
234+
String input = com.antgroup.openspg.common.util.CommonUtils.getKagBuilderInput(job, bizDate);
235+
236+
PemjaConfig config =
237+
new PemjaConfig(
238+
pythonExec,
239+
pythonPaths,
240+
pythonEnv,
241+
hostAddr,
242+
project.getId(),
243+
bridgeReader,
244+
Maps.newHashMap());
245+
List<Object> result = (List<Object>) PemjaUtils.invoke(config, pyConfig.toJSONString(), input);
246+
List<Map<String, Object>> datas =
247+
JSON.parseObject(
248+
JSON.toJSONString(result), new TypeReference<List<Map<String, Object>>>() {});
249+
return datas;
250+
}
251+
270252
public static PemjaConfig getSplitterConfig(
271253
JSONObject pyConfig,
272254
String pythonExec,
273255
String pythonPaths,
256+
String pythonEnv,
274257
String hostAddr,
275258
Project project,
276259
JSONObject builderExtension) {
277260
Long projectId = project.getId();
278-
JSONObject llm = JSONObject.parseObject(project.getConfig()).getJSONObject(CommonConstants.LLM);
279-
JSONObject config = builderExtension.getJSONObject(BuilderConstant.SPLIT_CONFIG);
280-
Boolean semanticSplit = config.getBoolean(BuilderConstant.SEMANTIC_SPLIT);
281-
282-
Long splitLength = config.getLong(BuilderConstant.SPLIT_LENGTH);
283-
284261
PythonInvokeMethod splitter = PythonInvokeMethod.BRIDGE_COMPONENT;
285-
if (semanticSplit != null && semanticSplit) {
286-
pyConfig.put(BuilderConstant.TYPE, BuilderConstant.SEMANTIC);
287-
pyConfig.put(BuilderConstant.LLM, llm);
288-
pyConfig.put(BuilderConstant.PY_SPLIT_LENGTH, splitLength);
289-
} else {
290-
pyConfig.put(BuilderConstant.TYPE, BuilderConstant.LENGTH);
291-
pyConfig.put(BuilderConstant.PY_SPLIT_LENGTH, splitLength);
292-
pyConfig.put(BuilderConstant.PY_WINDOW_LENGTH, 0);
293-
}
294-
262+
com.antgroup.openspg.common.util.CommonUtils.getSplitterConfig(
263+
project, builderExtension, pyConfig);
295264
PemjaConfig pemjaConfig =
296-
new PemjaConfig(pythonExec, pythonPaths, hostAddr, projectId, splitter, Maps.newHashMap());
265+
new PemjaConfig(
266+
pythonExec, pythonPaths, pythonEnv, hostAddr, projectId, splitter, Maps.newHashMap());
297267

298268
return pemjaConfig;
299269
}

builder/core/src/main/java/com/antgroup/openspg/builder/core/runtime/BuilderContext.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ public class BuilderContext implements Serializable {
3636

3737
private String pythonExec;
3838
private String pythonPaths;
39+
private String pythonEnv;
3940
private String graphStoreUrl;
4041
private String searchEngineUrl;
4142
private String cacheUrl;

builder/model/src/main/java/com/antgroup/openspg/builder/model/BuilderConstants.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ public class BuilderConstants {
1919
public static final String PIPELINE_OPTION = "pipeline";
2020
public static final String PYTHON_EXEC_OPTION = "pythonExec";
2121
public static final String PYTHON_PATHS_OPTION = "pythonPaths";
22+
public static final String PYTHON_ENV_OPTION = "pythonEnv";
2223
public static final String SCHEMA_URL_OPTION = "schemaUrl";
2324
public static final String PARALLELISM_OPTION = "parallelism";
2425
public static final String ALTER_OPERATION_OPTION = "alterOperation";

builder/model/src/main/java/com/antgroup/openspg/builder/model/pipeline/PipelineUtils.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,11 @@ public static Pipeline getKagDefaultPipeline(BuilderJob job) {
4444
String splitId = UUID.randomUUID().toString();
4545
PythonInvokeMethod splitMethod = PythonInvokeMethod.BRIDGE_COMPONENT;
4646
JSONObject config = extension.getJSONObject(BuilderConstant.YU_QUE_CONFIG);
47-
String token = (config == null) ? null : config.getString(BuilderConstant.TOKEN);
47+
String token = (config == null) ? null : config.getString(BuilderConstant.YU_QUE_TOKEN);
4848
OperatorConfig operatorConfigSplit = new OperatorConfig(splitMethod, Maps.newHashMap());
4949
Node split =
5050
new Node(
51-
splitId,
52-
"Splitter",
53-
new ParagraphSplitNodeConfig(operatorConfigSplit, token, job.getExtension()));
51+
splitId, "Splitter", new ParagraphSplitNodeConfig(operatorConfigSplit, token, job));
5452
nodes.add(split);
5553
edges.add(new Edge(sourceId, splitId));
5654

@@ -83,10 +81,9 @@ public static Pipeline getKagDefaultPipeline(BuilderJob job) {
8381
edges.add(new Edge(vectorizerId, alignmentId));
8482

8583
String sinkId = UUID.randomUUID().toString();
84+
JSONObject extractConfig = extension.getJSONObject(BuilderConstant.EXTRACT_CONFIG);
8685
Boolean autoWrite =
87-
extension
88-
.getJSONObject(BuilderConstant.EXTRACT_CONFIG)
89-
.getBoolean(BuilderConstant.AUTO_WRITE);
86+
(extractConfig == null) ? true : extractConfig.getBoolean(BuilderConstant.AUTO_WRITE);
9087
Node sink = new Node(sinkId, "Writer", new Neo4jSinkNodeConfig(autoWrite));
9188
nodes.add(sink);
9289
edges.add(new Edge(alignmentId, sinkId));

builder/model/src/main/java/com/antgroup/openspg/builder/model/pipeline/config/ParagraphSplitNodeConfig.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,18 @@
1414
package com.antgroup.openspg.builder.model.pipeline.config;
1515

1616
import com.antgroup.openspg.builder.model.pipeline.enums.NodeTypeEnum;
17+
import com.antgroup.openspg.server.common.model.bulider.BuilderJob;
1718
import lombok.Getter;
1819

1920
@Getter
2021
public class ParagraphSplitNodeConfig extends BasePythonNodeConfig {
2122

2223
private final String token;
23-
private final String extension;
24+
private final BuilderJob job;
2425

25-
public ParagraphSplitNodeConfig(OperatorConfig operatorConfig, String token, String extension) {
26+
public ParagraphSplitNodeConfig(OperatorConfig operatorConfig, String token, BuilderJob job) {
2627
super(NodeTypeEnum.PARAGRAPH_SPLIT, operatorConfig);
2728
this.token = token;
28-
this.extension = extension;
29+
this.job = job;
2930
}
3031
}

builder/runner/local/src/main/java/com/antgroup/openspg/builder/runner/local/LocalBuilderMain.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ private static void run(CommandLine commandLine) throws Exception {
140140

141141
String pythonExec = commandLine.getOptionValue(BuilderConstants.PYTHON_EXEC_OPTION);
142142
String pythonPaths = commandLine.getOptionValue(BuilderConstants.PYTHON_PATHS_OPTION);
143+
String pythonEnv = commandLine.getOptionValue(BuilderConstants.PYTHON_ENV_OPTION);
143144
String schemaUrl = commandLine.getOptionValue(BuilderConstants.SCHEMA_URL_OPTION);
144145

145146
String parallelismStr = commandLine.getOptionValue(BuilderConstants.PARALLELISM_OPTION);
@@ -169,6 +170,7 @@ private static void run(CommandLine commandLine) throws Exception {
169170
.setCatalog(new DefaultBuilderCatalog(projectSchema, conceptLists))
170171
.setPythonExec(pythonExec)
171172
.setPythonPaths(pythonPaths)
173+
.setPythonEnv(pythonEnv)
172174
.setOperation(alterOperationEnum)
173175
.setEnableLeadTo(enableLeadTo)
174176
.setGraphStoreUrl(graphStoreUrl)

builder/runner/local/src/main/java/com/antgroup/openspg/builder/runner/local/physical/sink/impl/Neo4jSinkWriter.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import com.antgroup.openspg.cloudext.interfaces.graphstore.model.lpg.schema.EdgeTypeName;
3232
import com.antgroup.openspg.server.common.model.project.Project;
3333
import com.google.common.collect.Lists;
34+
import java.math.BigDecimal;
3435
import java.util.ArrayList;
3536
import java.util.List;
3637
import java.util.Map;
@@ -144,6 +145,9 @@ private void convertProperties(Map<String, Object> properties) {
144145
}
145146
entry.setValue(doubleList);
146147
}
148+
if (entry.getValue() instanceof BigDecimal) {
149+
entry.setValue(((BigDecimal) entry.getValue()).doubleValue());
150+
}
147151
}
148152
}
149153

0 commit comments

Comments
 (0)