Skip to content

feat(source-pinterest): Update CDK to v6#65960

Merged
Anatolii Yatsuk (tolik0) merged 21 commits into
masterfrom
tolik0/source-pinterest/update-cdk-to-v6
Sep 10, 2025
Merged

feat(source-pinterest): Update CDK to v6#65960
Anatolii Yatsuk (tolik0) merged 21 commits into
masterfrom
tolik0/source-pinterest/update-cdk-to-v6

Conversation

@tolik0
Copy link
Copy Markdown
Contributor

@tolik0 Anatolii Yatsuk (tolik0) commented Sep 5, 2025

What

Update CDK to v6.
Migration to v7 is currently blocked, as version 7 expects all streams — including those in Python implementations — to align with the concurrent CDK.

{"type":"LOG","log":{"level":"FATAL","message":"'NoneType' object has no attribute 'ensure_at_least_one_state_emitted'\nTraceback (most recent call last):\n  File \"/home/anatolii/airbytehq/airbyte/airbyte-integrations/connectors/source-pinterest/main.py\", line 9, in <module>\n    run()\n  File \"/home/anatolii/airbytehq/airbyte/airbyte-integrations/connectors/source-pinterest/source_pinterest/run.py\", line 54, in run\n    launch(source, _args)\n  File \"/home/anatolii/.cache/pypoetry/virtualenvs/source-pinterest-_YSa_W83-py3.11/lib/python3.11/site-packages/airbyte_cdk/entrypoint.py\", line 377, in launch\n    for message in source_entrypoint.run(parsed_args):\n  File \"/home/anatolii/.cache/pypoetry/virtualenvs/source-pinterest-_YSa_W83-py3.11/lib/python3.11/site-packages/airbyte_cdk/entrypoint.py\", line 207, in run\n    yield from map(\n  File \"/home/anatolii/.cache/pypoetry/virtualenvs/source-pinterest-_YSa_W83-py3.11/lib/python3.11/site-packages/airbyte_cdk/entrypoint.py\", line 280, in read\n    for message in self.source.read(self.logger, config, catalog, state):\n  File \"/home/anatolii/.cache/pypoetry/virtualenvs/source-pinterest-_YSa_W83-py3.11/lib/python3.11/site-packages/airbyte_cdk/sources/declarative/concurrent_declarative_source.py\", line 379, in read\n    yield from self._concurrent_source.read(selected_concurrent_streams)\n  File \"/home/anatolii/.cache/pypoetry/virtualenvs/source-pinterest-_YSa_W83-py3.11/lib/python3.11/site-packages/airbyte_cdk/sources/concurrent_source/concurrent_source.py\", line 126, in read\n    yield from self._consume_from_queue(\n  File \"/home/anatolii/.cache/pypoetry/virtualenvs/source-pinterest-_YSa_W83-py3.11/lib/python3.11/site-packages/airbyte_cdk/sources/concurrent_source/concurrent_source.py\", line 147, in _consume_from_queue\n    yield from self._handle_item(\n  File \"/home/anatolii/.cache/pypoetry/virtualenvs/source-pinterest-_YSa_W83-py3.11/lib/python3.11/site-packages/airbyte_cdk/sources/concurrent_source/concurrent_source.py\", line 167, in _handle_item\n    yield from concurrent_stream_processor.on_partition_generation_completed(queue_item)\n  File \"/home/anatolii/.cache/pypoetry/virtualenvs/source-pinterest-_YSa_W83-py3.11/lib/python3.11/site-packages/airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py\", line 86, in on_partition_generation_completed\n    yield from self._on_stream_is_done(stream_name)\n  File \"/home/anatolii/.cache/pypoetry/virtualenvs/source-pinterest-_YSa_W83-py3.11/lib/python3.11/site-packages/airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py\", line 239, in _on_stream_is_done\n    stream.cursor.ensure_at_least_one_state_emitted()\n    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\nAttributeError: 'NoneType' object has no attribute 'ensure_at_least_one_state_emitted'"}}

Resolves: https://github.com/airbytehq/airbyte-internal-issues/issues/14217

How

Review guide

User Impact

Can this PR be safely reverted and rolled back?

  • YES 💚
  • NO ❌

@tolik0 Anatolii Yatsuk (tolik0) marked this pull request as ready for review September 5, 2025 16:56
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Sep 5, 2025

👋 Greetings, Airbyte Team Member!

Here are some helpful tips and reminders for your convenience.

Helpful Resources

PR Slash Commands

Airbyte Maintainers (that's you!) can execute the following slash commands on your PR:

  • /format-fix - Fixes most formatting issues.
  • /bump-version - Bumps connector versions.
    • You can specify a custom changelog by passing changelog. Example: /bump-version changelog="My cool update"
    • Leaving the changelog arg blank will auto-populate the changelog from the PR title.
  • /run-cat-tests - Runs legacy CAT tests (Connector Acceptance Tests)
  • /build-connector-images - Builds and publishes a pre-release docker image for the modified connector(s).
  • /poe connector source-example lock - Run the Poe lock task on the source-example connector, committing the results back to the branch.
  • /poe source example lock - Alias for /poe connector source-example lock.
  • /poe source example use-cdk-branch my/branch - Pin the source-example CDK reference to the branch name specified.
  • /poe source example use-cdk-latest - Update the source-example CDK dependency to the latest available version.

📝 Edit this welcome message.

@tolik0
Copy link
Copy Markdown
Contributor Author

Anatolii Yatsuk (tolik0) commented Sep 5, 2025

/format-fix

Format-fix job started... Check job output.

✅ Changes applied successfully. (7d8b8c9)

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Sep 5, 2025

source-pinterest Connector Test Results

83 tests   78 ✅  55s ⏱️
 2 suites   5 💤
 2 files     0 ❌

Results for commit 412b505.

♻️ This comment has been updated with latest results.

@tolik0
Copy link
Copy Markdown
Contributor Author

Anatolii Yatsuk (tolik0) commented Sep 5, 2025

/bump-version

Bump Version job started... Check job output.

✅ Changes applied successfully. (d174c5c)

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Sep 5, 2025

Deploy preview for airbyte-docs ready!

✅ Preview
https://airbyte-docs-5o12vdfyt-airbyte-growth.vercel.app

Built with commit 412b505.
This pull request is being automatically deployed with vercel-action

@tolik0
Copy link
Copy Markdown
Contributor Author

Anatolii Yatsuk (tolik0) commented Sep 8, 2025

/format-fix

Format-fix job started... Check job output.

✅ Changes applied successfully. (7ddb72b)

@tolik0 Anatolii Yatsuk (tolik0) changed the title feat(source-pinterest): Update CDK to v6 feat(source-pinterest): Update CDK to v7 Sep 8, 2025
@tolik0
Copy link
Copy Markdown
Contributor Author

Anatolii Yatsuk (tolik0) commented Sep 8, 2025

/poe source pinterest use-cdk-latest

Running poe source pinterest use-cdk-latest...

Link to job logs.

🤖 Auto-commit successful: f08bbaa

🟦 Poe command source pinterest use-cdk-latest completed successfully.

@tolik0
Copy link
Copy Markdown
Contributor Author

Anatolii Yatsuk (tolik0) commented Sep 8, 2025

/format-fix

Format-fix job started... Check job output.

✅ Changes applied successfully. (ad32260)

@tolik0 Anatolii Yatsuk (tolik0) requested a review from a team September 8, 2025 15:39
@tolik0
Copy link
Copy Markdown
Contributor Author

Anatolii Yatsuk (tolik0) commented Sep 8, 2025

/poe source pinterest use-cdk-latest

Running poe source pinterest use-cdk-latest...

Link to job logs.

🤖 Auto-commit successful: 00fba70

🟦 Poe command source pinterest use-cdk-latest completed successfully.

Copy link
Copy Markdown
Contributor

@brianjlai Brian Lai (brianjlai) left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added some comments around the shift back to v6 CDK. feel free to ping me if you want to discuss more about the changes. but yeah as max mentioned, v7 would have some shenanigans. but once its fully ready to migrate to manifest-only most of the pain points should no longer be relevant

python = "^3.10,<3.12"
pendulum = "==2.1.2"
airbyte-cdk = "^4"
airbyte-cdk = "^7.0.1"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this would work, but we've usually just used ^7. Did you have to do the full version for some reason to get the latest CDK?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is just how the /poe source pinterest use-cdk-latest updated it, because I had a failed CI check with ^7

title: "Account ID"
description: "The Pinterest account ID you want to fetch data for. This ID must be provided to filter the data for a specific account."
examples: ["1234567890"]
num_workers:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as per the conversation during retro about the confusion between Airbyte workers and connector workers, lets change this to num_threads and in the title and description let's just say "Number of concurrent threadsandThe number of parallel threads to use for the sync.`

concurrency_level:
type: ConcurrencyLevel
default_concurrency: "{{ config['num_workers'] or 2 }}"
max_concurrency: 20
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a comment about where the boundaries for concurrency level were determined


return declarative_streams + report_streams

def _create_ad_accounts_stream(self, config):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unfortunately, I suspect that because we're still going to be using v6 of the CDK here, we might still run into this same issue since RFR on python streams is still auto-enabled.

If we were going to v7, then you are right that RFR w/ synthetic cursors is not used anymore and we could share the streams. It may be safer to leave this and once we go to v7 + manifest-only, this whole section will get deleted anyway

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I encountered the same issue with version 6 — AdAccounts was created as a DefaultStream, but we’re iterating through it using stream_slices. I tested it and confirmed that we’re still receiving all AdAccounts records when running a sync for multiple report streams.
Would it be safer to explicitly set is_resumable = False?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah yeah you're right we have part of the changes to return back DefaultStream still in v6, so that is still a real scenario especially for a simple stream like ad accounts. its fine how you have it.

Would it be safer to explicitly set is_resumable = False?

Yeah I think it would be. So let's disable it. Either way once we go to v7 that setting won't matter any more (once on v7 we'll resume incremental syncs running as full refresh, but full refresh streams will only checkpoint once at the end). This should hopefully just be a short term behavior anyway

Its possible the reason records matched was because we would only see this appear if we have more than one page of ad accounts, but I imagine most customers don't have many pages of ad accounts.

dockerImageTag: 2.1.9-rc.1
dockerRepository: airbyte/source-pinterest
connectorBuildOptions:
baseImage: docker.io/airbyte/python-connector-base:4.0.0@sha256:d9894b6895923b379f3006fa251147806919c62b7d9021b5cd125bb67d7bbe22
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

small nit, but I think we should also be bumping the base image to 4.0.2 since we had some recent updates to the image to get some additional security fixes: https://hub.docker.com/layers/airbyte/python-connector-base/4.0.2/images/sha256-f24aa7ddd043ec3c21851d814e95736deb3e1e657b74def0f1dec14e4d51504e

#62909

yield record


class AdAccounts(PinterestStream):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pending the above comment about not being able to reuse the same AdAccounts stream, would it make sense to continue to use the low-code version of the stream instead?

i assume we needed this change since in v7 we would get back a DefaultStream which might incompatible with the python streams, but now back on v6 it should be a DeclarativeStream.

@tolik0
Copy link
Copy Markdown
Contributor Author

Anatolii Yatsuk (tolik0) commented Sep 9, 2025

/format-fix

Format-fix job started... Check job output.

✅ Changes applied successfully. (0a4ed49)

Copy link
Copy Markdown
Contributor

@brianjlai Brian Lai (brianjlai) left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm!


return declarative_streams + report_streams

def _create_ad_accounts_stream(self, config):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah yeah you're right we have part of the changes to return back DefaultStream still in v6, so that is still a real scenario especially for a simple stream like ad accounts. its fine how you have it.

Would it be safer to explicitly set is_resumable = False?

Yeah I think it would be. So let's disable it. Either way once we go to v7 that setting won't matter any more (once on v7 we'll resume incremental syncs running as full refresh, but full refresh streams will only checkpoint once at the end). This should hopefully just be a short term behavior anyway

Its possible the reason records matched was because we would only see this appear if we have more than one page of ad accounts, but I imagine most customers don't have many pages of ad accounts.

description = "Source implementation for Pinterest."
authors = [ "Airbyte <contact@airbyte.io>",]
license = "MIT"
license = "ELv2"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch!

@tolik0 Anatolii Yatsuk (tolik0) merged commit aadfd9a into master Sep 10, 2025
29 of 30 checks passed
@tolik0 Anatolii Yatsuk (tolik0) deleted the tolik0/source-pinterest/update-cdk-to-v6 branch September 10, 2025 20:32
@maxi297 Maxime Carbonneau-Leclerc (maxi297) changed the title feat(source-pinterest): Update CDK to v7 feat(source-pinterest): Update CDK to v6 Sep 11, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants