Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions src/restic_compose_backup/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def main():
def status(config, containers):
"""Outputs the backup config for the compose setup"""
logger.info("Status for compose project '%s'", containers.project_name)
logger.info("Repository: '%s'", config.repository)
logger.info("Repository: '%s'", utils.redact_repo_url(config.repository))
logger.info("Backup currently running?: %s", containers.backup_process_running)
logger.info(
"Include project name in backup path?: %s",
Expand Down Expand Up @@ -100,7 +100,10 @@ def status(config, containers):
logger.info("Repository is not initialized. Attempting to initialize it.")
result = restic.init_repo(config.repository)
if result == 0:
logger.info("Successfully initialized repository: %s", config.repository)
logger.info(
"Successfully initialized repository: %s",
utils.redact_repo_url(config.repository),
)
else:
logger.error("Failed to initialize repository")

Expand Down
16 changes: 13 additions & 3 deletions src/restic_compose_backup/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,16 @@
logger = logging.getLogger(__name__)


def _redacted_cmd(cmd: List[str]) -> str:
"""Join cmd into a log-safe string, masking any embedded secret.

Each argument is passed through utils.redact_repo_url(), which masks the
password in a repository URL (e.g. rest:http://user:pass@host/...) and
leaves all other arguments unchanged.
"""
return " ".join(utils.redact_repo_url(arg) for arg in cmd)


def test():
return run(["ls", "/volumes"])

Expand Down Expand Up @@ -64,7 +74,7 @@ def docker_exec(
) -> int:
"""Execute a command within the given container"""
client = utils.docker_client()
logger.debug("docker exec inside %s: %s", container_id, " ".join(cmd))
logger.debug("docker exec inside %s: %s", container_id, _redacted_cmd(cmd))
exit_code, (stdout, stderr) = client.containers.get(container_id).exec_run(
cmd, demux=True, environment=environment
)
Expand All @@ -84,7 +94,7 @@ def docker_exec(

def run(cmd: List[str]) -> int:
"""Run a command with parameters"""
logger.debug("cmd: %s", " ".join(cmd))
logger.debug("cmd: %s", _redacted_cmd(cmd))
child = Popen(cmd, stdout=PIPE, stderr=PIPE)
stdoutdata, stderrdata = child.communicate()

Expand All @@ -104,7 +114,7 @@ def run(cmd: List[str]) -> int:

def run_capture_std(cmd: List[str]) -> Tuple[str, str]:
"""Run a command with parameters and return stdout, stderr"""
logger.debug("cmd: %s", " ".join(cmd))
logger.debug("cmd: %s", _redacted_cmd(cmd))
child = Popen(cmd, stdout=PIPE, stderr=PIPE)
return child.communicate()

Expand Down
43 changes: 43 additions & 0 deletions src/restic_compose_backup/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import re
import logging
from typing import List, TYPE_CHECKING
from contextlib import contextmanager
Expand Down Expand Up @@ -116,6 +117,48 @@ def strip_root(path):
return path


def redact_repo_url(repository):
"""
Mask any password embedded in a restic repository URL so it is safe to log.

A repository string can carry the password inline as HTTP basic auth, e.g.
``rest:http://user:password@host:8000/path`` (the ``rest:`` prefix is
restic's backend tag in front of a real URL). The password is replaced with
``***``, including any ``/`` or ``@`` characters it may contain.

Repository strings without a ``scheme://`` URL (s3, local paths) or without
embedded credentials (key-based sftp, ...) are returned unchanged. Note that
restic's sftp backend authenticates via ssh keys/agent and does not support
inline URL passwords, so a bare ``sftp:`` spec is intentionally not redacted.

>>> redact_repo_url('rest:http://user:s3cr3t@backup.example.com:8000/repo')
'rest:http://user:***@backup.example.com:8000/repo'
"""
if not repository:
return repository
# restic prefixes some backends (e.g. "rest:") before a real URL. Locate the
# embedded "scheme://" URL; anything before it is left untouched.
match = re.search(r"[a-zA-Z][a-zA-Z0-9+.-]*://", repository)
if not match:
return repository
prefix = repository[: match.start()]
scheme = match.group(0)
after = repository[match.end() :]
if "@" not in after:
return repository
# The userinfo (user:password) sits between the scheme and the LAST "@"
# before the host. Using the last "@" matches how URL parsers split userinfo,
# and — unlike cutting the authority at the first "/" — keeps a password
# that contains "/" (or "@") from leaking through.
last_at = after.rfind("@")
userinfo = after[:last_at]
hostpath = after[last_at + 1 :]
if ":" not in userinfo:
return repository
user = userinfo.split(":", 1)[0]
return prefix + scheme + f"{user}:***@{hostpath}"


@contextmanager
def environment(name, value):
"""Tempset env var"""
Expand Down
49 changes: 49 additions & 0 deletions src/tests/unit/test_command_redaction.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
"""Unit tests for redaction of secrets in command logging"""

from unittest import mock

import pytest

from restic_compose_backup import commands
from .conftest import BaseTestCase

pytestmark = pytest.mark.unit

REPO_WITH_PASSWORD = "rest:http://user:s3cr3t@host:8000/repo"


class CommandRedactionTests(BaseTestCase):
"""restic invocations must never log the repository password, even at DEBUG."""

def _fake_process(self):
proc = mock.MagicMock()
proc.communicate.return_value = (b"", b"")
proc.returncode = 0
return proc

def test_run_does_not_log_repository_password(self):
"""commands.run() must redact the repository URL in its debug log."""
cmd = ["restic", "-r", REPO_WITH_PASSWORD, "snapshots"]
with mock.patch(
"restic_compose_backup.commands.Popen", return_value=self._fake_process()
):
with self.assertLogs(
"restic_compose_backup.commands", level="DEBUG"
) as log:
commands.run(cmd)
output = "\n".join(log.output)
self.assertNotIn("s3cr3t", output)
self.assertIn("***", output)

def test_run_capture_std_does_not_log_repository_password(self):
"""commands.run_capture_std() must redact the repository URL in its debug log."""
cmd = ["restic", "-r", REPO_WITH_PASSWORD, "snapshots"]
with mock.patch(
"restic_compose_backup.commands.Popen", return_value=self._fake_process()
):
with self.assertLogs(
"restic_compose_backup.commands", level="DEBUG"
) as log:
commands.run_capture_std(cmd)
output = "\n".join(log.output)
self.assertNotIn("s3cr3t", output)
65 changes: 65 additions & 0 deletions src/tests/unit/test_repository_redaction.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
"""Unit tests for repository URL redaction in log output"""

import pytest

from restic_compose_backup.utils import redact_repo_url
from .conftest import BaseTestCase

pytestmark = pytest.mark.unit


class RepositoryRedactionTests(BaseTestCase):
"""Ensure passwords embedded in the repository URL are never logged in clear text."""

def test_rest_http_basic_auth_is_redacted(self):
"""HTTP basic auth credentials in a rest: backend are masked."""
repo = "rest:http://user:s3cr3t@backup.example.com:8000/repo"
self.assertEqual(
redact_repo_url(repo),
"rest:http://user:***@backup.example.com:8000/repo",
)

def test_rest_https_basic_auth_is_redacted(self):
self.assertEqual(
redact_repo_url("rest:https://u:p@host:8000/repo"),
"rest:https://u:***@host:8000/repo",
)

def test_password_with_slash_is_redacted(self):
"""A '/' in the password must still be masked (no silent leak)."""
self.assertEqual(
redact_repo_url("rest:http://u:p/ass@host:8000/x"),
"rest:http://u:***@host:8000/x",
)

def test_password_with_at_is_redacted(self):
"""An '@' in the password must still be masked (no silent leak)."""
self.assertEqual(
redact_repo_url("rest:http://u:p@ss@host:8000/x"),
"rest:http://u:***@host:8000/x",
)

def test_sftp_without_url_scheme_is_unchanged(self):
"""sftp backends authenticate via ssh keys/agent and carry no inline URL
password, so a bare ``sftp:`` spec (no scheme://) is returned unchanged."""
self.assertEqual(
redact_repo_url("sftp:user:pass@host:/srv/restic-repo"),
"sftp:user:pass@host:/srv/restic-repo",
)

def test_repository_without_credentials_is_unchanged(self):
"""Repository strings without embedded secrets must be returned verbatim."""
for repo in [
"rest:http://backup.example.com:8000/repo",
"sftp:user@host:/srv/restic-repo",
"s3:s3.amazonaws.com/bucket",
"azure:container/path",
"/mnt/restic",
"",
]:
with self.subTest(repo=repo):
self.assertEqual(redact_repo_url(repo), repo)

def test_secret_is_not_leaked(self):
"""The cleartext password must never appear in the redacted output."""
self.assertNotIn("s3cr3t", redact_repo_url("rest:http://u:s3cr3t@h:8000/x"))