Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
7937514
feat: make psutil, rapidfuzz, and google-cloud-secret-manager optiona…
devin-ai-integration[bot] Jul 23, 2025
8bc5492
style: auto-format declarative component schema
devin-ai-integration[bot] Jul 23, 2025
5d46da6
fix: completely remove psutil and rapidfuzz dependencies
devin-ai-integration[bot] Jul 23, 2025
68490dc
feat: Add WASM compatibility fallback from serpyco-rs to serpyco
devin-ai-integration[bot] Jul 23, 2025
14f1ce3
revert formatting on generated file
aaronsteers Jul 23, 2025
20a43d6
clean up pyproject diff
aaronsteers Jul 23, 2025
07f0b2a
move install condition to a constant
aaronsteers Jul 23, 2025
ebc5412
Merge remote-tracking branch 'origin/devin/1753304576-wasm-compatibil…
aaronsteers Jul 23, 2025
ca46edf
resolve pytz and pandas version conflicts in wasm
aaronsteers Jul 23, 2025
62925ee
widen numpy version
aaronsteers Jul 23, 2025
bda9765
poetry lock
aaronsteers Jul 23, 2025
c5b0767
try conditional whenever install (temporary)
aaronsteers Jul 23, 2025
7468e2e
remove 'serpyco' compatibility import (no pure wheels)
aaronsteers Jul 23, 2025
a648429
allow newer version of cryptography
aaronsteers Jul 23, 2025
bc1fb21
fall back to basic custom serializer
aaronsteers Jul 23, 2025
814136b
remove more custom serpyco logic
aaronsteers Jul 24, 2025
4db3c8b
add missing callable class import
aaronsteers Jul 24, 2025
ca18c56
rework whenever dependency
aaronsteers Jul 24, 2025
b924fb2
allow dynamic inputs to run()
aaronsteers Jul 24, 2025
b18e7c2
cherry-pick-me: replace serializer classes with helper functions
aaronsteers Jul 24, 2025
b3807e3
fix circular ref
aaronsteers Jul 24, 2025
cbe0ba4
use dacite, add a wasm test html file
aaronsteers Jul 24, 2025
833429c
update script
aaronsteers Jul 24, 2025
e389081
improve script logic
aaronsteers Jul 24, 2025
df11705
split into three buttons
aaronsteers Jul 24, 2025
70f7f43
whitespace cleanup
aaronsteers Jul 24, 2025
94e073f
add airbyte_cdk.connector_builder.main:run
aaronsteers Jul 24, 2025
974d7d4
updated wasm test page
aaronsteers Jul 24, 2025
595581b
switch airbyte-protocol package back to pydantic
aaronsteers Jul 24, 2025
1ac75c6
remove stray import
aaronsteers Jul 24, 2025
005b079
remove old 'airbyte_protocol_dataclasses' refs
aaronsteers Jul 24, 2025
d62072a
fix refs
aaronsteers Jul 24, 2025
cfeda8c
fix missing import
aaronsteers Jul 24, 2025
f33e4a3
try forcing reduced concurrency
aaronsteers Jul 24, 2025
e0a1dac
swap to rick-and-morty via gist
aaronsteers Jul 24, 2025
b90f3f5
reword
aaronsteers Jul 29, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,6 @@ dist

# ASDF tool versions files
.tool-versions

# Pyodide build artifacts
.pyodide-xbuildenv
65 changes: 30 additions & 35 deletions airbyte_cdk/cli/source_declarative_manifest/_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@
from airbyte_cdk.models import (
AirbyteErrorTraceMessage,
AirbyteMessage,
AirbyteMessageSerializer,
AirbyteStateMessage,
AirbyteTraceMessage,
ConfiguredAirbyteCatalog,
ConnectorSpecificationSerializer,
TraceType,
Type,
ab_connector_spec_from_string,
ab_message_to_string,
)
from airbyte_cdk.sources.declarative.concurrent_declarative_source import (
ConcurrentDeclarativeSource,
Expand Down Expand Up @@ -105,21 +105,19 @@ def _get_local_yaml_source(args: list[str]) -> SourceLocalYaml:
)
except Exception as error:
print(
orjson.dumps(
AirbyteMessageSerializer.dump(
AirbyteMessage(
type=Type.TRACE,
trace=AirbyteTraceMessage(
type=TraceType.ERROR,
emitted_at=ab_datetime_now().to_epoch_millis(),
error=AirbyteErrorTraceMessage(
message=f"Error starting the sync. This could be due to an invalid configuration or catalog. Please contact Support for assistance. Error: {error}",
stack_trace=traceback.format_exc(),
),
ab_message_to_string(
AirbyteMessage(
type=Type.TRACE,
trace=AirbyteTraceMessage(
type=TraceType.ERROR,
emitted_at=ab_datetime_now().to_epoch_millis(),
error=AirbyteErrorTraceMessage(
message=f"Error starting the sync. This could be due to an invalid configuration or catalog. Please contact Support for assistance. Error: {error}",
stack_trace=traceback.format_exc(),
),
)
)
).decode()
),
),
)
)
raise error

Expand Down Expand Up @@ -149,11 +147,10 @@ def handle_remote_manifest_command(args: list[str]) -> None:
"Could not find `spec.json` file for source-declarative-manifest"
)

