-
Notifications
You must be signed in to change notification settings - Fork 180
Expand file tree
/
Copy pathcloud_utils.py
More file actions
134 lines (105 loc) · 3.86 KB
/
cloud_utils.py
File metadata and controls
134 lines (105 loc) · 3.86 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
"""Shared utilities for cloud operations."""
from basic_memory.cli.commands.cloud.api_client import make_api_request
from basic_memory.config import ConfigManager
from basic_memory.schemas.cloud import (
CloudProjectList,
CloudProjectCreateRequest,
CloudProjectCreateResponse,
)
from basic_memory.utils import generate_permalink
class CloudUtilsError(Exception):
"""Exception raised for cloud utility errors."""
pass
def _workspace_headers(workspace: str | None = None) -> dict[str, str]:
"""Build workspace header if workspace is specified."""
if workspace:
return {"X-Workspace-ID": workspace}
return {}
async def fetch_cloud_projects(
*,
workspace: str | None = None,
api_request=make_api_request,
) -> CloudProjectList:
"""Fetch list of projects from cloud API.
Args:
workspace: Cloud workspace tenant_id to list projects from
Returns:
CloudProjectList with projects from cloud
"""
try:
config_manager = ConfigManager()
config = config_manager.config
host_url = config.cloud_host.rstrip("/")
response = await api_request(
method="GET",
url=f"{host_url}/proxy/v2/projects/",
headers=_workspace_headers(workspace),
)
return CloudProjectList.model_validate(response.json())
except Exception as e:
raise CloudUtilsError(f"Failed to fetch cloud projects: {e}") from e
async def create_cloud_project(
project_name: str,
*,
workspace: str | None = None,
api_request=make_api_request,
) -> CloudProjectCreateResponse:
"""Create a new project on cloud.
Args:
project_name: Name of project to create
workspace: Cloud workspace tenant_id to create project in
Returns:
CloudProjectCreateResponse with project details from API
"""
try:
config_manager = ConfigManager()
config = config_manager.config
host_url = config.cloud_host.rstrip("/")
# Use generate_permalink to ensure consistent naming
project_path = generate_permalink(project_name)
project_data = CloudProjectCreateRequest(
name=project_name,
path=project_path,
set_default=False,
)
headers = {"Content-Type": "application/json"}
headers.update(_workspace_headers(workspace))
response = await api_request(
method="POST",
url=f"{host_url}/proxy/v2/projects/",
headers=headers,
json_data=project_data.model_dump(),
)
return CloudProjectCreateResponse.model_validate(response.json())
except Exception as e:
raise CloudUtilsError(f"Failed to create cloud project '{project_name}': {e}") from e
async def sync_project(project_name: str, force_full: bool = False) -> None:
"""Trigger sync for a specific project on cloud.
Args:
project_name: Name of project to sync
force_full: If True, force a full scan bypassing watermark optimization
"""
try:
from basic_memory.cli.commands.command_utils import run_sync
await run_sync(project=project_name, force_full=force_full)
except Exception as e:
raise CloudUtilsError(f"Failed to sync project '{project_name}': {e}") from e
async def project_exists(
project_name: str,
*,
workspace: str | None = None,
api_request=make_api_request,
) -> bool:
"""Check if a project exists on cloud.
Args:
project_name: Name of project to check
workspace: Cloud workspace tenant_id to check in
Returns:
True if project exists, False otherwise
"""
try:
projects = await fetch_cloud_projects(workspace=workspace, api_request=api_request)
project_names = {p.name for p in projects.projects}
return project_name in project_names
except Exception:
return False