-
Notifications
You must be signed in to change notification settings - Fork 140
Expand file tree
/
Copy path__init__.py
More file actions
380 lines (324 loc) · 13.7 KB
/
Copy path__init__.py
File metadata and controls
380 lines (324 loc) · 13.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
from abc import ABC, abstractmethod
from collections.abc import Generator, Mapping
from typing import Any, Generic, TypeVar, final
from typing_extensions import deprecated
from werkzeug import Request
from dify_plugin.core.runtime import Session
from dify_plugin.entities import ParameterOption
from dify_plugin.entities.invoke_message import InvokeMessage
from dify_plugin.entities.oauth import ToolOAuthCredentials
from dify_plugin.entities.provider_config import LogMetadata
from dify_plugin.entities.tool import ToolInvokeMessage, ToolParameter, ToolRuntime, ToolSelector
from dify_plugin.file.constants import DIFY_FILE_IDENTITY, DIFY_TOOL_SELECTOR_IDENTITY
from dify_plugin.file.entities import FileType
from dify_plugin.file.file import File
from dify_plugin.protocol.oauth import OAuthCredentials
T = TypeVar("T", bound=InvokeMessage)
class ToolLike(ABC, Generic[T]):
response_type: type[T]
############################################################
# For plugin implementation use only #
############################################################
def create_text_message(self, text: str) -> T:
return self.response_type(
type=InvokeMessage.MessageType.TEXT,
message=InvokeMessage.TextMessage(text=text),
)
def create_json_message(self, json: Mapping | list) -> T:
return self.response_type(
type=InvokeMessage.MessageType.JSON,
message=InvokeMessage.JsonMessage(json_object=json),
)
def create_image_message(self, image_url: str) -> T:
"""
create an image message
:param image: the url of the image
:return: the image message
"""
return self.response_type(
type=InvokeMessage.MessageType.IMAGE,
message=InvokeMessage.TextMessage(text=image_url),
)
def create_link_message(self, link: str) -> T:
"""
create a link message
:param link: the url of the link
:return: the link message
"""
return self.response_type(
type=InvokeMessage.MessageType.LINK,
message=InvokeMessage.TextMessage(text=link),
)
def create_blob_message(self, blob: bytes, meta: dict | None = None) -> T:
"""
create a blob message
:param blob: the blob
:return: the blob message
"""
return self.response_type(
type=InvokeMessage.MessageType.BLOB,
message=InvokeMessage.BlobMessage(blob=blob),
meta=meta,
)
def create_variable_message(self, variable_name: str, variable_value: Any) -> T:
"""
create a variable message
:param variable_name: the name of the variable
:param variable_value: the value of the variable
:return: the variable message
"""
return self.response_type(
type=InvokeMessage.MessageType.VARIABLE,
message=InvokeMessage.VariableMessage(variable_name=variable_name, variable_value=variable_value),
)
def create_stream_variable_message(self, variable_name: str, variable_value: str) -> T:
"""
create a variable message that will be streamed to the frontend
NOTE: variable value should be a string, only string is streaming supported now
:param variable_name: the name of the variable
:param variable_value: the value of the variable
:return: the variable message
"""
return self.response_type(
type=InvokeMessage.MessageType.VARIABLE,
message=InvokeMessage.VariableMessage(
variable_name=variable_name,
variable_value=variable_value,
stream=True,
),
)
def create_log_message(
self,
label: str,
data: Mapping[str, Any],
status: InvokeMessage.LogMessage.LogStatus = InvokeMessage.LogMessage.LogStatus.SUCCESS,
parent: T | None = None,
metadata: Mapping[LogMetadata, Any] | None = None,
) -> T:
"""
create a log message with status "start"
"""
return self.response_type(
type=InvokeMessage.MessageType.LOG,
message=InvokeMessage.LogMessage(
label=label,
data=data,
status=status,
parent_id=parent.message.id
if parent and isinstance(parent.message, InvokeMessage.LogMessage)
else None,
metadata=metadata,
),
)
def create_retriever_resource_message(
self,
retriever_resources: list[InvokeMessage.RetrieverResourceMessage.RetrieverResource],
context: str,
) -> T:
"""
create a retriever resource message
"""
return self.response_type(
type=InvokeMessage.MessageType.RETRIEVER_RESOURCES,
message=InvokeMessage.RetrieverResourceMessage(
retriever_resources=retriever_resources,
context=context,
),
)
def finish_log_message(
self,
log: T,
status: InvokeMessage.LogMessage.LogStatus = InvokeMessage.LogMessage.LogStatus.SUCCESS,
error: str | None = None,
data: Mapping[str, Any] | None = None,
metadata: Mapping[LogMetadata, Any] | None = None,
) -> T:
"""
mark log as finished
"""
assert isinstance(log.message, InvokeMessage.LogMessage)
return self.response_type(
type=InvokeMessage.MessageType.LOG,
message=InvokeMessage.LogMessage(
id=log.message.id,
label=log.message.label,
data=data or log.message.data,
status=status,
parent_id=log.message.parent_id,
error=error,
metadata=metadata or log.message.metadata,
),
)
@deprecated("This feature is deprecated, will soon be replaced by dynamic select parameter")
def _get_runtime_parameters(self) -> list[ToolParameter]:
"""
get the runtime parameters of the tool
:return: the runtime parameters
"""
return []
@classmethod
def _is_get_runtime_parameters_overridden(cls) -> bool:
"""
check if the _get_runtime_parameters method is overridden by subclass
:return: True if overridden, False otherwise
"""
return "_get_runtime_parameters" in cls.__dict__
@classmethod
def _convert_parameters(cls, tool_parameters: dict) -> dict:
"""
convert parameters into correct types
"""
for parameter, value in tool_parameters.items():
if isinstance(value, dict) and value.get("dify_model_identity") == DIFY_FILE_IDENTITY:
tool_parameters[parameter] = File(
url=value["url"],
mime_type=value.get("mime_type"),
type=FileType(value.get("type")),
filename=value.get("filename"),
extension=value.get("extension"),
size=value.get("size"),
)
elif isinstance(value, list) and all(
isinstance(item, dict) and item.get("dify_model_identity") == DIFY_FILE_IDENTITY for item in value
):
tool_parameters[parameter] = [
File(
url=item["url"],
mime_type=item.get("mime_type"),
type=FileType(item.get("type")),
filename=item.get("filename"),
extension=item.get("extension"),
size=item.get("size"),
)
for item in value
]
elif isinstance(value, dict) and value.get("dify_model_identity") == DIFY_TOOL_SELECTOR_IDENTITY:
tool_parameters[parameter] = ToolSelector.model_validate(value)
elif isinstance(value, list) and all(
isinstance(item, dict) and item.get("dify_model_identity") == DIFY_TOOL_SELECTOR_IDENTITY
for item in value
):
tool_parameters[parameter] = [ToolSelector.model_validate(item) for item in value]
return tool_parameters
class ToolProvider:
def validate_credentials(self, credentials: dict):
return self._validate_credentials(credentials)
def _validate_credentials(self, credentials: dict):
raise NotImplementedError(
"The tool you are using does not support credentials validation, "
"please implement `_validate_credentials` method"
)
def oauth_get_authorization_url(self, redirect_uri: str, system_credentials: Mapping[str, Any]) -> str:
"""
Get the authorization url
:param redirect_uri: redirect uri
:param system_credentials: system credentials
:return: authorization url
"""
return self._oauth_get_authorization_url(redirect_uri, system_credentials)
def _oauth_get_authorization_url(self, redirect_uri: str, system_credentials: Mapping[str, Any]) -> str:
raise NotImplementedError(
"The tool you are using does not support OAuth, please implement `_oauth_get_authorization_url` method"
)
def oauth_get_credentials(
self, redirect_uri: str, system_credentials: Mapping[str, Any], request: Request
) -> OAuthCredentials:
"""
Get the credentials
:param redirect_uri: redirect uri
:param system_credentials: system credentials
:param request: raw http request
:return: credentials
"""
tool_oauth_credentials = self._oauth_get_credentials(redirect_uri, system_credentials, request)
return OAuthCredentials(
credentials=tool_oauth_credentials.credentials, expires_at=tool_oauth_credentials.expires_at
)
def _oauth_get_credentials(
self, redirect_uri: str, system_credentials: Mapping[str, Any], request: Request
) -> ToolOAuthCredentials:
raise NotImplementedError(
"The tool you are using does not support OAuth, please implement `_oauth_get_credentials` method"
)
def oauth_refresh_credentials(
self, redirect_uri: str, system_credentials: Mapping[str, Any], credentials: Mapping[str, Any]
) -> OAuthCredentials:
"""
Refresh the credentials
:param redirect_uri: redirect uri
:param system_credentials: system credentials
:param credentials: credentials
:return: refreshed credentials
"""
tool_oauth_credentials = self._oauth_refresh_credentials(redirect_uri, system_credentials, credentials)
return OAuthCredentials(
credentials=tool_oauth_credentials.credentials, expires_at=tool_oauth_credentials.expires_at
)
def _oauth_refresh_credentials(
self, redirect_uri: str, system_credentials: Mapping[str, Any], credentials: Mapping[str, Any]
) -> ToolOAuthCredentials:
raise NotImplementedError(
"The tool you are using does not support OAuth, please implement `_oauth_refresh_credentials` method"
)
class Tool(ToolLike[ToolInvokeMessage]):
runtime: ToolRuntime
session: Session
@final
def __init__(
self,
runtime: ToolRuntime,
session: Session,
):
"""
Initialize the tool
NOTE:
- This method has been marked as final, DO NOT OVERRIDE IT.
"""
self.runtime = runtime
self.session = session
self.response_type = ToolInvokeMessage
@classmethod
def from_credentials(
cls,
credentials: dict,
user_id: str | None = None,
):
return cls(
runtime=ToolRuntime(credentials=credentials, user_id=user_id, session_id=None),
session=Session.empty_session(), # TODO could not fetch session here
)
############################################################
# Methods that can be implemented by plugin #
############################################################
@abstractmethod
def _invoke(self, tool_parameters: dict) -> Generator[ToolInvokeMessage, None, None]:
pass
def _fetch_parameter_options(self, parameter: str) -> list[ParameterOption]:
"""
Fetch the parameter options of the tool.
To be implemented by subclasses.
Also, it's optional to implement, that's why it's not an abstract method.
"""
raise NotImplementedError(
"This plugin should implement `_fetch_parameter_options` method to enable dynamic select parameter"
)
############################################################
# For executor use only #
############################################################
def invoke(self, tool_parameters: dict, passthrough: str | None = None) -> Generator[ToolInvokeMessage, None, None]:
# convert parameters into correct types
tool_parameters = self._convert_parameters(tool_parameters)
# try to pass passthrough to implementations that support it, fallback otherwise
try:
return self._invoke(tool_parameters, passthrough=passthrough) # type: ignore[call-arg]
except TypeError:
return self._invoke(tool_parameters)
@deprecated("This feature is deprecated, will soon be replaced by dynamic select parameter")
def get_runtime_parameters(self) -> list[ToolParameter]:
return self._get_runtime_parameters()
def fetch_parameter_options(self, parameter: str) -> list[ParameterOption]:
"""
Fetch the parameter options of the tool.
To be implemented by subclasses.
"""
return self._fetch_parameter_options(parameter)