-
Notifications
You must be signed in to change notification settings - Fork 310
Expand file tree
/
Copy pathfoundry_local_manager.py
More file actions
219 lines (170 loc) · 8.35 KB
/
foundry_local_manager.py
File metadata and controls
219 lines (170 loc) · 8.35 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
# --------------------------------------------------------------------------
from __future__ import annotations
import json
import logging
import threading
from typing import Callable, List, Optional
from pydantic import TypeAdapter
from .catalog import Catalog
from .configuration import Configuration
from .ep_types import EpDownloadResult, EpInfo
from .logging_helper import set_default_logger_severity
from .detail.core_interop import CoreInterop, InteropRequest
from .detail.model_load_manager import ModelLoadManager
from .exception import FoundryLocalException
from .openai.responses_client import ResponsesClient
logger = logging.getLogger(__name__)
class FoundryLocalManager:
"""Singleton manager for Foundry Local SDK operations.
Call ``FoundryLocalManager.initialize(config)`` once at startup, then access
the singleton via ``FoundryLocalManager.instance``.
Attributes:
instance: The singleton ``FoundryLocalManager`` instance (set after ``initialize``).
catalog: The model ``Catalog`` for discovering and managing models.
urls: Bound URL(s) after ``start_web_service()`` is called, or ``None``.
"""
_lock = threading.Lock()
instance: FoundryLocalManager = None
@staticmethod
def initialize(config: Configuration):
"""Initialize the Foundry Local SDK with the given configuration.
This method must be called before using any other part of the SDK.
Args:
config: Configuration object for the SDK.
"""
# Delegate singleton creation to the constructor, which enforces
# the singleton invariant under a lock and sets `instance`.
FoundryLocalManager(config)
def __init__(self, config: Configuration):
# Enforce singleton creation under a class-level lock and ensure
# that `FoundryLocalManager.instance` is set exactly once.
with FoundryLocalManager._lock:
if FoundryLocalManager.instance is not None:
raise FoundryLocalException(
"FoundryLocalManager is a singleton and has already been initialized."
)
config.validate()
self.config = config
self._initialize()
FoundryLocalManager.instance = self
self.urls = None
def _initialize(self):
set_default_logger_severity(self.config.log_level)
external_service_url = self.config.web.external_url if self.config.web else None
self._core_interop = CoreInterop(self.config)
self._model_load_manager = ModelLoadManager(self._core_interop, external_service_url)
self.catalog = Catalog(self._model_load_manager, self._core_interop)
def discover_eps(self) -> list[EpInfo]:
"""Discover available execution providers and their registration status.
Returns:
List of ``EpInfo`` entries for all discoverable EPs.
Raises:
FoundryLocalException: If EP discovery fails or response JSON is invalid.
"""
response = self._core_interop.execute_command("discover_eps")
if response.error is not None:
raise FoundryLocalException(f"Error discovering execution providers: {response.error}")
try:
adapter = TypeAdapter(List[EpInfo])
return adapter.validate_json(response.data or "[]")
except Exception as e:
raise FoundryLocalException(
f"Failed to decode JSON response from discover_eps: {e}. Response was: {response.data}"
) from e
def download_and_register_eps(
self,
names: Optional[list[str]] = None,
progress_callback: Optional[Callable[[str, float], None]] = None,
) -> EpDownloadResult:
"""Download and register execution providers.
Args:
names: Optional subset of EP names to download. If omitted or empty,
all discoverable EPs are downloaded.
progress_callback: Optional callback ``(ep_name: str, percent: float) -> None``
invoked as each EP downloads. ``percent`` is 0-100.
Returns:
``EpDownloadResult`` describing operation status and per-EP outcomes.
Raises:
FoundryLocalException: If the operation fails or response JSON is invalid.
"""
request = None
if names is not None and len(names) > 0:
request = InteropRequest(params={"Names": ",".join(names)})
if progress_callback is not None:
def _on_chunk(chunk: str) -> None:
sep = chunk.find("|")
if sep >= 0:
ep_name = chunk[:sep] or ""
try:
percent = float(chunk[sep + 1:])
progress_callback(ep_name, percent)
except ValueError:
pass
response = self._core_interop.execute_command_with_callback(
"download_and_register_eps", request, _on_chunk
)
else:
response = self._core_interop.execute_command("download_and_register_eps", request)
if response.error is not None:
raise FoundryLocalException(f"Error downloading execution providers: {response.error}")
if response.data:
try:
adapter = TypeAdapter(EpDownloadResult)
ep_result = adapter.validate_json(response.data)
except Exception as e:
raise FoundryLocalException(
"Failed to decode JSON response from download_and_register_eps: "
f"{e}. Response was: {response.data}"
) from e
else:
ep_result = EpDownloadResult(
Success=True, Status="Completed", RegisteredEps=[], FailedEps=[]
)
# Invalidate the catalog cache if any EP was newly registered so the next access
# re-fetches models with the updated set of available EPs.
if ep_result.success or len(ep_result.registered_eps) > 0:
self.catalog._invalidate_cache()
return ep_result
def start_web_service(self):
"""Start the optional web service.
If provided, the service will be bound to the value of Configuration.web.urls.
The default of http://127.0.0.1:0 will be used otherwise, which binds to a random ephemeral port.
FoundryLocalManager.urls will be updated with the actual URL/s the service is listening on.
"""
with FoundryLocalManager._lock:
response = self._core_interop.execute_command("start_service")
if response.error is not None:
raise FoundryLocalException(f"Error starting web service: {response.error}")
bound_urls = json.loads(response.data)
if bound_urls is None or len(bound_urls) == 0:
raise FoundryLocalException("Failed to get bound URLs from web service start response.")
self.urls = bound_urls
def stop_web_service(self):
"""Stop the optional web service."""
with FoundryLocalManager._lock:
if self.urls is None:
raise FoundryLocalException("Web service is not running.")
response = self._core_interop.execute_command("stop_service")
if response.error is not None:
raise FoundryLocalException(f"Error stopping web service: {response.error}")
self.urls = None
def create_responses_client(self, model_id: Optional[str] = None) -> ResponsesClient:
"""Create a :class:`ResponsesClient` bound to the running web service.
The Responses API is HTTP-only, so the web service must be started
before calling this. Use :meth:`start_web_service` first.
Args:
model_id: Optional default model ID baked into the client. May also
be supplied per-call via ``options['model']``.
Returns:
A new :class:`ResponsesClient`.
Raises:
FoundryLocalException: If the web service has not been started.
"""
if not self.urls:
raise FoundryLocalException(
"Web service is not running. Call start_web_service() first."
)
return ResponsesClient(self.urls[0], model_id)