Skip to content

Commit 49a81d3

Browse files
committed
Fix NPE on s3 source stopping without sqs, stop s3 scan worker thread on stopping of the s3 source
Signed-off-by: Taylor Gray <tylgry@amazon.com>
1 parent e3b425e commit 49a81d3

4 files changed

Lines changed: 15 additions & 5 deletions

File tree

data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/Pipeline.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,7 @@ public synchronized void shutdown() {
274274
stopRequested.set(true);
275275
} catch (Exception ex) {
276276
LOG.error("Pipeline [{}] - Encountered exception while stopping the source, " +
277-
"proceeding with termination of process workers", name);
277+
"proceeding with termination of process workers", name, ex);
278278
}
279279

280280
shutdownExecutorService(processorExecutorService, processorShutdownTimeout.toMillis(), "processor");

data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3ScanService.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,10 @@ public void start() {
6565
scanObjectWorkerThread.start();
6666
}
6767

68+
public void stop() {
69+
scanObjectWorkerThread.interrupt();
70+
}
71+
6872
/**
6973
* This Method Used to fetch the scan options details from {@link S3SourceConfig} amd build the
7074
* all the s3 scan buckets information in list.

data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3Source.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,8 +131,13 @@ public void start(Buffer<Record<Event>> buffer) {
131131

132132
@Override
133133
public void stop() {
134-
sqsService.stop();
135-
if (Objects.nonNull(sourceCoordinator)) {
134+
135+
if (Objects.nonNull(sqsService)) {
136+
sqsService.stop();
137+
}
138+
139+
if (Objects.nonNull(s3ScanService) && Objects.nonNull(sourceCoordinator)) {
140+
s3ScanService.stop();
136141
sourceCoordinator.giveUpPartitions();
137142
}
138143
}

data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/ScanObjectWorker.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ public class ScanObjectWorker implements Runnable{
6565
private final AcknowledgementSetManager acknowledgementSetManager;
6666

6767
// Should there be a duration or time that is configured in the source to stop processing? Otherwise will only stop when data prepper is stopped
68-
private final boolean shouldStopProcessing = false;
68+
private boolean shouldStopProcessing = false;
6969
private final boolean deleteS3ObjectsOnRead;
7070
private final S3ObjectDeleteWorker s3ObjectDeleteWorker;
7171
private final PluginMetrics pluginMetrics;
@@ -109,6 +109,7 @@ public void run() {
109109
Thread.sleep(RETRY_BACKOFF_ON_EXCEPTION_MILLIS);
110110
} catch (InterruptedException ex) {
111111
LOG.error("S3 Scan worker thread interrupted while backing off.", ex);
112+
return;
112113
}
113114
}
114115

@@ -129,7 +130,7 @@ private void startProcessingObject(final int waitTimeMillis) {
129130
try {
130131
Thread.sleep(waitTimeMillis);
131132
} catch (InterruptedException e) {
132-
e.printStackTrace();
133+
shouldStopProcessing = true;
133134
}
134135
return;
135136
}

0 commit comments

Comments
 (0)