Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/basic_memory/api/routers/project_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ async def add_project(
Response confirming the project was added
"""
try: # pragma: no cover
# The service layer now handles cloud mode validation and path sanitization
await project_service.add_project(
project_data.name, project_data.path, set_default=project_data.set_default
)
Expand Down
23 changes: 20 additions & 3 deletions src/basic_memory/services/project_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,30 @@ async def add_project(self, name: str, path: str, set_default: bool = False) ->
ValueError: If the project already exists
"""
# in cloud mode, don't allow arbitrary paths.
if config.cloud_mode:
if self.config_manager.config.cloud_mode_enabled:
basic_memory_home = os.getenv("BASIC_MEMORY_HOME")
assert basic_memory_home is not None
base_path = Path(basic_memory_home)

# Resolve to absolute path
resolved_path = Path(os.path.abspath(os.path.expanduser(base_path / path))).as_posix()
# Sanitize the input path for cloud mode
# Strip leading slashes, home directory references, and parent directory references
clean_path = path.lstrip("/").replace("~/", "").replace("~", "")

# Remove any parent directory traversal attempts
path_parts = []
for part in clean_path.split("/"):
if part and part != "." and part != "..":
path_parts.append(part)
clean_path = "/".join(path_parts) if path_parts else ""

# Construct path relative to BASIC_MEMORY_HOME
resolved_path = (base_path / clean_path).resolve().as_posix()

# Verify the resolved path is actually under BASIC_MEMORY_HOME
if not resolved_path.startswith(base_path.resolve().as_posix()):
raise ValueError(
f"Cloud mode requires projects under {basic_memory_home}. Invalid path: {path}"
)
else:
resolved_path = Path(os.path.abspath(os.path.expanduser(path))).as_posix()

Expand Down
3 changes: 0 additions & 3 deletions test-int/test_disable_permalinks_integration.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
"""Integration tests for the disable_permalinks configuration."""

import pytest
from pathlib import Path
from textwrap import dedent

from basic_memory.config import BasicMemoryConfig
from basic_memory.markdown import EntityParser, MarkdownProcessor
from basic_memory.models import Project
from basic_memory.repository import (
EntityRepository,
ObservationRepository,
Expand Down
154 changes: 154 additions & 0 deletions tests/services/test_project_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -714,3 +714,157 @@ async def test_synchronize_projects_handles_case_sensitivity_bug(
db_project = await project_service.repository.get_by_name(name)
if db_project:
await project_service.repository.delete(db_project.id)


@pytest.mark.skipif(os.name == "nt", reason="Cloud mode only runs on POSIX systems")
@pytest.mark.asyncio
async def test_add_project_cloud_mode_sanitizes_paths(
project_service: ProjectService, config_manager: ConfigManager, tmp_path, monkeypatch
):
"""Test that cloud mode sanitizes and validates project paths."""
# Set up cloud mode environment
cloud_home = tmp_path / "app" / "data" / "basic-memory"
cloud_home.mkdir(parents=True, exist_ok=True)

monkeypatch.setenv("BASIC_MEMORY_HOME", str(cloud_home))
monkeypatch.setenv("BASIC_MEMORY_CLOUD_MODE", "true")

# Force reload config to pick up cloud mode
from basic_memory.services import project_service as ps_module

monkeypatch.setattr(ps_module, "config", config_manager.load_config())

test_cases = [
# (input_path, expected_result_path, should_succeed)
("test", str(cloud_home / "test"), True), # Simple relative path
("~/Documents/test", str(cloud_home / "Documents" / "test"), True), # Home directory
(
"/tmp/test",
str(cloud_home / "tmp" / "test"),
True,
), # Absolute path (sanitized to relative)
(
"../../../etc/passwd",
str(cloud_home),
True,
), # Path traversal (all ../ removed, results in cloud_home)
("folder/subfolder", str(cloud_home / "folder" / "subfolder"), True), # Nested path
(
"~/folder/../test",
str(cloud_home / "test"),
True,
), # Mixed patterns (sanitized to just 'test')
]

for i, (input_path, expected_path, should_succeed) in enumerate(test_cases):
test_project_name = f"cloud-test-{i}"

try:
# Add the project
await project_service.add_project(test_project_name, input_path)

if should_succeed:
# Verify the path was sanitized correctly
assert test_project_name in project_service.projects
actual_path = project_service.projects[test_project_name]

# The path should be under cloud_home
assert actual_path.startswith(str(cloud_home)), (
f"Path {actual_path} should start with {cloud_home} for input {input_path}"
)

# Clean up
await project_service.remove_project(test_project_name)
else:
pytest.fail(f"Expected ValueError for input path: {input_path}")

except ValueError as e:
if should_succeed:
pytest.fail(f"Unexpected ValueError for input path {input_path}: {e}")
# Expected failure - continue to next test case


@pytest.mark.skipif(os.name == "nt", reason="Cloud mode only runs on POSIX systems")
@pytest.mark.asyncio
async def test_add_project_cloud_mode_rejects_escape_attempts(
project_service: ProjectService, config_manager: ConfigManager, tmp_path, monkeypatch
):
"""Test that cloud mode rejects paths that try to escape cloud storage."""
# Set up cloud mode environment
cloud_home = tmp_path / "app" / "data" / "basic-memory"
cloud_home.mkdir(parents=True, exist_ok=True)

# Create a directory outside cloud_home to verify it's not accessible
outside_dir = tmp_path / "outside"
outside_dir.mkdir(parents=True, exist_ok=True)

monkeypatch.setenv("BASIC_MEMORY_HOME", str(cloud_home))
monkeypatch.setenv("BASIC_MEMORY_CLOUD_MODE", "true")

# Force reload config to pick up cloud mode
from basic_memory.services import project_service as ps_module

monkeypatch.setattr(ps_module, "config", config_manager.load_config())

# All of these should succeed by being sanitized to paths under cloud_home
# The sanitization removes dangerous patterns, so they don't escape
safe_after_sanitization = [
"../../../etc/passwd",
"../../.env",
"../../../home/user/.ssh/id_rsa",
]

for i, attack_path in enumerate(safe_after_sanitization):
test_project_name = f"cloud-attack-test-{i}"

try:
# Add the project
await project_service.add_project(test_project_name, attack_path)

# Verify it was sanitized to be under cloud_home
actual_path = project_service.projects[test_project_name]
assert actual_path.startswith(str(cloud_home)), (
f"Sanitized path {actual_path} should be under {cloud_home}"
)

# Clean up
await project_service.remove_project(test_project_name)

except ValueError:
# If it raises ValueError, that's also acceptable for security
pass


@pytest.mark.skipif(os.name == "nt", reason="Cloud mode only runs on POSIX systems")
@pytest.mark.asyncio
async def test_add_project_local_mode_allows_arbitrary_paths(
project_service: ProjectService, config_manager: ConfigManager, tmp_path, monkeypatch
):
"""Test that local mode (non-cloud) still allows arbitrary paths."""
# Ensure cloud mode is disabled
monkeypatch.setenv("BASIC_MEMORY_CLOUD_MODE", "false")

# Force reload config to pick up local mode
from basic_memory.services import project_service as ps_module

monkeypatch.setattr(ps_module, "config", config_manager.load_config())

# Create a test directory
test_dir = tmp_path / "arbitrary-location"
test_dir.mkdir(parents=True, exist_ok=True)

test_project_name = "local-mode-test"

try:
# In local mode, we should be able to use arbitrary absolute paths
await project_service.add_project(test_project_name, str(test_dir))

# Verify the path was accepted as-is
assert test_project_name in project_service.projects
actual_path = project_service.projects[test_project_name]
assert actual_path == str(test_dir)

finally:
# Clean up
if test_project_name in project_service.projects:
await project_service.remove_project(test_project_name)
Loading