Skip to content

Commit 4cab9ae

Browse files
Merge pull request #839 from symphony-enrico/improve-datafeed-stop
Better handling stop datafeed
2 parents 7eaee87 + 521b809 commit 4cab9ae

4 files changed

Lines changed: 12 additions & 14 deletions

File tree

symphony-bdk-core/src/main/java/com/symphony/bdk/core/service/datafeed/impl/AbstractDatafeedLoop.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ public void unsubscribe(RealTimeEventListener listener) {
7777
*/
7878
@Override
7979
public void start() throws AuthUnauthorizedException, ApiException {
80-
if (this.started.get()) {
80+
if (!this.started.compareAndSet(false, true)) {
8181
throw new IllegalStateException("The datafeed service is already started");
8282
}
8383

@@ -104,8 +104,11 @@ public void start() throws AuthUnauthorizedException, ApiException {
104104
*/
105105
@Override
106106
public void stop() {
107-
log.info("Stopping the datafeed loop (will happen once the current read is finished)...");
108-
this.started.set(false);
107+
if (this.started.compareAndSet(true, false)) {
108+
log.info("Stopping the datafeed loop (will happen once the current read is finished)...");
109+
} else {
110+
log.warn("Datafeed loop already stopping...");
111+
}
109112
}
110113

111114
private void updateLastPullTimestamp() {

symphony-bdk-core/src/main/java/com/symphony/bdk/core/service/datafeed/impl/DatafeedLoopV1.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -97,10 +97,9 @@ protected void runLoop() throws Throwable {
9797
this.datafeedId = this.datafeedId == null ? this.createDatafeed.execute() : this.datafeedId;
9898
log.info("Start reading events from datafeed {}", datafeedId);
9999

100-
this.started.set(true);
101-
do {
100+
while (this.started.get()) {
102101
this.readDatafeed.execute();
103-
} while (this.started.get());
102+
}
104103

105104
log.info("Datafeed loop successfully stopped.");
106105
}

symphony-bdk-core/src/main/java/com/symphony/bdk/core/service/datafeed/impl/DatafeedLoopV2.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -97,12 +97,9 @@ protected void runLoop() throws Throwable {
9797
}
9898

9999
log.info("Start reading events from datafeed {}", this.datafeed.getId());
100-
this.started.set(true);
101-
do {
102-
100+
while (this.started.get()) {
103101
this.readDatafeed.execute();
104-
105-
} while (this.started.get());
102+
}
106103
log.info("Datafeed loop successfully stopped.");
107104
}
108105

symphony-bdk-core/src/main/java/com/symphony/bdk/core/service/datafeed/impl/DatahoseLoopImpl.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,11 +58,10 @@ public DatahoseLoopImpl(DatafeedApi datafeedApi, AuthSession authSession, BdkCon
5858
@Override
5959
protected void runLoop() throws Throwable {
6060
log.info("Start reading events from datahose loop");
61-
this.started.set(true);
6261

63-
do {
62+
while (this.started.get()) {
6463
this.readEvents.execute();
65-
} while (this.started.get());
64+
}
6665

6766
log.info("Datahose loop successfully stopped.");
6867
}

0 commit comments

Comments
 (0)