Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 33 additions & 9 deletions src/langbot/pkg/storage/providers/localstorage.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,23 @@
LOCAL_STORAGE_PATH = os.path.join('data', 'storage')


def _safe_resolve(base: str, key: str) -> str:
"""Resolve *key* under *base* and ensure the result stays inside *base*.

Raises ``ValueError`` if the resolved path escapes the storage root
(e.g. via absolute paths, ``..`` components, or symlinks).
"""
# os.path.realpath resolves symlinks and normalises the path.
resolved = os.path.realpath(os.path.join(base, key))
canonical_base = os.path.realpath(base)
# The resolved path must be *strictly* inside the base directory (or equal
# to it only for directory operations). We append os.sep so that a base of
# "/data/storage" does not match "/data/storage_evil".
if not (resolved == canonical_base or resolved.startswith(canonical_base + os.sep)):
raise ValueError(f'Path traversal detected: key {key!r} resolves outside storage root')
return resolved


class LocalStorageProvider(provider.StorageProvider):
def __init__(self, ap: app.Application):
super().__init__(ap)
Expand All @@ -23,40 +40,47 @@ async def save(
key: str,
value: bytes,
):
if not os.path.exists(os.path.join(LOCAL_STORAGE_PATH, os.path.dirname(key))):
os.makedirs(os.path.join(LOCAL_STORAGE_PATH, os.path.dirname(key)))
async with aiofiles.open(os.path.join(LOCAL_STORAGE_PATH, f'{key}'), 'wb') as f:
resolved = _safe_resolve(LOCAL_STORAGE_PATH, key)
parent = os.path.dirname(resolved)
if not os.path.exists(parent):
os.makedirs(parent)
async with aiofiles.open(resolved, 'wb') as f:
await f.write(value)

async def load(
self,
key: str,
) -> bytes:
async with aiofiles.open(os.path.join(LOCAL_STORAGE_PATH, f'{key}'), 'rb') as f:
resolved = _safe_resolve(LOCAL_STORAGE_PATH, key)
async with aiofiles.open(resolved, 'rb') as f:
return await f.read()

async def exists(
self,
key: str,
) -> bool:
return os.path.exists(os.path.join(LOCAL_STORAGE_PATH, f'{key}'))
resolved = _safe_resolve(LOCAL_STORAGE_PATH, key)
return os.path.exists(resolved)

async def delete(
self,
key: str,
):
os.remove(os.path.join(LOCAL_STORAGE_PATH, f'{key}'))
resolved = _safe_resolve(LOCAL_STORAGE_PATH, key)
os.remove(resolved)

async def size(
self,
key: str,
) -> int:
return os.path.getsize(os.path.join(LOCAL_STORAGE_PATH, f'{key}'))
resolved = _safe_resolve(LOCAL_STORAGE_PATH, key)
return os.path.getsize(resolved)

async def delete_dir_recursive(
self,
dir_path: str,
):
resolved = _safe_resolve(LOCAL_STORAGE_PATH, dir_path)
# 直接删除整个目录
if os.path.exists(os.path.join(LOCAL_STORAGE_PATH, dir_path)):
shutil.rmtree(os.path.join(LOCAL_STORAGE_PATH, dir_path))
if os.path.exists(resolved):
shutil.rmtree(resolved)
181 changes: 181 additions & 0 deletions tests/unit_tests/storage/test_localstorage_path_traversal.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
"""
PoC test for CWE-22 path traversal in LocalStorageProvider.

The LocalStorageProvider uses os.path.join(LOCAL_STORAGE_PATH, key) without
validating that the resulting path stays inside LOCAL_STORAGE_PATH.

When `key` is an absolute path (e.g. '/etc/passwd'), os.path.join discards
all previous components and returns the absolute path directly, allowing
arbitrary file reads, writes, and deletes.

This test must FAIL before the fix and PASS after.
"""

import os
import pytest
from unittest.mock import Mock, patch

from langbot.pkg.storage.providers.localstorage import LocalStorageProvider


@pytest.fixture
def storage_provider(tmp_path):
"""Create a LocalStorageProvider with a temporary storage path."""
storage_path = str(tmp_path / "storage")
with patch("langbot.pkg.storage.providers.localstorage.LOCAL_STORAGE_PATH", storage_path):
mock_app = Mock()
provider = LocalStorageProvider(mock_app)
yield provider, storage_path


class TestPathTraversalPrevention:
"""Test that LocalStorageProvider rejects path traversal attempts."""

@pytest.mark.asyncio
async def test_absolute_path_save_rejected(self, storage_provider, tmp_path):
"""Saving with an absolute path key must be blocked."""
provider, storage_path = storage_provider
target_file = str(tmp_path / "pwned.txt")

with patch("langbot.pkg.storage.providers.localstorage.LOCAL_STORAGE_PATH", storage_path):
with pytest.raises((ValueError, PermissionError)):
await provider.save(target_file, b"malicious content")

