Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 73 additions & 10 deletions airbyte_cdk/cli/airbyte_cdk/_secrets.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import os
from functools import lru_cache
from pathlib import Path
from typing import Any, cast
from typing import Any, List, Optional, Tuple, cast
Comment thread
aaronsteers marked this conversation as resolved.
Outdated

import requests
import rich_click as click
Expand All @@ -43,6 +43,7 @@
from rich.console import Console
from rich.table import Table

from airbyte_cdk.cli.airbyte_cdk.exceptions import ConnectorSecretWithNoValidVersionsError
from airbyte_cdk.utils.connector_paths import (
resolve_connector_name,
resolve_connector_name_and_directory,
Expand Down Expand Up @@ -131,25 +132,49 @@ def fetch(
)
# Fetch and write secrets
secret_count = 0
failed_secrets: List[str] = []

for secret in secrets:
secret_file_path = _get_secret_filepath(
secrets_dir=secrets_dir,
secret=secret,
)
_write_secret_file(
error = _write_secret_file(
secret=secret,
client=client,
file_path=secret_file_path,
)
click.echo(f"Secret written to: {secret_file_path.absolute()!s}", err=True)
secret_count += 1

if secret_count == 0:
if error:
secret_name = secret.name.split("/secrets/")[-1] # Removes project prefix
failed_secrets.append(secret_name)
click.echo(f"Failed to retrieve secret '{secret_name}': {error}", err=True)
else:
click.echo(f"Secret written to: {secret_file_path.absolute()!s}", err=True)
secret_count += 1

if secret_count == 0 and not failed_secrets:
click.echo(
f"No secrets found for connector: '{connector_name}'",
err=True,
)

if failed_secrets:
error_message = f"Failed to retrieve {len(failed_secrets)} secret(s)"
click.echo(
style(
error_message,
fg="red",
),
err=True,
)
if secret_count == 0:
raise ConnectorSecretWithNoValidVersionsError(
Comment thread
aaronsteers marked this conversation as resolved.
Outdated
connector_name=connector_name,
secret_names=failed_secrets,
gcp_project_id=gcp_project_id,
)

if not print_ci_secrets_masks:
return

Expand Down Expand Up @@ -231,8 +256,7 @@ def list_(
for secret in secrets:
full_secret_name = secret.name
secret_name = full_secret_name.split("/secrets/")[-1] # Removes project prefix
# E.g. https://console.cloud.google.com/security/secret-manager/secret/SECRET_SOURCE-SHOPIFY__CREDS/versions?hl=en&project=<gcp_project_id>
secret_url = f"https://console.cloud.google.com/security/secret-manager/secret/{secret_name}/versions?hl=en&project={gcp_project_id}"
secret_url = _get_secret_url(secret_name, gcp_project_id)
table.add_row(
f"[link={secret_url}]{secret_name}[/link]",
"\n".join([f"{k}={v}" for k, v in secret.labels.items()]),
Expand All @@ -242,6 +266,19 @@ def list_(
console.print(table)


def _get_secret_url(secret_name: str, gcp_project_id: str) -> str:
"""Generate a URL for a secret in the GCP Secret Manager console.

Args:
secret_name: The name of the secret
gcp_project_id: The GCP project ID

Returns:
str: URL to the secret in the GCP console
"""
return f"https://console.cloud.google.com/security/secret-manager/secret/{secret_name}/versions?hl=en&project={gcp_project_id}"
Comment thread
aaronsteers marked this conversation as resolved.


def _fetch_secret_handles(
connector_name: str,
gcp_project_id: str = AIRBYTE_INTERNAL_GCP_PROJECT,
Expand Down Expand Up @@ -272,11 +309,37 @@ def _write_secret_file(
secret: "Secret", # type: ignore
client: "secretmanager.SecretManagerServiceClient", # type: ignore
file_path: Path,
) -> None:
version_name = f"{secret.name}/versions/latest"
response = client.access_secret_version(name=version_name)
) -> Optional[str]:
"""Write the most recent enabled version of a secret to a file.

Lists all enabled versions of the secret and selects the most recent one.
Returns an error message if no enabled versions are found.

Args:
secret: The secret to write to a file
client: The Secret Manager client
file_path: The path to write the secret to

Returns:
Optional[str]: Error message if no enabled version is found, None otherwise
"""
# List all enabled versions of the secret
response = client.list_secret_versions(
request={"parent": secret.name, "filter": "state:ENABLED"}
)

versions = list(response)

if not versions:
secret_name = secret.name.split("/secrets/")[-1] # Removes project prefix
Comment thread
aaronsteers marked this conversation as resolved.
Outdated
return f"No enabled version found for secret: {secret_name}"

enabled_version = versions[0]
Comment thread
aaronsteers marked this conversation as resolved.

response = client.access_secret_version(name=enabled_version.name)
file_path.write_text(response.payload.data.decode("UTF-8"))
file_path.chmod(0o600) # default to owner read/write only
return None


def _get_secrets_dir(
Expand Down
30 changes: 30 additions & 0 deletions airbyte_cdk/cli/airbyte_cdk/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
"""Exceptions for the Airbyte CDK CLI."""

from dataclasses import dataclass
from typing import List

from airbyte_cdk.sql.exceptions import AirbyteConnectorError


@dataclass(kw_only=True)
class ConnectorSecretWithNoValidVersionsError(AirbyteConnectorError):
Comment thread
aaronsteers marked this conversation as resolved.
Outdated
"""Error when a connector secret has no valid versions."""

connector_name: str
secret_names: List[str]
gcp_project_id: str

def __str__(self) -> str:
"""Return a string representation of the exception."""
from airbyte_cdk.cli.airbyte_cdk._secrets import _get_secret_url

urls = [
_get_secret_url(secret_name, self.gcp_project_id) for secret_name in self.secret_names
]
urls_str = "\n".join([f"- {url}" for url in urls])
secrets_str = ", ".join(self.secret_names)
return (
f"No valid versions found for the following secrets in connector '{self.connector_name}': {secrets_str}. "
f"Please check the following URLs for more information:\n{urls_str}"
)
190 changes: 190 additions & 0 deletions unit_tests/cli/airbyte_cdk/test_secrets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
from __future__ import annotations

from pathlib import Path
from unittest.mock import MagicMock, patch

import pytest
from click.testing import CliRunner

from airbyte_cdk.cli.airbyte_cdk._secrets import (
_write_secret_file,
fetch,
secretmanager,
)
from airbyte_cdk.cli.airbyte_cdk.exceptions import ConnectorSecretWithNoValidVersionsError


class TestWriteSecretFile:
@pytest.fixture
def mock_client(self):
return MagicMock()

@pytest.fixture
def mock_secret(self):
secret = MagicMock()
secret.name = "projects/test-project/secrets/test-secret"
return secret

@pytest.fixture
def mock_file_path(self, tmp_path):
return tmp_path / "test_secret.json"

def test_write_secret_file_with_enabled_version(self, mock_client, mock_secret, mock_file_path):
# Mock list_secret_versions to return an enabled version
mock_version = MagicMock()
mock_version.name = f"{mock_secret.name}/versions/1"
mock_client.list_secret_versions.return_value = [mock_version]

# Mock access_secret_version to return a payload
mock_response = MagicMock()
mock_response.payload.data.decode.return_value = '{"key": "value"}'
mock_client.access_secret_version.return_value = mock_response

# Call the function
result = _write_secret_file(mock_secret, mock_client, mock_file_path)

# Verify that list_secret_versions was called with the correct parameters
mock_client.list_secret_versions.assert_called_once()
assert "state:ENABLED" in str(mock_client.list_secret_versions.call_args)

# Verify that access_secret_version was called with the correct version
mock_client.access_secret_version.assert_called_once_with(name=mock_version.name)

# Verify that the file was created with the correct content
assert mock_file_path.read_text() == '{"key": "value"}'

# Verify that no error was returned
assert result is None

def test_write_secret_file_with_no_enabled_versions(
self, mock_client, mock_secret, mock_file_path
):
# Mock list_secret_versions to return an empty list (no enabled versions)
mock_client.list_secret_versions.return_value = []

# Call the function
result = _write_secret_file(mock_secret, mock_client, mock_file_path)

# Verify that list_secret_versions was called with the correct parameters
mock_client.list_secret_versions.assert_called_once()
assert "state:ENABLED" in str(mock_client.list_secret_versions.call_args)

# Verify that access_secret_version was not called
mock_client.access_secret_version.assert_not_called()

# Verify that the file was not created
assert not mock_file_path.exists()

# Verify that an error was returned
assert result is not None
assert "No enabled version found for secret" in result
assert "test-secret" in result


@patch("airbyte_cdk.cli.airbyte_cdk._secrets._get_gsm_secrets_client")
@patch("airbyte_cdk.cli.airbyte_cdk._secrets.resolve_connector_name_and_directory")
@patch("airbyte_cdk.cli.airbyte_cdk._secrets._get_secrets_dir")
@patch("airbyte_cdk.cli.airbyte_cdk._secrets._fetch_secret_handles")
class TestFetch:
def test_fetch_with_some_failed_secrets(
self,
mock_fetch_secret_handles,
mock_get_secrets_dir,
mock_resolve,
mock_get_client,
tmp_path,
):
# Setup mocks
mock_client = MagicMock()
mock_get_client.return_value = mock_client

mock_resolve.return_value = ("test-connector", tmp_path)

secrets_dir = tmp_path / "secrets"
mock_get_secrets_dir.return_value = secrets_dir

# Create two secrets, one that will succeed and one that will fail
secret1 = MagicMock()
secret1.name = "projects/test-project/secrets/test-secret-1"
secret1.labels = {}

secret2 = MagicMock()
secret2.name = "projects/test-project/secrets/test-secret-2"
secret2.labels = {}

mock_fetch_secret_handles.return_value = [secret1, secret2]

# Mock _write_secret_file to succeed for secret1 and fail for secret2
with patch(
"airbyte_cdk.cli.airbyte_cdk._secrets._write_secret_file"
) as mock_write_secret_file:
mock_write_secret_file.side_effect = [
None, # Success for secret1
"No enabled version found for secret: test-secret-2", # Failure for secret2
]

# Call the function
runner = CliRunner()
result = runner.invoke(fetch)

# Verify that _write_secret_file was called twice
assert mock_write_secret_file.call_count == 2

# Verify that the error message was printed
assert "Failed to retrieve secret 'test-secret-2'" in result.output
assert "Failed to retrieve 1 secret(s)" in result.output

# Verify that the function did not raise an exception
assert result.exit_code == 0

def test_fetch_with_all_failed_secrets(
self,
mock_fetch_secret_handles,
mock_get_secrets_dir,
mock_resolve,
mock_get_client,
tmp_path,
):
# Setup mocks
mock_client = MagicMock()
mock_get_client.return_value = mock_client

mock_resolve.return_value = ("test-connector", tmp_path)

secrets_dir = tmp_path / "secrets"
mock_get_secrets_dir.return_value = secrets_dir

# Create two secrets that will both fail
secret1 = MagicMock()
secret1.name = "projects/test-project/secrets/test-secret-1"
secret1.labels = {}

secret2 = MagicMock()
secret2.name = "projects/test-project/secrets/test-secret-2"
secret2.labels = {}

mock_fetch_secret_handles.return_value = [secret1, secret2]

# Mock _write_secret_file to fail for both secrets
with patch(
"airbyte_cdk.cli.airbyte_cdk._secrets._write_secret_file"
) as mock_write_secret_file:
mock_write_secret_file.side_effect = [
"No enabled version found for secret: test-secret-1", # Failure for secret1
"No enabled version found for secret: test-secret-2", # Failure for secret2
]

# Call the function
runner = CliRunner()
result = runner.invoke(fetch)

# Verify that _write_secret_file was called twice
assert mock_write_secret_file.call_count == 2

# Verify that the error message was printed
assert "Failed to retrieve secret 'test-secret-1'" in result.output
assert "Failed to retrieve secret 'test-secret-2'" in result.output
assert "Failed to retrieve 2 secret(s)" in result.output

# Verify that the function raised an exception
assert result.exit_code != 0
Loading