Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 107 additions & 13 deletions airbyte_cdk/cli/airbyte_cdk/_secrets.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
from rich.console import Console
from rich.table import Table

from airbyte_cdk.cli.airbyte_cdk.exceptions import ConnectorSecretWithNoValidVersionsError
from airbyte_cdk.utils.connector_paths import (
resolve_connector_name,
resolve_connector_name_and_directory,
Expand Down Expand Up @@ -131,24 +132,46 @@ def fetch(
)
# Fetch and write secrets
secret_count = 0
exceptions = []

for secret in secrets:
secret_file_path = _get_secret_filepath(
secrets_dir=secrets_dir,
secret=secret,
)
_write_secret_file(
secret=secret,
client=client,
file_path=secret_file_path,
try:
_write_secret_file(
secret=secret,
client=client,
file_path=secret_file_path,
connector_name=connector_name,
gcp_project_id=gcp_project_id,
)
click.echo(f"Secret written to: {secret_file_path.absolute()!s}", err=True)
secret_count += 1
except ConnectorSecretWithNoValidVersionsError as e:
exceptions.append(e)
click.echo(
f"Failed to retrieve secret '{e.secret_name}': No enabled version found", err=True
)

if secret_count == 0 and not exceptions:
click.echo(
f"No secrets found for connector: '{connector_name}'",
err=True,
)
click.echo(f"Secret written to: {secret_file_path.absolute()!s}", err=True)
secret_count += 1

if secret_count == 0:
if exceptions:
error_message = f"Failed to retrieve {len(exceptions)} secret(s)"
click.echo(
f"No secrets found for connector: '{connector_name}'",
style(
error_message,
fg="red",
),
err=True,
)
if secret_count == 0:
raise exceptions[0]

if not print_ci_secrets_masks:
return
Expand Down Expand Up @@ -230,9 +253,8 @@ def list_(
table.add_column("Created", justify="left", style="blue", overflow="fold")
for secret in secrets:
full_secret_name = secret.name
secret_name = full_secret_name.split("/secrets/")[-1] # Removes project prefix
# E.g. https://console.cloud.google.com/security/secret-manager/secret/SECRET_SOURCE-SHOPIFY__CREDS/versions?hl=en&project=<gcp_project_id>
secret_url = f"https://console.cloud.google.com/security/secret-manager/secret/{secret_name}/versions?hl=en&project={gcp_project_id}"
secret_name = _extract_secret_name(full_secret_name)
secret_url = _get_secret_url(secret_name, gcp_project_id)
table.add_row(
f"[link={secret_url}]{secret_name}[/link]",
"\n".join([f"{k}={v}" for k, v in secret.labels.items()]),
Expand All @@ -242,6 +264,43 @@ def list_(
console.print(table)


def _extract_secret_name(secret_name: str) -> str:
"""Extract the secret name from a fully qualified secret path.

Handles different formats of secret names:
- Full path: "projects/project-id/secrets/SECRET_NAME"
- Already extracted: "SECRET_NAME"

Args:
secret_name: The secret name or path

Returns:
str: The extracted secret name without project prefix
"""
if "/secrets/" in secret_name:
return secret_name.split("/secrets/")[-1]
return secret_name


def _get_secret_url(secret_name: str, gcp_project_id: str) -> str:
"""Generate a URL for a secret in the GCP Secret Manager console.

Note: This URL itself does not contain secrets or sensitive information.
The URL itself is only useful for valid logged-in users of the project, and it
safe to print this URL in logs.

Args:
secret_name: The name of the secret in GCP.
gcp_project_id: The GCP project ID.

Returns:
str: URL to the secret in the GCP console
"""
# Ensure we have just the secret name without the project prefix
secret_name = _extract_secret_name(secret_name)
return f"https://console.cloud.google.com/security/secret-manager/secret/{secret_name}/versions?hl=en&project={gcp_project_id}"


def _fetch_secret_handles(
connector_name: str,
gcp_project_id: str = AIRBYTE_INTERNAL_GCP_PROJECT,
Expand Down Expand Up @@ -272,9 +331,44 @@ def _write_secret_file(
secret: "Secret", # type: ignore
client: "secretmanager.SecretManagerServiceClient", # type: ignore
file_path: Path,
connector_name: str,
gcp_project_id: str,
) -> None:
version_name = f"{secret.name}/versions/latest"
response = client.access_secret_version(name=version_name)
"""Write the most recent enabled version of a secret to a file.

Lists all enabled versions of the secret and selects the most recent one.
Raises ConnectorSecretWithNoValidVersionsError if no enabled versions are found.

Args:
secret: The secret to write to a file
client: The Secret Manager client
file_path: The path to write the secret to
connector_name: The name of the connector
gcp_project_id: The GCP project ID

Raises:
ConnectorSecretWithNoValidVersionsError: If no enabled version is found
"""
# List all enabled versions of the secret.
response = client.list_secret_versions(
request={"parent": secret.name, "filter": "state:ENABLED"}
)

# The API returns versions pre-sorted in descending order, with the
# 0th item being the latest version.
versions = list(response)

if not versions:
secret_name = _extract_secret_name(secret.name)
raise ConnectorSecretWithNoValidVersionsError(
connector_name=connector_name,
secret_name=secret_name,
gcp_project_id=gcp_project_id,
)

enabled_version = versions[0]
Comment thread
aaronsteers marked this conversation as resolved.

response = client.access_secret_version(name=enabled_version.name)
file_path.write_text(response.payload.data.decode("UTF-8"))
file_path.chmod(0o600) # default to owner read/write only

Expand Down
23 changes: 23 additions & 0 deletions airbyte_cdk/cli/airbyte_cdk/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
"""Exceptions for the Airbyte CDK CLI."""

from dataclasses import dataclass


@dataclass(kw_only=True)
class ConnectorSecretWithNoValidVersionsError(Exception):
"""Error when a connector secret has no valid versions."""

connector_name: str
secret_name: str
gcp_project_id: str

def __str__(self) -> str:
"""Return a string representation of the exception."""
from airbyte_cdk.cli.airbyte_cdk._secrets import _get_secret_url

url = _get_secret_url(self.secret_name, self.gcp_project_id)
return (
f"No valid versions found for secret '{self.secret_name}' in connector '{self.connector_name}'. "
f"Please check the following URL for more information:\n- {url}"
)
Loading
Loading