# The file must NOT exist outside the storage directory
assert not os.path.exists(target_file), (
f"Path traversal succeeded: file was written outside storage to {target_file}"
)

@pytest.mark.asyncio
async def test_absolute_path_load_rejected(self, storage_provider, tmp_path):
"""Loading with an absolute path key must be blocked."""
provider, storage_path = storage_provider

# Create a file outside the storage directory
target_file = str(tmp_path / "secret.txt")
with open(target_file, "wb") as f:
f.write(b"secret data")

with patch("langbot.pkg.storage.providers.localstorage.LOCAL_STORAGE_PATH", storage_path):
with pytest.raises((ValueError, PermissionError, FileNotFoundError)):
data = await provider.load(target_file)
assert data != b"secret data", (
"Path traversal succeeded: read file outside storage"
)

@pytest.mark.asyncio
async def test_absolute_path_exists_rejected(self, storage_provider, tmp_path):
"""Exists check with an absolute path key must be blocked or return False."""
provider, storage_path = storage_provider

target_file = str(tmp_path / "check_me.txt")
with open(target_file, "wb") as f:
f.write(b"data")

with patch("langbot.pkg.storage.providers.localstorage.LOCAL_STORAGE_PATH", storage_path):
try:
result = await provider.exists(target_file)
assert result is False, (
"Path traversal succeeded: exists() returned True for file outside storage"
)
except (ValueError, PermissionError):
pass # Expected

@pytest.mark.asyncio
async def test_absolute_path_delete_rejected(self, storage_provider, tmp_path):
"""Deleting with an absolute path key must be blocked."""
provider, storage_path = storage_provider

target_file = str(tmp_path / "do_not_delete.txt")
with open(target_file, "wb") as f:
f.write(b"important data")

with patch("langbot.pkg.storage.providers.localstorage.LOCAL_STORAGE_PATH", storage_path):
with pytest.raises((ValueError, PermissionError, FileNotFoundError)):
await provider.delete(target_file)

assert os.path.exists(target_file), (
"Path traversal succeeded: file outside storage was deleted"
)

@pytest.mark.asyncio
async def test_absolute_path_size_rejected(self, storage_provider, tmp_path):
"""Size check with an absolute path key must be blocked."""
provider, storage_path = storage_provider

target_file = str(tmp_path / "measure_me.txt")
with open(target_file, "wb") as f:
f.write(b"some data")

with patch("langbot.pkg.storage.providers.localstorage.LOCAL_STORAGE_PATH", storage_path):
with pytest.raises((ValueError, PermissionError, FileNotFoundError)):
await provider.size(target_file)

@pytest.mark.asyncio
async def test_dot_dot_path_traversal_rejected(self, storage_provider, tmp_path):
"""Relative path traversal with '..' must be blocked."""
provider, storage_path = storage_provider

target_file = str(tmp_path / "above_storage.txt")
with open(target_file, "wb") as f:
f.write(b"above storage secret")

with patch("langbot.pkg.storage.providers.localstorage.LOCAL_STORAGE_PATH", storage_path):
relative_key = os.path.join("..", "above_storage.txt")
with pytest.raises((ValueError, PermissionError, FileNotFoundError)):
data = await provider.load(relative_key)
assert data != b"above storage secret"

@pytest.mark.asyncio
async def test_delete_dir_recursive_traversal_rejected(self, storage_provider, tmp_path):
"""delete_dir_recursive with traversal path must be blocked."""
provider, storage_path = storage_provider

outside_dir = tmp_path / "outside_dir"
outside_dir.mkdir()
(outside_dir / "file.txt").write_text("important")

with patch("langbot.pkg.storage.providers.localstorage.LOCAL_STORAGE_PATH", storage_path):
with pytest.raises((ValueError, PermissionError)):
await provider.delete_dir_recursive(str(outside_dir))

assert outside_dir.exists(), (
"Path traversal succeeded: directory outside storage was deleted"
)

@pytest.mark.asyncio
async def test_legitimate_key_works(self, storage_provider):
"""Normal keys without traversal must still work."""
provider, storage_path = storage_provider

with patch("langbot.pkg.storage.providers.localstorage.LOCAL_STORAGE_PATH", storage_path):
key = "test_image_abc123.png"
content = b"PNG image data"

await provider.save(key, content)
assert await provider.exists(key) is True
loaded = await provider.load(key)
assert loaded == content
size = await provider.size(key)
assert size == len(content)
await provider.delete(key)
assert await provider.exists(key) is False

@pytest.mark.asyncio
async def test_legitimate_subdirectory_key_works(self, storage_provider):
"""Keys with legitimate subdirectories must still work."""
provider, storage_path = storage_provider

with patch("langbot.pkg.storage.providers.localstorage.LOCAL_STORAGE_PATH", storage_path):
key = "bot_log_images/img_001.png"
content = b"PNG image data"

await provider.save(key, content)
assert await provider.exists(key) is True
loaded = await provider.load(key)
assert loaded == content
await provider.delete(key)


if __name__ == "__main__":
pytest.main([__file__, "-v"])
Loading