Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions airbyte_cdk/manifest_server/api_models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
CheckResponse,
DiscoverRequest,
DiscoverResponse,
ErrorResponse,
FullResolveRequest,
ManifestResponse,
RequestContext,
Expand Down Expand Up @@ -40,6 +41,7 @@
"CheckResponse",
"DiscoverRequest",
"DiscoverResponse",
"ErrorResponse",
# Stream models
"AuxiliaryRequest",
"HttpRequest",
Expand Down
6 changes: 6 additions & 0 deletions airbyte_cdk/manifest_server/api_models/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,9 @@ class FullResolveRequest(BaseModel):
config: ConnectorConfig
stream_limit: int = Field(default=100, ge=1, le=100)
context: Optional[RequestContext] = None


class ErrorResponse(BaseModel):
"""Error response for API requests."""

detail: str
2 changes: 1 addition & 1 deletion airbyte_cdk/manifest_server/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
app = FastAPI(
title="Manifest Server",
description="A service for running low-code Airbyte connectors",
version="0.1.0",
version="0.2.0",
contact={
"name": "Airbyte",
"url": "https://airbyte.com",
Expand Down
42 changes: 41 additions & 1 deletion airbyte_cdk/manifest_server/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ info:
contact:
name: Airbyte
url: https://airbyte.com/
version: 0.1.0
version: 0.2.0
paths:
/health/:
get:
Expand Down Expand Up @@ -62,6 +62,12 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/StreamReadResponse'
'400':
description: Bad Request - Error processing request
content:
application/json:
schema:
$ref: '#/components/schemas/ErrorResponse'
'422':
description: Validation Error
content:
Expand Down Expand Up @@ -90,6 +96,12 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/CheckResponse'
'400':
description: Bad Request - Error processing request
content:
application/json:
schema:
$ref: '#/components/schemas/ErrorResponse'
'422':
description: Validation Error
content:
Expand Down Expand Up @@ -118,6 +130,12 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/DiscoverResponse'
'400':
description: Bad Request - Error processing request
content:
application/json:
schema:
$ref: '#/components/schemas/ErrorResponse'
'422':
description: Validation Error
content:
Expand Down Expand Up @@ -146,6 +164,12 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/ManifestResponse'
'400':
description: Bad Request - Error processing request
content:
application/json:
schema:
$ref: '#/components/schemas/ErrorResponse'
'422':
description: Validation Error
content:
Expand Down Expand Up @@ -180,6 +204,12 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/ManifestResponse'
'400':
description: Bad Request - Error processing request
content:
application/json:
schema:
$ref: '#/components/schemas/ErrorResponse'
'422':
description: Validation Error
content:
Expand Down Expand Up @@ -353,6 +383,16 @@ components:
- catalog
title: DiscoverResponse
description: Response to discover a manifest.
ErrorResponse:
properties:
detail:
type: string
title: Detail
type: object
required:
- detail
title: ErrorResponse
description: Error response for API requests.
FullResolveRequest:
properties:
manifest:
Expand Down
166 changes: 114 additions & 52 deletions airbyte_cdk/manifest_server/routers/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@
INJECTED_COMPONENTS_PY,
INJECTED_COMPONENTS_PY_CHECKSUMS,
)
from airbyte_cdk.utils.airbyte_secrets_utils import filter_secrets

from ..api_models import (
CheckRequest,
CheckResponse,
DiscoverRequest,
DiscoverResponse,
ErrorResponse,
FullResolveRequest,
Manifest,
ManifestResponse,
Expand Down Expand Up @@ -64,7 +66,13 @@ def safe_build_source(
)


@router.post("/test_read", operation_id="testRead")
@router.post(
"/test_read",
operation_id="testRead",
responses={
400: {"description": "Bad Request - Error processing request", "model": ErrorResponse}
},
)
def test_read(request: StreamTestReadRequest) -> StreamReadResponse:
"""
Test reading from a specific stream in the manifest.
Expand Down Expand Up @@ -109,18 +117,29 @@ def test_read(request: StreamTestReadRequest) -> StreamReadResponse:
)

runner = ManifestCommandProcessor(source)
cdk_result = runner.test_read(
config_dict,
catalog,
converted_state,
request.record_limit,
request.page_limit,
request.slice_limit,
)
return StreamReadResponse.model_validate(asdict(cdk_result))


@router.post("/check", operation_id="check")
try:
cdk_result = runner.test_read(
config_dict,
catalog,
converted_state,
request.record_limit,
request.page_limit,
request.slice_limit,
)
return StreamReadResponse.model_validate(asdict(cdk_result))
except Exception as exc:
# Filter secrets from error message before returning to client
sanitized_message = filter_secrets(f"Error reading stream: {str(exc)}")
raise HTTPException(status_code=400, detail=sanitized_message)


@router.post(
"/check",
operation_id="check",
responses={
400: {"description": "Bad Request - Error processing request", "model": ErrorResponse}
},
)
def check(request: CheckRequest) -> CheckResponse:
"""Check configuration against a manifest"""
# Apply trace tags from context if provided
Expand All @@ -130,13 +149,24 @@ def check(request: CheckRequest) -> CheckResponse:
project_id=request.context.project_id,
)

source = safe_build_source(request.manifest.model_dump(), request.config.model_dump())
runner = ManifestCommandProcessor(source)
success, message = runner.check_connection(request.config.model_dump())
return CheckResponse(success=success, message=message)


@router.post("/discover", operation_id="discover")
try:
source = safe_build_source(request.manifest.model_dump(), request.config.model_dump())
runner = ManifestCommandProcessor(source)
success, message = runner.check_connection(request.config.model_dump())
return CheckResponse(success=success, message=message)
except Exception as exc:
# Filter secrets from error message before returning to client
sanitized_message = filter_secrets(f"Error checking connection: {str(exc)}")
raise HTTPException(status_code=400, detail=sanitized_message)


@router.post(
"/discover",
operation_id="discover",
responses={
400: {"description": "Bad Request - Error processing request", "model": ErrorResponse}
},
)
def discover(request: DiscoverRequest) -> DiscoverResponse:
"""Discover streams from a manifest"""
# Apply trace tags from context if provided
Expand All @@ -146,15 +176,31 @@ def discover(request: DiscoverRequest) -> DiscoverResponse:
project_id=request.context.project_id,
)

source = safe_build_source(request.manifest.model_dump(), request.config.model_dump())
runner = ManifestCommandProcessor(source)
catalog = runner.discover(request.config.model_dump())
if catalog is None:
raise HTTPException(status_code=422, detail="Connector did not return a discovered catalog")
return DiscoverResponse(catalog=catalog)


@router.post("/resolve", operation_id="resolve")
try:
source = safe_build_source(request.manifest.model_dump(), request.config.model_dump())
runner = ManifestCommandProcessor(source)
catalog = runner.discover(request.config.model_dump())
if catalog is None:
raise HTTPException(
status_code=422, detail="Connector did not return a discovered catalog"
)
return DiscoverResponse(catalog=catalog)
except HTTPException:
# Re-raise HTTPExceptions as-is (like the catalog None check above)
Comment thread
ChristoGrab marked this conversation as resolved.
raise
except Exception as exc:
# Filter secrets from error message before returning to client
sanitized_message = filter_secrets(f"Error discovering streams: {str(exc)}")
raise HTTPException(status_code=400, detail=sanitized_message)


@router.post(
"/resolve",
operation_id="resolve",
responses={
400: {"description": "Bad Request - Error processing request", "model": ErrorResponse}
},
)
def resolve(request: ResolveRequest) -> ManifestResponse:
"""Resolve a manifest to its final configuration."""
# Apply trace tags from context if provided
Expand All @@ -164,11 +210,22 @@ def resolve(request: ResolveRequest) -> ManifestResponse:
project_id=request.context.project_id,
)

source = safe_build_source(request.manifest.model_dump(), {})
return ManifestResponse(manifest=Manifest(**source.resolved_manifest))


@router.post("/full_resolve", operation_id="fullResolve")
try:
source = safe_build_source(request.manifest.model_dump(), {})
return ManifestResponse(manifest=Manifest(**source.resolved_manifest))
except Exception as exc:
# Filter secrets from error message before returning to client
sanitized_message = filter_secrets(f"Error resolving manifest: {str(exc)}")
raise HTTPException(status_code=400, detail=sanitized_message)


@router.post(
"/full_resolve",
operation_id="fullResolve",
responses={
400: {"description": "Bad Request - Error processing request", "model": ErrorResponse}
},
)
def full_resolve(request: FullResolveRequest) -> ManifestResponse:
"""
Fully resolve a manifest, including dynamic streams.
Expand All @@ -182,21 +239,26 @@ def full_resolve(request: FullResolveRequest) -> ManifestResponse:
project_id=request.context.project_id,
)

source = safe_build_source(request.manifest.model_dump(), request.config.model_dump())
manifest = {**source.resolved_manifest}
streams = manifest.get("streams", [])
for stream in streams:
stream["dynamic_stream_name"] = None

mapped_streams: Dict[str, List[Dict[str, Any]]] = {}
for stream in source.dynamic_streams:
generated_streams = mapped_streams.setdefault(stream["dynamic_stream_name"], [])

if len(generated_streams) < request.stream_limit:
generated_streams += [stream]

for generated_streams_list in mapped_streams.values():
streams.extend(generated_streams_list)

manifest["streams"] = streams
return ManifestResponse(manifest=Manifest(**manifest))
try:
source = safe_build_source(request.manifest.model_dump(), request.config.model_dump())
manifest = {**source.resolved_manifest}
streams = manifest.get("streams", [])
for stream in streams:
stream["dynamic_stream_name"] = None

mapped_streams: Dict[str, List[Dict[str, Any]]] = {}
for stream in source.dynamic_streams:
generated_streams = mapped_streams.setdefault(stream["dynamic_stream_name"], [])

if len(generated_streams) < request.stream_limit:
generated_streams += [stream]

for generated_streams_list in mapped_streams.values():
streams.extend(generated_streams_list)

manifest["streams"] = streams
return ManifestResponse(manifest=Manifest(**manifest))
except Exception as exc:
# Filter secrets from error message before returning to client
sanitized_message = filter_secrets(f"Error full resolving manifest: {str(exc)}")
raise HTTPException(status_code=400, detail=sanitized_message)
Loading
Loading