-
Notifications
You must be signed in to change notification settings - Fork 179
Expand file tree
/
Copy pathcloud_utils.py
More file actions
149 lines (121 loc) · 4.46 KB
/
cloud_utils.py
File metadata and controls
149 lines (121 loc) · 4.46 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
"""Shared utilities for cloud operations."""
from basic_memory.cli.commands.cloud.api_client import make_api_request
from basic_memory.config import ConfigManager
from basic_memory.mcp.async_client import resolve_configured_workspace
from basic_memory.schemas.cloud import (
CloudProjectList,
CloudProjectCreateRequest,
CloudProjectCreateResponse,
ProjectVisibility,
)
from basic_memory.utils import generate_permalink
class CloudUtilsError(Exception):
"""Exception raised for cloud utility errors."""
pass
def _workspace_headers(
*,
project_name: str | None = None,
workspace: str | None = None,
) -> dict[str, str]:
"""Build optional workspace headers using the CLI config resolution chain."""
resolved_workspace = resolve_configured_workspace(
project_name=project_name,
workspace=workspace,
)
if resolved_workspace is None:
return {}
return {"X-Workspace-ID": resolved_workspace}
async def fetch_cloud_projects(
*,
project_name: str | None = None,
workspace: str | None = None,
api_request=make_api_request,
) -> CloudProjectList:
"""Fetch list of projects from cloud API.
Returns:
CloudProjectList with projects from cloud
"""
try:
config_manager = ConfigManager()
config = config_manager.config
host_url = config.cloud_host.rstrip("/")
response = await api_request(
method="GET",
url=f"{host_url}/proxy/v2/projects/",
headers=_workspace_headers(project_name=project_name, workspace=workspace),
)
return CloudProjectList.model_validate(response.json())
except Exception as e:
raise CloudUtilsError(f"Failed to fetch cloud projects: {e}") from e
async def create_cloud_project(
project_name: str,
*,
workspace: str | None = None,
visibility: ProjectVisibility = "workspace",
api_request=make_api_request,
) -> CloudProjectCreateResponse:
"""Create a new project on cloud.
Args:
project_name: Name of project to create
workspace: Optional workspace override for tenant-scoped project creation
visibility: Visibility for the created cloud project
Returns:
CloudProjectCreateResponse with project details from API
"""
try:
config_manager = ConfigManager()
config = config_manager.config
host_url = config.cloud_host.rstrip("/")
# Use generate_permalink to ensure consistent naming
project_path = generate_permalink(project_name)
project_data = CloudProjectCreateRequest(
name=project_name,
path=project_path,
set_default=False,
visibility=visibility,
)
response = await api_request(
method="POST",
url=f"{host_url}/proxy/v2/projects/",
headers={
"Content-Type": "application/json",
**_workspace_headers(project_name=project_name, workspace=workspace),
},
json_data=project_data.model_dump(),
)
return CloudProjectCreateResponse.model_validate(response.json())
except Exception as e:
raise CloudUtilsError(f"Failed to create cloud project '{project_name}': {e}") from e
async def sync_project(project_name: str, force_full: bool = False) -> None:
"""Trigger sync for a specific project on cloud.
Args:
project_name: Name of project to sync
force_full: If True, force a full scan bypassing watermark optimization
"""
try:
from basic_memory.cli.commands.command_utils import run_sync
await run_sync(project=project_name, force_full=force_full)
except Exception as e:
raise CloudUtilsError(f"Failed to sync project '{project_name}': {e}") from e
async def project_exists(
project_name: str,
*,
workspace: str | None = None,
api_request=make_api_request,
) -> bool:
"""Check if a project exists on cloud.
Args:
project_name: Name of project to check
workspace: Optional workspace override for tenant-scoped project lookup
Returns:
True if project exists, False otherwise
Raises:
CloudUtilsError: If the project list cannot be fetched from cloud
"""
projects = await fetch_cloud_projects(
project_name=project_name,
workspace=workspace,
api_request=api_request,
)
project_names = {p.name for p in projects.projects}
return project_name in project_names