Skip to content

Fix how failures are handled for shard partitions in DDB source#6184

Merged
graytaylor0 merged 6 commits into
opensearch-project:mainfrom
graytaylor0:DdbAckFix
Oct 22, 2025
Merged

Fix how failures are handled for shard partitions in DDB source#6184
graytaylor0 merged 6 commits into
opensearch-project:mainfrom
graytaylor0:DdbAckFix

Conversation

@graytaylor0

@graytaylor0 graytaylor0 commented Oct 16, 2025

Copy link
Copy Markdown
Member

Description

This change fixes how the DynamoDB source handles failures with acknowledgments, including ack timeouts, negative acknowledgments, and failure to update partition state.

When these failure cases happen, we will now signal to the shardConsumer to stop processing the shard.

Check List

  • New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Signed-off-by: Taylor Gray <tylgry@amazon.com>
… condition

Signed-off-by: Taylor Gray <tylgry@amazon.com>
}

@VisibleForTesting
public ShardAcknowledgementManager(final AcknowledgementSetManager acknowledgementSetManager,

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make this package protected.

Also, update the constructor above to use this.

public ShardAcknowledgementManager(final AcknowledgementSetManager acknowledgementSetManager,
                                       final EnhancedSourceCoordinator sourceCoordinator,
                                       final DynamoDBSourceConfig dynamoDBSourceConfig,
                                       final Consumer<StreamPartition> stopWorkerConsumer) {
  this(acknowledgementSetManager, sourceCoordinator, dynamoDBSourceConfig, createExecutorService(stopWorkConsumer));
}

private static ExecutorService createExecutorService(final Consumer<StreamPartition> stopWorkerConsumer) {
  ExecutorService executorService = Executors.newSingleThreadExecutor(BackgroundThreadFactory.defaultExecutorThreadFactory("dynamodb-shard-ack-monitor"));
  executorService.submit(() -> monitorAcknowledgements(stopWorkConsumer));
  return executorService.
}

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private static ExecutorService createExecutorService(final Consumer<StreamPartition> stopWorkerConsumer) {
  ExecutorService executorService = Executors.newSingleThreadExecutor(BackgroundThreadFactory.defaultExecutorThreadFactory("dynamodb-shard-ack-monitor"));
  executorService.submit(() -> monitorAcknowledgements(stopWorkConsumer));
  return executorService.
}

monitorAcknowledgments method can't be used in a static method

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is just a Runnable. So you should be able to add a Runnable as a parameter to the static method.

Then:

this(acknowledgementSetManager, sourceCoordinator, dynamoDBSourceConfig, createExecutorService(stopWorkConsumer), () -> monitorAcknowledgements(stopWorkConsumer))

callback1.accept(true); // Positive ack
callback2.accept(true);

// doThrow(PartitionUpdateException.class).when(sourceCoordinator).saveProgressStateForPartition(eq(streamPartition),

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove this commented out code.


if (shardAcknowledgementManager != null && !shardAcknowledgementManager.isStillTrackingShard(streamPartition)) {
LOG.warn("Shard {} is no longer being tracked by the acknowledgment manager, exiting", streamPartition.getShardId());
break;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe for a future PR, but this and the exception seem to just result in ShardConsumer threads just going away without any real tracking of them.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would the ideal behavior be here? This does just have the ShardConsumer exit

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we may be ok here. I found this code that handles exceptions:

} else {
LOG.error("Received an exception while processing shard {}, giving up shard: {}", streamPartition.getShardId(), ex);
if (dynamoDBSourceConfig.isAcknowledgmentsEnabled()) {
shardAcknowledgementManager.giveUpPartition(streamPartition);
} else {
coordinator.giveUpPartition(streamPartition);
}
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is giveUpPartition idempotent? If not, we might need a change here since we have already lost the partition in this particular case.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this is what happens when ShardConsumer throws an error.

And giveUpPartition would work if called twice as long as another worker hasn't already grabbed ownership of the shard. However, I tried to handle this to only be called once by putting this check in the giveUpPartition method of shardAcknowledgmentManager

public void giveUpPartition(final StreamPartition streamPartition) {
        if (isStillTrackingShard(streamPartition)) {
            LOG.debug("Adding partition {} to give up list", streamPartition.getPartitionKey());
            partitionsToGiveUp.add(streamPartition);
            partitionsToRemove.add(streamPartition);
        }
    }

So basically this call will only mark the partition to be given up by the acknowledgment manager and isn't actually calling give up partition. So it will only be called once.

Signed-off-by: Taylor Gray <tylgry@amazon.com>
…when checkpointing fails

Signed-off-by: Taylor Gray <tylgry@amazon.com>
checkpoints.remove(streamPartition);
ackStatuses.remove(streamPartition);
});
lock.lock();

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks like all the collections the logic is dealing with - are already synchronized. Do we still need this lock?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any given collection is synchronized. But, this block of code is modifying multiple collections together.

san81
san81 previously approved these changes Oct 21, 2025
Signed-off-by: Taylor Gray <tylgry@amazon.com>
try {
acknowledgementSet = shardAcknowledgementManager.createAcknowledgmentSet(streamPartition, sequenceNumber, shardIterator == null);
} catch (final ShardNotTrackedException e) {
LOG.warn(e.getMessage());

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improve this log message:

LOG.warn("Not creating acknowledgment set since shard is not tracked: {}", e.getMessage());

There are two reasons for this:

  • When somebody sees this warning it will be hard to know where is originated unless line logging is unabled.
  • Unknown strings shouldn't be passed to a logger since it is treated as a format string.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense will change this

Signed-off-by: Taylor Gray <tylgry@amazon.com>
@graytaylor0 graytaylor0 merged commit 5af2fdc into opensearch-project:main Oct 22, 2025
46 of 47 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants