From 57e7bff88c98ced2f29c240122ca980873263806 Mon Sep 17 00:00:00 2001 From: Oleksandr Bazarnov Date: Wed, 16 Apr 2025 12:44:03 +0300 Subject: [PATCH 01/17] add --- .../manifest_declarative_source.py | 20 +- .../parsers/manifest_component_transformer.py | 4 +- manifest_migrations/README.md | 66 ++ manifest_migrations/__init__.py | 0 manifest_migrations/exceptions.py | 12 + manifest_migrations/manifest_migration.py | 137 ++++ manifest_migrations/migration_handler.py | 62 ++ manifest_migrations/migrations/__init__.py | 0 .../http_requester_path_to_url_v6_45_2__1.py | 42 ++ ...tp_requester_url_base_to_url_v6_45_2__0.py | 26 + manifest_migrations/migrations_registry.py | 53 ++ unit_tests/manifest_migrations/conftest.py | 600 ++++++++++++++++++ .../test_manifest_migration.py | 45 ++ 13 files changed, 1061 insertions(+), 6 deletions(-) create mode 100644 manifest_migrations/README.md create mode 100644 manifest_migrations/__init__.py create mode 100644 manifest_migrations/exceptions.py create mode 100644 manifest_migrations/manifest_migration.py create mode 100644 manifest_migrations/migration_handler.py create mode 100644 manifest_migrations/migrations/__init__.py create mode 100644 manifest_migrations/migrations/http_requester_path_to_url_v6_45_2__1.py create mode 100644 manifest_migrations/migrations/http_requester_url_base_to_url_v6_45_2__0.py create mode 100644 manifest_migrations/migrations_registry.py create mode 100644 unit_tests/manifest_migrations/conftest.py create mode 100644 unit_tests/manifest_migrations/test_manifest_migration.py diff --git a/airbyte_cdk/sources/declarative/manifest_declarative_source.py b/airbyte_cdk/sources/declarative/manifest_declarative_source.py index cfd258c6c..1520cb689 100644 --- a/airbyte_cdk/sources/declarative/manifest_declarative_source.py +++ b/airbyte_cdk/sources/declarative/manifest_declarative_source.py @@ -55,6 +55,9 @@ SliceLogger, ) from airbyte_cdk.utils.traced_exception import AirbyteTracedException +from manifest_migrations.migration_handler import ( + ManifestMigrationHandler, +) class ManifestDeclarativeSource(DeclarativeSource): @@ -68,16 +71,19 @@ def __init__( debug: bool = False, emit_connector_builder_messages: bool = False, component_factory: Optional[ModelToComponentFactory] = None, - ): + migrate_manifest: Optional[bool] = False, + ) -> None: """ Args: config: The provided config dict. source_config: The manifest of low-code components that describe the source connector. - debug: True if debug mode is enabled. - emit_connector_builder_messages: True if messages should be emitted to the connector builder. - component_factory: optional factory if ModelToComponentFactory's default behavior needs to be tweaked. + debug: bool True if debug mode is enabled. + emit_connector_builder_messages: Optional[bool] True if messages should be emitted to the connector builder. + component_factory: Optional factory if ModelToComponentFactory's default behavior needs to be tweaked. + migrate_manifest: Optional[bool] if the manifest should be migrated to pick up the latest declarative component schema changes at runtime. """ self.logger = logging.getLogger(f"airbyte.{self.name}") + # For ease of use we don't require the type to be specified at the top level manifest, but it should be included during processing manifest = dict(source_config) if "type" not in manifest: @@ -90,6 +96,12 @@ def __init__( propagated_source_config = ManifestComponentTransformer().propagate_types_and_parameters( "", resolved_source_config, {} ) + + if migrate_manifest: + propagated_source_config = ManifestMigrationHandler( + propagated_source_config + ).apply_migrations() + self._source_config = propagated_source_config self._debug = debug self._emit_connector_builder_messages = emit_connector_builder_messages diff --git a/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py b/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py index 6779b54ab..44f414343 100644 --- a/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py +++ b/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py @@ -4,7 +4,7 @@ import copy import typing -from typing import Any, Mapping, Optional +from typing import Any, Dict, Mapping, Optional PARAMETERS_STR = "$parameters" @@ -95,7 +95,7 @@ def propagate_types_and_parameters( declarative_component: Mapping[str, Any], parent_parameters: Mapping[str, Any], use_parent_parameters: Optional[bool] = None, - ) -> Mapping[str, Any]: + ) -> Dict[str, Any]: """ Recursively transforms the specified declarative component and subcomponents to propagate parameters and insert the default component type if it was not already present. The resulting transformed components are a deep copy of the input diff --git a/manifest_migrations/README.md b/manifest_migrations/README.md new file mode 100644 index 000000000..47f13cb4c --- /dev/null +++ b/manifest_migrations/README.md @@ -0,0 +1,66 @@ +# Manifest Migrations + +This directory contains the logic and registry for manifest migrations in the Airbyte CDK. Migrations are used to update or transform manifest components to newer formats or schemas as the CDK evolves. + +## Adding a New Migration + +1. **Create a Migration File:** + - Add a new Python file in the `migrations/` subdirectory. + - Name the file using the pattern: `_v____.py`. + - Example: `http_requester_url_base_to_url_v6_45_2__0.py` + - The `` integer is used to determine the order of migrations for the same version. + +2. **Define the Migration Class:** + - The migration class must inherit from `ManifestMigration`. + - Name the class using the pattern: `V____ManifestMigration_`. + - Example: `V_6_45_2_ManifestMigration_HttpRequesterUrlBaseToUrl` + - Implement the following methods: + - `should_migrate(self, manifest: ManifestType) -> bool`: Return `True` if the migration should be applied to the given manifest. + - `migrate(self, manifest: ManifestType) -> None`: Perform the migration in-place. + +3. **Migration Versioning:** + - The migration version is extracted from the class name and used to determine applicability. + - Only manifests with a version less than or equal to the migration version will be migrated. + +4. **Component Type:** + - Use the `TYPE_TAG` constant to check the component type in your migration logic. + +5. **Examples:** + - See `migrations/http_requester_url_base_to_url_v6_45_2__0.py` and `migrations/http_requester_path_to_url_v6_45_2__1.py` for reference implementations. + +## Migration Registry + +- All migration classes in the `migrations/` folder are automatically discovered and registered in `migrations_registry.py`. +- Migrations are applied in order, determined by the `` suffix in the filename. + +## Testing + +- Ensure your migration is covered by unit tests. +- Tests should verify both `should_migrate` and `migrate` behaviors. + +## Example Migration Skeleton + +```python +from airbyte_cdk.sources.declarative.migrations.manifest.manifest_migration import TYPE_TAG, ManifestMigration, ManifestType + +class V_1_2_3_ManifestMigration_Example(ManifestMigration): + component_type = "ExampleComponent" + original_key = "old_key" + replacement_key = "new_key" + + def should_migrate(self, manifest: ManifestType) -> bool: + return manifest[TYPE_TAG] == self.component_type and self.original_key in manifest + + def migrate(self, manifest: ManifestType) -> None: + manifest[self.replacement_key] = manifest[self.original_key] + manifest.pop(self.original_key, None) +``` + +## Additional Notes + +- Do not modify the migration registry manually; it will pick up all valid migration classes automatically. +- If you need to skip certain component types, use the `NON_MIGRATABLE_TYPES` list in `manifest_migration.py`. + +--- + +For more details, see the docstrings in `manifest_migration.py` and the examples in the `migrations/` folder. \ No newline at end of file diff --git a/manifest_migrations/__init__.py b/manifest_migrations/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/manifest_migrations/exceptions.py b/manifest_migrations/exceptions.py new file mode 100644 index 000000000..7a140706f --- /dev/null +++ b/manifest_migrations/exceptions.py @@ -0,0 +1,12 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + + +class ManifestMigrationException(Exception): + """ + Raised when a migration error occurs in the manifest. + """ + + def __init__(self, message: str) -> None: + super().__init__(f"Failed to migrate the manifest: {message}") diff --git a/manifest_migrations/manifest_migration.py b/manifest_migrations/manifest_migration.py new file mode 100644 index 000000000..6e4b3bb2f --- /dev/null +++ b/manifest_migrations/manifest_migration.py @@ -0,0 +1,137 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. + +import re +from abc import abstractmethod +from typing import Any, Dict + +ManifestType = Dict[str, Any] + + +TYPE_TAG = "type" + +NON_MIGRATABLE_TYPES = [ + "DynamicDeclarativeStream", +] + + +class ManifestMigration: + @abstractmethod + def should_migrate(self, manifest: ManifestType) -> bool: + """ + Check if the manifest should be migrated. + + :param manifest: The manifest to potentially migrate + :param kwargs: Additional arguments for migration + + :return: true if the manifest is of the expected format and should be migrated. False otherwise. + """ + + @abstractmethod + def migrate(self, manifest: ManifestType) -> None: + """ + Migrate the manifest. Assumes should_migrate(manifest) returned True. + + :param manifest: The manifest to migrate + :param kwargs: Additional arguments for migration + """ + + @property + def migration_version(self) -> str: + """ + Get the migration version. + + :return: The migration version as a string + """ + return self._get_migration_version() + + def _is_component(self, obj: Dict[str, Any]) -> bool: + """ + Check if the object is a component. + + :param obj: The object to check + :return: True if the object is a component, False otherwise + """ + return TYPE_TAG in obj.keys() + + def _is_migratable(self, obj: Dict[str, Any]) -> bool: + """ + Check if the object is a migratable component, + based on the Type of the component and the migration version. + + :param obj: The object to check + :return: True if the object is a migratable component, False otherwise + """ + return ( + obj[TYPE_TAG] not in NON_MIGRATABLE_TYPES + and self._get_manifest_version(obj) <= self.migration_version + ) + + def _process_manifest(self, obj: Any) -> None: + """ + Recursively processes a manifest object, migrating components that match the migration criteria. + + This method traverses the entire manifest structure (dictionaries and lists) and applies + migrations to components that: + 1. Have a type tag + 2. Are not in the list of non-migratable types + 3. Meet the conditions defined in the should_migrate method + + Parameters: + obj (Any): The object to process, which can be a dictionary, list, or any other type. + Dictionary objects are checked for component type tags and potentially migrated. + List objects have each of their items processed recursively. + Other types are ignored. + + Returns: + None, since we process the manifest in place. + """ + if isinstance(obj, dict): + # Check if the object is a component + if self._is_component(obj): + # Check if the object is allowed to be migrated + if not self._is_migratable(obj): + return + + # Check if the object should be migrated + if self.should_migrate(obj): + # Perform the migration, if needed + self.migrate(obj) + + # Process all values in the dictionary + for value in list(obj.values()): + self._process_manifest(value) + + elif isinstance(obj, list): + # Process all items in the list + for item in obj: + self._process_manifest(item) + + def _get_manifest_version(self, manifest: ManifestType) -> str: + """ + Get the manifest version from the manifest. + + :param manifest: The manifest to get the version from + :return: The manifest version + """ + return str(manifest.get("version", "0.0.0")) + + def _get_migration_version(self) -> str: + """ + Get the migration version from the class name. + The migration version is extracted from the class name using a regular expression. + The expected format is "V____". + + For example, "V_6_45_2_ManifestMigration_HttpRequesterPathToUrl" -> "6.45.2" + + :return: The migration version as a string in the format "major.minor.patch" + :raises ValueError: If the class name does not match the expected format + """ + + class_name = self.__class__.__name__ + migration_version = re.search(r"V_(\d+_\d+_\d+)", class_name) + if migration_version: + return migration_version.group(1).replace("_", ".") + else: + raise ValueError( + f"Invalid migration class name, make sure the class name has the version (e.g `V_0_0_0_`): {class_name}" + ) diff --git a/manifest_migrations/migration_handler.py b/manifest_migrations/migration_handler.py new file mode 100644 index 000000000..87ec33716 --- /dev/null +++ b/manifest_migrations/migration_handler.py @@ -0,0 +1,62 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + +import copy +from typing import Type + +from manifest_migrations.exceptions import ( + ManifestMigrationException, +) +from manifest_migrations.manifest_migration import ( + ManifestMigration, + ManifestType, +) +from manifest_migrations.migrations_registry import ( + MIGRATIONS, +) + + +class ManifestMigrationHandler: + """ + This class is responsible for handling migrations in the manifest. + """ + + def __init__(self, manifest: ManifestType) -> None: + self._manifest = manifest + self._migrated_manifest: ManifestType = copy.deepcopy(self._manifest) + + def apply_migrations(self) -> ManifestType: + """ + Apply all registered migrations to the manifest. + + This method iterates through all migrations in the migrations registry and applies + them sequentially to the current manifest. If any migration fails with a + ManifestMigrationException, the original unmodified manifest is returned instead. + + Returns: + ManifestType: The migrated manifest if all migrations succeeded, or the original + manifest if any migration failed. + """ + try: + for migration_cls in MIGRATIONS: + self._handle_migration(migration_cls) + return self._migrated_manifest + except ManifestMigrationException: + # if any errors occur we return the original resolved manifest + return self._manifest + + def _handle_migration(self, migration_class: Type[ManifestMigration]) -> None: + """ + Handles a single manifest migration by instantiating the migration class and processing the manifest. + + Args: + migration_class (Type[ManifestMigration]): The migration class to apply to the manifest. + + Raises: + ManifestMigrationException: If the migration process encounters any errors. + """ + try: + migration_class()._process_manifest(self._migrated_manifest) + except Exception as e: + raise ManifestMigrationException(f"Failed to migrate the manifest: {e}") from e diff --git a/manifest_migrations/migrations/__init__.py b/manifest_migrations/migrations/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/manifest_migrations/migrations/http_requester_path_to_url_v6_45_2__1.py b/manifest_migrations/migrations/http_requester_path_to_url_v6_45_2__1.py new file mode 100644 index 000000000..d0693ec43 --- /dev/null +++ b/manifest_migrations/migrations/http_requester_path_to_url_v6_45_2__1.py @@ -0,0 +1,42 @@ +from urllib.parse import urljoin + +from airbyte_cdk.sources.types import EmptyString +from manifest_migrations.manifest_migration import ( + TYPE_TAG, + ManifestMigration, + ManifestType, +) + + +class V_6_45_2_ManifestMigration_HttpRequesterPathToUrl(ManifestMigration): + """ + This migration is responsible for migrating the `path` key to `url` in the HttpRequester component. + The `path` key is expected to be a relative path, and the `url` key is expected to be a full URL. + The migration will concatenate the `url_base` and `path` to form a full URL. + """ + + component_type = "HttpRequester" + original_key = "path" + replacement_key = "url" + + def should_migrate(self, manifest: ManifestType) -> bool: + return manifest[TYPE_TAG] == self.component_type and self.original_key in list( + manifest.keys() + ) + + def migrate(self, manifest: ManifestType) -> None: + original_key_value = manifest[self.original_key].lstrip("/") + replacement_key_value = manifest[self.replacement_key] + + # return a full-url if provided directly from interpolation context + if original_key_value == EmptyString or original_key_value is None: + manifest[self.replacement_key] = replacement_key_value + manifest.pop(self.original_key, None) + else: + # since we didn't provide a full-url, the url_base might not have a trailing slash + # so we join the url_base and path correctly + if not replacement_key_value.endswith("/"): + replacement_key_value += "/" + + manifest[self.replacement_key] = urljoin(replacement_key_value, original_key_value) + manifest.pop(self.original_key, None) diff --git a/manifest_migrations/migrations/http_requester_url_base_to_url_v6_45_2__0.py b/manifest_migrations/migrations/http_requester_url_base_to_url_v6_45_2__0.py new file mode 100644 index 000000000..a269db9fa --- /dev/null +++ b/manifest_migrations/migrations/http_requester_url_base_to_url_v6_45_2__0.py @@ -0,0 +1,26 @@ +from manifest_migrations.manifest_migration import ( + TYPE_TAG, + ManifestMigration, + ManifestType, +) + + +class V_6_45_2_ManifestMigration_HttpRequesterUrlBaseToUrl(ManifestMigration): + """ + This migration is responsible for migrating the `url_base` key to `url` in the HttpRequester component. + The `url_base` key is expected to be a base URL, and the `url` key is expected to be a full URL. + The migration will copy the value of `url_base` to `url`. + """ + + component_type = "HttpRequester" + original_key = "url_base" + replacement_key = "url" + + def should_migrate(self, manifest: ManifestType) -> bool: + return manifest[TYPE_TAG] == self.component_type and self.original_key in list( + manifest.keys() + ) + + def migrate(self, manifest: ManifestType) -> None: + manifest[self.replacement_key] = manifest[self.original_key] + manifest.pop(self.original_key, None) diff --git a/manifest_migrations/migrations_registry.py b/manifest_migrations/migrations_registry.py new file mode 100644 index 000000000..34a28e907 --- /dev/null +++ b/manifest_migrations/migrations_registry.py @@ -0,0 +1,53 @@ +import importlib +import inspect +import pkgutil +import re +import sys +from typing import List, Type + +import manifest_migrations.migrations as migrations_pkg +from manifest_migrations.manifest_migration import ( + ManifestMigration, +) + +# Dynamically import all modules in the migrations package +for _, module_name, is_pkg in pkgutil.iter_modules(migrations_pkg.__path__): + if not is_pkg: + importlib.import_module(f"{migrations_pkg.__name__}.{module_name}") + + +def _migration_order_key(cls: object) -> int: + # Extract the migration order from the module name, e.g., http_requester_url_base_to_url_v6_45_2__0 + # The order is the integer after the double underscore at the end of the module name + module_name = cls.__module__.split(".")[-1] + match = re.search(r"__(\d+)$", module_name) + return int(match.group(1)) if match else 0 + + +def _discover_migrations() -> List[Type[ManifestMigration]]: + migration_classes = [] + for name, obj in inspect.getmembers(sys.modules[migrations_pkg.__name__], inspect.isclass): + if ( + issubclass(obj, ManifestMigration) + and obj is not ManifestMigration + and obj not in migration_classes + ): + migration_classes.append(obj) + + for _, module_name, _ in pkgutil.iter_modules(migrations_pkg.__path__): + module = sys.modules.get(f"{migrations_pkg.__name__}.{module_name}") + if module: + for name, obj in inspect.getmembers(module, inspect.isclass): + if ( + issubclass(obj, ManifestMigration) + and obj is not ManifestMigration + and obj not in migration_classes + ): + migration_classes.append(obj) + + # Sort by migration order key + migration_classes.sort(key=_migration_order_key) + return migration_classes + + +MIGRATIONS: List[Type[ManifestMigration]] = _discover_migrations() diff --git a/unit_tests/manifest_migrations/conftest.py b/unit_tests/manifest_migrations/conftest.py new file mode 100644 index 000000000..38cdbdfb1 --- /dev/null +++ b/unit_tests/manifest_migrations/conftest.py @@ -0,0 +1,600 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + +from typing import Any, Dict + +import pytest + + +@pytest.fixture +def manifest_with_url_base_to_migrate_to_url() -> Dict[str, Any]: + return { + "version": "0.0.0", + "type": "DeclarativeSource", + "check": { + "type": "CheckStream", + "stream_names": ["A"], + }, + "definitions": { + "streams": { + "A": { + "type": "DeclarativeStream", + "name": "A", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "$ref": "#/definitions/requester_A", + "path": "/path_to_A", + "http_method": "GET", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": {"$ref": "#/schemas/A"}, + }, + }, + "B": { + "type": "DeclarativeStream", + "name": "B", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "$ref": "#/definitions/requester_A", + "path": "path_to_A", + "http_method": "GET", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": {"$ref": "#/schemas/B"}, + }, + }, + "C": { + "type": "DeclarativeStream", + "name": "C", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "$ref": "#/definitions/requester_B", + "path": "path_to_B", + "http_method": "GET", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": {"$ref": "#/schemas/C"}, + }, + }, + "D": { + "type": "DeclarativeStream", + "name": "D", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "$ref": "#/definitions/requester_B", + # ! the double-slash is intentional here for the test. + "path": "//path_to_B", + "http_method": "GET", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": {"$ref": "#/schemas/D"}, + }, + }, + "E": { + "type": "DeclarativeStream", + "name": "E", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "$ref": "#/definitions/requester_B", + "path": "/path_to_B", + "http_method": "GET", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": {"$ref": "#/schemas/E"}, + }, + }, + }, + # both requesters have duplicated `url_base`, + # which should be migrated to `url` in the new format + # and the `url_base` and `path` key should be removed + "requester_A": { + "type": "HttpRequester", + "url_base": "https://example.com/v1/", + }, + "requester_B": { + "type": "HttpRequester", + "url_base": "https://example.com/v2/", + }, + }, + "streams": [ + {"$ref": "#/definitions/streams/A"}, + {"$ref": "#/definitions/streams/B"}, + {"$ref": "#/definitions/streams/C"}, + {"$ref": "#/definitions/streams/D"}, + {"$ref": "#/definitions/streams/E"}, + ], + "schemas": { + "A": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": { + "field_a1": { + "type": "string", + }, + }, + }, + "B": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": { + "field_b1": { + "type": "string", + }, + }, + }, + "C": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": { + "field_c1": { + "type": "string", + }, + }, + }, + "D": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": { + "field_d1": { + "type": "string", + }, + }, + }, + "E": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": { + "field_e1": { + "type": "string", + }, + }, + }, + }, + } + + +@pytest.fixture +def expected_manifest_with_url_base_migrated_to_url() -> Dict[str, Any]: + return { + "version": "0.0.0", + "type": "DeclarativeSource", + "check": {"type": "CheckStream", "stream_names": ["A"]}, + "definitions": { + "streams": { + "A": { + "type": "DeclarativeStream", + "name": "A", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "http_method": "GET", + "url": "https://example.com/v1/path_to_A", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_a1": {"type": "string"}}, + }, + }, + }, + "B": { + "type": "DeclarativeStream", + "name": "B", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "http_method": "GET", + "url": "https://example.com/v1/path_to_A", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_b1": {"type": "string"}}, + }, + }, + }, + "C": { + "type": "DeclarativeStream", + "name": "C", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "http_method": "GET", + "url": "https://example.com/v2/path_to_B", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_c1": {"type": "string"}}, + }, + }, + }, + "D": { + "type": "DeclarativeStream", + "name": "D", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "http_method": "GET", + "url": "https://example.com/v2/path_to_B", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_d1": {"type": "string"}}, + }, + }, + }, + "E": { + "type": "DeclarativeStream", + "name": "E", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "http_method": "GET", + "url": "https://example.com/v2/path_to_B", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_e1": {"type": "string"}}, + }, + }, + }, + }, + "requester_A": {"type": "HttpRequester", "url": "https://example.com/v1/"}, + "requester_B": {"type": "HttpRequester", "url": "https://example.com/v2/"}, + }, + "streams": [ + { + "type": "DeclarativeStream", + "name": "A", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "http_method": "GET", + "url": "https://example.com/v1/path_to_A", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_a1": {"type": "string"}}, + }, + }, + }, + { + "type": "DeclarativeStream", + "name": "B", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "http_method": "GET", + "url": "https://example.com/v1/path_to_A", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_b1": {"type": "string"}}, + }, + }, + }, + { + "type": "DeclarativeStream", + "name": "C", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "http_method": "GET", + "url": "https://example.com/v2/path_to_B", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_c1": {"type": "string"}}, + }, + }, + }, + { + "type": "DeclarativeStream", + "name": "D", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "http_method": "GET", + "url": "https://example.com/v2/path_to_B", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_d1": {"type": "string"}}, + }, + }, + }, + { + "type": "DeclarativeStream", + "name": "E", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "http_method": "GET", + "url": "https://example.com/v2/path_to_B", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_e1": {"type": "string"}}, + }, + }, + }, + ], + "schemas": { + "A": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_a1": {"type": "string"}}, + }, + "B": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_b1": {"type": "string"}}, + }, + "C": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_c1": {"type": "string"}}, + }, + "D": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_d1": {"type": "string"}}, + }, + "E": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_e1": {"type": "string"}}, + }, + }, + } + + +@pytest.fixture +def manifest_with_migrated_url_base_and_path_is_joined_to_url() -> Dict[str, Any]: + return { + "version": "0.0.0", + "type": "DeclarativeSource", + "check": {"type": "CheckStream", "stream_names": ["A"]}, + "definitions": {}, + "streams": [ + { + "type": "DeclarativeStream", + "name": "A", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "http_method": "GET", + "url": "https://example.com/v1/path_to_A", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_a1": {"type": "string"}}, + }, + }, + }, + { + "type": "DeclarativeStream", + "name": "B", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "http_method": "GET", + "url": "https://example.com/v2/path_to_B", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_b1": {"type": "string"}}, + }, + }, + }, + { + "type": "DeclarativeStream", + "name": "C", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "http_method": "GET", + "url": "https://example.com/v2/path_to_B", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_c1": {"type": "string"}}, + }, + }, + }, + ], + "schemas": { + "A": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_a1": {"type": "string"}}, + }, + "B": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_b1": {"type": "string"}}, + }, + "C": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_c1": {"type": "string"}}, + }, + }, + } diff --git a/unit_tests/manifest_migrations/test_manifest_migration.py b/unit_tests/manifest_migrations/test_manifest_migration.py new file mode 100644 index 000000000..5fa1ec7db --- /dev/null +++ b/unit_tests/manifest_migrations/test_manifest_migration.py @@ -0,0 +1,45 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + + +from airbyte_cdk.sources.declarative.parsers.manifest_reference_resolver import ( + ManifestReferenceResolver, +) +from manifest_migrations.migration_handler import ( + ManifestMigrationHandler, +) + +resolver = ManifestReferenceResolver() + + +def test_manifest_resolve_migrate( + manifest_with_url_base_to_migrate_to_url, + expected_manifest_with_url_base_migrated_to_url, +) -> None: + """ + This test is to check that the manifest is migrated and normalized + when the `url_base` is migrated to `url` and the `path` is joined to `url`. + """ + + resolved_manifest = resolver.preprocess_manifest(manifest_with_url_base_to_migrate_to_url) + migrated_manifest = ManifestMigrationHandler(dict(resolved_manifest)).apply_migrations() + + assert migrated_manifest == expected_manifest_with_url_base_migrated_to_url + + +def test_manifest_resolve_do_not_migrate( + manifest_with_migrated_url_base_and_path_is_joined_to_url, +) -> None: + """ + This test is to check that the manifest remains migrated already, + after the `url_base` and `path` is joined to `url`. + """ + + resolved_manifest = resolver.preprocess_manifest( + manifest_with_migrated_url_base_and_path_is_joined_to_url + ) + migrated_manifest = ManifestMigrationHandler(dict(resolved_manifest)).apply_migrations() + + # it's expected that the manifest is the same after the processing + assert migrated_manifest == manifest_with_migrated_url_base_and_path_is_joined_to_url From d36bb0130706bc20a0a1fa4252f6489a64d9118c Mon Sep 17 00:00:00 2001 From: Oleksandr Bazarnov Date: Wed, 16 Apr 2025 14:09:45 +0300 Subject: [PATCH 02/17] updated structure --- .../manifest_migrations}/README.md | 0 .../manifest_migrations}/__init__.py | 0 .../manifest_migrations}/exceptions.py | 0 .../manifest_migrations}/manifest_migration.py | 0 .../manifest_migrations}/migration_handler.py | 6 +++--- .../manifest_migrations}/migrations/__init__.py | 0 .../migrations/http_requester_path_to_url_v6_45_2__1.py | 4 ++-- .../http_requester_url_base_to_url_v6_45_2__0.py | 2 +- .../manifest_migrations}/migrations_registry.py | 4 ++-- .../sources/declarative/manifest_declarative_source.py | 6 +++--- unit_tests/manifest_migrations/test_manifest_migration.py | 7 ++++--- 11 files changed, 15 insertions(+), 14 deletions(-) rename {manifest_migrations => airbyte_cdk/manifest_migrations}/README.md (100%) rename {manifest_migrations => airbyte_cdk/manifest_migrations}/__init__.py (100%) rename {manifest_migrations => airbyte_cdk/manifest_migrations}/exceptions.py (100%) rename {manifest_migrations => airbyte_cdk/manifest_migrations}/manifest_migration.py (100%) rename {manifest_migrations => airbyte_cdk/manifest_migrations}/migration_handler.py (91%) rename {manifest_migrations => airbyte_cdk/manifest_migrations}/migrations/__init__.py (100%) rename {manifest_migrations => airbyte_cdk/manifest_migrations}/migrations/http_requester_path_to_url_v6_45_2__1.py (96%) rename {manifest_migrations => airbyte_cdk/manifest_migrations}/migrations/http_requester_url_base_to_url_v6_45_2__0.py (93%) rename {manifest_migrations => airbyte_cdk/manifest_migrations}/migrations_registry.py (93%) diff --git a/manifest_migrations/README.md b/airbyte_cdk/manifest_migrations/README.md similarity index 100% rename from manifest_migrations/README.md rename to airbyte_cdk/manifest_migrations/README.md diff --git a/manifest_migrations/__init__.py b/airbyte_cdk/manifest_migrations/__init__.py similarity index 100% rename from manifest_migrations/__init__.py rename to airbyte_cdk/manifest_migrations/__init__.py diff --git a/manifest_migrations/exceptions.py b/airbyte_cdk/manifest_migrations/exceptions.py similarity index 100% rename from manifest_migrations/exceptions.py rename to airbyte_cdk/manifest_migrations/exceptions.py diff --git a/manifest_migrations/manifest_migration.py b/airbyte_cdk/manifest_migrations/manifest_migration.py similarity index 100% rename from manifest_migrations/manifest_migration.py rename to airbyte_cdk/manifest_migrations/manifest_migration.py diff --git a/manifest_migrations/migration_handler.py b/airbyte_cdk/manifest_migrations/migration_handler.py similarity index 91% rename from manifest_migrations/migration_handler.py rename to airbyte_cdk/manifest_migrations/migration_handler.py index 87ec33716..32746560b 100644 --- a/manifest_migrations/migration_handler.py +++ b/airbyte_cdk/manifest_migrations/migration_handler.py @@ -5,14 +5,14 @@ import copy from typing import Type -from manifest_migrations.exceptions import ( +from airbyte_cdk.manifest_migrations.exceptions import ( ManifestMigrationException, ) -from manifest_migrations.manifest_migration import ( +from airbyte_cdk.manifest_migrations.manifest_migration import ( ManifestMigration, ManifestType, ) -from manifest_migrations.migrations_registry import ( +from airbyte_cdk.manifest_migrations.migrations_registry import ( MIGRATIONS, ) diff --git a/manifest_migrations/migrations/__init__.py b/airbyte_cdk/manifest_migrations/migrations/__init__.py similarity index 100% rename from manifest_migrations/migrations/__init__.py rename to airbyte_cdk/manifest_migrations/migrations/__init__.py diff --git a/manifest_migrations/migrations/http_requester_path_to_url_v6_45_2__1.py b/airbyte_cdk/manifest_migrations/migrations/http_requester_path_to_url_v6_45_2__1.py similarity index 96% rename from manifest_migrations/migrations/http_requester_path_to_url_v6_45_2__1.py rename to airbyte_cdk/manifest_migrations/migrations/http_requester_path_to_url_v6_45_2__1.py index d0693ec43..9a7c1de9e 100644 --- a/manifest_migrations/migrations/http_requester_path_to_url_v6_45_2__1.py +++ b/airbyte_cdk/manifest_migrations/migrations/http_requester_path_to_url_v6_45_2__1.py @@ -1,11 +1,11 @@ from urllib.parse import urljoin -from airbyte_cdk.sources.types import EmptyString -from manifest_migrations.manifest_migration import ( +from airbyte_cdk.manifest_migrations.manifest_migration import ( TYPE_TAG, ManifestMigration, ManifestType, ) +from airbyte_cdk.sources.types import EmptyString class V_6_45_2_ManifestMigration_HttpRequesterPathToUrl(ManifestMigration): diff --git a/manifest_migrations/migrations/http_requester_url_base_to_url_v6_45_2__0.py b/airbyte_cdk/manifest_migrations/migrations/http_requester_url_base_to_url_v6_45_2__0.py similarity index 93% rename from manifest_migrations/migrations/http_requester_url_base_to_url_v6_45_2__0.py rename to airbyte_cdk/manifest_migrations/migrations/http_requester_url_base_to_url_v6_45_2__0.py index a269db9fa..842b6ad26 100644 --- a/manifest_migrations/migrations/http_requester_url_base_to_url_v6_45_2__0.py +++ b/airbyte_cdk/manifest_migrations/migrations/http_requester_url_base_to_url_v6_45_2__0.py @@ -1,4 +1,4 @@ -from manifest_migrations.manifest_migration import ( +from airbyte_cdk.manifest_migrations.manifest_migration import ( TYPE_TAG, ManifestMigration, ManifestType, diff --git a/manifest_migrations/migrations_registry.py b/airbyte_cdk/manifest_migrations/migrations_registry.py similarity index 93% rename from manifest_migrations/migrations_registry.py rename to airbyte_cdk/manifest_migrations/migrations_registry.py index 34a28e907..7c238af22 100644 --- a/manifest_migrations/migrations_registry.py +++ b/airbyte_cdk/manifest_migrations/migrations_registry.py @@ -5,8 +5,8 @@ import sys from typing import List, Type -import manifest_migrations.migrations as migrations_pkg -from manifest_migrations.manifest_migration import ( +import airbyte_cdk.manifest_migrations.migrations as migrations_pkg +from airbyte_cdk.manifest_migrations.manifest_migration import ( ManifestMigration, ) diff --git a/airbyte_cdk/sources/declarative/manifest_declarative_source.py b/airbyte_cdk/sources/declarative/manifest_declarative_source.py index 1520cb689..2478fdac6 100644 --- a/airbyte_cdk/sources/declarative/manifest_declarative_source.py +++ b/airbyte_cdk/sources/declarative/manifest_declarative_source.py @@ -15,6 +15,9 @@ from jsonschema.validators import validate from packaging.version import InvalidVersion, Version +from airbyte_cdk.manifest_migrations.migration_handler import ( + ManifestMigrationHandler, +) from airbyte_cdk.models import ( AirbyteConnectionStatus, AirbyteMessage, @@ -55,9 +58,6 @@ SliceLogger, ) from airbyte_cdk.utils.traced_exception import AirbyteTracedException -from manifest_migrations.migration_handler import ( - ManifestMigrationHandler, -) class ManifestDeclarativeSource(DeclarativeSource): diff --git a/unit_tests/manifest_migrations/test_manifest_migration.py b/unit_tests/manifest_migrations/test_manifest_migration.py index 5fa1ec7db..ae44358db 100644 --- a/unit_tests/manifest_migrations/test_manifest_migration.py +++ b/unit_tests/manifest_migrations/test_manifest_migration.py @@ -3,12 +3,13 @@ # +from airbyte_cdk.manifest_migrations.migration_handler import ( + ManifestMigrationHandler, +) +from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource from airbyte_cdk.sources.declarative.parsers.manifest_reference_resolver import ( ManifestReferenceResolver, ) -from manifest_migrations.migration_handler import ( - ManifestMigrationHandler, -) resolver = ManifestReferenceResolver() From 88f9e302227bcdd1442a1fd59747ab4b05945304 Mon Sep 17 00:00:00 2001 From: Oleksandr Bazarnov Date: Wed, 16 Apr 2025 14:13:59 +0300 Subject: [PATCH 03/17] removed __ManifestMigration_ from class names --- airbyte_cdk/manifest_migrations/manifest_migration.py | 4 ++-- .../migrations/http_requester_path_to_url_v6_45_2__1.py | 2 +- .../migrations/http_requester_url_base_to_url_v6_45_2__0.py | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/airbyte_cdk/manifest_migrations/manifest_migration.py b/airbyte_cdk/manifest_migrations/manifest_migration.py index 6e4b3bb2f..bbba9ff9b 100644 --- a/airbyte_cdk/manifest_migrations/manifest_migration.py +++ b/airbyte_cdk/manifest_migrations/manifest_migration.py @@ -121,7 +121,7 @@ def _get_migration_version(self) -> str: The migration version is extracted from the class name using a regular expression. The expected format is "V____". - For example, "V_6_45_2_ManifestMigration_HttpRequesterPathToUrl" -> "6.45.2" + For example, "V_6_45_2_HttpRequesterPathToUrl" -> "6.45.2" :return: The migration version as a string in the format "major.minor.patch" :raises ValueError: If the class name does not match the expected format @@ -133,5 +133,5 @@ def _get_migration_version(self) -> str: return migration_version.group(1).replace("_", ".") else: raise ValueError( - f"Invalid migration class name, make sure the class name has the version (e.g `V_0_0_0_`): {class_name}" + f"Invalid migration class name, make sure the class name has the version (e.g `V____`): {class_name}" ) diff --git a/airbyte_cdk/manifest_migrations/migrations/http_requester_path_to_url_v6_45_2__1.py b/airbyte_cdk/manifest_migrations/migrations/http_requester_path_to_url_v6_45_2__1.py index 9a7c1de9e..cb36a886c 100644 --- a/airbyte_cdk/manifest_migrations/migrations/http_requester_path_to_url_v6_45_2__1.py +++ b/airbyte_cdk/manifest_migrations/migrations/http_requester_path_to_url_v6_45_2__1.py @@ -8,7 +8,7 @@ from airbyte_cdk.sources.types import EmptyString -class V_6_45_2_ManifestMigration_HttpRequesterPathToUrl(ManifestMigration): +class V_6_45_2_HttpRequesterPathToUrl(ManifestMigration): """ This migration is responsible for migrating the `path` key to `url` in the HttpRequester component. The `path` key is expected to be a relative path, and the `url` key is expected to be a full URL. diff --git a/airbyte_cdk/manifest_migrations/migrations/http_requester_url_base_to_url_v6_45_2__0.py b/airbyte_cdk/manifest_migrations/migrations/http_requester_url_base_to_url_v6_45_2__0.py index 842b6ad26..34dfc41c9 100644 --- a/airbyte_cdk/manifest_migrations/migrations/http_requester_url_base_to_url_v6_45_2__0.py +++ b/airbyte_cdk/manifest_migrations/migrations/http_requester_url_base_to_url_v6_45_2__0.py @@ -5,7 +5,7 @@ ) -class V_6_45_2_ManifestMigration_HttpRequesterUrlBaseToUrl(ManifestMigration): +class V_6_45_2_HttpRequesterUrlBaseToUrl(ManifestMigration): """ This migration is responsible for migrating the `url_base` key to `url` in the HttpRequester component. The `url_base` key is expected to be a base URL, and the `url` key is expected to be a full URL. From ebc854d39b1b1b9f222e7d22519d273c87b31cc4 Mon Sep 17 00:00:00 2001 From: Oleksandr Bazarnov Date: Wed, 16 Apr 2025 14:51:41 +0300 Subject: [PATCH 04/17] updated version checks and error messaging --- .../manifest_migrations/manifest_migration.py | 26 ++++--------- .../manifest_migrations/migration_handler.py | 26 ++++++++++++- .../migrations_registry.py | 37 +++++++++++++++++-- 3 files changed, 66 insertions(+), 23 deletions(-) diff --git a/airbyte_cdk/manifest_migrations/manifest_migration.py b/airbyte_cdk/manifest_migrations/manifest_migration.py index bbba9ff9b..249d1fd6b 100644 --- a/airbyte_cdk/manifest_migrations/manifest_migration.py +++ b/airbyte_cdk/manifest_migrations/manifest_migration.py @@ -4,6 +4,8 @@ from abc import abstractmethod from typing import Any, Dict +from packaging.version import Version + ManifestType = Dict[str, Any] @@ -36,13 +38,13 @@ def migrate(self, manifest: ManifestType) -> None: """ @property - def migration_version(self) -> str: + def migration_version(self) -> Version: """ Get the migration version. - :return: The migration version as a string + :return: The migration version as a Version object """ - return self._get_migration_version() + return Version(self._get_migration_version()) def _is_component(self, obj: Dict[str, Any]) -> bool: """ @@ -53,7 +55,7 @@ def _is_component(self, obj: Dict[str, Any]) -> bool: """ return TYPE_TAG in obj.keys() - def _is_migratable(self, obj: Dict[str, Any]) -> bool: + def _is_migratable_type(self, obj: Dict[str, Any]) -> bool: """ Check if the object is a migratable component, based on the Type of the component and the migration version. @@ -61,10 +63,7 @@ def _is_migratable(self, obj: Dict[str, Any]) -> bool: :param obj: The object to check :return: True if the object is a migratable component, False otherwise """ - return ( - obj[TYPE_TAG] not in NON_MIGRATABLE_TYPES - and self._get_manifest_version(obj) <= self.migration_version - ) + return obj[TYPE_TAG] not in NON_MIGRATABLE_TYPES def _process_manifest(self, obj: Any) -> None: """ @@ -89,7 +88,7 @@ def _process_manifest(self, obj: Any) -> None: # Check if the object is a component if self._is_component(obj): # Check if the object is allowed to be migrated - if not self._is_migratable(obj): + if not self._is_migratable_type(obj): return # Check if the object should be migrated @@ -106,15 +105,6 @@ def _process_manifest(self, obj: Any) -> None: for item in obj: self._process_manifest(item) - def _get_manifest_version(self, manifest: ManifestType) -> str: - """ - Get the manifest version from the manifest. - - :param manifest: The manifest to get the version from - :return: The manifest version - """ - return str(manifest.get("version", "0.0.0")) - def _get_migration_version(self) -> str: """ Get the migration version from the class name. diff --git a/airbyte_cdk/manifest_migrations/migration_handler.py b/airbyte_cdk/manifest_migrations/migration_handler.py index 32746560b..b0f794b74 100644 --- a/airbyte_cdk/manifest_migrations/migration_handler.py +++ b/airbyte_cdk/manifest_migrations/migration_handler.py @@ -3,8 +3,11 @@ # import copy +import logging from typing import Type +from packaging.version import Version + from airbyte_cdk.manifest_migrations.exceptions import ( ManifestMigrationException, ) @@ -16,6 +19,8 @@ MIGRATIONS, ) +LOGGER = logging.getLogger("airbyte.cdk.manifest_migrations") + class ManifestMigrationHandler: """ @@ -25,6 +30,7 @@ class ManifestMigrationHandler: def __init__(self, manifest: ManifestType) -> None: self._manifest = manifest self._migrated_manifest: ManifestType = copy.deepcopy(self._manifest) + self._manifest_version: Version = self._get_manifest_version() def apply_migrations(self) -> ManifestType: """ @@ -57,6 +63,22 @@ def _handle_migration(self, migration_class: Type[ManifestMigration]) -> None: ManifestMigrationException: If the migration process encounters any errors. """ try: - migration_class()._process_manifest(self._migrated_manifest) + migration_instance = migration_class() + # check if the migration is supported for the given manifest version + if self._manifest_version <= migration_instance.migration_version: + migration_instance._process_manifest(self._migrated_manifest) + else: + LOGGER.info( + f"Manifest migration: `{migration_class.__name__}` is not supported for the given manifest version `{self._manifest_version}`.", + ) except Exception as e: - raise ManifestMigrationException(f"Failed to migrate the manifest: {e}") from e + raise ManifestMigrationException(str(e)) from e + + def _get_manifest_version(self) -> Version: + """ + Get the manifest version from the manifest. + + :param manifest: The manifest to get the version from + :return: The manifest version + """ + return Version(str(self._migrated_manifest.get("version", "0.0.0"))) diff --git a/airbyte_cdk/manifest_migrations/migrations_registry.py b/airbyte_cdk/manifest_migrations/migrations_registry.py index 7c238af22..9b8c4ada9 100644 --- a/airbyte_cdk/manifest_migrations/migrations_registry.py +++ b/airbyte_cdk/manifest_migrations/migrations_registry.py @@ -17,16 +17,45 @@ def _migration_order_key(cls: object) -> int: + """ + Determines the migration order key for a given migration class based on its module name. + + The function expects the module name to end with a double underscore followed by an integer (e.g., '__0'). + This integer is extracted and returned as the migration order key. + + Args: + cls (object): The migration class whose module name encodes the migration order. + + Returns: + int: The migration order extracted from the module name. + + Raises: + ValueError: If the module name does not contain the expected order suffix. + """ # Extract the migration order from the module name, e.g., http_requester_url_base_to_url_v6_45_2__0 # The order is the integer after the double underscore at the end of the module name module_name = cls.__module__.split(".")[-1] match = re.search(r"__(\d+)$", module_name) - return int(match.group(1)) if match else 0 + if match: + return int(match.group(1)) + else: + message = f"Migration `{cls.__module__}` doesn't have the `order` in the module name: {module_name}. Did you miss to add `__` to the module name?" + raise ValueError(message) def _discover_migrations() -> List[Type[ManifestMigration]]: + """ + Discovers and returns a sorted list of all ManifestMigration subclasses available in the migrations package. + This function inspects the main migrations package and its submodules to find all classes that are subclasses of ManifestMigration, + excluding the ManifestMigration base class itself and any duplicates. The discovered migration classes are then sorted using the + _migration_order_key function to ensure they are returned in the correct order. + + Returns: + List[Type[ManifestMigration]]: A list of discovered ManifestMigration subclasses, sorted by migration order. + """ + migration_classes = [] - for name, obj in inspect.getmembers(sys.modules[migrations_pkg.__name__], inspect.isclass): + for _, obj in inspect.getmembers(sys.modules[migrations_pkg.__name__], inspect.isclass): if ( issubclass(obj, ManifestMigration) and obj is not ManifestMigration @@ -37,7 +66,7 @@ def _discover_migrations() -> List[Type[ManifestMigration]]: for _, module_name, _ in pkgutil.iter_modules(migrations_pkg.__path__): module = sys.modules.get(f"{migrations_pkg.__name__}.{module_name}") if module: - for name, obj in inspect.getmembers(module, inspect.isclass): + for _, obj in inspect.getmembers(module, inspect.isclass): if ( issubclass(obj, ManifestMigration) and obj is not ManifestMigration @@ -47,7 +76,9 @@ def _discover_migrations() -> List[Type[ManifestMigration]]: # Sort by migration order key migration_classes.sort(key=_migration_order_key) + return migration_classes +# registered migrations MIGRATIONS: List[Type[ManifestMigration]] = _discover_migrations() From d30ae265fec17a5ecd2f073eafdfb706320d5328 Mon Sep 17 00:00:00 2001 From: Oleksandr Bazarnov Date: Wed, 16 Apr 2025 15:06:56 +0300 Subject: [PATCH 05/17] handled missing original key issue for path_to_url migration --- .../migrations/http_requester_path_to_url_v6_45_2__1.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte_cdk/manifest_migrations/migrations/http_requester_path_to_url_v6_45_2__1.py b/airbyte_cdk/manifest_migrations/migrations/http_requester_path_to_url_v6_45_2__1.py index cb36a886c..3454fe72d 100644 --- a/airbyte_cdk/manifest_migrations/migrations/http_requester_path_to_url_v6_45_2__1.py +++ b/airbyte_cdk/manifest_migrations/migrations/http_requester_path_to_url_v6_45_2__1.py @@ -25,7 +25,7 @@ def should_migrate(self, manifest: ManifestType) -> bool: ) def migrate(self, manifest: ManifestType) -> None: - original_key_value = manifest[self.original_key].lstrip("/") + original_key_value = manifest.get(self.original_key, EmptyString).lstrip("/") replacement_key_value = manifest[self.replacement_key] # return a full-url if provided directly from interpolation context From 56681806f0acb2139ed55c3d45ed329890a61211 Mon Sep 17 00:00:00 2001 From: Oleksandr Bazarnov Date: Thu, 17 Apr 2025 16:13:48 +0300 Subject: [PATCH 06/17] add __should_migrate flag handling --- .../connector_builder/connector_builder_handler.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/airbyte_cdk/connector_builder/connector_builder_handler.py b/airbyte_cdk/connector_builder/connector_builder_handler.py index 6b6b31111..8c92e5c69 100644 --- a/airbyte_cdk/connector_builder/connector_builder_handler.py +++ b/airbyte_cdk/connector_builder/connector_builder_handler.py @@ -55,6 +55,14 @@ def get_limits(config: Mapping[str, Any]) -> TestLimits: max_streams = command_config.get(MAX_STREAMS_KEY) or DEFAULT_MAXIMUM_STREAMS return TestLimits(max_records, max_pages_per_slice, max_slices, max_streams) +def should_migrate_manifest(config: Mapping[str, Any]) -> bool: + """ + Determines whether the manifest should be migrated, + based on the presence of the "__should_migrate" key in the config. + + This flag is set by the UI. + """ + return config.get("__should_migrate", False) def create_source(config: Mapping[str, Any], limits: TestLimits) -> ManifestDeclarativeSource: manifest = config["__injected_declarative_manifest"] @@ -62,6 +70,7 @@ def create_source(config: Mapping[str, Any], limits: TestLimits) -> ManifestDecl config=config, emit_connector_builder_messages=True, source_config=manifest, + migrate_manifest=should_migrate_manifest(config), component_factory=ModelToComponentFactory( emit_connector_builder_messages=True, limit_pages_fetched_per_slice=limits.max_pages_per_slice, From c4949345a17207eaabfb7b4a5bd53fed2d06d779 Mon Sep 17 00:00:00 2001 From: Oleksandr Bazarnov Date: Thu, 17 Apr 2025 16:14:21 +0300 Subject: [PATCH 07/17] formatted --- airbyte_cdk/connector_builder/connector_builder_handler.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/airbyte_cdk/connector_builder/connector_builder_handler.py b/airbyte_cdk/connector_builder/connector_builder_handler.py index 8c92e5c69..5f5fdfbb5 100644 --- a/airbyte_cdk/connector_builder/connector_builder_handler.py +++ b/airbyte_cdk/connector_builder/connector_builder_handler.py @@ -55,15 +55,17 @@ def get_limits(config: Mapping[str, Any]) -> TestLimits: max_streams = command_config.get(MAX_STREAMS_KEY) or DEFAULT_MAXIMUM_STREAMS return TestLimits(max_records, max_pages_per_slice, max_slices, max_streams) + def should_migrate_manifest(config: Mapping[str, Any]) -> bool: """ Determines whether the manifest should be migrated, based on the presence of the "__should_migrate" key in the config. - + This flag is set by the UI. """ return config.get("__should_migrate", False) + def create_source(config: Mapping[str, Any], limits: TestLimits) -> ManifestDeclarativeSource: manifest = config["__injected_declarative_manifest"] return ManifestDeclarativeSource( From 9e58f5b2cd242cca722e4fbc250a4b4744a36398 Mon Sep 17 00:00:00 2001 From: Oleksandr Bazarnov Date: Tue, 22 Apr 2025 11:48:01 +0300 Subject: [PATCH 08/17] correct readme.md --- airbyte_cdk/manifest_migrations/README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/airbyte_cdk/manifest_migrations/README.md b/airbyte_cdk/manifest_migrations/README.md index 47f13cb4c..843aaf351 100644 --- a/airbyte_cdk/manifest_migrations/README.md +++ b/airbyte_cdk/manifest_migrations/README.md @@ -12,8 +12,8 @@ This directory contains the logic and registry for manifest migrations in the Ai 2. **Define the Migration Class:** - The migration class must inherit from `ManifestMigration`. - - Name the class using the pattern: `V____ManifestMigration_`. - - Example: `V_6_45_2_ManifestMigration_HttpRequesterUrlBaseToUrl` + - Name the class using the pattern: `V____`. + - Example: `V_6_45_2_HttpRequesterUrlBaseToUrl` - Implement the following methods: - `should_migrate(self, manifest: ManifestType) -> bool`: Return `True` if the migration should be applied to the given manifest. - `migrate(self, manifest: ManifestType) -> None`: Perform the migration in-place. @@ -43,7 +43,7 @@ This directory contains the logic and registry for manifest migrations in the Ai ```python from airbyte_cdk.sources.declarative.migrations.manifest.manifest_migration import TYPE_TAG, ManifestMigration, ManifestType -class V_1_2_3_ManifestMigration_Example(ManifestMigration): +class V_1_2_3_Example(ManifestMigration): component_type = "ExampleComponent" original_key = "old_key" replacement_key = "new_key" From c6b31ccca94b18f17d369718a15a9e7310e103a4 Mon Sep 17 00:00:00 2001 From: Oleksandr Bazarnov Date: Tue, 22 Apr 2025 11:53:46 +0300 Subject: [PATCH 09/17] fixed the imports for readme.md --- airbyte_cdk/manifest_migrations/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte_cdk/manifest_migrations/README.md b/airbyte_cdk/manifest_migrations/README.md index 843aaf351..c5037dc98 100644 --- a/airbyte_cdk/manifest_migrations/README.md +++ b/airbyte_cdk/manifest_migrations/README.md @@ -41,7 +41,7 @@ This directory contains the logic and registry for manifest migrations in the Ai ## Example Migration Skeleton ```python -from airbyte_cdk.sources.declarative.migrations.manifest.manifest_migration import TYPE_TAG, ManifestMigration, ManifestType +from airbyte_cdk.manifest_migrations.manifest_migration import TYPE_TAG, ManifestMigration, ManifestType class V_1_2_3_Example(ManifestMigration): component_type = "ExampleComponent" From ab08a079f8fff8259ca5918ddd99430eae65536d Mon Sep 17 00:00:00 2001 From: Oleksandr Bazarnov Date: Tue, 22 Apr 2025 16:28:43 +0300 Subject: [PATCH 10/17] add request_body_* > request_body migration + unit test --- ...dy_json_data_to_request_body_v6_45_2__2.py | 28 + unit_tests/manifest_migrations/conftest.py | 540 ++++++++++++++++++ .../test_manifest_migration.py | 19 +- 3 files changed, 586 insertions(+), 1 deletion(-) create mode 100644 airbyte_cdk/manifest_migrations/migrations/http_requester_request_body_json_data_to_request_body_v6_45_2__2.py diff --git a/airbyte_cdk/manifest_migrations/migrations/http_requester_request_body_json_data_to_request_body_v6_45_2__2.py b/airbyte_cdk/manifest_migrations/migrations/http_requester_request_body_json_data_to_request_body_v6_45_2__2.py new file mode 100644 index 000000000..f5645aa88 --- /dev/null +++ b/airbyte_cdk/manifest_migrations/migrations/http_requester_request_body_json_data_to_request_body_v6_45_2__2.py @@ -0,0 +1,28 @@ +from airbyte_cdk.manifest_migrations.manifest_migration import ( + TYPE_TAG, + ManifestMigration, + ManifestType, +) + + +class V_6_45_2_HttpRequesterRequestBodyJsonDataToRequestBody(ManifestMigration): + """ + This migration is responsible for migrating the `url_base` key to `url` in the HttpRequester component. + The `url_base` key is expected to be a base URL, and the `url` key is expected to be a full URL. + The migration will copy the value of `url_base` to `url`. + """ + + component_type = "HttpRequester" + original_keys = ["request_body_json", "request_body_data"] + replacement_key = "request_body" + + def should_migrate(self, manifest: ManifestType) -> bool: + return manifest[TYPE_TAG] == self.component_type and any( + key in list(manifest.keys()) for key in self.original_keys + ) + + def migrate(self, manifest: ManifestType) -> None: + for key in self.original_keys: + if key in manifest: + manifest[self.replacement_key] = manifest[key] + manifest.pop(key, None) diff --git a/unit_tests/manifest_migrations/conftest.py b/unit_tests/manifest_migrations/conftest.py index 38cdbdfb1..0bd143c37 100644 --- a/unit_tests/manifest_migrations/conftest.py +++ b/unit_tests/manifest_migrations/conftest.py @@ -598,3 +598,543 @@ def manifest_with_migrated_url_base_and_path_is_joined_to_url() -> Dict[str, Any }, }, } + + +@pytest.fixture +def manifest_with_request_body_json_and_data_to_migrate_to_request_body() -> Dict[str, Any]: + return { + "version": "0.0.0", + "type": "DeclarativeSource", + "check": { + "type": "CheckStream", + "stream_names": ["A"], + }, + "definitions": { + "streams": { + "A": { + "type": "DeclarativeStream", + "name": "A", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "$ref": "#/definitions/requester_A", + "path": "/path_to_A", + "http_method": "GET", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": {"$ref": "#/schemas/A"}, + }, + }, + "B": { + "type": "DeclarativeStream", + "name": "B", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "$ref": "#/definitions/requester_A", + "path": "path_to_A", + "http_method": "GET", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": {"$ref": "#/schemas/B"}, + }, + }, + "C": { + "type": "DeclarativeStream", + "name": "C", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "$ref": "#/definitions/requester_B", + "path": "path_to_B", + "http_method": "GET", + # the `request_body_json` is expected to be migrated to the `request_body` key + "request_body_json": { + "reportType": "test_report", + "groupBy": "GROUP", + "metrics": "{{ ','.join( ['a-b','cd','e-f-g-h'] ) }}", + }, + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": {"$ref": "#/schemas/C"}, + }, + }, + "D": { + "type": "DeclarativeStream", + "name": "D", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "$ref": "#/definitions/requester_B", + # ! the double-slash is intentional here for the test. + "path": "//path_to_B", + "http_method": "GET", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": {"$ref": "#/schemas/D"}, + }, + }, + "E": { + "type": "DeclarativeStream", + "name": "E", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "$ref": "#/definitions/requester_B", + "path": "/path_to_B", + "http_method": "GET", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": {"$ref": "#/schemas/E"}, + }, + }, + }, + # both requesters have duplicated `url_base`, + # which should be migrated to `url` in the new format + # and the `url_base` and `path` key should be removed + "requester_A": { + "type": "HttpRequester", + "url_base": "https://example.com/v1/", + # this requester has a `request_body_json` key, + # to be migrated to the `request_body` key + "request_body_data": { + "test_key": "{{ config['config_key'] }}", + "test_key_2": "test_value_2", + "test_key_3": 123, + }, + }, + "requester_B": { + "type": "HttpRequester", + "url_base": "https://example.com/v2/", + # for this requester, the `request_body_json` key is not present, + # but the `request_body_data` key is present in the stream `C` itself. + # it should also be migrated to the `request_body` key + }, + }, + "streams": [ + {"$ref": "#/definitions/streams/A"}, + {"$ref": "#/definitions/streams/B"}, + {"$ref": "#/definitions/streams/C"}, + {"$ref": "#/definitions/streams/D"}, + {"$ref": "#/definitions/streams/E"}, + ], + "schemas": { + "A": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": { + "field_a1": { + "type": "string", + }, + }, + }, + "B": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": { + "field_b1": { + "type": "string", + }, + }, + }, + "C": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": { + "field_c1": { + "type": "string", + }, + }, + }, + "D": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": { + "field_d1": { + "type": "string", + }, + }, + }, + "E": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": { + "field_e1": { + "type": "string", + }, + }, + }, + }, + } + + +@pytest.fixture +def expected_manifest_with_migrated_to_request_body() -> Dict[str, Any]: + return { + "version": "0.0.0", + "type": "DeclarativeSource", + "check": {"type": "CheckStream", "stream_names": ["A"]}, + "definitions": { + "streams": { + "A": { + "type": "DeclarativeStream", + "name": "A", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "http_method": "GET", + "url": "https://example.com/v1/path_to_A", + "request_body": { + "test_key": "{{ config['config_key'] }}", + "test_key_2": "test_value_2", + "test_key_3": 123, + }, + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_a1": {"type": "string"}}, + }, + }, + }, + "B": { + "type": "DeclarativeStream", + "name": "B", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "http_method": "GET", + "url": "https://example.com/v1/path_to_A", + "request_body": { + "test_key": "{{ config['config_key'] }}", + "test_key_2": "test_value_2", + "test_key_3": 123, + }, + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_b1": {"type": "string"}}, + }, + }, + }, + "C": { + "type": "DeclarativeStream", + "name": "C", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "http_method": "GET", + "url": "https://example.com/v2/path_to_B", + "request_body": { + "reportType": "test_report", + "groupBy": "GROUP", + "metrics": "{{ ','.join( ['a-b','cd','e-f-g-h'] ) }}", + }, + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_c1": {"type": "string"}}, + }, + }, + }, + "D": { + "type": "DeclarativeStream", + "name": "D", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "http_method": "GET", + "url": "https://example.com/v2/path_to_B", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_d1": {"type": "string"}}, + }, + }, + }, + "E": { + "type": "DeclarativeStream", + "name": "E", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "http_method": "GET", + "url": "https://example.com/v2/path_to_B", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_e1": {"type": "string"}}, + }, + }, + }, + }, + "requester_A": { + "type": "HttpRequester", + "url": "https://example.com/v1/", + "request_body": { + "test_key": "{{ config['config_key'] }}", + "test_key_2": "test_value_2", + "test_key_3": 123, + }, + }, + "requester_B": {"type": "HttpRequester", "url": "https://example.com/v2/"}, + }, + "streams": [ + { + "type": "DeclarativeStream", + "name": "A", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "http_method": "GET", + "url": "https://example.com/v1/path_to_A", + "request_body": { + "test_key": "{{ config['config_key'] }}", + "test_key_2": "test_value_2", + "test_key_3": 123, + }, + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_a1": {"type": "string"}}, + }, + }, + }, + { + "type": "DeclarativeStream", + "name": "B", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "http_method": "GET", + "url": "https://example.com/v1/path_to_A", + "request_body": { + "test_key": "{{ config['config_key'] }}", + "test_key_2": "test_value_2", + "test_key_3": 123, + }, + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_b1": {"type": "string"}}, + }, + }, + }, + { + "type": "DeclarativeStream", + "name": "C", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "http_method": "GET", + "url": "https://example.com/v2/path_to_B", + "request_body": { + "reportType": "test_report", + "groupBy": "GROUP", + "metrics": "{{ ','.join( ['a-b','cd','e-f-g-h'] ) }}", + }, + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_c1": {"type": "string"}}, + }, + }, + }, + { + "type": "DeclarativeStream", + "name": "D", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "http_method": "GET", + "url": "https://example.com/v2/path_to_B", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_d1": {"type": "string"}}, + }, + }, + }, + { + "type": "DeclarativeStream", + "name": "E", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "http_method": "GET", + "url": "https://example.com/v2/path_to_B", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_e1": {"type": "string"}}, + }, + }, + }, + ], + "schemas": { + "A": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_a1": {"type": "string"}}, + }, + "B": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_b1": {"type": "string"}}, + }, + "C": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_c1": {"type": "string"}}, + }, + "D": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_d1": {"type": "string"}}, + }, + "E": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_e1": {"type": "string"}}, + }, + }, + } diff --git a/unit_tests/manifest_migrations/test_manifest_migration.py b/unit_tests/manifest_migrations/test_manifest_migration.py index ae44358db..abc68cdfc 100644 --- a/unit_tests/manifest_migrations/test_manifest_migration.py +++ b/unit_tests/manifest_migrations/test_manifest_migration.py @@ -14,7 +14,7 @@ resolver = ManifestReferenceResolver() -def test_manifest_resolve_migrate( +def test_manifest_resolve_migrate_url_base_to_url( manifest_with_url_base_to_migrate_to_url, expected_manifest_with_url_base_migrated_to_url, ) -> None: @@ -44,3 +44,20 @@ def test_manifest_resolve_do_not_migrate( # it's expected that the manifest is the same after the processing assert migrated_manifest == manifest_with_migrated_url_base_and_path_is_joined_to_url + + +def test_manifest_resolve_migrate_request_body_json_and_data_to_request_body( + manifest_with_request_body_json_and_data_to_migrate_to_request_body, + expected_manifest_with_migrated_to_request_body, +) -> None: + """ + This test is to check that the manifest is migrated correctly, + after the `request_body_json` and `request_body_data` are migrated to `request_body`. + """ + + resolved_manifest = resolver.preprocess_manifest( + manifest_with_request_body_json_and_data_to_migrate_to_request_body + ) + migrated_manifest = ManifestMigrationHandler(dict(resolved_manifest)).apply_migrations() + + assert migrated_manifest == expected_manifest_with_migrated_to_request_body From 0308817428bbfcc93cbf3a2e3e580d067aea3a90 Mon Sep 17 00:00:00 2001 From: Oleksandr Bazarnov Date: Tue, 22 Apr 2025 16:34:04 +0300 Subject: [PATCH 11/17] updated docstring --- ...ter_request_body_json_data_to_request_body_v6_45_2__2.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/airbyte_cdk/manifest_migrations/migrations/http_requester_request_body_json_data_to_request_body_v6_45_2__2.py b/airbyte_cdk/manifest_migrations/migrations/http_requester_request_body_json_data_to_request_body_v6_45_2__2.py index f5645aa88..59f8a0efe 100644 --- a/airbyte_cdk/manifest_migrations/migrations/http_requester_request_body_json_data_to_request_body_v6_45_2__2.py +++ b/airbyte_cdk/manifest_migrations/migrations/http_requester_request_body_json_data_to_request_body_v6_45_2__2.py @@ -7,9 +7,9 @@ class V_6_45_2_HttpRequesterRequestBodyJsonDataToRequestBody(ManifestMigration): """ - This migration is responsible for migrating the `url_base` key to `url` in the HttpRequester component. - The `url_base` key is expected to be a base URL, and the `url` key is expected to be a full URL. - The migration will copy the value of `url_base` to `url`. + This migration is responsible for migrating the `request_body_json` and `request_body_data` keys + to a unified `request_body` key in the HttpRequester component. + The migration will copy the value of either original key to `request_body` and remove the original key. """ component_type = "HttpRequester" From 38b7362b11b88bdc4e12902e772bf05792cdd0d3 Mon Sep 17 00:00:00 2001 From: Oleksandr Bazarnov Date: Wed, 23 Apr 2025 18:47:51 +0300 Subject: [PATCH 12/17] updated the structure to address the versioning and the order of the migrations registered, add the metadata.applied_migrations object to be able to trace the applied MigrationTrace --- airbyte_cdk/manifest_migrations/README.md | 73 ++++++----- airbyte_cdk/manifest_migrations/__init__.py | 3 + airbyte_cdk/manifest_migrations/exceptions.py | 2 +- .../manifest_migrations/manifest_migration.py | 75 ++++++----- .../manifest_migrations/migration_handler.py | 81 ++++++++++-- .../migrations/__init__.py | 4 + ..._2__1.py => http_requester_path_to_url.py} | 17 ++- ...request_body_json_data_to_request_body.py} | 15 ++- ...0.py => http_requester_url_base_to_url.py} | 17 ++- .../migrations/registry.yaml | 22 ++++ .../migrations_registry.py | 119 ++++++++---------- unit_tests/manifest_migrations/conftest.py | 44 ++++++- .../test_manifest_migration.py | 32 ++--- 13 files changed, 341 insertions(+), 163 deletions(-) rename airbyte_cdk/manifest_migrations/migrations/{http_requester_path_to_url_v6_45_2__1.py => http_requester_path_to_url.py} (78%) rename airbyte_cdk/manifest_migrations/migrations/{http_requester_request_body_json_data_to_request_body_v6_45_2__2.py => http_requester_request_body_json_data_to_request_body.py} (68%) rename airbyte_cdk/manifest_migrations/migrations/{http_requester_url_base_to_url_v6_45_2__0.py => http_requester_url_base_to_url.py} (64%) create mode 100644 airbyte_cdk/manifest_migrations/migrations/registry.yaml diff --git a/airbyte_cdk/manifest_migrations/README.md b/airbyte_cdk/manifest_migrations/README.md index c5037dc98..12ec41837 100644 --- a/airbyte_cdk/manifest_migrations/README.md +++ b/airbyte_cdk/manifest_migrations/README.md @@ -6,44 +6,53 @@ This directory contains the logic and registry for manifest migrations in the Ai 1. **Create a Migration File:** - Add a new Python file in the `migrations/` subdirectory. - - Name the file using the pattern: `_v____.py`. - - Example: `http_requester_url_base_to_url_v6_45_2__0.py` - - The `` integer is used to determine the order of migrations for the same version. + - Name the file using the pattern: `.py`. + - Example: `http_requester_url_base_to_url.py` + - The filename should be unique and descriptive. 2. **Define the Migration Class:** - The migration class must inherit from `ManifestMigration`. - - Name the class using the pattern: `V____`. - - Example: `V_6_45_2_HttpRequesterUrlBaseToUrl` + - Name the class using a descriptive name (e.g., `HttpRequesterUrlBaseToUrl`). - Implement the following methods: - - `should_migrate(self, manifest: ManifestType) -> bool`: Return `True` if the migration should be applied to the given manifest. - - `migrate(self, manifest: ManifestType) -> None`: Perform the migration in-place. - -3. **Migration Versioning:** - - The migration version is extracted from the class name and used to determine applicability. - - Only manifests with a version less than or equal to the migration version will be migrated. - -4. **Component Type:** - - Use the `TYPE_TAG` constant to check the component type in your migration logic. - -5. **Examples:** - - See `migrations/http_requester_url_base_to_url_v6_45_2__0.py` and `migrations/http_requester_path_to_url_v6_45_2__1.py` for reference implementations. - -## Migration Registry - -- All migration classes in the `migrations/` folder are automatically discovered and registered in `migrations_registry.py`. -- Migrations are applied in order, determined by the `` suffix in the filename. - -## Testing - -- Ensure your migration is covered by unit tests. -- Tests should verify both `should_migrate` and `migrate` behaviors. + - `should_migrate(self, manifest: ManifestType) -> bool` + - `migrate(self, manifest: ManifestType) -> None` + - `validate(self, manifest: ManifestType) -> bool` + +3. **Register the Migration:** + - Open `migrations/registry.yaml`. + - Add an entry under the appropriate version, or create a new version section if needed. + - Each migration entry should include: + - `name`: The filename (without `.py`) + - `order`: The order in which this migration should be applied for the version + - `description`: A short description of the migration + + Example: + ```yaml + manifest_migrations: + - version: 6.45.2 + migrations: + - name: http_requester_url_base_to_url + order: 1 + description: | + This migration updates the `url_base` field in the `HttpRequester` component spec to `url`. + ``` + +4. **Testing:** + - Ensure your migration is covered by unit tests. + - Tests should verify both `should_migrate`, `migrate`, and `validate` behaviors. + +## Migration Discovery + +- Migrations are discovered and registered automatically based on the entries in `migrations/registry.yaml`. +- Do not modify the migration registry in code manually. +- If you need to skip certain component types, use the `NON_MIGRATABLE_TYPES` list in `manifest_migration.py`. ## Example Migration Skeleton ```python from airbyte_cdk.manifest_migrations.manifest_migration import TYPE_TAG, ManifestMigration, ManifestType -class V_1_2_3_Example(ManifestMigration): +class ExampleMigration(ManifestMigration): component_type = "ExampleComponent" original_key = "old_key" replacement_key = "new_key" @@ -54,12 +63,10 @@ class V_1_2_3_Example(ManifestMigration): def migrate(self, manifest: ManifestType) -> None: manifest[self.replacement_key] = manifest[self.original_key] manifest.pop(self.original_key, None) -``` -## Additional Notes - -- Do not modify the migration registry manually; it will pick up all valid migration classes automatically. -- If you need to skip certain component types, use the `NON_MIGRATABLE_TYPES` list in `manifest_migration.py`. + def validate(self, manifest: ManifestType) -> bool: + return self.replacement_key in manifest and self.original_key not in manifest +``` --- diff --git a/airbyte_cdk/manifest_migrations/__init__.py b/airbyte_cdk/manifest_migrations/__init__.py index e69de29bb..2acb8555b 100644 --- a/airbyte_cdk/manifest_migrations/__init__.py +++ b/airbyte_cdk/manifest_migrations/__init__.py @@ -0,0 +1,3 @@ +# +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. +# diff --git a/airbyte_cdk/manifest_migrations/exceptions.py b/airbyte_cdk/manifest_migrations/exceptions.py index 7a140706f..43c88a334 100644 --- a/airbyte_cdk/manifest_migrations/exceptions.py +++ b/airbyte_cdk/manifest_migrations/exceptions.py @@ -1,5 +1,5 @@ # -# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. # diff --git a/airbyte_cdk/manifest_migrations/manifest_migration.py b/airbyte_cdk/manifest_migrations/manifest_migration.py index 249d1fd6b..70952cfe9 100644 --- a/airbyte_cdk/manifest_migrations/manifest_migration.py +++ b/airbyte_cdk/manifest_migrations/manifest_migration.py @@ -1,10 +1,11 @@ -# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +# +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. +# -import re -from abc import abstractmethod -from typing import Any, Dict -from packaging.version import Version +from abc import ABC, abstractmethod +from dataclasses import asdict, dataclass +from typing import Any, Dict ManifestType = Dict[str, Any] @@ -12,18 +13,45 @@ TYPE_TAG = "type" NON_MIGRATABLE_TYPES = [ + # more info here: https://github.com/airbytehq/airbyte-internal-issues/issues/12423 "DynamicDeclarativeStream", ] -class ManifestMigration: +@dataclass +class MigrationTrace: + """ + This class represents a migration that has been applied to the manifest. + It contains information about the migration, including the version it was applied from, + the version it was applied to, and the time it was applied. + """ + + from_version: str + to_version: str + migration: str + migrated_at: str + + def as_dict(self) -> dict: + return asdict(self) + + +class ManifestMigration(ABC): + """ + Base class for manifest migrations. + This class provides a framework for migrating manifest components. + It defines the structure for migration classes, including methods for checking if a migration is needed, + performing the migration, and validating the migration. + """ + + def __init__(self) -> None: + self.is_migrated: bool = False + @abstractmethod def should_migrate(self, manifest: ManifestType) -> bool: """ Check if the manifest should be migrated. :param manifest: The manifest to potentially migrate - :param kwargs: Additional arguments for migration :return: true if the manifest is of the expected format and should be migrated. False otherwise. """ @@ -34,17 +62,15 @@ def migrate(self, manifest: ManifestType) -> None: Migrate the manifest. Assumes should_migrate(manifest) returned True. :param manifest: The manifest to migrate - :param kwargs: Additional arguments for migration """ - @property - def migration_version(self) -> Version: + @abstractmethod + def validate(self, manifest: ManifestType) -> bool: """ - Get the migration version. + Validate the manifest to ensure the migration was successfully applied. - :return: The migration version as a Version object + :param manifest: The manifest to validate """ - return Version(self._get_migration_version()) def _is_component(self, obj: Dict[str, Any]) -> bool: """ @@ -95,6 +121,8 @@ def _process_manifest(self, obj: Any) -> None: if self.should_migrate(obj): # Perform the migration, if needed self.migrate(obj) + # validate the migration + self.is_migrated = self.validate(obj) # Process all values in the dictionary for value in list(obj.values()): @@ -104,24 +132,3 @@ def _process_manifest(self, obj: Any) -> None: # Process all items in the list for item in obj: self._process_manifest(item) - - def _get_migration_version(self) -> str: - """ - Get the migration version from the class name. - The migration version is extracted from the class name using a regular expression. - The expected format is "V____". - - For example, "V_6_45_2_HttpRequesterPathToUrl" -> "6.45.2" - - :return: The migration version as a string in the format "major.minor.patch" - :raises ValueError: If the class name does not match the expected format - """ - - class_name = self.__class__.__name__ - migration_version = re.search(r"V_(\d+_\d+_\d+)", class_name) - if migration_version: - return migration_version.group(1).replace("_", ".") - else: - raise ValueError( - f"Invalid migration class name, make sure the class name has the version (e.g `V____`): {class_name}" - ) diff --git a/airbyte_cdk/manifest_migrations/migration_handler.py b/airbyte_cdk/manifest_migrations/migration_handler.py index b0f794b74..ffbe97028 100644 --- a/airbyte_cdk/manifest_migrations/migration_handler.py +++ b/airbyte_cdk/manifest_migrations/migration_handler.py @@ -1,9 +1,11 @@ # -# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. # + import copy import logging +from datetime import datetime, timezone from typing import Type from packaging.version import Version @@ -14,11 +16,16 @@ from airbyte_cdk.manifest_migrations.manifest_migration import ( ManifestMigration, ManifestType, + MigrationTrace, ) from airbyte_cdk.manifest_migrations.migrations_registry import ( - MIGRATIONS, + MANIFEST_MIGRATIONS, ) +METADATA_TAG = "metadata" +MANIFEST_VERSION_TAG = "version" +APPLIED_MIGRATIONS_TAG = "applied_migrations" + LOGGER = logging.getLogger("airbyte.cdk.manifest_migrations") @@ -30,7 +37,6 @@ class ManifestMigrationHandler: def __init__(self, manifest: ManifestType) -> None: self._manifest = manifest self._migrated_manifest: ManifestType = copy.deepcopy(self._manifest) - self._manifest_version: Version = self._get_manifest_version() def apply_migrations(self) -> ManifestType: """ @@ -45,14 +51,21 @@ def apply_migrations(self) -> ManifestType: manifest if any migration failed. """ try: - for migration_cls in MIGRATIONS: - self._handle_migration(migration_cls) + manifest_version = self._get_manifest_version() + for migration_version, migrations in MANIFEST_MIGRATIONS.items(): + for migration_cls in migrations: + self._handle_migration(migration_cls, manifest_version, migration_version) return self._migrated_manifest except ManifestMigrationException: # if any errors occur we return the original resolved manifest return self._manifest - def _handle_migration(self, migration_class: Type[ManifestMigration]) -> None: + def _handle_migration( + self, + migration_class: Type[ManifestMigration], + manifest_version: str, + migration_version: str, + ) -> None: """ Handles a single manifest migration by instantiating the migration class and processing the manifest. @@ -64,21 +77,67 @@ def _handle_migration(self, migration_class: Type[ManifestMigration]) -> None: """ try: migration_instance = migration_class() - # check if the migration is supported for the given manifest version - if self._manifest_version <= migration_instance.migration_version: + if self._version_is_valid_for_migration(manifest_version, migration_version): migration_instance._process_manifest(self._migrated_manifest) + if migration_instance.is_migrated: + # set the updated manifest version, after migration has been applied + self._set_manifest_version(migration_version) + # set the migration trace + self._set_migration_trace(migration_class, manifest_version, migration_version) else: LOGGER.info( - f"Manifest migration: `{migration_class.__name__}` is not supported for the given manifest version `{self._manifest_version}`.", + f"Manifest migration: `{migration_instance.__name__}` is not supported for the given manifest version `{manifest_version}`.", ) except Exception as e: raise ManifestMigrationException(str(e)) from e - def _get_manifest_version(self) -> Version: + def _get_manifest_version(self) -> str: """ Get the manifest version from the manifest. :param manifest: The manifest to get the version from :return: The manifest version """ - return Version(str(self._migrated_manifest.get("version", "0.0.0"))) + return str(self._migrated_manifest.get(MANIFEST_VERSION_TAG, "0.0.0")) + + def _version_is_valid_for_migration( + self, manifest_version: str, migration_version: str + ) -> bool: + return Version(manifest_version) <= Version(migration_version) + + def _set_manifest_version(self, version: str) -> None: + """ + Set the manifest version in the manifest. + + :param version: The version to set + """ + self._migrated_manifest[MANIFEST_VERSION_TAG] = version + + def _set_migration_trace( + self, + migration_instance: Type[ManifestMigration], + manifest_version: str, + migration_version: str, + ) -> None: + """ + Set the migration trace in the manifest. + + :param migration_instance: The migration instance to set + :param manifest_version: The manifest version before migration + :param migration_version: The manifest version after migration + """ + + if METADATA_TAG not in self._migrated_manifest: + self._migrated_manifest[METADATA_TAG] = {} + if APPLIED_MIGRATIONS_TAG not in self._migrated_manifest[METADATA_TAG]: + self._migrated_manifest[METADATA_TAG][APPLIED_MIGRATIONS_TAG] = [] + + migration_trace = MigrationTrace( + from_version=manifest_version, + to_version=migration_version, + migration=migration_instance.__name__, + migrated_at=datetime.now(tz=timezone.utc).isoformat(), + ).as_dict() + + if migration_version not in self._migrated_manifest[METADATA_TAG][APPLIED_MIGRATIONS_TAG]: + self._migrated_manifest[METADATA_TAG][APPLIED_MIGRATIONS_TAG].append(migration_trace) diff --git a/airbyte_cdk/manifest_migrations/migrations/__init__.py b/airbyte_cdk/manifest_migrations/migrations/__init__.py index e69de29bb..5a567c032 100644 --- a/airbyte_cdk/manifest_migrations/migrations/__init__.py +++ b/airbyte_cdk/manifest_migrations/migrations/__init__.py @@ -0,0 +1,4 @@ +# +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. +# + diff --git a/airbyte_cdk/manifest_migrations/migrations/http_requester_path_to_url_v6_45_2__1.py b/airbyte_cdk/manifest_migrations/migrations/http_requester_path_to_url.py similarity index 78% rename from airbyte_cdk/manifest_migrations/migrations/http_requester_path_to_url_v6_45_2__1.py rename to airbyte_cdk/manifest_migrations/migrations/http_requester_path_to_url.py index 3454fe72d..12d7d2b75 100644 --- a/airbyte_cdk/manifest_migrations/migrations/http_requester_path_to_url_v6_45_2__1.py +++ b/airbyte_cdk/manifest_migrations/migrations/http_requester_path_to_url.py @@ -1,3 +1,8 @@ +# +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. +# + + from urllib.parse import urljoin from airbyte_cdk.manifest_migrations.manifest_migration import ( @@ -8,7 +13,7 @@ from airbyte_cdk.sources.types import EmptyString -class V_6_45_2_HttpRequesterPathToUrl(ManifestMigration): +class HttpRequesterPathToUrl(ManifestMigration): """ This migration is responsible for migrating the `path` key to `url` in the HttpRequester component. The `path` key is expected to be a relative path, and the `url` key is expected to be a full URL. @@ -40,3 +45,13 @@ def migrate(self, manifest: ManifestType) -> None: manifest[self.replacement_key] = urljoin(replacement_key_value, original_key_value) manifest.pop(self.original_key, None) + + def validate(self, manifest: ManifestType) -> bool: + """ + Validate the migration by checking if the `url` key is present and the `path` key is not. + """ + return ( + self.replacement_key in manifest + and self.original_key not in manifest + and manifest[self.replacement_key] is not None + ) diff --git a/airbyte_cdk/manifest_migrations/migrations/http_requester_request_body_json_data_to_request_body_v6_45_2__2.py b/airbyte_cdk/manifest_migrations/migrations/http_requester_request_body_json_data_to_request_body.py similarity index 68% rename from airbyte_cdk/manifest_migrations/migrations/http_requester_request_body_json_data_to_request_body_v6_45_2__2.py rename to airbyte_cdk/manifest_migrations/migrations/http_requester_request_body_json_data_to_request_body.py index 59f8a0efe..cab6e6747 100644 --- a/airbyte_cdk/manifest_migrations/migrations/http_requester_request_body_json_data_to_request_body_v6_45_2__2.py +++ b/airbyte_cdk/manifest_migrations/migrations/http_requester_request_body_json_data_to_request_body.py @@ -1,3 +1,8 @@ +# +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. +# + + from airbyte_cdk.manifest_migrations.manifest_migration import ( TYPE_TAG, ManifestMigration, @@ -5,7 +10,7 @@ ) -class V_6_45_2_HttpRequesterRequestBodyJsonDataToRequestBody(ManifestMigration): +class HttpRequesterRequestBodyJsonDataToRequestBody(ManifestMigration): """ This migration is responsible for migrating the `request_body_json` and `request_body_data` keys to a unified `request_body` key in the HttpRequester component. @@ -26,3 +31,11 @@ def migrate(self, manifest: ManifestType) -> None: if key in manifest: manifest[self.replacement_key] = manifest[key] manifest.pop(key, None) + + def validate(self, manifest: ManifestType) -> bool: + """ + Validate the migration by checking if the `request_body` key is present and none of the original keys are. + """ + return self.replacement_key in manifest and all( + key not in manifest for key in self.original_keys + ) diff --git a/airbyte_cdk/manifest_migrations/migrations/http_requester_url_base_to_url_v6_45_2__0.py b/airbyte_cdk/manifest_migrations/migrations/http_requester_url_base_to_url.py similarity index 64% rename from airbyte_cdk/manifest_migrations/migrations/http_requester_url_base_to_url_v6_45_2__0.py rename to airbyte_cdk/manifest_migrations/migrations/http_requester_url_base_to_url.py index 34dfc41c9..14ffa4141 100644 --- a/airbyte_cdk/manifest_migrations/migrations/http_requester_url_base_to_url_v6_45_2__0.py +++ b/airbyte_cdk/manifest_migrations/migrations/http_requester_url_base_to_url.py @@ -1,3 +1,8 @@ +# +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. +# + + from airbyte_cdk.manifest_migrations.manifest_migration import ( TYPE_TAG, ManifestMigration, @@ -5,7 +10,7 @@ ) -class V_6_45_2_HttpRequesterUrlBaseToUrl(ManifestMigration): +class HttpRequesterUrlBaseToUrl(ManifestMigration): """ This migration is responsible for migrating the `url_base` key to `url` in the HttpRequester component. The `url_base` key is expected to be a base URL, and the `url` key is expected to be a full URL. @@ -24,3 +29,13 @@ def should_migrate(self, manifest: ManifestType) -> bool: def migrate(self, manifest: ManifestType) -> None: manifest[self.replacement_key] = manifest[self.original_key] manifest.pop(self.original_key, None) + + def validate(self, manifest: ManifestType) -> bool: + """ + Validate the migration by checking if the `url` key is present and the `url_base` key is not. + """ + return ( + self.replacement_key in manifest + and self.original_key not in manifest + and manifest[self.replacement_key] is not None + ) diff --git a/airbyte_cdk/manifest_migrations/migrations/registry.yaml b/airbyte_cdk/manifest_migrations/migrations/registry.yaml new file mode 100644 index 000000000..e0cf5510c --- /dev/null +++ b/airbyte_cdk/manifest_migrations/migrations/registry.yaml @@ -0,0 +1,22 @@ +# +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. +# + +manifest_migrations: + - version: 6.45.2 + migrations: + - name: http_requester_url_base_to_url + order: 1 + description: | + This migration updates the `url_base` field in the `http_requester` spec to `url`. + The `url_base` field is deprecated and will be removed in a future version. + - name: http_requester_path_to_url + order: 2 + description: | + This migration updates the `path` field in the `http_requester` spec to `url`. + The `path` field is deprecated and will be removed in a future version. + - name: http_requester_request_body_json_data_to_request_body + order: 3 + description: | + This migration updates the `request_body_json_data` field in the `http_requester` spec to `request_body`. + The `request_body_json_data` field is deprecated and will be removed in a future version. diff --git a/airbyte_cdk/manifest_migrations/migrations_registry.py b/airbyte_cdk/manifest_migrations/migrations_registry.py index 9b8c4ada9..994d86372 100644 --- a/airbyte_cdk/manifest_migrations/migrations_registry.py +++ b/airbyte_cdk/manifest_migrations/migrations_registry.py @@ -1,84 +1,75 @@ +# +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. +# + + import importlib import inspect -import pkgutil -import re -import sys -from typing import List, Type +import os +from pathlib import Path +from typing import Dict, List, Type + +import yaml -import airbyte_cdk.manifest_migrations.migrations as migrations_pkg from airbyte_cdk.manifest_migrations.manifest_migration import ( ManifestMigration, ) -# Dynamically import all modules in the migrations package -for _, module_name, is_pkg in pkgutil.iter_modules(migrations_pkg.__path__): - if not is_pkg: - importlib.import_module(f"{migrations_pkg.__name__}.{module_name}") - - -def _migration_order_key(cls: object) -> int: - """ - Determines the migration order key for a given migration class based on its module name. - - The function expects the module name to end with a double underscore followed by an integer (e.g., '__0'). - This integer is extracted and returned as the migration order key. +DiscoveredMigrations = Dict[str, List[Type[ManifestMigration]]] - Args: - cls (object): The migration class whose module name encodes the migration order. +MIGRATIONS_PATH = Path(__file__).parent / "migrations" +REGISTRY_PATH = MIGRATIONS_PATH / "registry.yaml" - Returns: - int: The migration order extracted from the module name. - Raises: - ValueError: If the module name does not contain the expected order suffix. +def _find_migration_module(name: str) -> str | None: """ - # Extract the migration order from the module name, e.g., http_requester_url_base_to_url_v6_45_2__0 - # The order is the integer after the double underscore at the end of the module name - module_name = cls.__module__.split(".")[-1] - match = re.search(r"__(\d+)$", module_name) - if match: - return int(match.group(1)) - else: - message = f"Migration `{cls.__module__}` doesn't have the `order` in the module name: {module_name}. Did you miss to add `__` to the module name?" - raise ValueError(message) - - -def _discover_migrations() -> List[Type[ManifestMigration]]: + Finds the migration module by name in the migrations directory. + The name should match the file name of the migration module (without the .py extension). + Raises ImportError if the module is not found. """ - Discovers and returns a sorted list of all ManifestMigration subclasses available in the migrations package. - This function inspects the main migrations package and its submodules to find all classes that are subclasses of ManifestMigration, - excluding the ManifestMigration base class itself and any duplicates. The discovered migration classes are then sorted using the - _migration_order_key function to ensure they are returned in the correct order. + try: + for migration_file in os.listdir(MIGRATIONS_PATH): + migration_name = name + ".py" + if migration_file == migration_name: + return migration_file.replace(".py", "") + except ImportError as e: + raise ImportError(f"Migration module '{name}' not found in {MIGRATIONS_PATH}.") from e - Returns: - List[Type[ManifestMigration]]: A list of discovered ManifestMigration subclasses, sorted by migration order. - """ - migration_classes = [] - for _, obj in inspect.getmembers(sys.modules[migrations_pkg.__name__], inspect.isclass): - if ( - issubclass(obj, ManifestMigration) - and obj is not ManifestMigration - and obj not in migration_classes - ): - migration_classes.append(obj) +def _get_migration_class(module) -> type: + """ + Returns the ManifestMigration subclass defined in the module. + """ + for _, obj in inspect.getmembers(module, inspect.isclass): + if issubclass(obj, ManifestMigration): + return obj - for _, module_name, _ in pkgutil.iter_modules(migrations_pkg.__path__): - module = sys.modules.get(f"{migrations_pkg.__name__}.{module_name}") - if module: - for _, obj in inspect.getmembers(module, inspect.isclass): - if ( - issubclass(obj, ManifestMigration) - and obj is not ManifestMigration - and obj not in migration_classes - ): - migration_classes.append(obj) + raise ImportError(f"No ManifestMigration subclass found in module {module.__name__}.") - # Sort by migration order key - migration_classes.sort(key=_migration_order_key) - return migration_classes +def _discover_migrations() -> DiscoveredMigrations: + """ + Discovers and returns a list of ManifestMigration subclasses in the order specified by registry.yaml. + """ + with open(REGISTRY_PATH, "r") as f: + registry = yaml.safe_load(f) + migrations = {} + # Iterate through the registry and import the migration classes + # based on the version and order specified in the registry.yaml + for version_entry in registry.get("manifest_migrations", []): + migration_version = version_entry.get("version", "0.0.0") + if not migration_version in migrations: + migrations[migration_version] = [] + + for migration in sorted(version_entry.get("migrations", []), key=lambda m: m["order"]): + module = importlib.import_module( + f"airbyte_cdk.manifest_migrations.migrations.{_find_migration_module(migration['name'])}" + ) + migration_class = _get_migration_class(module) + migrations[migration_version].append(migration_class) + + return migrations # registered migrations -MIGRATIONS: List[Type[ManifestMigration]] = _discover_migrations() +MANIFEST_MIGRATIONS: DiscoveredMigrations = _discover_migrations() diff --git a/unit_tests/manifest_migrations/conftest.py b/unit_tests/manifest_migrations/conftest.py index 0bd143c37..2ea89a789 100644 --- a/unit_tests/manifest_migrations/conftest.py +++ b/unit_tests/manifest_migrations/conftest.py @@ -197,7 +197,7 @@ def manifest_with_url_base_to_migrate_to_url() -> Dict[str, Any]: @pytest.fixture def expected_manifest_with_url_base_migrated_to_url() -> Dict[str, Any]: return { - "version": "0.0.0", + "version": "6.45.2", "type": "DeclarativeSource", "check": {"type": "CheckStream", "stream_names": ["A"]}, "definitions": { @@ -490,13 +490,29 @@ def expected_manifest_with_url_base_migrated_to_url() -> Dict[str, Any]: "properties": {"field_e1": {"type": "string"}}, }, }, + "metadata": { + "applied_migrations": [ + { + "from_version": "0.0.0", + "to_version": "6.45.2", + "migration": "HttpRequesterUrlBaseToUrl", + "migrated_at": "2025-04-01T00:00:00+00:00", # time freezed in the test + }, + { + "from_version": "0.0.0", + "to_version": "6.45.2", + "migration": "HttpRequesterPathToUrl", + "migrated_at": "2025-04-01T00:00:00+00:00", # time freezed in the test + }, + ] + }, } @pytest.fixture def manifest_with_migrated_url_base_and_path_is_joined_to_url() -> Dict[str, Any]: return { - "version": "0.0.0", + "version": "6.45.3", "type": "DeclarativeSource", "check": {"type": "CheckStream", "stream_names": ["A"]}, "definitions": {}, @@ -806,7 +822,7 @@ def manifest_with_request_body_json_and_data_to_migrate_to_request_body() -> Dic @pytest.fixture def expected_manifest_with_migrated_to_request_body() -> Dict[str, Any]: return { - "version": "0.0.0", + "version": "6.45.2", "type": "DeclarativeSource", "check": {"type": "CheckStream", "stream_names": ["A"]}, "definitions": { @@ -1137,4 +1153,26 @@ def expected_manifest_with_migrated_to_request_body() -> Dict[str, Any]: "properties": {"field_e1": {"type": "string"}}, }, }, + "metadata": { + "applied_migrations": [ + { + "from_version": "0.0.0", + "to_version": "6.45.2", + "migration": "HttpRequesterUrlBaseToUrl", + "migrated_at": "2025-04-01T00:00:00+00:00", + }, + { + "from_version": "0.0.0", + "to_version": "6.45.2", + "migration": "HttpRequesterPathToUrl", + "migrated_at": "2025-04-01T00:00:00+00:00", + }, + { + "from_version": "0.0.0", + "to_version": "6.45.2", + "migration": "HttpRequesterRequestBodyJsonDataToRequestBody", + "migrated_at": "2025-04-01T00:00:00+00:00", + }, + ] + }, } diff --git a/unit_tests/manifest_migrations/test_manifest_migration.py b/unit_tests/manifest_migrations/test_manifest_migration.py index abc68cdfc..6fba8da5e 100644 --- a/unit_tests/manifest_migrations/test_manifest_migration.py +++ b/unit_tests/manifest_migrations/test_manifest_migration.py @@ -2,6 +2,7 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # +from freezegun import freeze_time from airbyte_cdk.manifest_migrations.migration_handler import ( ManifestMigrationHandler, @@ -14,6 +15,7 @@ resolver = ManifestReferenceResolver() +@freeze_time("2025-04-01") def test_manifest_resolve_migrate_url_base_to_url( manifest_with_url_base_to_migrate_to_url, expected_manifest_with_url_base_migrated_to_url, @@ -29,35 +31,37 @@ def test_manifest_resolve_migrate_url_base_to_url( assert migrated_manifest == expected_manifest_with_url_base_migrated_to_url -def test_manifest_resolve_do_not_migrate( - manifest_with_migrated_url_base_and_path_is_joined_to_url, +@freeze_time("2025-04-01") +def test_manifest_resolve_migrate_request_body_json_and_data_to_request_body( + manifest_with_request_body_json_and_data_to_migrate_to_request_body, + expected_manifest_with_migrated_to_request_body, ) -> None: """ - This test is to check that the manifest remains migrated already, - after the `url_base` and `path` is joined to `url`. + This test is to check that the manifest is migrated correctly, + after the `request_body_json` and `request_body_data` are migrated to `request_body`. """ resolved_manifest = resolver.preprocess_manifest( - manifest_with_migrated_url_base_and_path_is_joined_to_url + manifest_with_request_body_json_and_data_to_migrate_to_request_body ) migrated_manifest = ManifestMigrationHandler(dict(resolved_manifest)).apply_migrations() - # it's expected that the manifest is the same after the processing - assert migrated_manifest == manifest_with_migrated_url_base_and_path_is_joined_to_url + assert migrated_manifest == expected_manifest_with_migrated_to_request_body -def test_manifest_resolve_migrate_request_body_json_and_data_to_request_body( - manifest_with_request_body_json_and_data_to_migrate_to_request_body, - expected_manifest_with_migrated_to_request_body, +@freeze_time("2025-04-01") +def test_manifest_resolve_do_not_migrate( + manifest_with_migrated_url_base_and_path_is_joined_to_url, ) -> None: """ - This test is to check that the manifest is migrated correctly, - after the `request_body_json` and `request_body_data` are migrated to `request_body`. + This test is to check that the manifest remains migrated already, + after the `url_base` and `path` is joined to `url`. """ resolved_manifest = resolver.preprocess_manifest( - manifest_with_request_body_json_and_data_to_migrate_to_request_body + manifest_with_migrated_url_base_and_path_is_joined_to_url ) migrated_manifest = ManifestMigrationHandler(dict(resolved_manifest)).apply_migrations() - assert migrated_manifest == expected_manifest_with_migrated_to_request_body + # it's expected that the manifest is the same after the processing, because the manifest version is higher. + assert migrated_manifest == manifest_with_migrated_url_base_and_path_is_joined_to_url From c3ee5144af9f18f1f46a77eb7dcb265d66a06211 Mon Sep 17 00:00:00 2001 From: Oleksandr Bazarnov Date: Wed, 23 Apr 2025 19:12:44 +0300 Subject: [PATCH 13/17] fixed linters issues --- .../manifest_migrations/manifest_migration.py | 2 +- .../manifest_migrations/migration_handler.py | 30 +++++++++++++++---- .../migrations_registry.py | 21 ++++++------- 3 files changed, 37 insertions(+), 16 deletions(-) diff --git a/airbyte_cdk/manifest_migrations/manifest_migration.py b/airbyte_cdk/manifest_migrations/manifest_migration.py index 70952cfe9..2cf26f6d5 100644 --- a/airbyte_cdk/manifest_migrations/manifest_migration.py +++ b/airbyte_cdk/manifest_migrations/manifest_migration.py @@ -31,7 +31,7 @@ class MigrationTrace: migration: str migrated_at: str - def as_dict(self) -> dict: + def as_dict(self) -> Dict[str, Any]: return asdict(self) diff --git a/airbyte_cdk/manifest_migrations/migration_handler.py b/airbyte_cdk/manifest_migrations/migration_handler.py index ffbe97028..f843c25ce 100644 --- a/airbyte_cdk/manifest_migrations/migration_handler.py +++ b/airbyte_cdk/manifest_migrations/migration_handler.py @@ -82,15 +82,23 @@ def _handle_migration( if migration_instance.is_migrated: # set the updated manifest version, after migration has been applied self._set_manifest_version(migration_version) - # set the migration trace self._set_migration_trace(migration_class, manifest_version, migration_version) else: LOGGER.info( - f"Manifest migration: `{migration_instance.__name__}` is not supported for the given manifest version `{manifest_version}`.", + f"Manifest migration: `{self._get_migration_name(migration_class)}` is not supported for the given manifest version `{manifest_version}`.", ) except Exception as e: raise ManifestMigrationException(str(e)) from e + def _get_migration_name(self, migration_class: Type[ManifestMigration]) -> str: + """ + Get the name of the migration instance. + + Returns: + str: The name of the migration. + """ + return migration_class.__name__ + def _get_manifest_version(self) -> str: """ Get the manifest version from the manifest. @@ -101,8 +109,20 @@ def _get_manifest_version(self) -> str: return str(self._migrated_manifest.get(MANIFEST_VERSION_TAG, "0.0.0")) def _version_is_valid_for_migration( - self, manifest_version: str, migration_version: str + self, + manifest_version: str, + migration_version: str, ) -> bool: + """ + Checks if the given manifest version is less than or equal to the specified migration version. + + Args: + manifest_version (str): The version of the manifest to check. + migration_version (str): The migration version to compare against. + + Returns: + bool: True if the manifest version is less than or equal to the migration version, False otherwise. + """ return Version(manifest_version) <= Version(migration_version) def _set_manifest_version(self, version: str) -> None: @@ -120,7 +140,7 @@ def _set_migration_trace( migration_version: str, ) -> None: """ - Set the migration trace in the manifest. + Set the migration trace in the manifest, under the `metadata.applied_migrations` property object. :param migration_instance: The migration instance to set :param manifest_version: The manifest version before migration @@ -135,7 +155,7 @@ def _set_migration_trace( migration_trace = MigrationTrace( from_version=manifest_version, to_version=migration_version, - migration=migration_instance.__name__, + migration=self._get_migration_name(migration_instance), migrated_at=datetime.now(tz=timezone.utc).isoformat(), ).as_dict() diff --git a/airbyte_cdk/manifest_migrations/migrations_registry.py b/airbyte_cdk/manifest_migrations/migrations_registry.py index 994d86372..4a57e2a35 100644 --- a/airbyte_cdk/manifest_migrations/migrations_registry.py +++ b/airbyte_cdk/manifest_migrations/migrations_registry.py @@ -7,6 +7,7 @@ import inspect import os from pathlib import Path +from types import ModuleType from typing import Dict, List, Type import yaml @@ -21,22 +22,22 @@ REGISTRY_PATH = MIGRATIONS_PATH / "registry.yaml" -def _find_migration_module(name: str) -> str | None: +def _find_migration_module(name: str) -> str: """ Finds the migration module by name in the migrations directory. The name should match the file name of the migration module (without the .py extension). Raises ImportError if the module is not found. """ - try: - for migration_file in os.listdir(MIGRATIONS_PATH): - migration_name = name + ".py" - if migration_file == migration_name: - return migration_file.replace(".py", "") - except ImportError as e: - raise ImportError(f"Migration module '{name}' not found in {MIGRATIONS_PATH}.") from e + for migration_file in os.listdir(MIGRATIONS_PATH): + migration_name = name + ".py" + if migration_file == migration_name: + return migration_file.replace(".py", "") -def _get_migration_class(module) -> type: + raise ImportError(f"Migration module '{name}' not found in {MIGRATIONS_PATH}.") + + +def _get_migration_class(module: ModuleType) -> Type[ManifestMigration]: """ Returns the ManifestMigration subclass defined in the module. """ @@ -53,7 +54,7 @@ def _discover_migrations() -> DiscoveredMigrations: """ with open(REGISTRY_PATH, "r") as f: registry = yaml.safe_load(f) - migrations = {} + migrations: DiscoveredMigrations = {} # Iterate through the registry and import the migration classes # based on the version and order specified in the registry.yaml for version_entry in registry.get("manifest_migrations", []): From b202be87832bb25e190d8ad53b7e8f9392d4ef19 Mon Sep 17 00:00:00 2001 From: Oleksandr Bazarnov Date: Fri, 25 Apr 2025 16:26:04 +0300 Subject: [PATCH 14/17] updated --- .../connector_builder/connector_builder_handler.py | 1 + .../declarative/manifest_declarative_source.py | 11 ++++------- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/airbyte_cdk/connector_builder/connector_builder_handler.py b/airbyte_cdk/connector_builder/connector_builder_handler.py index c5b85b061..bb6b0929a 100644 --- a/airbyte_cdk/connector_builder/connector_builder_handler.py +++ b/airbyte_cdk/connector_builder/connector_builder_handler.py @@ -65,6 +65,7 @@ def should_migrate_manifest(config: Mapping[str, Any]) -> bool: """ return config.get("__should_migrate", False) + def should_normalize_manifest(config: Mapping[str, Any]) -> bool: """ Check if the manifest should be normalized. diff --git a/airbyte_cdk/sources/declarative/manifest_declarative_source.py b/airbyte_cdk/sources/declarative/manifest_declarative_source.py index 7fc063f28..03e488895 100644 --- a/airbyte_cdk/sources/declarative/manifest_declarative_source.py +++ b/airbyte_cdk/sources/declarative/manifest_declarative_source.py @@ -106,7 +106,7 @@ def __init__( component_factory: optional factory if ModelToComponentFactory's default behavior needs to be tweaked. normalize_manifest: Optional flag to indicate if the manifest should be normalized. """ - self.logger = logging.getLogger(f"airbyte.{self.name}") + self.logger = logging.getLogger(f"airbyte.{self.name}") self._should_normalize = normalize_manifest self._should_migrate = migrate_manifest self._declarative_component_schema = _get_declarative_component_schema() @@ -128,7 +128,7 @@ def __init__( AlwaysLogSliceLogger() if emit_connector_builder_messages else DebugSliceLogger() ) self._config = config or {} - + # resolve all components in the manifest self._source_config = self._pre_process_manifest(dict(source_config)) # validate resolved manifest against the declarative component schema @@ -170,9 +170,8 @@ def _pre_process_manifest(self, manifest: Dict[str, Any]) -> Dict[str, Any]: propagated_manifest = ManifestComponentTransformer().propagate_types_and_parameters( "", resolved_manifest, {} ) - + return propagated_manifest - def _post_process_manifest(self) -> None: """ @@ -185,8 +184,6 @@ def _post_process_manifest(self) -> None: # apply manifest normalization, if required self._normalize_manifest() - - def _normalize_manifest(self) -> None: """ This method is used to normalize the manifest. It should be called after the manifest has been validated. @@ -203,7 +200,7 @@ def _migrate_manifest(self) -> None: """ This method is used to migrate the manifest. It should be called after the manifest has been validated. The migration is done in place, so the original manifest is modified. - + The original manifest is returned if any error occurs during migration. """ if self._should_migrate: From e2f6fd1be0b73903eb5767c55588e8dac2ef0d34 Mon Sep 17 00:00:00 2001 From: Oleksandr Bazarnov Date: Mon, 28 Apr 2025 13:47:16 +0300 Subject: [PATCH 15/17] changed the name of the test --- unit_tests/manifest_migrations/test_manifest_migration.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/unit_tests/manifest_migrations/test_manifest_migration.py b/unit_tests/manifest_migrations/test_manifest_migration.py index 6fba8da5e..0ad897375 100644 --- a/unit_tests/manifest_migrations/test_manifest_migration.py +++ b/unit_tests/manifest_migrations/test_manifest_migration.py @@ -16,7 +16,7 @@ @freeze_time("2025-04-01") -def test_manifest_resolve_migrate_url_base_to_url( +def test_manifest_resolve_migrate_url_base_and_path_to_url( manifest_with_url_base_to_migrate_to_url, expected_manifest_with_url_base_migrated_to_url, ) -> None: From 480f5f74c01bdece04aad2c97093e86414113c2f Mon Sep 17 00:00:00 2001 From: Oleksandr Bazarnov Date: Mon, 28 Apr 2025 18:44:29 +0300 Subject: [PATCH 16/17] updated request_body_* migration --- ..._request_body_json_data_to_request_body.py | 22 +++++-- unit_tests/manifest_migrations/conftest.py | 63 ++++++++++++------- 2 files changed, 58 insertions(+), 27 deletions(-) diff --git a/airbyte_cdk/manifest_migrations/migrations/http_requester_request_body_json_data_to_request_body.py b/airbyte_cdk/manifest_migrations/migrations/http_requester_request_body_json_data_to_request_body.py index cab6e6747..648b8d9a3 100644 --- a/airbyte_cdk/manifest_migrations/migrations/http_requester_request_body_json_data_to_request_body.py +++ b/airbyte_cdk/manifest_migrations/migrations/http_requester_request_body_json_data_to_request_body.py @@ -18,7 +18,11 @@ class HttpRequesterRequestBodyJsonDataToRequestBody(ManifestMigration): """ component_type = "HttpRequester" - original_keys = ["request_body_json", "request_body_data"] + + body_json_key = "request_body_json" + body_data_key = "request_body_data" + original_keys = (body_json_key, body_data_key) + replacement_key = "request_body" def should_migrate(self, manifest: ManifestType) -> bool: @@ -28,14 +32,20 @@ def should_migrate(self, manifest: ManifestType) -> bool: def migrate(self, manifest: ManifestType) -> None: for key in self.original_keys: - if key in manifest: - manifest[self.replacement_key] = manifest[key] + if key == self.body_json_key and key in manifest: + manifest[self.replacement_key] = { + "type": "RequestBodyJson", + "value": manifest[key], + } + manifest.pop(key, None) + elif key == self.body_data_key and key in manifest: + manifest[self.replacement_key] = { + "type": "RequestBodyData", + "value": manifest[key], + } manifest.pop(key, None) def validate(self, manifest: ManifestType) -> bool: - """ - Validate the migration by checking if the `request_body` key is present and none of the original keys are. - """ return self.replacement_key in manifest and all( key not in manifest for key in self.original_keys ) diff --git a/unit_tests/manifest_migrations/conftest.py b/unit_tests/manifest_migrations/conftest.py index 2ea89a789..9a7b7aad5 100644 --- a/unit_tests/manifest_migrations/conftest.py +++ b/unit_tests/manifest_migrations/conftest.py @@ -837,9 +837,12 @@ def expected_manifest_with_migrated_to_request_body() -> Dict[str, Any]: "http_method": "GET", "url": "https://example.com/v1/path_to_A", "request_body": { - "test_key": "{{ config['config_key'] }}", - "test_key_2": "test_value_2", - "test_key_3": 123, + "type": "RequestBodyData", + "value": { + "test_key": "{{ config['config_key'] }}", + "test_key_2": "test_value_2", + "test_key_3": 123, + }, }, }, "record_selector": { @@ -867,9 +870,12 @@ def expected_manifest_with_migrated_to_request_body() -> Dict[str, Any]: "http_method": "GET", "url": "https://example.com/v1/path_to_A", "request_body": { - "test_key": "{{ config['config_key'] }}", - "test_key_2": "test_value_2", - "test_key_3": 123, + "type": "RequestBodyData", + "value": { + "test_key": "{{ config['config_key'] }}", + "test_key_2": "test_value_2", + "test_key_3": 123, + }, }, }, "record_selector": { @@ -897,9 +903,12 @@ def expected_manifest_with_migrated_to_request_body() -> Dict[str, Any]: "http_method": "GET", "url": "https://example.com/v2/path_to_B", "request_body": { - "reportType": "test_report", - "groupBy": "GROUP", - "metrics": "{{ ','.join( ['a-b','cd','e-f-g-h'] ) }}", + "type": "RequestBodyJson", + "value": { + "reportType": "test_report", + "groupBy": "GROUP", + "metrics": "{{ ','.join( ['a-b','cd','e-f-g-h'] ) }}", + }, }, }, "record_selector": { @@ -972,9 +981,12 @@ def expected_manifest_with_migrated_to_request_body() -> Dict[str, Any]: "type": "HttpRequester", "url": "https://example.com/v1/", "request_body": { - "test_key": "{{ config['config_key'] }}", - "test_key_2": "test_value_2", - "test_key_3": 123, + "type": "RequestBodyData", + "value": { + "test_key": "{{ config['config_key'] }}", + "test_key_2": "test_value_2", + "test_key_3": 123, + }, }, }, "requester_B": {"type": "HttpRequester", "url": "https://example.com/v2/"}, @@ -990,9 +1002,12 @@ def expected_manifest_with_migrated_to_request_body() -> Dict[str, Any]: "http_method": "GET", "url": "https://example.com/v1/path_to_A", "request_body": { - "test_key": "{{ config['config_key'] }}", - "test_key_2": "test_value_2", - "test_key_3": 123, + "type": "RequestBodyData", + "value": { + "test_key": "{{ config['config_key'] }}", + "test_key_2": "test_value_2", + "test_key_3": 123, + }, }, }, "record_selector": { @@ -1020,9 +1035,12 @@ def expected_manifest_with_migrated_to_request_body() -> Dict[str, Any]: "http_method": "GET", "url": "https://example.com/v1/path_to_A", "request_body": { - "test_key": "{{ config['config_key'] }}", - "test_key_2": "test_value_2", - "test_key_3": 123, + "type": "RequestBodyData", + "value": { + "test_key": "{{ config['config_key'] }}", + "test_key_2": "test_value_2", + "test_key_3": 123, + }, }, }, "record_selector": { @@ -1050,9 +1068,12 @@ def expected_manifest_with_migrated_to_request_body() -> Dict[str, Any]: "http_method": "GET", "url": "https://example.com/v2/path_to_B", "request_body": { - "reportType": "test_report", - "groupBy": "GROUP", - "metrics": "{{ ','.join( ['a-b','cd','e-f-g-h'] ) }}", + "type": "RequestBodyJson", + "value": { + "reportType": "test_report", + "groupBy": "GROUP", + "metrics": "{{ ','.join( ['a-b','cd','e-f-g-h'] ) }}", + }, }, }, "record_selector": { From 6e729283072dd4c72d8a04f14a5aa00dbef7cee7 Mon Sep 17 00:00:00 2001 From: Oleksandr Bazarnov Date: Mon, 28 Apr 2025 20:04:30 +0300 Subject: [PATCH 17/17] updated migrations to the latest CDK version --- .../manifest_migrations/migrations/registry.yaml | 2 +- unit_tests/manifest_migrations/conftest.py | 16 ++++++++-------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/airbyte_cdk/manifest_migrations/migrations/registry.yaml b/airbyte_cdk/manifest_migrations/migrations/registry.yaml index e0cf5510c..6beb1667c 100644 --- a/airbyte_cdk/manifest_migrations/migrations/registry.yaml +++ b/airbyte_cdk/manifest_migrations/migrations/registry.yaml @@ -3,7 +3,7 @@ # manifest_migrations: - - version: 6.45.2 + - version: 6.47.1 migrations: - name: http_requester_url_base_to_url order: 1 diff --git a/unit_tests/manifest_migrations/conftest.py b/unit_tests/manifest_migrations/conftest.py index 9a7b7aad5..303e31178 100644 --- a/unit_tests/manifest_migrations/conftest.py +++ b/unit_tests/manifest_migrations/conftest.py @@ -197,7 +197,7 @@ def manifest_with_url_base_to_migrate_to_url() -> Dict[str, Any]: @pytest.fixture def expected_manifest_with_url_base_migrated_to_url() -> Dict[str, Any]: return { - "version": "6.45.2", + "version": "6.47.1", "type": "DeclarativeSource", "check": {"type": "CheckStream", "stream_names": ["A"]}, "definitions": { @@ -494,13 +494,13 @@ def expected_manifest_with_url_base_migrated_to_url() -> Dict[str, Any]: "applied_migrations": [ { "from_version": "0.0.0", - "to_version": "6.45.2", + "to_version": "6.47.1", "migration": "HttpRequesterUrlBaseToUrl", "migrated_at": "2025-04-01T00:00:00+00:00", # time freezed in the test }, { "from_version": "0.0.0", - "to_version": "6.45.2", + "to_version": "6.47.1", "migration": "HttpRequesterPathToUrl", "migrated_at": "2025-04-01T00:00:00+00:00", # time freezed in the test }, @@ -512,7 +512,7 @@ def expected_manifest_with_url_base_migrated_to_url() -> Dict[str, Any]: @pytest.fixture def manifest_with_migrated_url_base_and_path_is_joined_to_url() -> Dict[str, Any]: return { - "version": "6.45.3", + "version": "6.47.1", "type": "DeclarativeSource", "check": {"type": "CheckStream", "stream_names": ["A"]}, "definitions": {}, @@ -822,7 +822,7 @@ def manifest_with_request_body_json_and_data_to_migrate_to_request_body() -> Dic @pytest.fixture def expected_manifest_with_migrated_to_request_body() -> Dict[str, Any]: return { - "version": "6.45.2", + "version": "6.47.1", "type": "DeclarativeSource", "check": {"type": "CheckStream", "stream_names": ["A"]}, "definitions": { @@ -1178,19 +1178,19 @@ def expected_manifest_with_migrated_to_request_body() -> Dict[str, Any]: "applied_migrations": [ { "from_version": "0.0.0", - "to_version": "6.45.2", + "to_version": "6.47.1", "migration": "HttpRequesterUrlBaseToUrl", "migrated_at": "2025-04-01T00:00:00+00:00", }, { "from_version": "0.0.0", - "to_version": "6.45.2", + "to_version": "6.47.1", "migration": "HttpRequesterPathToUrl", "migrated_at": "2025-04-01T00:00:00+00:00", }, { "from_version": "0.0.0", - "to_version": "6.45.2", + "to_version": "6.47.1", "migration": "HttpRequesterRequestBodyJsonDataToRequestBody", "migrated_at": "2025-04-01T00:00:00+00:00", },