diff --git a/src/basic_memory/cli/commands/status.py b/src/basic_memory/cli/commands/status.py index 9353508ff..e67e2a042 100644 --- a/src/basic_memory/cli/commands/status.py +++ b/src/basic_memory/cli/commands/status.py @@ -94,7 +94,7 @@ def display_changes( """Display changes using Rich for better visualization.""" tree = Tree(f"{project_name}: {title}") - if changes.total == 0: + if changes.total == 0 and not changes.skipped_files: tree.add("No changes") console.print(Panel(tree, expand=False)) return @@ -114,6 +114,13 @@ def display_changes( if changes.deleted: del_branch = tree.add("[red]Deleted[/red]") add_files_to_tree(del_branch, changes.deleted, "red") + if changes.skipped_files: + skip_branch = tree.add("[red]⚠️ Skipped (Circuit Breaker)[/red]") + for skipped in sorted(changes.skipped_files, key=lambda x: x.path): + skip_branch.add( + f"[red]{skipped.path}[/red] " + f"(failures: {skipped.failure_count}, reason: {skipped.reason})" + ) else: # Show directory summaries by_dir = group_changes_by_directory(changes) @@ -122,6 +129,14 @@ def display_changes( if summary: # Only show directories with changes tree.add(f"[bold]{dir_name}/[/bold] {summary}") + # Show skipped files summary in non-verbose mode + if changes.skipped_files: + skip_count = len(changes.skipped_files) + tree.add( + f"[red]⚠️ {skip_count} file{'s' if skip_count != 1 else ''} " + f"skipped due to repeated failures[/red]" + ) + console.print(Panel(tree, expand=False)) diff --git a/src/basic_memory/importers/claude_conversations_importer.py b/src/basic_memory/importers/claude_conversations_importer.py index de7dc9580..d516a149a 100644 --- a/src/basic_memory/importers/claude_conversations_importer.py +++ b/src/basic_memory/importers/claude_conversations_importer.py @@ -155,7 +155,8 @@ def _format_chat_markdown( if msg.get("content"): # Filter out None values before joining content = " ".join( - str(c.get("text", "")) for c in msg["content"] + str(c.get("text", "")) + for c in msg["content"] if c and c.get("text") is not None ) lines.append(content) diff --git a/src/basic_memory/schemas/sync_report.py b/src/basic_memory/schemas/sync_report.py index 267bf669a..edb83bd6d 100644 --- a/src/basic_memory/schemas/sync_report.py +++ b/src/basic_memory/schemas/sync_report.py @@ -1,6 +1,7 @@ """Pydantic schemas for sync report responses.""" -from typing import TYPE_CHECKING, Dict, Set +from datetime import datetime +from typing import TYPE_CHECKING, Dict, List, Set from pydantic import BaseModel, Field @@ -9,6 +10,17 @@ from basic_memory.sync.sync_service import SyncReport +class SkippedFileResponse(BaseModel): + """Information about a file that was skipped due to repeated failures.""" + + path: str = Field(description="File path relative to project root") + reason: str = Field(description="Error message from last failure") + failure_count: int = Field(description="Number of consecutive failures") + first_failed: datetime = Field(description="Timestamp of first failure") + + model_config = {"from_attributes": True} + + class SyncReportResponse(BaseModel): """Report of file changes found compared to database state. @@ -24,6 +36,9 @@ class SyncReportResponse(BaseModel): checksums: Dict[str, str] = Field( default_factory=dict, description="Current file checksums (path -> checksum)" ) + skipped_files: List[SkippedFileResponse] = Field( + default_factory=list, description="Files skipped due to repeated failures" + ) total: int = Field(description="Total number of changes") @classmethod @@ -42,6 +57,15 @@ def from_sync_report(cls, report: "SyncReport") -> "SyncReportResponse": deleted=report.deleted, moves=report.moves, checksums=report.checksums, + skipped_files=[ + SkippedFileResponse( + path=skipped.path, + reason=skipped.reason, + failure_count=skipped.failure_count, + first_failed=skipped.first_failed, + ) + for skipped in report.skipped_files + ], total=report.total, ) diff --git a/src/basic_memory/sync/sync_service.py b/src/basic_memory/sync/sync_service.py index bb64c7206..0080da9b4 100644 --- a/src/basic_memory/sync/sync_service.py +++ b/src/basic_memory/sync/sync_service.py @@ -7,7 +7,7 @@ from dataclasses import dataclass, field from datetime import datetime from pathlib import Path -from typing import Dict, Optional, Set, Tuple +from typing import Dict, List, Optional, Set, Tuple from loguru import logger from sqlalchemy import select @@ -26,6 +26,45 @@ from basic_memory.services.search_service import SearchService from basic_memory.services.sync_status_service import sync_status_tracker, SyncStatus +# Circuit breaker configuration +MAX_CONSECUTIVE_FAILURES = 3 + + +@dataclass +class FileFailureInfo: + """Track failure information for a file that repeatedly fails to sync. + + Attributes: + count: Number of consecutive failures + first_failure: Timestamp of first failure in current sequence + last_failure: Timestamp of most recent failure + last_error: Error message from most recent failure + last_checksum: Checksum of file when it last failed (for detecting file changes) + """ + + count: int + first_failure: datetime + last_failure: datetime + last_error: str + last_checksum: str + + +@dataclass +class SkippedFile: + """Information about a file that was skipped due to repeated failures. + + Attributes: + path: File path relative to project root + reason: Error message from last failure + failure_count: Number of consecutive failures + first_failed: Timestamp of first failure + """ + + path: str + reason: str + failure_count: int + first_failed: datetime + @dataclass class SyncReport: @@ -38,6 +77,7 @@ class SyncReport: deleted: Files that exist in database but not on disk moves: Files that have been moved from one location to another checksums: Current checksums for files on disk + skipped_files: Files that were skipped due to repeated failures """ # We keep paths as strings in sets/dicts for easier serialization @@ -46,6 +86,7 @@ class SyncReport: deleted: Set[str] = field(default_factory=set) moves: Dict[str, str] = field(default_factory=dict) # old_path -> new_path checksums: Dict[str, str] = field(default_factory=dict) # path -> checksum + skipped_files: List[SkippedFile] = field(default_factory=list) @property def total(self) -> int: @@ -90,6 +131,8 @@ def __init__( self._thread_pool = ThreadPoolExecutor(max_workers=app_config.sync_thread_pool_size) # Load ignore patterns once at initialization for performance self._ignore_patterns = load_bmignore_patterns() + # Circuit breaker: track file failures to prevent infinite retry loops + self._file_failures: Dict[str, FileFailureInfo] = {} async def _read_file_async(self, file_path: Path) -> str: """Read file content in thread pool to avoid blocking the event loop.""" @@ -125,6 +168,102 @@ def __del__(self): if hasattr(self, "_thread_pool"): self._thread_pool.shutdown(wait=False) + async def _should_skip_file(self, path: str) -> bool: + """Check if file should be skipped due to repeated failures. + + Computes current file checksum and compares with last failed checksum. + If checksums differ, file has changed and we should retry. + + Args: + path: File path to check + + Returns: + True if file should be skipped, False otherwise + """ + if path not in self._file_failures: + return False + + failure_info = self._file_failures[path] + + # Check if failure count exceeds threshold + if failure_info.count < MAX_CONSECUTIVE_FAILURES: + return False + + # Compute current checksum to see if file changed + try: + current_checksum = await self._compute_checksum_async(path) + + # If checksum changed, file was modified - reset and retry + if current_checksum != failure_info.last_checksum: + logger.info( + f"File {path} changed since last failure (checksum differs), " + f"resetting failure count and retrying" + ) + del self._file_failures[path] + return False + except Exception as e: + # If we can't compute checksum, log but still skip to avoid infinite loops + logger.warning(f"Failed to compute checksum for {path}: {e}") + + # File unchanged and exceeded threshold - skip it + return True + + async def _record_failure(self, path: str, error: str) -> None: + """Record a file sync failure for circuit breaker tracking. + + Args: + path: File path that failed + error: Error message from the failure + """ + now = datetime.now() + + # Compute checksum for failure tracking + try: + checksum = await self._compute_checksum_async(path) + except Exception: + # If checksum fails, use empty string (better than crashing) + checksum = "" + + if path in self._file_failures: + # Update existing failure record + failure_info = self._file_failures[path] + failure_info.count += 1 + failure_info.last_failure = now + failure_info.last_error = error + failure_info.last_checksum = checksum + + logger.warning( + f"File sync failed (attempt {failure_info.count}/{MAX_CONSECUTIVE_FAILURES}): " + f"path={path}, error={error}" + ) + + # Log when threshold is reached + if failure_info.count >= MAX_CONSECUTIVE_FAILURES: + logger.error( + f"File {path} has failed {MAX_CONSECUTIVE_FAILURES} times and will be skipped. " + f"First failure: {failure_info.first_failure}, Last error: {error}" + ) + else: + # Create new failure record + self._file_failures[path] = FileFailureInfo( + count=1, + first_failure=now, + last_failure=now, + last_error=error, + last_checksum=checksum, + ) + logger.debug(f"Recording first failure for {path}: {error}") + + def _clear_failure(self, path: str) -> None: + """Clear failure tracking for a file after successful sync. + + Args: + path: File path that successfully synced + """ + if path in self._file_failures: + logger.info(f"Clearing failure history for {path} after successful sync") + del self._file_failures[path] + async def sync(self, directory: Path, project_name: Optional[str] = None) -> SyncReport: """Sync all files with database.""" @@ -192,7 +331,20 @@ async def sync(self, directory: Path, project_name: Optional[str] = None) -> Syn # then new and modified for path in report.new: - await self.sync_file(path, new=True) + entity, _ = await self.sync_file(path, new=True) + + # Track if file was skipped + if entity is None and await self._should_skip_file(path): + failure_info = self._file_failures[path] + report.skipped_files.append( + SkippedFile( + path=path, + reason=failure_info.last_error, + failure_count=failure_info.count, + first_failed=failure_info.first_failure, + ) + ) + files_processed += 1 if project_name: sync_status_tracker.update_project_progress( @@ -203,7 +355,20 @@ async def sync(self, directory: Path, project_name: Optional[str] = None) -> Syn ) for path in report.modified: - await self.sync_file(path, new=False) + entity, _ = await self.sync_file(path, new=False) + + # Track if file was skipped + if entity is None and await self._should_skip_file(path): + failure_info = self._file_failures[path] + report.skipped_files.append( + SkippedFile( + path=path, + reason=failure_info.last_error, + failure_count=failure_info.count, + first_failed=failure_info.first_failure, + ) + ) + files_processed += 1 if project_name: sync_status_tracker.update_project_progress( # pragma: no cover @@ -220,9 +385,24 @@ async def sync(self, directory: Path, project_name: Optional[str] = None) -> Syn sync_status_tracker.complete_project_sync(project_name) duration_ms = int((time.time() - start_time) * 1000) - logger.info( - f"Sync operation completed: directory={directory}, total_changes={report.total}, duration_ms={duration_ms}" - ) + + # Log summary with skipped files if any + if report.skipped_files: + logger.warning( + f"Sync completed with {len(report.skipped_files)} skipped files: " + f"directory={directory}, total_changes={report.total}, " + f"skipped={len(report.skipped_files)}, duration_ms={duration_ms}" + ) + for skipped in report.skipped_files: + logger.warning( + f"Skipped file: path={skipped.path}, " + f"failures={skipped.failure_count}, reason={skipped.reason}" + ) + else: + logger.info( + f"Sync operation completed: directory={directory}, " + f"total_changes={report.total}, duration_ms={duration_ms}" + ) return report @@ -298,15 +478,20 @@ async def get_db_file_state(self) -> Dict[str, str]: async def sync_file( self, path: str, new: bool = True ) -> Tuple[Optional[Entity], Optional[str]]: - """Sync a single file. + """Sync a single file with circuit breaker protection. Args: path: Path to file to sync new: Whether this is a new file Returns: - Tuple of (entity, checksum) or (None, None) if sync fails + Tuple of (entity, checksum) or (None, None) if sync fails or file is skipped """ + # Check if file should be skipped due to repeated failures + if await self._should_skip_file(path): + logger.warning(f"Skipping file due to repeated failures: {path}") + return None, None + try: logger.debug( f"Syncing file path={path} is_new={new} is_markdown={self.file_service.is_markdown(path)}" @@ -320,13 +505,21 @@ async def sync_file( if entity is not None: await self.search_service.index_entity(entity) + # Clear failure tracking on successful sync + self._clear_failure(path) + logger.debug( f"File sync completed, path={path}, entity_id={entity.id}, checksum={checksum[:8]}" ) return entity, checksum - except Exception as e: # pragma: no cover - logger.error(f"Failed to sync file: path={path}, error={str(e)}") + except Exception as e: + error_msg = str(e) + logger.error(f"Failed to sync file: path={path}, error={error_msg}") + + # Record failure for circuit breaker + await self._record_failure(path, error_msg) + return None, None async def sync_markdown_file(self, path: str, new: bool = True) -> Tuple[Optional[Entity], str]: diff --git a/tests/sync/test_sync_service.py b/tests/sync/test_sync_service.py index c4ff06dd3..7bef4c9c3 100644 --- a/tests/sync/test_sync_service.py +++ b/tests/sync/test_sync_service.py @@ -1316,3 +1316,252 @@ async def mock_update(entity_id, updates): await sync_service.sync_regular_file( str(test_file.relative_to(project_config.home)), new=True ) + + +@pytest.mark.asyncio +async def test_circuit_breaker_skips_after_three_failures( + sync_service: SyncService, project_config: ProjectConfig +): + """Test that circuit breaker skips file after 3 consecutive failures.""" + from unittest.mock import patch + + project_dir = project_config.home + test_file = project_dir / "failing_file.md" + + # Create a file with malformed content that will fail to parse + await create_test_file(test_file, "invalid markdown content") + + # Mock sync_markdown_file to always fail + async def mock_sync_markdown_file(*args, **kwargs): + raise ValueError("Simulated sync failure") + + with patch.object(sync_service, "sync_markdown_file", side_effect=mock_sync_markdown_file): + # First sync - should fail and record (1/3) + report1 = await sync_service.sync(project_dir) + assert len(report1.skipped_files) == 0 # Not skipped yet + + # Second sync - should fail and record (2/3) + report2 = await sync_service.sync(project_dir) + assert len(report2.skipped_files) == 0 # Still not skipped + + # Third sync - should fail, record (3/3), and be added to skipped list + report3 = await sync_service.sync(project_dir) + assert len(report3.skipped_files) == 1 + assert report3.skipped_files[0].path == "failing_file.md" + assert report3.skipped_files[0].failure_count == 3 + assert "Simulated sync failure" in report3.skipped_files[0].reason + + # Fourth sync - should be skipped immediately without attempting + report4 = await sync_service.sync(project_dir) + assert len(report4.skipped_files) == 1 # Still skipped + + +@pytest.mark.asyncio +async def test_circuit_breaker_resets_on_file_change( + sync_service: SyncService, project_config: ProjectConfig, entity_service: EntityService +): + """Test that circuit breaker resets when file content changes.""" + from unittest.mock import patch + + project_dir = project_config.home + test_file = project_dir / "changing_file.md" + + # Create initial failing content + await create_test_file(test_file, "initial bad content") + + # Mock sync_markdown_file to fail + call_count = 0 + + async def mock_sync_markdown_file(*args, **kwargs): + nonlocal call_count + call_count += 1 + raise ValueError("Simulated sync failure") + + with patch.object(sync_service, "sync_markdown_file", side_effect=mock_sync_markdown_file): + # Fail 3 times to hit circuit breaker threshold + await sync_service.sync(project_dir) # Fail 1 + await sync_service.sync(project_dir) # Fail 2 + report3 = await sync_service.sync(project_dir) # Fail 3 - now skipped + assert len(report3.skipped_files) == 1 + + # Now change the file content + valid_content = dedent( + """ + --- + title: Fixed Content + type: knowledge + --- + # Fixed Content + This should work now. + """ + ).strip() + await create_test_file(test_file, valid_content) + + # Circuit breaker should reset and allow retry + report = await sync_service.sync(project_dir) + assert len(report.skipped_files) == 0 # Should not be skipped anymore + + # Verify entity was created successfully + entity = await entity_service.get_by_permalink("changing-file") + assert entity is not None + assert entity.title == "Fixed Content" + + +@pytest.mark.asyncio +async def test_circuit_breaker_clears_on_success( + sync_service: SyncService, project_config: ProjectConfig, entity_service: EntityService +): + """Test that circuit breaker clears failure history after successful sync.""" + from unittest.mock import patch + + project_dir = project_config.home + test_file = project_dir / "sometimes_failing.md" + + valid_content = dedent( + """ + --- + title: Test File + type: knowledge + --- + # Test File + Test content + """ + ).strip() + await create_test_file(test_file, valid_content) + + # Mock to fail twice, then succeed + call_count = 0 + original_sync_markdown_file = sync_service.sync_markdown_file + + async def mock_sync_markdown_file(path, new): + nonlocal call_count + call_count += 1 + if call_count <= 2: + raise ValueError("Temporary failure") + # On third call, use the real implementation + return await original_sync_markdown_file(path, new) + + # Patch and fail twice + with patch.object(sync_service, "sync_markdown_file", side_effect=mock_sync_markdown_file): + await sync_service.sync(project_dir) # Fail 1 + await sync_service.sync(project_dir) # Fail 2 + await sync_service.sync(project_dir) # Succeed + + # Verify failure history was cleared + assert "sometimes_failing.md" not in sync_service._file_failures + + # Verify entity was created + entity = await entity_service.get_by_permalink("sometimes-failing") + assert entity is not None + + +@pytest.mark.asyncio +async def test_circuit_breaker_tracks_multiple_files( + sync_service: SyncService, project_config: ProjectConfig +): + """Test that circuit breaker tracks multiple failing files independently.""" + from unittest.mock import patch + + project_dir = project_config.home + + # Create multiple files with valid markdown + await create_test_file( + project_dir / "file1.md", + """ +--- +type: knowledge +--- +# File 1 +Content 1 +""", + ) + await create_test_file( + project_dir / "file2.md", + """ +--- +type: knowledge +--- +# File 2 +Content 2 +""", + ) + await create_test_file( + project_dir / "file3.md", + """ +--- +type: knowledge +--- +# File 3 +Content 3 +""", + ) + + # Mock to make file1 and file2 fail, but file3 succeed + original_sync_markdown_file = sync_service.sync_markdown_file + + async def mock_sync_markdown_file(path, new): + if "file1.md" in path or "file2.md" in path: + raise ValueError(f"Failure for {path}") + # file3 succeeds - use real implementation + return await original_sync_markdown_file(path, new) + + with patch.object(sync_service, "sync_markdown_file", side_effect=mock_sync_markdown_file): + # Fail 3 times for file1 and file2 (file3 succeeds each time) + await sync_service.sync(project_dir) # Fail count: file1=1, file2=1 + await sync_service.sync(project_dir) # Fail count: file1=2, file2=2 + report3 = await sync_service.sync(project_dir) # Fail count: file1=3, file2=3, now skipped + + # Both files should be skipped on third sync + assert len(report3.skipped_files) == 2 + skipped_paths = {f.path for f in report3.skipped_files} + assert "file1.md" in skipped_paths + assert "file2.md" in skipped_paths + + # Verify file3 is not in failures dict + assert "file3.md" not in sync_service._file_failures + + +@pytest.mark.asyncio +async def test_circuit_breaker_handles_checksum_computation_failure( + sync_service: SyncService, project_config: ProjectConfig +): + """Test circuit breaker behavior when checksum computation fails.""" + from unittest.mock import patch + + project_dir = project_config.home + test_file = project_dir / "checksum_fail.md" + await create_test_file(test_file, "content") + + # Mock sync_markdown_file to fail + async def mock_sync_markdown_file(*args, **kwargs): + raise ValueError("Sync failure") + + # Mock checksum computation to fail only during _record_failure (not during scan) + original_compute_checksum = sync_service._compute_checksum_async + call_count = 0 + + async def mock_compute_checksum(path): + nonlocal call_count + call_count += 1 + # First call is during scan - let it succeed + if call_count == 1: + return await original_compute_checksum(path) + # Second call is during _record_failure - make it fail + raise IOError("Cannot read file") + + with ( + patch.object(sync_service, "sync_markdown_file", side_effect=mock_sync_markdown_file), + patch.object( + sync_service, + "_compute_checksum_async", + side_effect=mock_compute_checksum, + ), + ): + # Should still record failure even if checksum fails + await sync_service.sync(project_dir) + + # Check that failure was recorded with empty checksum + assert "checksum_fail.md" in sync_service._file_failures + failure_info = sync_service._file_failures["checksum_fail.md"] + assert failure_info.count == 1 + assert failure_info.last_checksum == "" # Empty when checksum fails