spec_obj = json.loads(json_spec)
spec = ConnectorSpecificationSerializer.load(spec_obj)
spec = ab_connector_spec_from_string(json_spec.decode("utf-8"))

message = AirbyteMessage(type=Type.SPEC, spec=spec)
print(AirbyteEntrypoint.airbyte_message_to_string(message))
print(ab_message_to_string(message))
else:
source = create_declarative_source(args)
launch(
Expand Down Expand Up @@ -215,21 +212,19 @@ def create_declarative_source(
)
except Exception as error:
print(
orjson.dumps(
AirbyteMessageSerializer.dump(
AirbyteMessage(
type=Type.TRACE,
trace=AirbyteTraceMessage(
type=TraceType.ERROR,
emitted_at=ab_datetime_now().to_epoch_millis(),
error=AirbyteErrorTraceMessage(
message=f"Error starting the sync. This could be due to an invalid configuration or catalog. Please contact Support for assistance. Error: {error}",
stack_trace=traceback.format_exc(),
),
ab_message_to_string(
AirbyteMessage(
type=Type.TRACE,
trace=AirbyteTraceMessage(
type=TraceType.ERROR,
emitted_at=ab_datetime_now().to_epoch_millis(),
error=AirbyteErrorTraceMessage(
message=f"Error starting the sync. This could be due to an invalid configuration or catalog. Please contact Support for assistance. Error: {error}",
stack_trace=traceback.format_exc(),
),
)
)
).decode()
),
),
),
)
raise error

Expand Down Expand Up @@ -298,10 +293,10 @@ def _register_components_from_file(filepath: str) -> None:
spec.loader.exec_module(module)


def run() -> None:
def run(args: list[str] | None = None) -> None:
"""Run the `source-declarative-manifest` CLI.

Args are detected from the command line, and the appropriate command is executed.
"""
args: list[str] = sys.argv[1:]
args = args or sys.argv[1:]
handle_command(args)
4 changes: 2 additions & 2 deletions airbyte_cdk/config_observation.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
AirbyteControlConnectorConfigMessage,
AirbyteControlMessage,
AirbyteMessage,
AirbyteMessageSerializer,
OrchestratorType,
Type,
ab_message_to_string,
)


Expand Down Expand Up @@ -92,7 +92,7 @@ def emit_configuration_as_airbyte_control_message(config: MutableMapping[str, An
See the airbyte_cdk.sources.message package
"""
airbyte_message = create_connector_config_control_message(config)
print(orjson.dumps(AirbyteMessageSerializer.dump(airbyte_message)).decode())
print(ab_message_to_string(airbyte_message))


def create_connector_config_control_message(config: MutableMapping[str, Any]) -> AirbyteMessage:
Expand Down
3 changes: 1 addition & 2 deletions airbyte_cdk/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
from airbyte_cdk.models import (
AirbyteConnectionStatus,
ConnectorSpecification,
ConnectorSpecificationSerializer,
)


Expand Down Expand Up @@ -95,7 +94,7 @@ def spec(self, logger: logging.Logger) -> ConnectorSpecification:
else:
raise FileNotFoundError("Unable to find spec.yaml or spec.json in the package.")

return ConnectorSpecificationSerializer.load(spec_obj)
return ConnectorSpecification.model_validate(spec_obj)

@abstractmethod
def check(self, logger: logging.Logger, config: TConfig) -> AirbyteConnectionStatus:
Expand Down
29 changes: 18 additions & 11 deletions airbyte_cdk/connector_builder/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,10 @@
from airbyte_cdk.entrypoint import AirbyteEntrypoint
from airbyte_cdk.models import (
AirbyteMessage,
AirbyteMessageSerializer,
AirbyteStateMessage,
ConfiguredAirbyteCatalog,
ConfiguredAirbyteCatalogSerializer,
)
from airbyte_cdk.models.airbyte_protocol_serializers import ab_message_to_string
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
from airbyte_cdk.sources.source import Source
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
Expand Down Expand Up @@ -53,7 +52,7 @@ def get_config_and_catalog_from_args(

command = config["__command"]
if command == "test_read":
catalog = ConfiguredAirbyteCatalogSerializer.load(BaseConnector.read_config(catalog_path))
catalog = ConfiguredAirbyteCatalog.model_validate(BaseConnector.read_config(catalog_path))
state = Source.read_state(state_path)
else:
catalog = None
Expand Down Expand Up @@ -92,19 +91,27 @@ def handle_request(args: List[str]) -> str:
command, config, catalog, state = get_config_and_catalog_from_args(args)
limits = get_limits(config)
source = create_source(config, limits)
return orjson.dumps(
AirbyteMessageSerializer.dump(
handle_connector_builder_request(source, command, config, catalog, state, limits)
)
).decode() # type: ignore[no-any-return] # Serializer.dump() always returns AirbyteMessage
return ab_message_to_string(
handle_connector_builder_request(source, command, config, catalog, state, limits)
)

def run(args: list[str] | None) -> None:
"""Run the connector builder handler."""
if args is None:
args = sys.argv[1:]

if __name__ == "__main__":
try:
print(handle_request(sys.argv[1:]))
result = handle_request(args)
print(result)
except Exception as exc:
error = AirbyteTracedException.from_exception(
exc, message=f"Error handling request: {str(exc)}"
)
m = error.as_airbyte_message()
print(orjson.dumps(AirbyteMessageSerializer.dump(m)).decode())
print(ab_message_to_string(m))
sys.exit(1)


if __name__ == "__main__":
run(sys.argv[1:])
sys.exit(1)
14 changes: 7 additions & 7 deletions airbyte_cdk/destinations/destination.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
from airbyte_cdk.exception_handler import init_uncaught_exception_handler
from airbyte_cdk.models import (
AirbyteMessage,
AirbyteMessageSerializer,
ConfiguredAirbyteCatalog,
ConfiguredAirbyteCatalogSerializer,
Type,
ab_configured_catalog_from_string,
ab_configured_catalog_to_string,
ab_message_from_string,
ab_message_to_string,
)
from airbyte_cdk.sources.utils.schema_helpers import check_config_against_spec_or_exit
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
Expand Down Expand Up @@ -46,7 +48,7 @@ def _parse_input_stream(self, input_stream: io.TextIOWrapper) -> Iterable[Airbyt
"""Reads from stdin, converting to Airbyte messages"""
for line in input_stream:
try:
yield AirbyteMessageSerializer.load(orjson.loads(line))
yield ab_message_from_string(line)
except orjson.JSONDecodeError:
logger.info(
f"ignoring input which can't be deserialized as Airbyte Message: {line}"
Expand All @@ -58,9 +60,7 @@ def _run_write(
configured_catalog_path: str,
input_stream: io.TextIOWrapper,
) -> Iterable[AirbyteMessage]:
catalog = ConfiguredAirbyteCatalogSerializer.load(
orjson.loads(open(configured_catalog_path).read())
)
catalog = ab_configured_catalog_from_string(open(configured_catalog_path).read())
input_messages = self._parse_input_stream(input_stream)
logger.info("Begin writing to the destination...")
yield from self.write(
Expand Down Expand Up @@ -151,4 +151,4 @@ def run(self, args: List[str]) -> None:
parsed_args = self.parse_args(args)
output_messages = self.run_cmd(parsed_args)
for message in output_messages:
print(orjson.dumps(AirbyteMessageSerializer.dump(message)).decode())
print(ab_message_to_string(message))
31 changes: 8 additions & 23 deletions airbyte_cdk/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@
from airbyte_cdk.models import (
AirbyteConnectionStatus,
AirbyteMessage,
AirbyteMessageSerializer,
AirbyteStateStats,
ConnectorSpecification,
FailureType,
Status,
Type,
ab_message_to_string,
)
from airbyte_cdk.sources import Source
from airbyte_cdk.sources.connector_state_manager import HashableStreamDescriptor
Expand All @@ -47,7 +47,6 @@

VALID_URL_SCHEMES = ["https"]
CLOUD_DEPLOYMENT_MODE = "cloud"
_HAS_LOGGED_FOR_SERIALIZATION_ERROR = False


class AirbyteEntrypoint(object):
Expand Down Expand Up @@ -178,41 +177,41 @@ def run(self, parsed_args: argparse.Namespace) -> Iterable[str]:
if cmd == "spec":
message = AirbyteMessage(type=Type.SPEC, spec=source_spec)
yield from [
self.airbyte_message_to_string(queued_message)
ab_message_to_string(queued_message)
for queued_message in self._emit_queued_messages(self.source)
]
yield self.airbyte_message_to_string(message)
yield ab_message_to_string(message)
else:
raw_config = self.source.read_config(parsed_args.config)
config = self.source.configure(raw_config, temp_dir)

yield from [
self.airbyte_message_to_string(queued_message)
ab_message_to_string(queued_message)
for queued_message in self._emit_queued_messages(self.source)
]
if cmd == "check":
yield from map(
AirbyteEntrypoint.airbyte_message_to_string,
ab_message_to_string,
self.check(source_spec, config),
)
elif cmd == "discover":
yield from map(
AirbyteEntrypoint.airbyte_message_to_string,
ab_message_to_string,
self.discover(source_spec, config),
)
elif cmd == "read":
config_catalog = self.source.read_catalog(parsed_args.catalog)
state = self.source.read_state(parsed_args.state)

yield from map(
AirbyteEntrypoint.airbyte_message_to_string,
ab_message_to_string,
self.read(source_spec, config, config_catalog, state),
)
else:
raise Exception("Unexpected command " + cmd)
finally:
yield from [
self.airbyte_message_to_string(queued_message)
ab_message_to_string(queued_message)
for queued_message in self._emit_queued_messages(self.source)
]

Expand Down Expand Up @@ -327,20 +326,6 @@ def set_up_secret_filter(config: TConfig, connection_specification: Mapping[str,
config_secrets = get_secrets(connection_specification, config)
update_secrets(config_secrets)

@staticmethod
def airbyte_message_to_string(airbyte_message: AirbyteMessage) -> str:
global _HAS_LOGGED_FOR_SERIALIZATION_ERROR
serialized_message = AirbyteMessageSerializer.dump(airbyte_message)
try:
return orjson.dumps(serialized_message).decode()
except Exception as exception:
if not _HAS_LOGGED_FOR_SERIALIZATION_ERROR:
logger.warning(
f"There was an error during the serialization of an AirbyteMessage: `{exception}`. This might impact the sync performances."
)
_HAS_LOGGED_FOR_SERIALIZATION_ERROR = True
return json.dumps(serialized_message)

@classmethod
def extract_state(cls, args: List[str]) -> Optional[Any]:
parsed_args = cls.parse_args(args)
Expand Down
4 changes: 2 additions & 2 deletions airbyte_cdk/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
from airbyte_cdk.models import (
AirbyteLogMessage,
AirbyteMessage,
AirbyteMessageSerializer,
Level,
Type,
ab_message_to_string,
)
from airbyte_cdk.utils import PrintBuffer
from airbyte_cdk.utils.airbyte_secrets_utils import filter_secrets
Expand Down Expand Up @@ -81,7 +81,7 @@ def format(self, record: logging.LogRecord) -> str:
log_message = AirbyteMessage(
type=Type.LOG, log=AirbyteLogMessage(level=airbyte_level, message=message)
)
return orjson.dumps(AirbyteMessageSerializer.dump(log_message)).decode()
return ab_message_to_string(log_message)

@staticmethod
def extract_extra_args_from_record(record: logging.LogRecord) -> Mapping[str, Any]:
Expand Down
13 changes: 7 additions & 6 deletions airbyte_cdk/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,13 @@
Type,
)
from .airbyte_protocol_serializers import (
AirbyteMessageSerializer,
AirbyteStateMessageSerializer,
AirbyteStreamStateSerializer,
ConfiguredAirbyteCatalogSerializer,
ConfiguredAirbyteStreamSerializer,
ConnectorSpecificationSerializer,
ab_configured_catalog_from_string,
ab_configured_catalog_to_string,
ab_connector_spec_from_string,
ab_connector_spec_to_string,
ab_message_from_string,
ab_message_to_string,
ab_state_message_to_string,
)
from .well_known_types import (
BinaryData,
Expand Down
Loading
Loading