Skip to content

Commit 186c0a9

Browse files
committed
Improve OPC UA server logging
1 parent aab9152 commit 186c0a9

4 files changed

Lines changed: 87 additions & 28 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -335,10 +335,18 @@ private void customizeServer(final PipeParameters parameters) {
335335
.setEnableAnonymousAccess(enableAnonymousAccess)
336336
.setSecurityPolicies(securityPolicies)
337337
.setDebounceTimeMs(debounceTimeMs);
338+
LOGGER.info(
339+
"Starting Apache IoTDB OPC UA server: tcpPort={}, httpsPort={}.",
340+
tcpBindPort,
341+
httpsBindPort);
338342
final OpcUaServer newServer = builder.build();
339343
nameSpace = new OpcUaNameSpace(newServer, builder);
340344
nameSpace.startup();
341345
newServer.startup().get();
346+
LOGGER.info(
347+
"Apache IoTDB OPC UA server started: tcpPort={}, httpsPort={}.",
348+
tcpBindPort,
349+
httpsBindPort);
342350
return new Pair<>(new AtomicInteger(0), nameSpace);
343351
} else {
344352
oldValue
@@ -567,7 +575,9 @@ public void close() throws Exception {
567575

568576
if (pair.getLeft().decrementAndGet() <= 0) {
569577
try {
578+
LOGGER.info("Shutting down Apache IoTDB OPC UA server: serverKey={}.", serverKey);
570579
pair.getRight().shutdown();
580+
LOGGER.info("Apache IoTDB OPC UA server stopped: serverKey={}.", serverKey);
571581
} finally {
572582
SERVER_KEY_TO_REFERENCE_COUNT_AND_NAME_SPACE_MAP.remove(serverKey);
573583
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaKeyStoreLoader.java

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
package org.apache.iotdb.db.pipe.sink.protocol.opcua.server;
2121

22-
import org.apache.iotdb.commons.utils.FileUtils;
2322
import org.apache.iotdb.db.i18n.DataNodePipeMessages;
2423

2524
import com.google.common.collect.Sets;
@@ -35,6 +34,7 @@
3534
import java.io.OutputStream;
3635
import java.nio.file.Files;
3736
import java.nio.file.Path;
37+
import java.nio.file.StandardOpenOption;
3838
import java.security.Key;
3939
import java.security.KeyPair;
4040
import java.security.KeyStore;
@@ -62,17 +62,27 @@ OpcUaKeyStoreLoader load(final Path baseDir, final char[] password) throws Excep
6262
final File serverKeyStore = baseDir.resolve("iotdb-server.pfx").toFile();
6363

6464
LOGGER.info(DataNodePipeMessages.LOADING_KEYSTORE_AT, serverKeyStore);
65+
boolean needRewrite = false;
6566

6667
if (serverKeyStore.exists()) {
6768
try (InputStream is = Files.newInputStream(serverKeyStore.toPath())) {
6869
keyStore.load(is, password);
6970
} catch (final IOException e) {
70-
LOGGER.warn(DataNodePipeMessages.LOAD_KEYSTORE_FAILED_THE_EXISTING_KEYSTORE_MAY);
71-
FileUtils.deleteFileOrDirectory(serverKeyStore);
71+
LOGGER.warn(
72+
"Load keyStore {} failed, the existing keyStore may be stale, re-constructing.",
73+
serverKeyStore,
74+
e);
75+
if (!serverKeyStore.delete()) {
76+
LOGGER.warn(
77+
"Delete stale keyStore {} failed. The file will be overwritten if possible.",
78+
serverKeyStore);
79+
needRewrite = true;
80+
}
7281
}
7382
}
7483

75-
if (!serverKeyStore.exists()) {
84+
if (!serverKeyStore.exists() || needRewrite) {
85+
LOGGER.info("Generating new server keyStore at {}", serverKeyStore);
7686
keyStore.load(null, password);
7787

7888
final KeyPair keyPair = SelfSignedCertificateGenerator.generateRsaKeyPair(2048);
@@ -108,7 +118,12 @@ OpcUaKeyStoreLoader load(final Path baseDir, final char[] password) throws Excep
108118

109119
keyStore.setKeyEntry(
110120
SERVER_ALIAS, keyPair.getPrivate(), password, new X509Certificate[] {certificate});
111-
try (final OutputStream os = Files.newOutputStream(serverKeyStore.toPath())) {
121+
try (final OutputStream os =
122+
Files.newOutputStream(
123+
serverKeyStore.toPath(),
124+
StandardOpenOption.CREATE,
125+
StandardOpenOption.WRITE,
126+
StandardOpenOption.TRUNCATE_EXISTING)) {
112127
keyStore.store(os, password);
113128
}
114129
}
@@ -119,6 +134,7 @@ OpcUaKeyStoreLoader load(final Path baseDir, final char[] password) throws Excep
119134

120135
final PublicKey serverPublicKey = serverCertificate.getPublicKey();
121136
serverKeyPair = new KeyPair(serverPublicKey, (PrivateKey) serverPrivateKey);
137+
LOGGER.info("Loaded server certificate from keyStore alias {}.", SERVER_ALIAS);
122138
} else {
123139
throw new Exception(
124140
"Invalid keyStore, the serverPrivateKey is "

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java

Lines changed: 45 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -305,9 +305,7 @@ private void transferTabletRowForClientServerModel(
305305
new DateTime(utcTimestamp),
306306
new DateTime());
307307
measurementNode = addNode(name, currentFolder, folderNode, dataValue, type);
308-
if (Objects.isNull(measurementNode.getValue())
309-
|| Objects.isNull(measurementNode.getValue().getSourceTime())
310-
|| measurementNode.getValue().getSourceTime().getUtcTime() < utcTimestamp) {
308+
if (shouldNotifyNodeValueChange(measurementNode, utcTimestamp)) {
311309
notifyNodeValueChange(measurementNode.getNodeId(), dataValue, measurementNode);
312310
}
313311
} else {
@@ -325,9 +323,7 @@ private void transferTabletRowForClientServerModel(
325323
new DataValue(
326324
new Variant(value), currentQuality, new DateTime(timestamp), new DateTime()),
327325
dataType);
328-
if (Objects.isNull(valueNode.getValue())
329-
|| Objects.isNull(valueNode.getValue().getSourceTime())
330-
|| valueNode.getValue().getSourceTime().getUtcTime() < timestamp) {
326+
if (shouldNotifyNodeValueChange(valueNode, timestamp)) {
331327
notifyNodeValueChange(
332328
valueNode.getNodeId(),
333329
new DataValue(
@@ -337,6 +333,26 @@ private void transferTabletRowForClientServerModel(
337333
}
338334
}
339335

336+
private boolean shouldNotifyNodeValueChange(
337+
final UaVariableNode variableNode, final long candidateUtcTime) {
338+
final DataValue currentValue = variableNode.getValue();
339+
if (Objects.isNull(currentValue) || Objects.isNull(currentValue.getSourceTime())) {
340+
return true;
341+
}
342+
final long currentUtcTime = currentValue.getSourceTime().getUtcTime();
343+
if (currentUtcTime < candidateUtcTime) {
344+
return true;
345+
}
346+
if (candidateUtcTime < currentUtcTime) {
347+
LOGGER.debug(
348+
"Reject stale value update: nodeId={}, candidateSourceTime={}, currentSourceTime={}.",
349+
variableNode.getNodeId(),
350+
candidateUtcTime,
351+
currentUtcTime);
352+
}
353+
return false;
354+
}
355+
340356
private UaVariableNode addNode(
341357
final String nodeName,
342358
final String currentFolder,
@@ -639,14 +655,19 @@ public void onDataItemsCreated(final List<DataItem> dataItems) {
639655
final NodeId nodeId = readValueId.getNodeId();
640656

641657
// 1. Add the new subscription item to the subscription mapping
642-
nodeSubscriptions.compute(
658+
final List<DataItem> subscribedItems =
659+
nodeSubscriptions.compute(
660+
nodeId,
661+
(k, existingList) -> {
662+
List<DataItem> list =
663+
existingList != null ? existingList : new CopyOnWriteArrayList<>();
664+
list.add(item);
665+
return list;
666+
});
667+
LOGGER.debug(
668+
"Registered data item subscription: nodeId={}, subscriptionCount={}.",
643669
nodeId,
644-
(k, existingList) -> {
645-
List<DataItem> list =
646-
existingList != null ? existingList : new CopyOnWriteArrayList<>();
647-
list.add(item);
648-
return list;
649-
});
670+
subscribedItems.size());
650671

651672
// 2. 【Key Optimization】Proactively push the current node's initial value when the new
652673
// subscription item is created
@@ -680,13 +701,18 @@ public void onDataItemsDeleted(final List<DataItem> dataItems) {
680701
final NodeId nodeId = readValueId.getNodeId();
681702

682703
// When the client cancels the subscription, remove this subscription item from the mapping
683-
nodeSubscriptions.computeIfPresent(
704+
final List<DataItem> remainingItems =
705+
nodeSubscriptions.computeIfPresent(
706+
nodeId,
707+
(k, existingList) -> {
708+
existingList.remove(item);
709+
// Automatically clean up the key when there are no subscribers, save memory
710+
return existingList.isEmpty() ? null : existingList;
711+
});
712+
LOGGER.debug(
713+
"Removed data item subscription: nodeId={}, subscriptionCount={}.",
684714
nodeId,
685-
(k, existingList) -> {
686-
existingList.remove(item);
687-
// Automatically clean up the key when there are no subscribers, save memory
688-
return existingList.isEmpty() ? null : existingList;
689-
});
715+
Objects.isNull(remainingItems) ? 0 : remainingItems.size());
690716
}
691717
}
692718

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaServerBuilder.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -142,10 +142,8 @@ public OpcUaServer build() throws Exception {
142142

143143
final File pkiDir = securityDir.resolve("pki").toFile();
144144

145-
LoggerFactory.getLogger(OpcUaServerBuilder.class)
146-
.info("Security dir: {}", securityDir.toAbsolutePath());
147-
LoggerFactory.getLogger(OpcUaServerBuilder.class)
148-
.info("Security pki dir: {}", pkiDir.getAbsolutePath());
145+
LOGGER.info("Security dir: {}", securityDir.toAbsolutePath());
146+
LOGGER.info("Security pki dir: {}", pkiDir.getAbsolutePath());
149147

150148
final OpcUaKeyStoreLoader loader =
151149
new OpcUaKeyStoreLoader().load(securityDir, password.toCharArray());
@@ -197,8 +195,17 @@ public OpcUaServer build() throws Exception {
197195
StatusCodes.Bad_ConfigurationError,
198196
"Certificate is missing the application URI"));
199197

198+
final Set<SecurityPolicy> configuredSecurityPolicies = new LinkedHashSet<>(securityPolicies);
200199
final Set<EndpointConfiguration> endpointConfigurations =
201200
createEndpointConfigurations(certificate, tcpBindPort, httpsBindPort);
201+
LOGGER.info(
202+
"Built OPC UA server endpoints: tcpPort={}, httpsPort={}, anonymousAccess={}, securityPolicies={}, endpointCount={}, debounceTimeMs={}.",
203+
tcpBindPort,
204+
httpsBindPort,
205+
enableAnonymousAccess,
206+
configuredSecurityPolicies,
207+
endpointConfigurations.size(),
208+
debounceTimeMs);
202209

203210
serverConfig =
204211
OpcUaServerConfig.builder()

0 commit comments

Comments
 (0)