Skip to content

Commit d1edaba

Browse files
committed
Query Executor integrated in the Digital Adapter
1 parent 5f0e978 commit d1edaba

3 files changed

Lines changed: 103 additions & 7 deletions

File tree

src/main/java/it/wldt/adapter/digital/DigitalAdapter.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import it.wldt.exception.EventBusException;
1313
import it.wldt.exception.WldtDigitalTwinStateEventException;
1414
import it.wldt.exception.WldtRuntimeException;
15+
import it.wldt.storage.query.QueryExecutor;
1516
import org.slf4j.Logger;
1617
import org.slf4j.LoggerFactory;
1718

@@ -22,20 +23,17 @@
2223
* Authors:
2324
* Marco Picone, Ph.D. (picone.m@gmail.com)
2425
* Marta Spadoni (marta.spadoni2@studio.unibo.it)
25-
*
26-
* Date: 01/02/2023
26+
* Date: 05/08/2024
2727
* Project: White Label Digital Twin Java Framework - (whitelabel-digitaltwin)
28-
*
2928
* This class defines the core capabilities and responsibilities of a Digital Adapter in the Digital Twin's Instance
3029
* A Digital Adapter is the architectural component in charge of handling the interaction of the Digital Twin
3130
* with the external digital world exposing information coming from the Digital Twin State and receiving information and
3231
* action from other digital applications or twins.
33-
*
3432
* This class can be extended in order to create custom Digital Adapter shaping specific behaviours of the twin and
3533
* allowing a simplified interaction with the external digital world.
36-
*
3734
* Multiple Digital Adapters can be active at the same time on the Digital Twin with the aim to handle different
3835
* interaction with the digital layer according to the nature of the twin and its operation context.
36+
* The class integrates a Query Executor to send query to the storage layer in both synchronous and asynchronous way.
3937
*/
4038
public abstract class DigitalAdapter<C> extends DigitalTwinWorker implements WldtEventListener, LifeCycleListener {
4139

@@ -67,17 +65,21 @@ public abstract class DigitalAdapter<C> extends DigitalTwinWorker implements Wld
6765

6866
private DigitalAdapterLifeCycleListener digitalAdapterLifeCycleListener;
6967

68+
// Query Executor to send query to the storage layer in both synchronous and asynchronous way
69+
protected QueryExecutor queryExecutor = null;
70+
7071
private DigitalAdapter() {
72+
super();
7173
}
7274

7375
public DigitalAdapter(String id, C configuration) {
74-
super();
76+
this(id);
7577
this.id = id;
7678
this.configuration = configuration;
7779
}
7880

7981
public DigitalAdapter(String id) {
80-
super();
82+
this();
8183
this.id = id;
8284
}
8385

@@ -314,6 +316,11 @@ abstract protected void onStateUpdate(DigitalTwinState newDigitalTwinState,
314316
@Override
315317
public void onWorkerStart() throws WldtRuntimeException {
316318
try{
319+
320+
// Init the Query Executor
321+
if(this.queryExecutor == null)
322+
this.queryExecutor = new QueryExecutor(this.digitalTwinId, String.format("query-executor-%s", this.id));
323+
317324
onAdapterStart();
318325
}catch (Exception e){
319326
throw new WldtRuntimeException(e.getLocalizedMessage());

src/test/java/it/wldt/process/digital/DemoDigitalAdapter.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@
77
import it.wldt.core.state.DigitalTwinStateEventNotification;
88
import it.wldt.exception.EventBusException;
99
import it.wldt.process.metrics.SharedTestMetrics;
10+
import it.wldt.storage.query.IQueryResultListener;
11+
import it.wldt.storage.query.QueryRequest;
12+
import it.wldt.storage.query.QueryResult;
1013
import org.slf4j.Logger;
1114
import org.slf4j.LoggerFactory;
1215
import java.util.ArrayList;
@@ -95,4 +98,28 @@ public <T> void invokeAction(String actionKey, T body){
9598
e.printStackTrace();
9699
}
97100
}
101+
102+
/**
103+
* This method is used to test the sync query execution
104+
* @param queryRequest - The query request
105+
* @return - The query result
106+
*/
107+
public QueryResult<?> testSyncQuery(QueryRequest queryRequest){
108+
if(this.queryExecutor != null){
109+
return this.queryExecutor.syncQueryExecute(queryRequest);
110+
}
111+
else
112+
return null;
113+
}
114+
115+
/**
116+
* This method is used to test the async query execution
117+
* @param queryRequest - The query request
118+
* @param queryResultListener - The query result listener
119+
*/
120+
public void testAsyncQuery(QueryRequest queryRequest, IQueryResultListener queryResultListener){
121+
if(this.queryExecutor != null){
122+
this.queryExecutor.asyncQueryExecute(queryRequest, queryResultListener);
123+
}
124+
}
98125
}

src/test/java/it/wldt/storage/StorageQueryTester.java

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1174,4 +1174,66 @@ public void testSyncLifeCycleQuery() throws InterruptedException {
11741174
// Check the last Life Cycle State is SYNCHRONIZED
11751175
assertEquals(LifeCycleState.SYNCHRONIZED, ((LifeCycleVariationRecord)resultList.get(0)).getLifeCycleState());
11761176
}
1177+
1178+
@Test
1179+
@Order(14)
1180+
public void testDigitalAdapterQueryExecutorSync() throws WldtConfigurationException, EventBusException, ModelException, InterruptedException, WldtRuntimeException, StorageException {
1181+
1182+
//Set EventBus Logger
1183+
WldtEventBus.getInstance().setEventLogger(new DefaultWldtEventLogger());
1184+
1185+
//Wait until all the messages have been received
1186+
Thread.sleep((DemoPhysicalAdapter.DEFAULT_MESSAGE_SLEEP_PERIOD_MS + ((DemoPhysicalAdapter.DEFAULT_TARGET_PHYSICAL_ASSET_PROPERTY_UPDATE_MESSAGES + DemoPhysicalAdapter.DEFAULT_TARGET_PHYSICAL_ASSET_EVENT_UPDATES) * DemoPhysicalAdapter.DEFAULT_MESSAGE_SLEEP_PERIOD_MS)));
1187+
1188+
Thread.sleep(5000);
1189+
1190+
// Create Query Request to the Storage Manager for the Last Digital Twin State
1191+
QueryRequest queryRequest = new QueryRequest();
1192+
queryRequest.setResourceType(QueryResourceType.DIGITAL_TWIN_STATE);
1193+
queryRequest.setRequestType(QueryRequestType.LAST_VALUE);
1194+
1195+
// Send the Query Request to the Storage Manager for the target DT
1196+
QueryResult<?> queryResult = digitalAdapter.testSyncQuery(queryRequest);
1197+
1198+
assertNotNull(queryResult);
1199+
assertEquals(queryResult.getOriginalRequest().getRequestType(), QueryRequestType.LAST_VALUE);
1200+
assertEquals(queryResult.getOriginalRequest().getResourceType(), QueryResourceType.DIGITAL_TWIN_STATE);
1201+
1202+
}
1203+
1204+
@Test
1205+
@Order(15)
1206+
public void testDigitalAdapterQueryExecutorAsync() throws InterruptedException {
1207+
1208+
//Set EventBus Logger
1209+
WldtEventBus.getInstance().setEventLogger(new DefaultWldtEventLogger());
1210+
1211+
//Wait until all the messages have been received
1212+
Thread.sleep((DemoPhysicalAdapter.DEFAULT_MESSAGE_SLEEP_PERIOD_MS + ((DemoPhysicalAdapter.DEFAULT_TARGET_PHYSICAL_ASSET_PROPERTY_UPDATE_MESSAGES + DemoPhysicalAdapter.DEFAULT_TARGET_PHYSICAL_ASSET_EVENT_UPDATES) * DemoPhysicalAdapter.DEFAULT_MESSAGE_SLEEP_PERIOD_MS)));
1213+
1214+
Thread.sleep(5000);
1215+
1216+
// Create Query Request to the Storage Manager for the Last Digital Twin State
1217+
QueryRequest queryRequest = new QueryRequest();
1218+
queryRequest.setResourceType(QueryResourceType.DIGITAL_TWIN_STATE);
1219+
queryRequest.setRequestType(QueryRequestType.LAST_VALUE);
1220+
1221+
final QueryResult<?>[] receivedQueryResult = {null};
1222+
1223+
// Send the Query Request to the Storage Manager for the target DT
1224+
digitalAdapter.testAsyncQuery(queryRequest, new IQueryResultListener() {
1225+
@Override
1226+
public void onQueryResult(QueryResult<?> queryResult) {
1227+
receivedQueryResult[0] = queryResult;
1228+
}
1229+
});
1230+
1231+
Thread.sleep(5000);
1232+
1233+
assertNotNull(receivedQueryResult[0]);
1234+
assertEquals(receivedQueryResult[0].getOriginalRequest().getRequestType(), QueryRequestType.LAST_VALUE);
1235+
assertEquals(receivedQueryResult[0].getOriginalRequest().getResourceType(), QueryResourceType.DIGITAL_TWIN_STATE);
1236+
1237+
Thread.sleep(10000);
1238+
}
11771239
}

0 commit comments

Comments
 (0)