Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 42 additions & 5 deletions src/basic_memory/mcp/tools/read_note.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,25 @@
"""Read note tool for Basic Memory MCP server."""

from textwrap import dedent
from typing import Optional

from loguru import logger

from basic_memory.mcp.async_client import client
from basic_memory.mcp.server import mcp
from basic_memory.mcp.tools.search import search_notes
from basic_memory.mcp.tools.utils import call_get
from basic_memory.mcp.project_session import get_active_project
from basic_memory.schemas.memory import memory_url_path
from basic_memory.utils import validate_project_path


@mcp.tool(
description="Read a markdown note by title or permalink.",
)
async def read_note(identifier: str, page: int = 1, page_size: int = 10) -> str:
async def read_note(
identifier: str, page: int = 1, page_size: int = 10, project: Optional[str] = None
) -> str:

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

adding project parameter back

"""Read a markdown note from the knowledge base.

This tool finds and retrieves a note by its title, permalink, or content search,
Expand All @@ -26,6 +31,7 @@ async def read_note(identifier: str, page: int = 1, page_size: int = 10) -> str:
Can be a full memory:// URL, a permalink, a title, or search text
page: Page number for paginated results (default: 1)
page_size: Number of items per page (default: 10)
project: Optional project name to read from. If not provided, uses current active project.

Returns:
The full markdown content of the note if found, or helpful guidance if not found.
Expand All @@ -42,10 +48,41 @@ async def read_note(identifier: str, page: int = 1, page_size: int = 10) -> str:

# Read with pagination
read_note("Project Updates", page=2, page_size=5)

# Read from specific project
read_note("Meeting Notes", project="work-project")
"""

# Get the active project first to check project-specific sync status
active_project = get_active_project(project)

# Validate identifier to prevent path traversal attacks
# We need to check both the raw identifier and the processed path
processed_path = memory_url_path(identifier)
project_path = active_project.home

if not validate_project_path(identifier, project_path) or not validate_project_path(processed_path, project_path):
logger.warning(
"Attempted path traversal attack blocked",
identifier=identifier,
processed_path=processed_path,
project=active_project.name,
)
return f"# Error\n\nIdentifier '{identifier}' is not allowed - paths must stay within project boundaries"

# Check migration status and wait briefly if needed
from basic_memory.mcp.tools.utils import wait_for_migration_or_return_status

migration_status = await wait_for_migration_or_return_status(
timeout=5.0, project_name=active_project.name
)
if migration_status: # pragma: no cover
return f"# System Status\n\n{migration_status}\n\nPlease wait for migration to complete before reading notes."
project_url = active_project.project_url

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can remove all of this migration_status cruft - in another PR


# Get the file via REST API - first try direct permalink lookup
entity_path = memory_url_path(identifier)
path = f"/resource/{entity_path}"
path = f"{project_url}/resource/{entity_path}"
logger.info(f"Attempting to read note from URL: {path}")

try:
Expand All @@ -62,14 +99,14 @@ async def read_note(identifier: str, page: int = 1, page_size: int = 10) -> str:

# Fallback 1: Try title search via API
logger.info(f"Search title for: {identifier}")
title_results = await search_notes(query=identifier, search_type="title")
title_results = await search_notes.fn(query=identifier, search_type="title", project=project)

if title_results and title_results.results:
result = title_results.results[0] # Get the first/best match
if result.permalink:
try:
# Try to fetch the content using the found permalink
path = f"/resource/{result.permalink}"
path = f"{project_url}/resource/{result.permalink}"
response = await call_get(
client, path, params={"page": page, "page_size": page_size}
)
Expand All @@ -86,7 +123,7 @@ async def read_note(identifier: str, page: int = 1, page_size: int = 10) -> str:

# Fallback 2: Text search as a last resort
logger.info(f"Title search failed, trying text search for: {identifier}")
text_results = await search_notes(query=identifier, search_type="text")
text_results = await search_notes.fn(query=identifier, search_type="text", project=project)

