Skip to content

Commit 2ede0d9

Browse files
committed
Adding Partition Execution Logging
Signed-off-by: Alexander Christensen <alchrisk@amazon.com>
1 parent 1f3c152 commit 2ede0d9

2 files changed

Lines changed: 10 additions & 1 deletion

File tree

data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/DimensionalTimeSliceCrawler.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,8 @@ public Instant crawl(LeaderPartition leaderPartition, EnhancedSourceCoordinator
8787

8888
@Override
8989
public void executePartition(DimensionalTimeSliceWorkerProgressState state, Buffer<Record<Event>> buffer, AcknowledgementSet acknowledgementSet) {
90+
log.debug("Processing partition - DimensionType: {}, TimeRange: {} to {}",
91+
state.getDimensionType(), state.getStartTime(), state.getEndTime());
9092
partitionWaitTimeTimer.record(Duration.between(state.getPartitionCreationTime(), Instant.now()));
9193
partitionProcessLatencyTimer.record(() -> client.executePartition(state, buffer, acknowledgementSet));
9294
}

data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/LeaderOnlyTokenCrawler.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,9 @@ public Instant crawl(LeaderPartition leaderPartition,
123123

124124
@Override
125125
public void executePartition(PaginationCrawlerWorkerProgressState state, Buffer buffer, AcknowledgementSet acknowledgementSet) {
126+
String firstToken = !state.getItemIds().isEmpty() ? state.getItemIds().get(0) : "";
127+
String lastToken = !state.getItemIds().isEmpty() ? state.getItemIds().get(state.getItemIds().size()-1) : "";
128+
log.debug("Processing partition - FirstToken: {}, LastToken: {}", firstToken, lastToken);
126129
client.executePartition(state, buffer, acknowledgementSet);
127130
}
128131

@@ -140,6 +143,10 @@ private List<ItemInfo> collectBatch(Iterator<ItemInfo> iterator) {
140143
private void processBatch(List<ItemInfo> batch,
141144
LeaderPartition leaderPartition,
142145
EnhancedSourceCoordinator coordinator) {
146+
String firstToken = !batch.isEmpty() ? batch.get(0).getItemId() : "(No First Token in this batch)";
147+
String lastToken = !batch.isEmpty() ? batch.get(batch.size()-1).getItemId() : "(No Last Token in this batch)";
148+
log.debug("Processing batch - FirstToken: {}, LastToken: {}", firstToken, lastToken);
149+
143150
if (acknowledgementsEnabled) {
144151
AtomicBoolean ackReceived = new AtomicBoolean(false);
145152
long createTimestamp = System.currentTimeMillis();
@@ -237,4 +244,4 @@ private void updateLeaderProgressState(LeaderPartition leaderPartition,
237244
void setNoAckTimeout(Duration timeout) {
238245
this.noAckTimeout = timeout;
239246
}
240-
}
247+
}

0 commit comments

Comments
 (0)