From 00f8d976d6cb3f73ea34c466c0da578358647c9c Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 29 Aug 2025 02:55:31 +0000 Subject: [PATCH 01/10] fix(concurrent_declarative_source): add default value to optional state parameter - Add default value of None to state parameter in ConcurrentDeclarativeSource.__init__() - Remove Generic[TState] from class definition as it adds no meaningful value - Update all type annotations throughout codebase to use concrete Optional[List[AirbyteStateMessage]] type - Fix test parameter order to match updated constructor signature - Resolves breaking change introduced in PR #704 where Optional state parameter lacked default value Fixes: airbytehq/airbyte-python-cdk#704 Co-Authored-By: AJ Steers --- .../connector_builder/connector_builder_handler.py | 8 ++++---- airbyte_cdk/connector_builder/main.py | 2 +- airbyte_cdk/connector_builder/test_reader/reader.py | 4 ++-- .../manifest_server/command_processor/processor.py | 6 ++---- airbyte_cdk/manifest_server/command_processor/utils.py | 2 +- airbyte_cdk/manifest_server/routers/manifest.py | 2 +- .../sources/declarative/concurrent_declarative_source.py | 4 ++-- .../sources/declarative/yaml_declarative_source.py | 2 +- .../declarative/test_manifest_declarative_source.py | 2 +- 9 files changed, 15 insertions(+), 17 deletions(-) diff --git a/airbyte_cdk/connector_builder/connector_builder_handler.py b/airbyte_cdk/connector_builder/connector_builder_handler.py index 0d3e2052b..00823d0b8 100644 --- a/airbyte_cdk/connector_builder/connector_builder_handler.py +++ b/airbyte_cdk/connector_builder/connector_builder_handler.py @@ -65,7 +65,7 @@ def create_source( limits: TestLimits, catalog: Optional[ConfiguredAirbyteCatalog], state: Optional[List[AirbyteStateMessage]], -) -> ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]]: +) -> ConcurrentDeclarativeSource: manifest = config["__injected_declarative_manifest"] # We enforce a concurrency level of 1 so that the stream is processed on a single thread @@ -88,7 +88,7 @@ def create_source( def read_stream( - source: ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]], + source: ConcurrentDeclarativeSource, config: Mapping[str, Any], configured_catalog: ConfiguredAirbyteCatalog, state: List[AirbyteStateMessage], @@ -127,7 +127,7 @@ def read_stream( def resolve_manifest( - source: ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]], + source: ConcurrentDeclarativeSource, ) -> AirbyteMessage: try: return AirbyteMessage( @@ -146,7 +146,7 @@ def resolve_manifest( def full_resolve_manifest( - source: ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]], limits: TestLimits + source: ConcurrentDeclarativeSource, limits: TestLimits ) -> AirbyteMessage: try: manifest = {**source.resolved_manifest} diff --git a/airbyte_cdk/connector_builder/main.py b/airbyte_cdk/connector_builder/main.py index 207831c3c..ba9ded0dd 100644 --- a/airbyte_cdk/connector_builder/main.py +++ b/airbyte_cdk/connector_builder/main.py @@ -70,7 +70,7 @@ def get_config_and_catalog_from_args( def handle_connector_builder_request( - source: ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]], + source: ConcurrentDeclarativeSource, command: str, config: Mapping[str, Any], catalog: Optional[ConfiguredAirbyteCatalog], diff --git a/airbyte_cdk/connector_builder/test_reader/reader.py b/airbyte_cdk/connector_builder/test_reader/reader.py index aab700951..db7d2d14a 100644 --- a/airbyte_cdk/connector_builder/test_reader/reader.py +++ b/airbyte_cdk/connector_builder/test_reader/reader.py @@ -85,7 +85,7 @@ def __init__( def run_test_read( self, - source: ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]], + source: ConcurrentDeclarativeSource, config: Mapping[str, Any], configured_catalog: ConfiguredAirbyteCatalog, stream_name: str, @@ -383,7 +383,7 @@ def _get_latest_config_update( def _read_stream( self, - source: ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]], + source: ConcurrentDeclarativeSource, config: Mapping[str, Any], configured_catalog: ConfiguredAirbyteCatalog, state: List[AirbyteStateMessage], diff --git a/airbyte_cdk/manifest_server/command_processor/processor.py b/airbyte_cdk/manifest_server/command_processor/processor.py index 16d14a799..34816ec55 100644 --- a/airbyte_cdk/manifest_server/command_processor/processor.py +++ b/airbyte_cdk/manifest_server/command_processor/processor.py @@ -21,12 +21,10 @@ class ManifestCommandProcessor: - _source: ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]] + _source: ConcurrentDeclarativeSource _logger = logging.getLogger("airbyte.manifest-server") - def __init__( - self, source: ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]] - ) -> None: + def __init__(self, source: ConcurrentDeclarativeSource) -> None: self._source = source def test_read( diff --git a/airbyte_cdk/manifest_server/command_processor/utils.py b/airbyte_cdk/manifest_server/command_processor/utils.py index 125a977c3..3aef171e0 100644 --- a/airbyte_cdk/manifest_server/command_processor/utils.py +++ b/airbyte_cdk/manifest_server/command_processor/utils.py @@ -63,7 +63,7 @@ def build_source( record_limit: Optional[int] = None, page_limit: Optional[int] = None, slice_limit: Optional[int] = None, -) -> ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]]: +) -> ConcurrentDeclarativeSource: # We enforce a concurrency level of 1 so that the stream is processed on a single thread # to retain ordering for the grouping of the builder message responses. definition = copy.deepcopy(manifest) diff --git a/airbyte_cdk/manifest_server/routers/manifest.py b/airbyte_cdk/manifest_server/routers/manifest.py index 48799ddc1..035058ec1 100644 --- a/airbyte_cdk/manifest_server/routers/manifest.py +++ b/airbyte_cdk/manifest_server/routers/manifest.py @@ -40,7 +40,7 @@ def safe_build_source( page_limit: Optional[int] = None, slice_limit: Optional[int] = None, record_limit: Optional[int] = None, -) -> ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]]: +) -> ConcurrentDeclarativeSource: """Wrapper around build_source that converts ValidationError to HTTPException.""" try: return build_source( diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index 16ff94abf..e9b48e1dd 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -162,7 +162,7 @@ def _get_declarative_component_schema() -> Dict[str, Any]: # is no longer inherited from since the only external dependency is from that class. # # todo: It is worth investigating removal of the Generic[TState] since it will always be Optional[List[AirbyteStateMessage]] -class ConcurrentDeclarativeSource(AbstractSource, Generic[TState]): +class ConcurrentDeclarativeSource(AbstractSource): # By default, we defer to a value of 2. A value lower than could cause a PartitionEnqueuer to be stuck in a state of deadlock # because it has hit the limit of futures but not partition reader is consuming them. _LOWEST_SAFE_CONCURRENCY_LEVEL = 2 @@ -171,8 +171,8 @@ def __init__( self, catalog: Optional[ConfiguredAirbyteCatalog], config: Optional[Mapping[str, Any]], - state: TState, source_config: ConnectionDefinition, + state: Optional[List[AirbyteStateMessage]] = None, debug: bool = False, emit_connector_builder_messages: bool = False, migrate_manifest: bool = False, diff --git a/airbyte_cdk/sources/declarative/yaml_declarative_source.py b/airbyte_cdk/sources/declarative/yaml_declarative_source.py index 003578738..7af07a5c8 100644 --- a/airbyte_cdk/sources/declarative/yaml_declarative_source.py +++ b/airbyte_cdk/sources/declarative/yaml_declarative_source.py @@ -14,7 +14,7 @@ from airbyte_cdk.sources.types import ConnectionDefinition -class YamlDeclarativeSource(ConcurrentDeclarativeSource[List[AirbyteStateMessage]]): +class YamlDeclarativeSource(ConcurrentDeclarativeSource): """Declarative source defined by a yaml file""" def __init__( diff --git a/unit_tests/legacy/sources/declarative/test_manifest_declarative_source.py b/unit_tests/legacy/sources/declarative/test_manifest_declarative_source.py index ad6735201..7341fe4ed 100644 --- a/unit_tests/legacy/sources/declarative/test_manifest_declarative_source.py +++ b/unit_tests/legacy/sources/declarative/test_manifest_declarative_source.py @@ -2219,7 +2219,7 @@ def _run_read(manifest: Mapping[str, Any], stream_name: str) -> List[AirbyteMess ) config = {} state = {} - source = ConcurrentDeclarativeSource(catalog, config, state, manifest) + source = ConcurrentDeclarativeSource(catalog, config, manifest, state) return list(source.read(logger, {}, catalog, state)) From 26208998225bac4213169f3b67f257aec8a1c817 Mon Sep 17 00:00:00 2001 From: "Aaron (\"AJ\") Steers" Date: Thu, 28 Aug 2025 20:35:28 -0700 Subject: [PATCH 02/10] Apply suggestion from @aaronsteers --- .../declarative/test_manifest_declarative_source.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/unit_tests/legacy/sources/declarative/test_manifest_declarative_source.py b/unit_tests/legacy/sources/declarative/test_manifest_declarative_source.py index 7341fe4ed..2b7c34fb4 100644 --- a/unit_tests/legacy/sources/declarative/test_manifest_declarative_source.py +++ b/unit_tests/legacy/sources/declarative/test_manifest_declarative_source.py @@ -2219,7 +2219,12 @@ def _run_read(manifest: Mapping[str, Any], stream_name: str) -> List[AirbyteMess ) config = {} state = {} - source = ConcurrentDeclarativeSource(catalog, config, manifest, state) + source = ConcurrentDeclarativeSource( + catalog=catalog, + config=config, + manifest=manifest, + state=state, + ) return list(source.read(logger, {}, catalog, state)) From 03fb2d8a639919c2616de5833e61e74aa8b83f15 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 29 Aug 2025 03:36:46 +0000 Subject: [PATCH 03/10] style: use named parameters in ConcurrentDeclarativeSource constructor call - Address GitHub PR comment by @aaronsteers requesting explicit parameter names - Improve code readability by using named arguments instead of positional arguments - Apply automatic formatting fixes to maintain code style consistency Co-Authored-By: AJ Steers --- airbyte_cdk/connector_builder/main.py | 6 ++-- .../concurrent_source/concurrent_source.py | 6 ++-- .../sources/file_based/file_based_source.py | 6 ++-- .../file_based/stream/concurrent/adapters.py | 6 ++-- airbyte_cdk/sql/shared/sql_processor.py | 6 ++-- .../test_connector_builder_handler.py | 12 +++---- unit_tests/destinations/test_destination.py | 6 ++-- .../test_parent_state_stream.py | 15 ++++---- ..._source_declarative_w_custom_components.py | 6 ++-- .../test_concurrent_perpartitioncursor.py | 21 ++++++----- .../interpolation/test_interpolated_string.py | 6 ++-- .../test_model_to_component_factory.py | 6 ++-- .../test_grouping_partition_router.py | 18 +++++----- .../test_substream_partition_router.py | 12 +++---- .../test_concurrent_declarative_source.py | 6 ++-- .../sources/file_based/test_scenarios.py | 12 +++---- unit_tests/sources/streams/test_call_rate.py | 12 +++---- .../sources/streams/test_stream_read.py | 6 ++-- unit_tests/test_entrypoint.py | 6 ++-- unit_tests/test_exception_handler.py | 36 +++++++++---------- unit_tests/test_secure_logger.py | 24 ++++++------- unit_tests/utils/test_secret_utils.py | 6 ++-- 22 files changed, 119 insertions(+), 121 deletions(-) diff --git a/airbyte_cdk/connector_builder/main.py b/airbyte_cdk/connector_builder/main.py index ba9ded0dd..b813f602b 100644 --- a/airbyte_cdk/connector_builder/main.py +++ b/airbyte_cdk/connector_builder/main.py @@ -80,9 +80,9 @@ def handle_connector_builder_request( if command == "resolve_manifest": return resolve_manifest(source) elif command == "test_read": - assert catalog is not None, ( - "`test_read` requires a valid `ConfiguredAirbyteCatalog`, got None." - ) + assert ( + catalog is not None + ), "`test_read` requires a valid `ConfiguredAirbyteCatalog`, got None." return read_stream(source, config, catalog, state, limits) elif command == "full_resolve_manifest": return full_resolve_manifest(source, limits) diff --git a/airbyte_cdk/sources/concurrent_source/concurrent_source.py b/airbyte_cdk/sources/concurrent_source/concurrent_source.py index de2d93523..453593bb8 100644 --- a/airbyte_cdk/sources/concurrent_source/concurrent_source.py +++ b/airbyte_cdk/sources/concurrent_source/concurrent_source.py @@ -51,9 +51,9 @@ def create( too_many_generator = ( not is_single_threaded and initial_number_of_partitions_to_generate >= num_workers ) - assert not too_many_generator, ( - "It is required to have more workers than threads generating partitions" - ) + assert ( + not too_many_generator + ), "It is required to have more workers than threads generating partitions" threadpool = ThreadPoolManager( concurrent.futures.ThreadPoolExecutor( max_workers=num_workers, thread_name_prefix="workerpool" diff --git a/airbyte_cdk/sources/file_based/file_based_source.py b/airbyte_cdk/sources/file_based/file_based_source.py index 2d34fe5dc..17a7ee957 100644 --- a/airbyte_cdk/sources/file_based/file_based_source.py +++ b/airbyte_cdk/sources/file_based/file_based_source.py @@ -282,9 +282,9 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: and hasattr(self, "_concurrency_level") and self._concurrency_level is not None ): - assert state_manager is not None, ( - "No ConnectorStateManager was created, but it is required for incremental syncs. This is unexpected. Please contact Support." - ) + assert ( + state_manager is not None + ), "No ConnectorStateManager was created, but it is required for incremental syncs. This is unexpected. Please contact Support." cursor = self.cursor_cls( stream_config, diff --git a/airbyte_cdk/sources/file_based/stream/concurrent/adapters.py b/airbyte_cdk/sources/file_based/stream/concurrent/adapters.py index fd8eef9b0..16f8b0075 100644 --- a/airbyte_cdk/sources/file_based/stream/concurrent/adapters.py +++ b/airbyte_cdk/sources/file_based/stream/concurrent/adapters.py @@ -277,9 +277,9 @@ def read(self) -> Iterable[Record]: def to_slice(self) -> Optional[Mapping[str, Any]]: if self._slice is None: return None - assert len(self._slice["files"]) == 1, ( - f"Expected 1 file per partition but got {len(self._slice['files'])} for stream {self.stream_name()}" - ) + assert ( + len(self._slice["files"]) == 1 + ), f"Expected 1 file per partition but got {len(self._slice['files'])} for stream {self.stream_name()}" file = self._slice["files"][0] return {"files": [file]} diff --git a/airbyte_cdk/sql/shared/sql_processor.py b/airbyte_cdk/sql/shared/sql_processor.py index 238ff6c69..f0cda3945 100644 --- a/airbyte_cdk/sql/shared/sql_processor.py +++ b/airbyte_cdk/sql/shared/sql_processor.py @@ -326,9 +326,9 @@ def _ensure_schema_exists( if DEBUG_MODE: found_schemas = schemas_list - assert schema_name in found_schemas, ( - f"Schema {schema_name} was not created. Found: {found_schemas}" - ) + assert ( + schema_name in found_schemas + ), f"Schema {schema_name} was not created. Found: {found_schemas}" def _quote_identifier(self, identifier: str) -> str: """Return the given identifier, quoted.""" diff --git a/unit_tests/connector_builder/test_connector_builder_handler.py b/unit_tests/connector_builder/test_connector_builder_handler.py index 643878eec..d547aabd8 100644 --- a/unit_tests/connector_builder/test_connector_builder_handler.py +++ b/unit_tests/connector_builder/test_connector_builder_handler.py @@ -1267,9 +1267,9 @@ def test_handle_read_external_requests(deployment_mode, url_base, expected_error source, config, catalog, _A_PER_PARTITION_STATE, limits ).record.data if expected_error: - assert len(output_data["logs"]) > 0, ( - "Expected at least one log message with the expected error" - ) + assert ( + len(output_data["logs"]) > 0 + ), "Expected at least one log message with the expected error" error_message = output_data["logs"][0] assert error_message["level"] == "ERROR" assert expected_error in error_message["stacktrace"] @@ -1363,9 +1363,9 @@ def test_handle_read_external_oauth_request(deployment_mode, token_url, expected source, config, catalog, _A_PER_PARTITION_STATE, limits ).record.data if expected_error: - assert len(output_data["logs"]) > 0, ( - "Expected at least one log message with the expected error" - ) + assert ( + len(output_data["logs"]) > 0 + ), "Expected at least one log message with the expected error" error_message = output_data["logs"][0] assert error_message["level"] == "ERROR" assert expected_error in error_message["stacktrace"] diff --git a/unit_tests/destinations/test_destination.py b/unit_tests/destinations/test_destination.py index 1f8f6573f..14f52be15 100644 --- a/unit_tests/destinations/test_destination.py +++ b/unit_tests/destinations/test_destination.py @@ -58,9 +58,9 @@ def test_successful_parse( self, arg_list: List[str], expected_output: Mapping[str, Any], destination: Destination ): parsed_args = vars(destination.parse_args(arg_list)) - assert parsed_args == expected_output, ( - f"Expected parsing {arg_list} to return parsed args {expected_output} but instead found {parsed_args}" - ) + assert ( + parsed_args == expected_output + ), f"Expected parsing {arg_list} to return parsed args {expected_output} but instead found {parsed_args}" @pytest.mark.parametrize( ("arg_list"), diff --git a/unit_tests/legacy/sources/declarative/partition_routers/test_parent_state_stream.py b/unit_tests/legacy/sources/declarative/partition_routers/test_parent_state_stream.py index c3cab5500..f69855ccc 100644 --- a/unit_tests/legacy/sources/declarative/partition_routers/test_parent_state_stream.py +++ b/unit_tests/legacy/sources/declarative/partition_routers/test_parent_state_stream.py @@ -340,11 +340,10 @@ def run_incremental_parent_state_test( expected_records_set = list( {orjson.dumps(record): record for record in expected_records}.values() ) - assert sorted( - cumulative_records_state_deduped, key=lambda x: orjson.dumps(x) - ) == sorted(expected_records_set, key=lambda x: orjson.dumps(x)), ( - f"Records mismatch with intermediate state {state}. Expected {expected_records}, got {cumulative_records_state_deduped}" - ) + assert ( + sorted(cumulative_records_state_deduped, key=lambda x: orjson.dumps(x)) + == sorted(expected_records_set, key=lambda x: orjson.dumps(x)) + ), f"Records mismatch with intermediate state {state}. Expected {expected_records}, got {cumulative_records_state_deduped}" # Store the final state after each intermediate read final_state_intermediate = [ @@ -356,9 +355,9 @@ def run_incremental_parent_state_test( # Assert that the final state matches the expected state for all runs for i, final_state in enumerate(final_states): - assert final_state in expected_states, ( - f"Final state mismatch at run {i + 1}. Expected {expected_states}, got {final_state}" - ) + assert ( + final_state in expected_states + ), f"Final state mismatch at run {i + 1}. Expected {expected_states}, got {final_state}" @pytest.mark.parametrize( diff --git a/unit_tests/source_declarative_manifest/test_source_declarative_w_custom_components.py b/unit_tests/source_declarative_manifest/test_source_declarative_w_custom_components.py index e7819624d..d821ce3e6 100644 --- a/unit_tests/source_declarative_manifest/test_source_declarative_w_custom_components.py +++ b/unit_tests/source_declarative_manifest/test_source_declarative_w_custom_components.py @@ -97,9 +97,9 @@ def get_py_components_config_dict( manifest_dict = yaml.safe_load(manifest_yaml_path.read_text()) assert manifest_dict, "Failed to load the manifest file." - assert isinstance(manifest_dict, Mapping), ( - f"Manifest file is type {type(manifest_dict).__name__}, not a mapping: {manifest_dict}" - ) + assert isinstance( + manifest_dict, Mapping + ), f"Manifest file is type {type(manifest_dict).__name__}, not a mapping: {manifest_dict}" custom_py_code = custom_py_code_path.read_text() combined_config_dict = { diff --git a/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py b/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py index cb774bda7..e6b05c51c 100644 --- a/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py +++ b/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py @@ -389,9 +389,9 @@ def run_mocked_test( request_count = len( [req for req in m.request_history if unquote(req.url) == unquote(url)] ) - assert request_count == 1, ( - f"URL {url} was called {request_count} times, expected exactly once." - ) + assert ( + request_count == 1 + ), f"URL {url} was called {request_count} times, expected exactly once." def _run_read( @@ -1137,11 +1137,10 @@ def run_incremental_parent_state_test( expected_records_set = list( {orjson.dumps(record): record for record in expected_records}.values() ) - assert sorted(cumulative_records_state_deduped, key=lambda x: x["id"]) == sorted( - expected_records_set, key=lambda x: x["id"] - ), ( - f"Records mismatch with intermediate state {state}. Expected {expected_records}, got {cumulative_records_state_deduped}" - ) + assert ( + sorted(cumulative_records_state_deduped, key=lambda x: x["id"]) + == sorted(expected_records_set, key=lambda x: x["id"]) + ), f"Records mismatch with intermediate state {state}. Expected {expected_records}, got {cumulative_records_state_deduped}" # Store the final state after each intermediate read final_state_intermediate = [ @@ -1152,9 +1151,9 @@ def run_incremental_parent_state_test( # Assert that the final state matches the expected state for all runs for i, final_state in enumerate(final_states): - assert final_state in expected_states, ( - f"Final state mismatch at run {i + 1}. Expected {expected_states}, got {final_state}" - ) + assert ( + final_state in expected_states + ), f"Final state mismatch at run {i + 1}. Expected {expected_states}, got {final_state}" @pytest.mark.parametrize( diff --git a/unit_tests/sources/declarative/interpolation/test_interpolated_string.py b/unit_tests/sources/declarative/interpolation/test_interpolated_string.py index 52738b33b..6114c3dba 100644 --- a/unit_tests/sources/declarative/interpolation/test_interpolated_string.py +++ b/unit_tests/sources/declarative/interpolation/test_interpolated_string.py @@ -62,6 +62,6 @@ def test_parsing_record_data(test_name, input_string, record_value, expected_val if expected_value is None: assert val is None, f"Expected None for value {record_value} in test {test_name}" else: - assert float == type(val), ( - f"Expected float, got {type(val)} for value {val} in test {test_name}" - ) + assert float == type( + val + ), f"Expected float, got {type(val)} for value {val} in test {test_name}" diff --git a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py index b543354f7..520031e62 100644 --- a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py +++ b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py @@ -3437,9 +3437,9 @@ def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]: stream_state_migrations=[DummyStateMigration()], ) assert cursor.state["lookback_window"] != 10, "State migration wasn't called" - assert cursor.state["lookback_window"] == 20, ( - "State migration was called, but actual state don't match expected" - ) + assert ( + cursor.state["lookback_window"] == 20 + ), "State migration was called, but actual state don't match expected" def test_create_concurrent_cursor_uses_min_max_datetime_format_if_defined(): diff --git a/unit_tests/sources/declarative/partition_routers/test_grouping_partition_router.py b/unit_tests/sources/declarative/partition_routers/test_grouping_partition_router.py index 9bea606e4..a75a48966 100644 --- a/unit_tests/sources/declarative/partition_routers/test_grouping_partition_router.py +++ b/unit_tests/sources/declarative/partition_routers/test_grouping_partition_router.py @@ -269,9 +269,9 @@ def __next__(self): cursor_slice={}, extra_fields={"name": ["Board 0", "Board 1"], "owner": ["User0", "User1"]}, ) - assert controlled_iter.yield_count == 2, ( - "Only 2 slices should be yielded to form the first group" - ) + assert ( + controlled_iter.yield_count == 2 + ), "Only 2 slices should be yielded to form the first group" # Get the second slice second_slice = next(slices_iter) @@ -280,9 +280,9 @@ def __next__(self): cursor_slice={}, extra_fields={"name": ["Board 2", "Board 3"], "owner": ["User2", "User3"]}, ) - assert controlled_iter.yield_count == 4, ( - "Only 4 slices should be yielded up to the second group" - ) + assert ( + controlled_iter.yield_count == 4 + ), "Only 4 slices should be yielded up to the second group" # Exhaust the iterator remaining_slices = list(slices_iter) @@ -293,9 +293,9 @@ def __next__(self): extra_fields={"name": ["Board 4"], "owner": ["User4"]}, ) ] - assert controlled_iter.yield_count == 5, ( - "All 5 slices should be yielded after exhausting the iterator" - ) + assert ( + controlled_iter.yield_count == 5 + ), "All 5 slices should be yielded after exhausting the iterator" def test_set_initial_state_delegation(mock_config, mock_underlying_router): diff --git a/unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py b/unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py index 122c8dfae..fce217136 100644 --- a/unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py +++ b/unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py @@ -1118,13 +1118,13 @@ def test_substream_using_resumable_full_refresh_parent_stream_slices(use_increme # validate final state for closed substream slices final_state = substream_cursor_slicer.get_stream_state() if not use_incremental_dependency: - assert final_state["states"] == expected_substream_state["states"], ( - "State for substreams is not valid!" - ) + assert ( + final_state["states"] == expected_substream_state["states"] + ), "State for substreams is not valid!" else: - assert final_state == expected_substream_state, ( - "State for substreams with incremental dependency is not valid!" - ) + assert ( + final_state == expected_substream_state + ), "State for substreams with incremental dependency is not valid!" @pytest.mark.parametrize( diff --git a/unit_tests/sources/declarative/test_concurrent_declarative_source.py b/unit_tests/sources/declarative/test_concurrent_declarative_source.py index 8fadc04fd..1102468b7 100644 --- a/unit_tests/sources/declarative/test_concurrent_declarative_source.py +++ b/unit_tests/sources/declarative/test_concurrent_declarative_source.py @@ -1471,9 +1471,9 @@ def test_concurrent_declarative_source_runs_state_migrations_provided_in_manifes source_config=manifest, config=_CONFIG, catalog=_CATALOG, state=state ) concurrent_streams, synchronous_streams = source._group_streams(_CONFIG) - assert concurrent_streams[0].cursor.state.get("state") != state_blob.__dict__, ( - "State was not migrated." - ) + assert ( + concurrent_streams[0].cursor.state.get("state") != state_blob.__dict__ + ), "State was not migrated." assert concurrent_streams[0].cursor.state.get("states") == [ {"cursor": {"updated_at": "2024-08-21"}, "partition": {"type": "type_1"}}, {"cursor": {"updated_at": "2024-08-21"}, "partition": {"type": "type_2"}}, diff --git a/unit_tests/sources/file_based/test_scenarios.py b/unit_tests/sources/file_based/test_scenarios.py index d70b7f4ef..214dd0872 100644 --- a/unit_tests/sources/file_based/test_scenarios.py +++ b/unit_tests/sources/file_based/test_scenarios.py @@ -122,9 +122,9 @@ def _verify_read_output(output: EntrypointOutput, scenario: TestScenario[Abstrac expected_states = list(filter(lambda e: "data" not in e, expected_records)) states = list(filter(lambda r: r.state, records_and_state_messages)) - assert len(states) > 0, ( - "No state messages emitted. Successful syncs should emit at least one stream state." - ) + assert ( + len(states) > 0 + ), "No state messages emitted. Successful syncs should emit at least one stream state." _verify_state_record_counts(sorted_records, states) if hasattr(scenario.source, "cursor_cls") and issubclass( @@ -182,9 +182,9 @@ def _verify_analytics( expected_analytics: Optional[List[AirbyteAnalyticsTraceMessage]], ) -> None: if expected_analytics: - assert len(analytics) == len(expected_analytics), ( - f"Number of actual analytics messages ({len(analytics)}) did not match expected ({len(expected_analytics)})" - ) + assert ( + len(analytics) == len(expected_analytics) + ), f"Number of actual analytics messages ({len(analytics)}) did not match expected ({len(expected_analytics)})" for actual, expected in zip(analytics, expected_analytics): actual_type, actual_value = actual.trace.analytics.type, actual.trace.analytics.value expected_type = expected.type diff --git a/unit_tests/sources/streams/test_call_rate.py b/unit_tests/sources/streams/test_call_rate.py index b99905870..853e2997e 100644 --- a/unit_tests/sources/streams/test_call_rate.py +++ b/unit_tests/sources/streams/test_call_rate.py @@ -268,9 +268,9 @@ def test_limit_rate(self): with pytest.raises(CallRateLimitHit) as excinfo2: policy.try_acquire("call", weight=1), "call over limit" - assert excinfo2.value.time_to_wait < excinfo1.value.time_to_wait, ( - "time to wait must decrease over time" - ) + assert ( + excinfo2.value.time_to_wait < excinfo1.value.time_to_wait + ), "time to wait must decrease over time" def test_limit_rate_support_custom_weight(self): """try_acquire must take into account provided weight and throw CallRateLimitHit when hit the limit.""" @@ -279,9 +279,9 @@ def test_limit_rate_support_custom_weight(self): policy.try_acquire("call", weight=2), "1st call with weight of 2" with pytest.raises(CallRateLimitHit) as excinfo: policy.try_acquire("call", weight=9), "2nd call, over limit since 2 + 9 = 11 > 10" - assert excinfo.value.time_to_wait.total_seconds() == pytest.approx(60, 0.1), ( - "should wait 1 minute before next call" - ) + assert excinfo.value.time_to_wait.total_seconds() == pytest.approx( + 60, 0.1 + ), "should wait 1 minute before next call" def test_multiple_limit_rates(self): """try_acquire must take into all call rates and apply stricter.""" diff --git a/unit_tests/sources/streams/test_stream_read.py b/unit_tests/sources/streams/test_stream_read.py index cf550f8cf..8181925e8 100644 --- a/unit_tests/sources/streams/test_stream_read.py +++ b/unit_tests/sources/streams/test_stream_read.py @@ -758,9 +758,9 @@ def test_configured_json_schema_with_invalid_properties(): assert old_user_insights not in configured_json_schema_properties assert old_feature_info not in configured_json_schema_properties for stream_schema_property in stream_schema["properties"]: - assert stream_schema_property in configured_json_schema_properties, ( - f"Stream schema property: {stream_schema_property} missing in configured schema" - ) + assert ( + stream_schema_property in configured_json_schema_properties + ), f"Stream schema property: {stream_schema_property} missing in configured schema" assert ( stream_schema["properties"][stream_schema_property] == configured_json_schema_properties[stream_schema_property] diff --git a/unit_tests/test_entrypoint.py b/unit_tests/test_entrypoint.py index 520131881..7a4afd968 100644 --- a/unit_tests/test_entrypoint.py +++ b/unit_tests/test_entrypoint.py @@ -830,9 +830,9 @@ def test_handle_record_counts( assert message_count == expected_records_by_stream[stream_descriptor] if actual_message.type == Type.STATE: - assert isinstance(actual_message.state.sourceStats.recordCount, float), ( - "recordCount value should be expressed as a float" - ) + assert isinstance( + actual_message.state.sourceStats.recordCount, float + ), "recordCount value should be expressed as a float" def test_given_serialization_error_using_orjson_then_fallback_on_json( diff --git a/unit_tests/test_exception_handler.py b/unit_tests/test_exception_handler.py index 1a5f70d8b..3efc6d57e 100644 --- a/unit_tests/test_exception_handler.py +++ b/unit_tests/test_exception_handler.py @@ -62,24 +62,24 @@ def test_uncaught_exception_handler(): log_output, trace_output = stdout_lines out_log_message = AirbyteMessageSerializer.load(json.loads(log_output)) - assert traceback_start in out_log_message.log.message, ( - "Log message should contain traceback start" - ) - assert file_reference in out_log_message.log.message, ( - "Log message should contain file reference" - ) - assert exception_message in out_log_message.log.message, ( - "Log message should contain expected exception message" - ) + assert ( + traceback_start in out_log_message.log.message + ), "Log message should contain traceback start" + assert ( + file_reference in out_log_message.log.message + ), "Log message should contain file reference" + assert ( + exception_message in out_log_message.log.message + ), "Log message should contain expected exception message" out_trace_message = AirbyteMessageSerializer.load(json.loads(trace_output)) assert out_trace_message.trace.emitted_at > 0 - assert traceback_start in out_trace_message.trace.error.stack_trace, ( - "Trace message should contain traceback start" - ) - assert file_reference in out_trace_message.trace.error.stack_trace, ( - "Trace message should contain file reference" - ) - assert out_trace_message.trace.error.internal_message == exception_message, ( - "Trace message should contain expected exception message" - ) + assert ( + traceback_start in out_trace_message.trace.error.stack_trace + ), "Trace message should contain traceback start" + assert ( + file_reference in out_trace_message.trace.error.stack_trace + ), "Trace message should contain file reference" + assert ( + out_trace_message.trace.error.internal_message == exception_message + ), "Trace message should contain expected exception message" diff --git a/unit_tests/test_secure_logger.py b/unit_tests/test_secure_logger.py index 757a069c7..0237091fe 100644 --- a/unit_tests/test_secure_logger.py +++ b/unit_tests/test_secure_logger.py @@ -203,12 +203,12 @@ def read( list(entrypoint.run(parsed_args)) except Exception: sys.excepthook(*sys.exc_info()) - assert I_AM_A_SECRET_VALUE not in capsys.readouterr().out, ( - "Should have filtered non-secret value from exception trace message" - ) - assert I_AM_A_SECRET_VALUE not in caplog.text, ( - "Should have filtered secret value from exception log message" - ) + assert ( + I_AM_A_SECRET_VALUE not in capsys.readouterr().out + ), "Should have filtered non-secret value from exception trace message" + assert ( + I_AM_A_SECRET_VALUE not in caplog.text + ), "Should have filtered secret value from exception log message" def test_non_airbyte_secrets_are_not_masked_on_uncaught_exceptions(mocker, caplog, capsys): @@ -257,9 +257,9 @@ def read( list(entrypoint.run(parsed_args)) except Exception: sys.excepthook(*sys.exc_info()) - assert NOT_A_SECRET_VALUE in capsys.readouterr().out, ( - "Should not have filtered non-secret value from exception trace message" - ) - assert NOT_A_SECRET_VALUE in caplog.text, ( - "Should not have filtered non-secret value from exception log message" - ) + assert ( + NOT_A_SECRET_VALUE in capsys.readouterr().out + ), "Should not have filtered non-secret value from exception trace message" + assert ( + NOT_A_SECRET_VALUE in caplog.text + ), "Should not have filtered non-secret value from exception log message" diff --git a/unit_tests/utils/test_secret_utils.py b/unit_tests/utils/test_secret_utils.py index d6f4f4563..73c93e670 100644 --- a/unit_tests/utils/test_secret_utils.py +++ b/unit_tests/utils/test_secret_utils.py @@ -150,9 +150,9 @@ def test_get_secret_paths(spec, expected): ], ) def test_get_secrets(spec, config, expected): - assert get_secrets(spec, config) == expected, ( - f"Expected the spec {spec} and config {config} to produce {expected}" - ) + assert ( + get_secrets(spec, config) == expected + ), f"Expected the spec {spec} and config {config} to produce {expected}" def test_secret_filtering(): From aaee7398648efd96b9d82ba96e7e3294df926aa8 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 29 Aug 2025 03:57:15 +0000 Subject: [PATCH 04/10] fix: apply ruff 0.11.5 formatting to resolve CI format check discrepancy Co-Authored-By: AJ Steers --- airbyte_cdk/connector_builder/main.py | 6 ++-- .../concurrent_source/concurrent_source.py | 6 ++-- .../sources/file_based/file_based_source.py | 6 ++-- .../file_based/stream/concurrent/adapters.py | 6 ++-- airbyte_cdk/sql/shared/sql_processor.py | 6 ++-- .../test_connector_builder_handler.py | 12 +++---- unit_tests/destinations/test_destination.py | 6 ++-- .../test_parent_state_stream.py | 15 ++++---- ..._source_declarative_w_custom_components.py | 6 ++-- .../test_concurrent_perpartitioncursor.py | 21 +++++------ .../interpolation/test_interpolated_string.py | 6 ++-- .../test_model_to_component_factory.py | 6 ++-- .../test_grouping_partition_router.py | 18 +++++----- .../test_substream_partition_router.py | 12 +++---- .../test_concurrent_declarative_source.py | 6 ++-- .../sources/file_based/test_scenarios.py | 12 +++---- unit_tests/sources/streams/test_call_rate.py | 12 +++---- .../sources/streams/test_stream_read.py | 6 ++-- unit_tests/test_entrypoint.py | 6 ++-- unit_tests/test_exception_handler.py | 36 +++++++++---------- unit_tests/test_secure_logger.py | 24 ++++++------- unit_tests/utils/test_secret_utils.py | 6 ++-- 22 files changed, 121 insertions(+), 119 deletions(-) diff --git a/airbyte_cdk/connector_builder/main.py b/airbyte_cdk/connector_builder/main.py index b813f602b..ba9ded0dd 100644 --- a/airbyte_cdk/connector_builder/main.py +++ b/airbyte_cdk/connector_builder/main.py @@ -80,9 +80,9 @@ def handle_connector_builder_request( if command == "resolve_manifest": return resolve_manifest(source) elif command == "test_read": - assert ( - catalog is not None - ), "`test_read` requires a valid `ConfiguredAirbyteCatalog`, got None." + assert catalog is not None, ( + "`test_read` requires a valid `ConfiguredAirbyteCatalog`, got None." + ) return read_stream(source, config, catalog, state, limits) elif command == "full_resolve_manifest": return full_resolve_manifest(source, limits) diff --git a/airbyte_cdk/sources/concurrent_source/concurrent_source.py b/airbyte_cdk/sources/concurrent_source/concurrent_source.py index 453593bb8..de2d93523 100644 --- a/airbyte_cdk/sources/concurrent_source/concurrent_source.py +++ b/airbyte_cdk/sources/concurrent_source/concurrent_source.py @@ -51,9 +51,9 @@ def create( too_many_generator = ( not is_single_threaded and initial_number_of_partitions_to_generate >= num_workers ) - assert ( - not too_many_generator - ), "It is required to have more workers than threads generating partitions" + assert not too_many_generator, ( + "It is required to have more workers than threads generating partitions" + ) threadpool = ThreadPoolManager( concurrent.futures.ThreadPoolExecutor( max_workers=num_workers, thread_name_prefix="workerpool" diff --git a/airbyte_cdk/sources/file_based/file_based_source.py b/airbyte_cdk/sources/file_based/file_based_source.py index 17a7ee957..2d34fe5dc 100644 --- a/airbyte_cdk/sources/file_based/file_based_source.py +++ b/airbyte_cdk/sources/file_based/file_based_source.py @@ -282,9 +282,9 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: and hasattr(self, "_concurrency_level") and self._concurrency_level is not None ): - assert ( - state_manager is not None - ), "No ConnectorStateManager was created, but it is required for incremental syncs. This is unexpected. Please contact Support." + assert state_manager is not None, ( + "No ConnectorStateManager was created, but it is required for incremental syncs. This is unexpected. Please contact Support." + ) cursor = self.cursor_cls( stream_config, diff --git a/airbyte_cdk/sources/file_based/stream/concurrent/adapters.py b/airbyte_cdk/sources/file_based/stream/concurrent/adapters.py index 16f8b0075..fd8eef9b0 100644 --- a/airbyte_cdk/sources/file_based/stream/concurrent/adapters.py +++ b/airbyte_cdk/sources/file_based/stream/concurrent/adapters.py @@ -277,9 +277,9 @@ def read(self) -> Iterable[Record]: def to_slice(self) -> Optional[Mapping[str, Any]]: if self._slice is None: return None - assert ( - len(self._slice["files"]) == 1 - ), f"Expected 1 file per partition but got {len(self._slice['files'])} for stream {self.stream_name()}" + assert len(self._slice["files"]) == 1, ( + f"Expected 1 file per partition but got {len(self._slice['files'])} for stream {self.stream_name()}" + ) file = self._slice["files"][0] return {"files": [file]} diff --git a/airbyte_cdk/sql/shared/sql_processor.py b/airbyte_cdk/sql/shared/sql_processor.py index f0cda3945..238ff6c69 100644 --- a/airbyte_cdk/sql/shared/sql_processor.py +++ b/airbyte_cdk/sql/shared/sql_processor.py @@ -326,9 +326,9 @@ def _ensure_schema_exists( if DEBUG_MODE: found_schemas = schemas_list - assert ( - schema_name in found_schemas - ), f"Schema {schema_name} was not created. Found: {found_schemas}" + assert schema_name in found_schemas, ( + f"Schema {schema_name} was not created. Found: {found_schemas}" + ) def _quote_identifier(self, identifier: str) -> str: """Return the given identifier, quoted.""" diff --git a/unit_tests/connector_builder/test_connector_builder_handler.py b/unit_tests/connector_builder/test_connector_builder_handler.py index d547aabd8..643878eec 100644 --- a/unit_tests/connector_builder/test_connector_builder_handler.py +++ b/unit_tests/connector_builder/test_connector_builder_handler.py @@ -1267,9 +1267,9 @@ def test_handle_read_external_requests(deployment_mode, url_base, expected_error source, config, catalog, _A_PER_PARTITION_STATE, limits ).record.data if expected_error: - assert ( - len(output_data["logs"]) > 0 - ), "Expected at least one log message with the expected error" + assert len(output_data["logs"]) > 0, ( + "Expected at least one log message with the expected error" + ) error_message = output_data["logs"][0] assert error_message["level"] == "ERROR" assert expected_error in error_message["stacktrace"] @@ -1363,9 +1363,9 @@ def test_handle_read_external_oauth_request(deployment_mode, token_url, expected source, config, catalog, _A_PER_PARTITION_STATE, limits ).record.data if expected_error: - assert ( - len(output_data["logs"]) > 0 - ), "Expected at least one log message with the expected error" + assert len(output_data["logs"]) > 0, ( + "Expected at least one log message with the expected error" + ) error_message = output_data["logs"][0] assert error_message["level"] == "ERROR" assert expected_error in error_message["stacktrace"] diff --git a/unit_tests/destinations/test_destination.py b/unit_tests/destinations/test_destination.py index 14f52be15..1f8f6573f 100644 --- a/unit_tests/destinations/test_destination.py +++ b/unit_tests/destinations/test_destination.py @@ -58,9 +58,9 @@ def test_successful_parse( self, arg_list: List[str], expected_output: Mapping[str, Any], destination: Destination ): parsed_args = vars(destination.parse_args(arg_list)) - assert ( - parsed_args == expected_output - ), f"Expected parsing {arg_list} to return parsed args {expected_output} but instead found {parsed_args}" + assert parsed_args == expected_output, ( + f"Expected parsing {arg_list} to return parsed args {expected_output} but instead found {parsed_args}" + ) @pytest.mark.parametrize( ("arg_list"), diff --git a/unit_tests/legacy/sources/declarative/partition_routers/test_parent_state_stream.py b/unit_tests/legacy/sources/declarative/partition_routers/test_parent_state_stream.py index f69855ccc..c3cab5500 100644 --- a/unit_tests/legacy/sources/declarative/partition_routers/test_parent_state_stream.py +++ b/unit_tests/legacy/sources/declarative/partition_routers/test_parent_state_stream.py @@ -340,10 +340,11 @@ def run_incremental_parent_state_test( expected_records_set = list( {orjson.dumps(record): record for record in expected_records}.values() ) - assert ( - sorted(cumulative_records_state_deduped, key=lambda x: orjson.dumps(x)) - == sorted(expected_records_set, key=lambda x: orjson.dumps(x)) - ), f"Records mismatch with intermediate state {state}. Expected {expected_records}, got {cumulative_records_state_deduped}" + assert sorted( + cumulative_records_state_deduped, key=lambda x: orjson.dumps(x) + ) == sorted(expected_records_set, key=lambda x: orjson.dumps(x)), ( + f"Records mismatch with intermediate state {state}. Expected {expected_records}, got {cumulative_records_state_deduped}" + ) # Store the final state after each intermediate read final_state_intermediate = [ @@ -355,9 +356,9 @@ def run_incremental_parent_state_test( # Assert that the final state matches the expected state for all runs for i, final_state in enumerate(final_states): - assert ( - final_state in expected_states - ), f"Final state mismatch at run {i + 1}. Expected {expected_states}, got {final_state}" + assert final_state in expected_states, ( + f"Final state mismatch at run {i + 1}. Expected {expected_states}, got {final_state}" + ) @pytest.mark.parametrize( diff --git a/unit_tests/source_declarative_manifest/test_source_declarative_w_custom_components.py b/unit_tests/source_declarative_manifest/test_source_declarative_w_custom_components.py index d821ce3e6..e7819624d 100644 --- a/unit_tests/source_declarative_manifest/test_source_declarative_w_custom_components.py +++ b/unit_tests/source_declarative_manifest/test_source_declarative_w_custom_components.py @@ -97,9 +97,9 @@ def get_py_components_config_dict( manifest_dict = yaml.safe_load(manifest_yaml_path.read_text()) assert manifest_dict, "Failed to load the manifest file." - assert isinstance( - manifest_dict, Mapping - ), f"Manifest file is type {type(manifest_dict).__name__}, not a mapping: {manifest_dict}" + assert isinstance(manifest_dict, Mapping), ( + f"Manifest file is type {type(manifest_dict).__name__}, not a mapping: {manifest_dict}" + ) custom_py_code = custom_py_code_path.read_text() combined_config_dict = { diff --git a/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py b/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py index e6b05c51c..cb774bda7 100644 --- a/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py +++ b/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py @@ -389,9 +389,9 @@ def run_mocked_test( request_count = len( [req for req in m.request_history if unquote(req.url) == unquote(url)] ) - assert ( - request_count == 1 - ), f"URL {url} was called {request_count} times, expected exactly once." + assert request_count == 1, ( + f"URL {url} was called {request_count} times, expected exactly once." + ) def _run_read( @@ -1137,10 +1137,11 @@ def run_incremental_parent_state_test( expected_records_set = list( {orjson.dumps(record): record for record in expected_records}.values() ) - assert ( - sorted(cumulative_records_state_deduped, key=lambda x: x["id"]) - == sorted(expected_records_set, key=lambda x: x["id"]) - ), f"Records mismatch with intermediate state {state}. Expected {expected_records}, got {cumulative_records_state_deduped}" + assert sorted(cumulative_records_state_deduped, key=lambda x: x["id"]) == sorted( + expected_records_set, key=lambda x: x["id"] + ), ( + f"Records mismatch with intermediate state {state}. Expected {expected_records}, got {cumulative_records_state_deduped}" + ) # Store the final state after each intermediate read final_state_intermediate = [ @@ -1151,9 +1152,9 @@ def run_incremental_parent_state_test( # Assert that the final state matches the expected state for all runs for i, final_state in enumerate(final_states): - assert ( - final_state in expected_states - ), f"Final state mismatch at run {i + 1}. Expected {expected_states}, got {final_state}" + assert final_state in expected_states, ( + f"Final state mismatch at run {i + 1}. Expected {expected_states}, got {final_state}" + ) @pytest.mark.parametrize( diff --git a/unit_tests/sources/declarative/interpolation/test_interpolated_string.py b/unit_tests/sources/declarative/interpolation/test_interpolated_string.py index 6114c3dba..52738b33b 100644 --- a/unit_tests/sources/declarative/interpolation/test_interpolated_string.py +++ b/unit_tests/sources/declarative/interpolation/test_interpolated_string.py @@ -62,6 +62,6 @@ def test_parsing_record_data(test_name, input_string, record_value, expected_val if expected_value is None: assert val is None, f"Expected None for value {record_value} in test {test_name}" else: - assert float == type( - val - ), f"Expected float, got {type(val)} for value {val} in test {test_name}" + assert float == type(val), ( + f"Expected float, got {type(val)} for value {val} in test {test_name}" + ) diff --git a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py index 520031e62..b543354f7 100644 --- a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py +++ b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py @@ -3437,9 +3437,9 @@ def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]: stream_state_migrations=[DummyStateMigration()], ) assert cursor.state["lookback_window"] != 10, "State migration wasn't called" - assert ( - cursor.state["lookback_window"] == 20 - ), "State migration was called, but actual state don't match expected" + assert cursor.state["lookback_window"] == 20, ( + "State migration was called, but actual state don't match expected" + ) def test_create_concurrent_cursor_uses_min_max_datetime_format_if_defined(): diff --git a/unit_tests/sources/declarative/partition_routers/test_grouping_partition_router.py b/unit_tests/sources/declarative/partition_routers/test_grouping_partition_router.py index a75a48966..9bea606e4 100644 --- a/unit_tests/sources/declarative/partition_routers/test_grouping_partition_router.py +++ b/unit_tests/sources/declarative/partition_routers/test_grouping_partition_router.py @@ -269,9 +269,9 @@ def __next__(self): cursor_slice={}, extra_fields={"name": ["Board 0", "Board 1"], "owner": ["User0", "User1"]}, ) - assert ( - controlled_iter.yield_count == 2 - ), "Only 2 slices should be yielded to form the first group" + assert controlled_iter.yield_count == 2, ( + "Only 2 slices should be yielded to form the first group" + ) # Get the second slice second_slice = next(slices_iter) @@ -280,9 +280,9 @@ def __next__(self): cursor_slice={}, extra_fields={"name": ["Board 2", "Board 3"], "owner": ["User2", "User3"]}, ) - assert ( - controlled_iter.yield_count == 4 - ), "Only 4 slices should be yielded up to the second group" + assert controlled_iter.yield_count == 4, ( + "Only 4 slices should be yielded up to the second group" + ) # Exhaust the iterator remaining_slices = list(slices_iter) @@ -293,9 +293,9 @@ def __next__(self): extra_fields={"name": ["Board 4"], "owner": ["User4"]}, ) ] - assert ( - controlled_iter.yield_count == 5 - ), "All 5 slices should be yielded after exhausting the iterator" + assert controlled_iter.yield_count == 5, ( + "All 5 slices should be yielded after exhausting the iterator" + ) def test_set_initial_state_delegation(mock_config, mock_underlying_router): diff --git a/unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py b/unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py index fce217136..122c8dfae 100644 --- a/unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py +++ b/unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py @@ -1118,13 +1118,13 @@ def test_substream_using_resumable_full_refresh_parent_stream_slices(use_increme # validate final state for closed substream slices final_state = substream_cursor_slicer.get_stream_state() if not use_incremental_dependency: - assert ( - final_state["states"] == expected_substream_state["states"] - ), "State for substreams is not valid!" + assert final_state["states"] == expected_substream_state["states"], ( + "State for substreams is not valid!" + ) else: - assert ( - final_state == expected_substream_state - ), "State for substreams with incremental dependency is not valid!" + assert final_state == expected_substream_state, ( + "State for substreams with incremental dependency is not valid!" + ) @pytest.mark.parametrize( diff --git a/unit_tests/sources/declarative/test_concurrent_declarative_source.py b/unit_tests/sources/declarative/test_concurrent_declarative_source.py index 1102468b7..8fadc04fd 100644 --- a/unit_tests/sources/declarative/test_concurrent_declarative_source.py +++ b/unit_tests/sources/declarative/test_concurrent_declarative_source.py @@ -1471,9 +1471,9 @@ def test_concurrent_declarative_source_runs_state_migrations_provided_in_manifes source_config=manifest, config=_CONFIG, catalog=_CATALOG, state=state ) concurrent_streams, synchronous_streams = source._group_streams(_CONFIG) - assert ( - concurrent_streams[0].cursor.state.get("state") != state_blob.__dict__ - ), "State was not migrated." + assert concurrent_streams[0].cursor.state.get("state") != state_blob.__dict__, ( + "State was not migrated." + ) assert concurrent_streams[0].cursor.state.get("states") == [ {"cursor": {"updated_at": "2024-08-21"}, "partition": {"type": "type_1"}}, {"cursor": {"updated_at": "2024-08-21"}, "partition": {"type": "type_2"}}, diff --git a/unit_tests/sources/file_based/test_scenarios.py b/unit_tests/sources/file_based/test_scenarios.py index 214dd0872..d70b7f4ef 100644 --- a/unit_tests/sources/file_based/test_scenarios.py +++ b/unit_tests/sources/file_based/test_scenarios.py @@ -122,9 +122,9 @@ def _verify_read_output(output: EntrypointOutput, scenario: TestScenario[Abstrac expected_states = list(filter(lambda e: "data" not in e, expected_records)) states = list(filter(lambda r: r.state, records_and_state_messages)) - assert ( - len(states) > 0 - ), "No state messages emitted. Successful syncs should emit at least one stream state." + assert len(states) > 0, ( + "No state messages emitted. Successful syncs should emit at least one stream state." + ) _verify_state_record_counts(sorted_records, states) if hasattr(scenario.source, "cursor_cls") and issubclass( @@ -182,9 +182,9 @@ def _verify_analytics( expected_analytics: Optional[List[AirbyteAnalyticsTraceMessage]], ) -> None: if expected_analytics: - assert ( - len(analytics) == len(expected_analytics) - ), f"Number of actual analytics messages ({len(analytics)}) did not match expected ({len(expected_analytics)})" + assert len(analytics) == len(expected_analytics), ( + f"Number of actual analytics messages ({len(analytics)}) did not match expected ({len(expected_analytics)})" + ) for actual, expected in zip(analytics, expected_analytics): actual_type, actual_value = actual.trace.analytics.type, actual.trace.analytics.value expected_type = expected.type diff --git a/unit_tests/sources/streams/test_call_rate.py b/unit_tests/sources/streams/test_call_rate.py index 853e2997e..b99905870 100644 --- a/unit_tests/sources/streams/test_call_rate.py +++ b/unit_tests/sources/streams/test_call_rate.py @@ -268,9 +268,9 @@ def test_limit_rate(self): with pytest.raises(CallRateLimitHit) as excinfo2: policy.try_acquire("call", weight=1), "call over limit" - assert ( - excinfo2.value.time_to_wait < excinfo1.value.time_to_wait - ), "time to wait must decrease over time" + assert excinfo2.value.time_to_wait < excinfo1.value.time_to_wait, ( + "time to wait must decrease over time" + ) def test_limit_rate_support_custom_weight(self): """try_acquire must take into account provided weight and throw CallRateLimitHit when hit the limit.""" @@ -279,9 +279,9 @@ def test_limit_rate_support_custom_weight(self): policy.try_acquire("call", weight=2), "1st call with weight of 2" with pytest.raises(CallRateLimitHit) as excinfo: policy.try_acquire("call", weight=9), "2nd call, over limit since 2 + 9 = 11 > 10" - assert excinfo.value.time_to_wait.total_seconds() == pytest.approx( - 60, 0.1 - ), "should wait 1 minute before next call" + assert excinfo.value.time_to_wait.total_seconds() == pytest.approx(60, 0.1), ( + "should wait 1 minute before next call" + ) def test_multiple_limit_rates(self): """try_acquire must take into all call rates and apply stricter.""" diff --git a/unit_tests/sources/streams/test_stream_read.py b/unit_tests/sources/streams/test_stream_read.py index 8181925e8..cf550f8cf 100644 --- a/unit_tests/sources/streams/test_stream_read.py +++ b/unit_tests/sources/streams/test_stream_read.py @@ -758,9 +758,9 @@ def test_configured_json_schema_with_invalid_properties(): assert old_user_insights not in configured_json_schema_properties assert old_feature_info not in configured_json_schema_properties for stream_schema_property in stream_schema["properties"]: - assert ( - stream_schema_property in configured_json_schema_properties - ), f"Stream schema property: {stream_schema_property} missing in configured schema" + assert stream_schema_property in configured_json_schema_properties, ( + f"Stream schema property: {stream_schema_property} missing in configured schema" + ) assert ( stream_schema["properties"][stream_schema_property] == configured_json_schema_properties[stream_schema_property] diff --git a/unit_tests/test_entrypoint.py b/unit_tests/test_entrypoint.py index 7a4afd968..520131881 100644 --- a/unit_tests/test_entrypoint.py +++ b/unit_tests/test_entrypoint.py @@ -830,9 +830,9 @@ def test_handle_record_counts( assert message_count == expected_records_by_stream[stream_descriptor] if actual_message.type == Type.STATE: - assert isinstance( - actual_message.state.sourceStats.recordCount, float - ), "recordCount value should be expressed as a float" + assert isinstance(actual_message.state.sourceStats.recordCount, float), ( + "recordCount value should be expressed as a float" + ) def test_given_serialization_error_using_orjson_then_fallback_on_json( diff --git a/unit_tests/test_exception_handler.py b/unit_tests/test_exception_handler.py index 3efc6d57e..1a5f70d8b 100644 --- a/unit_tests/test_exception_handler.py +++ b/unit_tests/test_exception_handler.py @@ -62,24 +62,24 @@ def test_uncaught_exception_handler(): log_output, trace_output = stdout_lines out_log_message = AirbyteMessageSerializer.load(json.loads(log_output)) - assert ( - traceback_start in out_log_message.log.message - ), "Log message should contain traceback start" - assert ( - file_reference in out_log_message.log.message - ), "Log message should contain file reference" - assert ( - exception_message in out_log_message.log.message - ), "Log message should contain expected exception message" + assert traceback_start in out_log_message.log.message, ( + "Log message should contain traceback start" + ) + assert file_reference in out_log_message.log.message, ( + "Log message should contain file reference" + ) + assert exception_message in out_log_message.log.message, ( + "Log message should contain expected exception message" + ) out_trace_message = AirbyteMessageSerializer.load(json.loads(trace_output)) assert out_trace_message.trace.emitted_at > 0 - assert ( - traceback_start in out_trace_message.trace.error.stack_trace - ), "Trace message should contain traceback start" - assert ( - file_reference in out_trace_message.trace.error.stack_trace - ), "Trace message should contain file reference" - assert ( - out_trace_message.trace.error.internal_message == exception_message - ), "Trace message should contain expected exception message" + assert traceback_start in out_trace_message.trace.error.stack_trace, ( + "Trace message should contain traceback start" + ) + assert file_reference in out_trace_message.trace.error.stack_trace, ( + "Trace message should contain file reference" + ) + assert out_trace_message.trace.error.internal_message == exception_message, ( + "Trace message should contain expected exception message" + ) diff --git a/unit_tests/test_secure_logger.py b/unit_tests/test_secure_logger.py index 0237091fe..757a069c7 100644 --- a/unit_tests/test_secure_logger.py +++ b/unit_tests/test_secure_logger.py @@ -203,12 +203,12 @@ def read( list(entrypoint.run(parsed_args)) except Exception: sys.excepthook(*sys.exc_info()) - assert ( - I_AM_A_SECRET_VALUE not in capsys.readouterr().out - ), "Should have filtered non-secret value from exception trace message" - assert ( - I_AM_A_SECRET_VALUE not in caplog.text - ), "Should have filtered secret value from exception log message" + assert I_AM_A_SECRET_VALUE not in capsys.readouterr().out, ( + "Should have filtered non-secret value from exception trace message" + ) + assert I_AM_A_SECRET_VALUE not in caplog.text, ( + "Should have filtered secret value from exception log message" + ) def test_non_airbyte_secrets_are_not_masked_on_uncaught_exceptions(mocker, caplog, capsys): @@ -257,9 +257,9 @@ def read( list(entrypoint.run(parsed_args)) except Exception: sys.excepthook(*sys.exc_info()) - assert ( - NOT_A_SECRET_VALUE in capsys.readouterr().out - ), "Should not have filtered non-secret value from exception trace message" - assert ( - NOT_A_SECRET_VALUE in caplog.text - ), "Should not have filtered non-secret value from exception log message" + assert NOT_A_SECRET_VALUE in capsys.readouterr().out, ( + "Should not have filtered non-secret value from exception trace message" + ) + assert NOT_A_SECRET_VALUE in caplog.text, ( + "Should not have filtered non-secret value from exception log message" + ) diff --git a/unit_tests/utils/test_secret_utils.py b/unit_tests/utils/test_secret_utils.py index 73c93e670..d6f4f4563 100644 --- a/unit_tests/utils/test_secret_utils.py +++ b/unit_tests/utils/test_secret_utils.py @@ -150,9 +150,9 @@ def test_get_secret_paths(spec, expected): ], ) def test_get_secrets(spec, config, expected): - assert ( - get_secrets(spec, config) == expected - ), f"Expected the spec {spec} and config {config} to produce {expected}" + assert get_secrets(spec, config) == expected, ( + f"Expected the spec {spec} and config {config} to produce {expected}" + ) def test_secret_filtering(): From ed9ab555f0d7fd4fabb67d3d6b2989e8da262c4b Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 29 Aug 2025 04:03:46 +0000 Subject: [PATCH 05/10] fix: add missing source_config parameter to ConcurrentDeclarativeSource constructor in test Co-Authored-By: AJ Steers --- .../sources/declarative/test_manifest_declarative_source.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/unit_tests/legacy/sources/declarative/test_manifest_declarative_source.py b/unit_tests/legacy/sources/declarative/test_manifest_declarative_source.py index 2b7c34fb4..0a03c2e53 100644 --- a/unit_tests/legacy/sources/declarative/test_manifest_declarative_source.py +++ b/unit_tests/legacy/sources/declarative/test_manifest_declarative_source.py @@ -2222,7 +2222,7 @@ def _run_read(manifest: Mapping[str, Any], stream_name: str) -> List[AirbyteMess source = ConcurrentDeclarativeSource( catalog=catalog, config=config, - manifest=manifest, + source_config=manifest, state=state, ) return list(source.read(logger, {}, catalog, state)) From f07166e618e55ab16397cf920763ce9a65efb34a Mon Sep 17 00:00:00 2001 From: "Aaron (\"AJ\") Steers" Date: Thu, 28 Aug 2025 22:14:51 -0700 Subject: [PATCH 06/10] preserve positional order of constructor, force kwargs where necessary for correctness --- .../sources/declarative/concurrent_declarative_source.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index e9b48e1dd..585073abf 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -171,8 +171,9 @@ def __init__( self, catalog: Optional[ConfiguredAirbyteCatalog], config: Optional[Mapping[str, Any]], - source_config: ConnectionDefinition, state: Optional[List[AirbyteStateMessage]] = None, + *, + source_config: ConnectionDefinition, debug: bool = False, emit_connector_builder_messages: bool = False, migrate_manifest: bool = False, From 35e9e5035829515b8b3818d4b12d8f7844c6c4f3 Mon Sep 17 00:00:00 2001 From: "Aaron (\"AJ\") Steers" Date: Thu, 28 Aug 2025 22:17:28 -0700 Subject: [PATCH 07/10] explcit default to None for catalog and config --- .../sources/declarative/concurrent_declarative_source.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index 585073abf..2ebd8fec4 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -169,8 +169,8 @@ class ConcurrentDeclarativeSource(AbstractSource): def __init__( self, - catalog: Optional[ConfiguredAirbyteCatalog], - config: Optional[Mapping[str, Any]], + catalog: Optional[ConfiguredAirbyteCatalog] = None, + config: Optional[Mapping[str, Any]] = None, state: Optional[List[AirbyteStateMessage]] = None, *, source_config: ConnectionDefinition, From 47577eff1e55dc3c000bd562564505dc3002b54e Mon Sep 17 00:00:00 2001 From: "Aaron (\"AJ\") Steers" Date: Fri, 29 Aug 2025 09:40:00 -0700 Subject: [PATCH 08/10] update create_source() to properly declare optional inputs --- airbyte_cdk/connector_builder/connector_builder_handler.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/airbyte_cdk/connector_builder/connector_builder_handler.py b/airbyte_cdk/connector_builder/connector_builder_handler.py index 00823d0b8..927202ce4 100644 --- a/airbyte_cdk/connector_builder/connector_builder_handler.py +++ b/airbyte_cdk/connector_builder/connector_builder_handler.py @@ -62,9 +62,9 @@ def should_normalize_manifest(config: Mapping[str, Any]) -> bool: def create_source( config: Mapping[str, Any], - limits: TestLimits, - catalog: Optional[ConfiguredAirbyteCatalog], - state: Optional[List[AirbyteStateMessage]], + limits: TestLimits | None = None, + catalog: ConfiguredAirbyteCatalog | None = None, + state: AirbyteStateMessage | None = None, ) -> ConcurrentDeclarativeSource: manifest = config["__injected_declarative_manifest"] From d1d64c34c54c290d697a526e891e15625561ddd6 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 29 Aug 2025 18:59:10 +0000 Subject: [PATCH 09/10] fix: correct return type annotation in get_config_and_catalog_from_args to resolve MyPy error Co-Authored-By: AJ Steers --- airbyte_cdk/connector_builder/main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte_cdk/connector_builder/main.py b/airbyte_cdk/connector_builder/main.py index ba9ded0dd..06e0e3d09 100644 --- a/airbyte_cdk/connector_builder/main.py +++ b/airbyte_cdk/connector_builder/main.py @@ -34,7 +34,7 @@ def get_config_and_catalog_from_args( args: List[str], -) -> Tuple[str, Mapping[str, Any], Optional[ConfiguredAirbyteCatalog], Any]: +) -> Tuple[str, Mapping[str, Any], Optional[ConfiguredAirbyteCatalog], List[AirbyteStateMessage]]: # TODO: Add functionality for the `debug` logger. # Currently, no one `debug` level log will be displayed during `read` a stream for a connector created through `connector-builder`. parsed_args = AirbyteEntrypoint.parse_args(args) From d1b1e4baf65d1b4f22f66dd845ca7f5521e8b229 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 29 Aug 2025 19:02:53 +0000 Subject: [PATCH 10/10] fix: correct state parameter type annotation in create_source to match ConcurrentDeclarativeSource signature Co-Authored-By: AJ Steers --- airbyte_cdk/connector_builder/connector_builder_handler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte_cdk/connector_builder/connector_builder_handler.py b/airbyte_cdk/connector_builder/connector_builder_handler.py index 927202ce4..e63c200c1 100644 --- a/airbyte_cdk/connector_builder/connector_builder_handler.py +++ b/airbyte_cdk/connector_builder/connector_builder_handler.py @@ -64,7 +64,7 @@ def create_source( config: Mapping[str, Any], limits: TestLimits | None = None, catalog: ConfiguredAirbyteCatalog | None = None, - state: AirbyteStateMessage | None = None, + state: List[AirbyteStateMessage] | None = None, ) -> ConcurrentDeclarativeSource: manifest = config["__injected_declarative_manifest"]