feat: do not reset semaphore when duplicate partitions#509
Conversation
|
/autofix
|
📝 WalkthroughWalkthroughA class-level boolean flag Changes
Suggested labels
Suggested reviewers
Would you like to consider adding a test to ensure the warning is logged only once when partition duplication occurs, wdyt? ✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (2)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (2)
284-290: Good approach to prevent semaphore reset, but consider enhancing loggingThe implementation correctly addresses the issue by preventing semaphore reset for duplicate partitions while adding a warning log. This helps avoid errors when processing duplicate partitions.
To provide more debugging context, would you consider enhancing the warning message to include the partition key details? Something like:
- logger.warning(f"Partition duplication detected for stream {self._stream_name}") + logger.warning(f"Partition duplication detected for stream {self._stream_name}, partition key: {partition_key}")This would make troubleshooting easier by showing which specific partition is being duplicated.
284-290: Consider tracking duplicate partition metricsWhile the current implementation prevents errors, it might be useful to track how frequently duplicate partitions occur.
Would it make sense to add a counter for duplicate partitions? This could help diagnose issues with upstream systems. For example:
if partition_key in self._semaphore_per_partition: + if not hasattr(self, '_duplicate_partition_count'): + self._duplicate_partition_count = 0 + self._duplicate_partition_count += 1 if not self._IS_PARTITION_DUPLICATION_LOGGED: - logger.warning(f"Partition duplication detected for stream {self._stream_name}") + logger.warning(f"Partition duplication detected for stream {self._stream_name}. Continuing without resetting semaphore.") self._IS_PARTITION_DUPLICATION_LOGGED = True + # Log a debug message each time to help with troubleshooting + logger.debug(f"Duplicate partition {partition_key} encountered for stream {self._stream_name} (total: {self._duplicate_partition_count})")What do you think?
Some parent streams may return duplicate record IDs in the response. As a result, we can end up with several identical partitions. After one partition is processed, the next one causes an error because it is already closed.
For example, in the
source-stripeconnector, thepayout_balance_transactionsstream depends onbalance_transactions, which uses theeventsendpoint to fetch data incrementally. This can lead to multiple events for the same parent with the same state but different values in other fields.This PR skip semaphore reset if duplicate partitions detected.