Skip to content

Commit cdeeffd

Browse files
committed
fix(builder): #585
1 parent 56fca1f commit cdeeffd

2 files changed

Lines changed: 43 additions & 10 deletions

File tree

  • builder/runner/local/src/main/java/com/antgroup/openspg/builder/runner/local/physical/sink/impl
  • cloudext/interface/graph-store/src/main/java/com/antgroup/openspg/cloudext/interfaces/graphstore/util

builder/runner/local/src/main/java/com/antgroup/openspg/builder/runner/local/physical/sink/impl/Neo4jSinkWriter.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -51,15 +51,15 @@ public class Neo4jSinkWriter extends BaseSinkWriter<Neo4jSinkNodeConfig> {
5151
private Project project;
5252
private static final String DOT = ".";
5353

54-
private static RejectedExecutionHandler handler =
54+
private static final RejectedExecutionHandler handler =
5555
(r, executor) -> {
5656
try {
5757
executor.getQueue().put(r);
5858
} catch (InterruptedException e) {
5959
Thread.currentThread().interrupt();
6060
}
6161
};
62-
private static ExecutorService executor =
62+
private static final ExecutorService executor =
6363
new ThreadPoolExecutor(
6464
NUM_THREADS,
6565
NUM_THREADS,
@@ -169,7 +169,7 @@ private void awaitAllTasks(List<Future<Void>> futures)
169169

170170
private void writeNode(SubGraphRecord.Node node) {
171171
try {
172-
Long statr = System.currentTimeMillis();
172+
long start = System.currentTimeMillis();
173173
RecordAlterOperationEnum operation = context.getOperation();
174174
if (StringUtils.isBlank(node.getId())
175175
|| StringUtils.isBlank(node.getName())
@@ -184,7 +184,7 @@ private void writeNode(SubGraphRecord.Node node) {
184184
List<LPGPropertyRecord> properties = Lists.newArrayList();
185185
for (Map.Entry<String, Object> entry : node.getProperties().entrySet()) {
186186
Object entryValue = entry.getValue();
187-
if (!TypeChecker.isArrayOrCollectionOfPrimitives(entryValue)) {
187+
if (!TypeChecker.isBasicType(entryValue)) {
188188
entryValue = JSON.toJSONString(entryValue);
189189
}
190190
properties.add(new LPGPropertyRecord(entry.getKey(), entryValue));
@@ -199,7 +199,7 @@ private void writeNode(SubGraphRecord.Node node) {
199199
log.info(
200200
String.format(
201201
"write Node succeed id:%s cons:%s",
202-
node.getId(), System.currentTimeMillis() - statr));
202+
node.getId(), System.currentTimeMillis() - start));
203203
} catch (Exception e) {
204204
throw new RuntimeException(e);
205205
}
@@ -214,7 +214,7 @@ private String labelPrefix(String label) {
214214

215215
private void writeEdge(SubGraphRecord.Edge edge) {
216216
try {
217-
Long statr = System.currentTimeMillis();
217+
long start = System.currentTimeMillis();
218218
RecordAlterOperationEnum operation = context.getOperation();
219219
if (StringUtils.isBlank(edge.getFrom())
220220
|| StringUtils.isBlank(edge.getTo())
@@ -226,7 +226,7 @@ private void writeEdge(SubGraphRecord.Edge edge) {
226226
List<LPGPropertyRecord> properties = Lists.newArrayList();
227227
for (Map.Entry<String, Object> entry : edge.getProperties().entrySet()) {
228228
Object entryValue = entry.getValue();
229-
if (!TypeChecker.isArrayOrCollectionOfPrimitives(entryValue)) {
229+
if (!TypeChecker.isBasicType(entryValue)) {
230230
entryValue = JSON.toJSONString(entryValue);
231231
}
232232
properties.add(new LPGPropertyRecord(entry.getKey(), entryValue));
@@ -244,9 +244,10 @@ private void writeEdge(SubGraphRecord.Edge edge) {
244244
client.deleteEdge(edge.getLabel(), edgeRecords);
245245
}
246246
log.info(
247-
String.format(
248-
"write Edge succeed from:%s to:%s cons:%s",
249-
edge.getFrom(), edge.getTo(), System.currentTimeMillis() - statr));
247+
"write Edge succeed from:{} to:{} cons:{}",
248+
edge.getFrom(),
249+
edge.getTo(),
250+
System.currentTimeMillis() - start);
250251
} catch (Exception e) {
251252
throw new RuntimeException(e);
252253
}

cloudext/interface/graph-store/src/main/java/com/antgroup/openspg/cloudext/interfaces/graphstore/util/TypeChecker.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,4 +66,36 @@ public static boolean isArrayOrCollectionOfPrimitives(Object obj) {
6666
}
6767
return false;
6868
}
69+
70+
public static boolean isBasicType(Object obj) {
71+
if (obj == null) {
72+
return false;
73+
}
74+
if (obj instanceof Object[]) {
75+
for (Object element : (Object[]) obj) {
76+
if (!isBasicType(element)) {
77+
return false;
78+
}
79+
}
80+
return true;
81+
82+
} else if (obj instanceof Collection<?>) {
83+
for (Object element : (Collection<?>) obj) {
84+
if (!isBasicType(element)) {
85+
return false;
86+
}
87+
}
88+
return true;
89+
}
90+
91+
return obj.getClass().isPrimitive()
92+
|| obj instanceof Integer
93+
|| obj instanceof Double
94+
|| obj instanceof Float
95+
|| obj instanceof Long
96+
|| obj instanceof Byte
97+
|| obj instanceof Boolean
98+
|| obj instanceof Character
99+
|| obj instanceof CharSequence;
100+
}
69101
}

0 commit comments

Comments
 (0)