-
Notifications
You must be signed in to change notification settings - Fork 710
Expand file tree
/
Copy pathdatabase_base.py
More file actions
234 lines (187 loc) · 6.7 KB
/
Copy pathdatabase_base.py
File metadata and controls
234 lines (187 loc) · 6.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
"""Database base class for managing database operations."""
# pylint: disable=unnecessary-pass
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional, Type
import v3.models.messages as messages
from ..models.messages_kernel import (
AgentMessageData,
BaseDataModel,
Plan,
Step,
TeamConfiguration,
UserCurrentTeam,
)
class DatabaseBase(ABC):
"""Abstract base class for database operations."""
@abstractmethod
async def initialize(self) -> None:
"""Initialize the database client and create containers if needed."""
pass
@abstractmethod
async def close(self) -> None:
"""Close database connection."""
pass
# Core CRUD Operations
@abstractmethod
async def add_item(self, item: BaseDataModel) -> None:
"""Add an item to the database."""
pass
@abstractmethod
async def update_item(self, item: BaseDataModel) -> None:
"""Update an item in the database."""
pass
@abstractmethod
async def get_item_by_id(
self, item_id: str, partition_key: str, model_class: Type[BaseDataModel]
) -> Optional[BaseDataModel]:
"""Retrieve an item by its ID and partition key."""
pass
@abstractmethod
async def query_items(
self,
query: str,
parameters: List[Dict[str, Any]],
model_class: Type[BaseDataModel],
) -> List[BaseDataModel]:
"""Query items from the database and return a list of model instances."""
pass
@abstractmethod
async def delete_item(self, item_id: str, partition_key: str) -> None:
"""Delete an item from the database."""
pass
# Plan Operations
@abstractmethod
async def add_plan(self, plan: Plan) -> None:
"""Add a plan to the database."""
pass
@abstractmethod
async def update_plan(self, plan: Plan) -> None:
"""Update a plan in the database."""
pass
@abstractmethod
async def get_plan_by_plan_id(self, plan_id: str) -> Optional[Plan]:
"""Retrieve a plan by plan_id."""
pass
@abstractmethod
async def get_plan(self, plan_id: str) -> Optional[Plan]:
"""Retrieve a plan by plan_id."""
pass
@abstractmethod
async def get_all_plans(self) -> List[Plan]:
"""Retrieve all plans for the user."""
pass
@abstractmethod
async def get_all_plans_by_team_id(self, team_id: str) -> List[Plan]:
"""Retrieve all plans for a specific team."""
pass
@abstractmethod
async def get_all_plans_by_team_id_status(
self, user_id: str, team_id: str, status: str
) -> List[Plan]:
"""Retrieve all plans for a specific team."""
pass
# Step Operations
@abstractmethod
async def add_step(self, step: Step) -> None:
"""Add a step to the database."""
pass
@abstractmethod
async def update_step(self, step: Step) -> None:
"""Update a step in the database."""
pass
@abstractmethod
async def get_steps_by_plan(self, plan_id: str) -> List[Step]:
"""Retrieve all steps for a plan."""
pass
@abstractmethod
async def get_step(self, step_id: str, session_id: str) -> Optional[Step]:
"""Retrieve a step by step_id and session_id."""
pass
# Team Operations
@abstractmethod
async def add_team(self, team: TeamConfiguration) -> None:
"""Add a team configuration to the database."""
pass
@abstractmethod
async def update_team(self, team: TeamConfiguration) -> None:
"""Update a team configuration in the database."""
pass
@abstractmethod
async def get_team(self, team_id: str) -> Optional[TeamConfiguration]:
"""Retrieve a team configuration by team_id."""
pass
@abstractmethod
async def get_team_by_id(self, team_id: str) -> Optional[TeamConfiguration]:
"""Retrieve a team configuration by internal id."""
pass
@abstractmethod
async def get_all_teams(self) -> List[TeamConfiguration]:
"""Retrieve all team configurations for the given user."""
pass
@abstractmethod
async def delete_team(self, team_id: str) -> bool:
"""Delete a team configuration by team_id and return True if deleted."""
pass
# Data Management Operations
@abstractmethod
async def get_data_by_type(self, data_type: str) -> List[BaseDataModel]:
"""Retrieve all data of a specific type."""
pass
@abstractmethod
async def get_all_items(self) -> List[Dict[str, Any]]:
"""Retrieve all items as dictionaries."""
pass
# Context Manager Support
async def __aenter__(self):
"""Async context manager entry."""
await self.initialize()
return self
async def __aexit__(self, exc_type, exc, tb):
"""Async context manager exit."""
await self.close()
@abstractmethod
async def get_steps_for_plan(self, plan_id: str) -> List[Step]:
"""Convenience method aliasing get_steps_by_plan for compatibility."""
pass
@abstractmethod
async def get_current_team(self, user_id: str) -> Optional[UserCurrentTeam]:
"""Retrieve the current team for a user."""
pass
@abstractmethod
async def delete_current_team(self, user_id: str) -> Optional[UserCurrentTeam]:
"""Retrieve the current team for a user."""
pass
@abstractmethod
async def set_current_team(self, current_team: UserCurrentTeam) -> None:
pass
@abstractmethod
async def update_current_team(self, current_team: UserCurrentTeam) -> None:
"""Update the current team for a user."""
pass
@abstractmethod
async def delete_plan_by_plan_id(self, plan_id: str) -> bool:
"""Retrieve the current team for a user."""
pass
@abstractmethod
async def add_mplan(self, mplan: messages.MPlan) -> None:
"""Add a team configuration to the database."""
pass
@abstractmethod
async def update_mplan(self, mplan: messages.MPlan) -> None:
"""Update a team configuration in the database."""
pass
@abstractmethod
async def get_mplan(self, plan_id: str) -> Optional[messages.MPlan]:
"""Retrieve a mplan configuration by plan_id."""
pass
@abstractmethod
async def add_agent_message(self, message: AgentMessageData) -> None:
pass
@abstractmethod
async def update_agent_message(self, message: AgentMessageData) -> None:
"""Update an agent message in the database."""
pass
@abstractmethod
async def get_agent_messages(self, plan_id: str) -> Optional[AgentMessageData]:
"""Retrieve an agent message by message_id."""
pass