Skip to content

Commit ae72193

Browse files
committed
restore worker models
1 parent ff9e20f commit ae72193

2 files changed

Lines changed: 85 additions & 4 deletions

File tree

xinference/core/supervisor.py

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1860,6 +1860,77 @@ async def add_worker(self, worker_address: str):
18601860
self._worker_address_to_worker[worker_address] = worker_ref
18611861
logger.debug("Worker %s has been added successfully", worker_address)
18621862

1863+
@log_async(logger=logger)
1864+
async def ensure_worker(
1865+
self, worker_address: str
1866+
) -> xo.ActorRefType["WorkerActor"]:
1867+
from .worker import WorkerActor
1868+
1869+
worker_ref = await xo.actor_ref(
1870+
address=worker_address, uid=WorkerActor.default_uid()
1871+
)
1872+
if worker_address in self._worker_address_to_worker:
1873+
self._worker_address_to_worker[worker_address] = worker_ref
1874+
logger.debug("Worker %s already registered, refreshed ref", worker_address)
1875+
else:
1876+
self._worker_address_to_worker[worker_address] = worker_ref
1877+
logger.debug("Worker %s has been added successfully", worker_address)
1878+
return worker_ref
1879+
1880+
@log_async(logger=logger)
1881+
async def restore_worker_models(
1882+
self, worker_address: str, models: Dict[str, Dict[str, Any]]
1883+
):
1884+
if not models:
1885+
return
1886+
worker_ref = await self.ensure_worker(worker_address)
1887+
restored = 0
1888+
for replica_model_uid in models.keys():
1889+
model_uid, rep_id = parse_replica_model_uid(replica_model_uid)
1890+
if rep_id < 0:
1891+
rep_id = 0
1892+
1893+
replica_info = self._model_uid_to_replica_info.get(model_uid, None)
1894+
if replica_info is None:
1895+
replica_count = rep_id + 1
1896+
replica_info = ReplicaInfo(
1897+
replica=replica_count,
1898+
scheduler=itertools.cycle(range(replica_count)),
1899+
)
1900+
self._model_uid_to_replica_info[model_uid] = replica_info
1901+
elif rep_id + 1 > replica_info.replica:
1902+
replica_info.replica = rep_id + 1
1903+
replica_info.scheduler = itertools.cycle(range(replica_info.replica))
1904+
1905+
if all(
1906+
w.address != worker_ref.address
1907+
for w in replica_info.replica_to_worker_refs[rep_id]
1908+
):
1909+
replica_info.replica_to_worker_refs[rep_id].append(worker_ref)
1910+
1911+
existing = self._replica_model_uid_to_worker.get(replica_model_uid, None)
1912+
if existing is None:
1913+
self._replica_model_uid_to_worker[replica_model_uid] = worker_ref
1914+
elif isinstance(existing, (list, tuple)):
1915+
if all(w.address != worker_ref.address for w in existing):
1916+
if isinstance(existing, tuple):
1917+
self._replica_model_uid_to_worker[replica_model_uid] = [
1918+
*existing,
1919+
worker_ref,
1920+
]
1921+
else:
1922+
existing.append(worker_ref)
1923+
else:
1924+
if existing.address != worker_ref.address:
1925+
self._replica_model_uid_to_worker[replica_model_uid] = [
1926+
existing,
1927+
worker_ref,
1928+
]
1929+
restored += 1
1930+
logger.info(
1931+
"Restored %s model replicas for worker %s", restored, worker_address
1932+
)
1933+
18631934
@log_async(logger=logger)
18641935
async def remove_worker(self, worker_address: str):
18651936
uids_to_remove = []

xinference/core/worker.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -456,10 +456,20 @@ async def get_supervisor_ref(self, add_worker: bool = True) -> xo.ActorRefType:
456456
if self._supervisor_ref is not None:
457457
return self._supervisor_ref
458458
self._supervisor_ref = supervisor_ref
459-
if add_worker and len(self._model_uid_to_model) == 0:
460-
# Newly started (or restarted), has no model, notify supervisor
461-
await self._supervisor_ref.add_worker(self.address)
462-
logger.info("Connected to supervisor as a fresh worker")
459+
if add_worker:
460+
await self._supervisor_ref.ensure_worker(self.address)
461+
if len(self._model_uid_to_model) == 0:
462+
logger.info("Connected to supervisor as a fresh worker")
463+
else:
464+
try:
465+
models = await self.list_models()
466+
await self._supervisor_ref.restore_worker_models(
467+
self.address, models
468+
)
469+
except Exception:
470+
logger.exception(
471+
"Failed to restore worker models to supervisor"
472+
)
463473

464474
self._status_guard_ref = await xo.actor_ref(
465475
address=self._supervisor_address, uid=StatusGuardActor.default_uid()

0 commit comments

Comments
 (0)