# We didn't find a direct match, construct a helpful error message
if not text_results or not text_results.results:
Expand Down
4 changes: 2 additions & 2 deletions src/basic_memory/repository/search_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -523,8 +523,8 @@ async def index_item(
async with db.scoped_session(self.session_maker) as session:
# Delete existing record if any
await session.execute(
text("DELETE FROM search_index WHERE permalink = :permalink"),
{"permalink": search_index_row.permalink},
text("DELETE FROM search_index WHERE permalink = :permalink AND project_id = :project_id"),
{"permalink": search_index_row.permalink, "project_id": self.project_id},

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bugfix

)

# Prepare data for insert with project_id
Expand Down
152 changes: 142 additions & 10 deletions src/basic_memory/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,26 +53,53 @@ def generate_permalink(file_path: Union[Path, str, PathLike]) -> str:
# Remove extension
base = os.path.splitext(path_str)[0]

# Check if we have non-ASCII characters that should be preserved
has_non_ascii = any(ord(char) > 127 for char in base)
# Check if we have CJK characters that should be preserved
# CJK ranges: \u4e00-\u9fff (CJK Unified Ideographs), \u3000-\u303f (CJK symbols),
# \u3400-\u4dbf (CJK Extension A), \uff00-\uffef (Fullwidth forms)
has_cjk_chars = any(
'\u4e00' <= char <= '\u9fff' or
'\u3000' <= char <= '\u303f' or
'\u3400' <= char <= '\u4dbf' or
'\uff00' <= char <= '\uffef'
for char in base
)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

adding chinese character handling back


if has_non_ascii:
# Preserve non-ASCII characters like Chinese while still processing ASCII parts
result = base
if has_cjk_chars:
# For text with CJK characters, selectively transliterate only Latin accented chars
result = ""
for char in base:
if ('\u4e00' <= char <= '\u9fff' or
'\u3000' <= char <= '\u303f' or
'\u3400' <= char <= '\u4dbf'):
# Preserve CJK ideographs and symbols
result += char
elif ('\uff00' <= char <= '\uffef'):
# Remove Chinese fullwidth punctuation entirely (like ,!?)
continue
else:
# Transliterate Latin accented characters to ASCII
result += unidecode(char)

# Insert hyphens between CJK and Latin character transitions
# Match: CJK followed by Latin letter/digit, or Latin letter/digit followed by CJK
result = re.sub(r'([\u4e00-\u9fff\u3000-\u303f\u3400-\u4dbf])([a-zA-Z0-9])', r'\1-\2', result)
result = re.sub(r'([a-zA-Z0-9])([\u4e00-\u9fff\u3000-\u303f\u3400-\u4dbf])', r'\1-\2', result)

# Insert dash between camelCase
result = re.sub(r"([a-z0-9])([A-Z])", r"\1-\2", result)

# Convert only ASCII letters to lowercase, preserve non-ASCII
# Convert ASCII letters to lowercase, preserve CJK
lower_text = "".join(c.lower() if c.isascii() and c.isalpha() else c for c in result)

# Replace underscores with hyphens
text_with_hyphens = lower_text.replace("_", "-")

# Replace spaces and unsafe ASCII chars with hyphens, preserve non-ASCII chars
# Includes Chinese character ranges (CJK Unified Ideographs, CJK symbols, etc.)
# Remove apostrophes entirely (don't replace with hyphens)
text_no_apostrophes = text_with_hyphens.replace("'", "")

# Replace unsafe chars with hyphens, but preserve CJK characters
clean_text = re.sub(
r"[^a-z0-9\u4e00-\u9fff\u3000-\u303f\u3400-\u4dbf/\-]", "-", text_with_hyphens
r"[^a-z0-9\u4e00-\u9fff\u3000-\u303f\u3400-\u4dbf/\-]", "-", text_no_apostrophes
)
else:
# Original ASCII-only processing for backward compatibility
Expand All @@ -88,8 +115,11 @@ def generate_permalink(file_path: Union[Path, str, PathLike]) -> str:
# replace underscores with hyphens
text_with_hyphens = lower_text.replace("_", "-")

# Remove apostrophes entirely (don't replace with hyphens)
text_no_apostrophes = text_with_hyphens.replace("'", "")

# Replace remaining invalid chars with hyphens
clean_text = re.sub(r"[^a-z0-9/\-]", "-", text_with_hyphens)
clean_text = re.sub(r"[^a-z0-9/\-]", "-", text_no_apostrophes)

# Collapse multiple hyphens
clean_text = re.sub(r"-+", "-", clean_text)
Expand Down Expand Up @@ -187,3 +217,105 @@ def parse_tags(tags: Union[List[str], str, None]) -> List[str]:
except (ValueError, TypeError): # pragma: no cover
logger.warning(f"Couldn't parse tags from input of type {type(tags)}: {tags}")
return []


def normalize_file_path_for_comparison(file_path: str) -> str:

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add back utility functions that were removed

"""Normalize a file path for conflict detection.

This function normalizes file paths to help detect potential conflicts:
- Converts to lowercase for case-insensitive comparison
- Normalizes Unicode characters
- Handles path separators consistently

Args:
file_path: The file path to normalize

Returns:
Normalized file path for comparison purposes
"""
import unicodedata

# Convert to lowercase for case-insensitive comparison
normalized = file_path.lower()

# Normalize Unicode characters (NFD normalization)
normalized = unicodedata.normalize('NFD', normalized)

# Replace path separators with forward slashes
normalized = normalized.replace('\\', '/')

# Remove multiple slashes
normalized = re.sub(r'/+', '/', normalized)

return normalized


def detect_potential_file_conflicts(file_path: str, existing_paths: List[str]) -> List[str]:
"""Detect potential conflicts between a file path and existing paths.

This function checks for various types of conflicts:
- Case sensitivity differences
- Unicode normalization differences
- Path separator differences
- Permalink generation conflicts

Args:
file_path: The file path to check
existing_paths: List of existing file paths to check against

Returns:
List of existing paths that might conflict with the given file path
"""
conflicts = []

# Normalize the input file path
normalized_input = normalize_file_path_for_comparison(file_path)
input_permalink = generate_permalink(file_path)

for existing_path in existing_paths:
# Skip identical paths
if existing_path == file_path:
continue

# Check for case-insensitive path conflicts
normalized_existing = normalize_file_path_for_comparison(existing_path)
if normalized_input == normalized_existing:
conflicts.append(existing_path)
continue

# Check for permalink conflicts
existing_permalink = generate_permalink(existing_path)
if input_permalink == existing_permalink:
conflicts.append(existing_path)
continue

return conflicts


def validate_project_path(path: str, project_path: Path) -> bool:
"""Ensure path stays within project boundaries."""
# Allow empty strings as they resolve to the project root
if not path:
return True

# Check for obvious path traversal patterns first
if ".." in path or "~" in path:
return False

# Check for Windows-style path traversal (even on Unix systems)
if "\\.." in path or path.startswith("\\"):
return False

# Block absolute paths (Unix-style starting with / or Windows-style with drive letters)
if path.startswith("/") or (len(path) >= 2 and path[1] == ":"):
return False

# Block paths with control characters (but allow whitespace that will be stripped)
if path.strip() and any(ord(c) < 32 and c not in [" ", "\t"] for c in path):
return False

try:
resolved = (project_path / path).resolve()
return resolved.is_relative_to(project_path.resolve())
except (ValueError, OSError):
return False
Loading
Loading