Skip to content

feat: do not reset semaphore when duplicate partitions#509

Merged
Serhii Lazebnyi (lazebnyi) merged 4 commits intomainfrom
maxi297/support-duplicate-partitions-in-per-partition
Apr 26, 2025
Merged

feat: do not reset semaphore when duplicate partitions#509
Serhii Lazebnyi (lazebnyi) merged 4 commits intomainfrom
maxi297/support-duplicate-partitions-in-per-partition

Conversation

@maxi297
Copy link
Copy Markdown
Contributor

@maxi297 Maxime Carbonneau-Leclerc (maxi297) commented Apr 26, 2025

Some parent streams may return duplicate record IDs in the response. As a result, we can end up with several identical partitions. After one partition is processed, the next one causes an error because it is already closed.

For example, in the source-stripe connector, the payout_balance_transactions stream depends on balance_transactions, which uses the events endpoint to fetch data incrementally. This can lead to multiple events for the same parent with the same state but different values in other fields.

This PR skip semaphore reset if duplicate partitions detected.

@maxi297
Copy link
Copy Markdown
Contributor Author

Maxime Carbonneau-Leclerc (maxi297) commented Apr 26, 2025

/autofix

Auto-Fix Job Info

This job attempts to auto-fix any linting or formating issues. If any fixes are made,
those changes will be automatically committed and pushed back to the PR.

Note: This job can only be run by maintainers. On PRs from forks, this command requires
that the PR author has enabled the Allow edits from maintainers option.

PR auto-fix job started... Check job output.

✅ Changes applied successfully.

@lazebnyi Serhii Lazebnyi (lazebnyi) marked this pull request as ready for review April 26, 2025 17:12
@lazebnyi Serhii Lazebnyi (lazebnyi) changed the title do not reset semaphore when duplicate partitions feat: do not reset semaphore when duplicate partitions Apr 26, 2025
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Apr 26, 2025

📝 Walkthrough

Walkthrough

A class-level boolean flag _IS_PARTITION_DUPLICATION_LOGGED has been added to the ConcurrentPerPartitionCursor class. The method responsible for generating slices from partitions now checks for duplicate partition keys before creating semaphores. If a duplicate partition key is detected and the warning has not yet been logged, a warning message is logged and the flag is set to prevent repeated warnings. The semaphore creation logic remains unchanged for unique partition keys.

Changes

File(s) Change Summary
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py Added a class-level flag to log partition duplication warnings only once. Modified logic to detect and log duplicate partition keys during semaphore creation.

Suggested labels

bug

Suggested reviewers

  • maxi297
  • brianjlai

Would you like to consider adding a test to ensure the warning is logged only once when partition duplication occurs, wdyt?

✨ Finishing Touches
  • 📝 Generate Docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

‼️ IMPORTANT
Auto-reply has been disabled for this repository in the CodeRabbit settings. The CodeRabbit bot will not respond to your replies unless it is explicitly tagged.

  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (2)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (2)

284-290: Good approach to prevent semaphore reset, but consider enhancing logging

The implementation correctly addresses the issue by preventing semaphore reset for duplicate partitions while adding a warning log. This helps avoid errors when processing duplicate partitions.

To provide more debugging context, would you consider enhancing the warning message to include the partition key details? Something like:

-                logger.warning(f"Partition duplication detected for stream {self._stream_name}")
+                logger.warning(f"Partition duplication detected for stream {self._stream_name}, partition key: {partition_key}")

This would make troubleshooting easier by showing which specific partition is being duplicated.


284-290: Consider tracking duplicate partition metrics

While the current implementation prevents errors, it might be useful to track how frequently duplicate partitions occur.

Would it make sense to add a counter for duplicate partitions? This could help diagnose issues with upstream systems. For example:

        if partition_key in self._semaphore_per_partition:
+            if not hasattr(self, '_duplicate_partition_count'):
+                self._duplicate_partition_count = 0
+            self._duplicate_partition_count += 1
            if not self._IS_PARTITION_DUPLICATION_LOGGED:
-                logger.warning(f"Partition duplication detected for stream {self._stream_name}")
+                logger.warning(f"Partition duplication detected for stream {self._stream_name}. Continuing without resetting semaphore.")
                self._IS_PARTITION_DUPLICATION_LOGGED = True
+            # Log a debug message each time to help with troubleshooting
+            logger.debug(f"Duplicate partition {partition_key} encountered for stream {self._stream_name} (total: {self._duplicate_partition_count})")

What do you think?

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 2fd2492 and a4d853b.

📒 Files selected for processing (1)
  • airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (2 hunks)

@lazebnyi Serhii Lazebnyi (lazebnyi) merged commit 4a96a2a into main Apr 26, 2025
34 of 36 checks passed
@lazebnyi Serhii Lazebnyi (lazebnyi) deleted the maxi297/support-duplicate-partitions-in-per-partition branch April 26, 2025 17:40
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants