Feat/new model manager#2251
Conversation
b0de01d to
c27b669
Compare
|
Review the following changes in direct dependencies. Learn more about Socket for GitHub.
|
ModelManagerProcess.__init__ has no parameter — it builds its own ModelManager internally. Passing manager=ModelManager() from main() raised TypeError before the ROUTER/SHM pool could start, breaking . Removed the kwarg + unused import; updated module docstring example to match.
Skip logic used registry.get_entry_for_class(cls, task_name), which walks the MRO. Once ObjectDetectionModel.infer was registered, a later OpenVocabularyObjectDetectionModel.infer registration was skipped — the subclass silently inherited the base validator and lost the requirement. Switched to registered_tasks(cls), which only looks at exact-class entries. Added regression test.
… path process_async()'s subprocess branch returned the raw worker result and skipped the registry serialization that process() applies after submit().result(). Same logical API produced different shapes depending on sync/async caller. Collapsed process_async() to delegate to self.process() in the default executor. Subprocess + direct backends both go through the same serialization tail. Threading model is unchanged — the direct path already used run_in_executor; subprocess now does the same.
_schedule_reload() called _load_model(model_id) with default api_key= and device=. Original values from _handle_ensure_loaded (and admin T_LOAD) were not stored anywhere, so a worker that died after loading a private model or a GPU-pinned model would reload without credentials or land on the wrong device. Added api_key/device fields to ModelState. _handle_ensure_loaded and _handle_load now persist them on first load; _schedule_reload reads them back when scheduling the recovery load. Added regression test.
…pletion _handle_ensure_loaded() stored a per-waiter deadline but only consulted it inside _flush_load_waiters(), which runs after load completes (or fails). If a load hung — slow disk, stuck worker init, GPU contention — clients never received T_LOAD_TIMEOUT at wait_ms and sat until the client socket timed out. Each waiter now spawns an _expire_waiter() task that sleeps until its deadline and sends T_LOAD_TIMEOUT iff the waiter is still pending. Success path: _flush_load_waiters() consumes the waiter first, the expiry task wakes, finds itself absent, and no-ops. Added regression tests for both the hang path and the no-double-send invariant.
All inference now flows through ModelManager.process()/submit(); backends have no public inference entry point. Aligns DirectBackend and SubprocessBackend — both raised or dropped kwargs from those methods inconsistently. Removes a footgun: DirectBackend.infer_sync(img, conf=0.7) silently ignored conf=0.7 while the same call on SubprocessBackend raised. Backend ABC: removed abstract infer_sync/infer_async/submit. DirectBackend: removed the public methods and the BatchCollector wiring that they were the only entry point for. Kept batch_max_size / batch_max_delay_ms ctor args as no-ops to preserve ModelManager.load call sites — they remain meaningful for SubprocessBackend. SubprocessBackend: removed the three raising stubs (no contract left). Tests rewritten to drive inference through ModelManager.process().
PassthroughModel returned numpy arrays which lack .cpu(); the subproc worker's result normalization (result.xyxy.cpu() etc.) crashed in benchmark mode. Switched the dummy fields to CPU torch tensors so the existing .cpu() calls are no-ops. Hot path stays branch-free — no hasattr guard in production.
…sult path
f'...{len(mv)}B' evaluated after mv.release() raised ValueError, aborting
_write_error_to_slot + the T_RESULT notify. Caller waited forever.
Lift is_accepting check + registry validation + record_inference into submit()'s direct branch. Future still resolves to the raw model output.
- proxies/mmp_client.py: MMPClient class wraps today's state.py logic - module globals → instance attrs; ZMQ/SHM helpers private - start()/shutdown() replace inline lifespan code - state.py untouched; routers still import it
- proxies/mm_wrapper.py: MMWrapper wraps ModelManager directly - sync MM methods (load/unload/stats) → run_in_executor - infer → manager.process_async(model_id, task=..., images=image) - in-process: no instance routing, no client-disconnect race
- routers/v2_server.py: state.fetch_stats → mm.stats() via Depends
- dependencies.py: new file holds get_model_manager (avoids circular
import between app.py and routers)
- app.py: get_model_manager moved out
- state.alloc/write/submit/free → mm.infer (collapses _submit_one_slot
into _infer_one; exceptions map to HTTP status codes)
- state.lifecycle_req(T_LOAD/T_UNLOAD) → mm.load / mm.unload
- state.fetch_stats → mm.stats; new /interface uses mm.interface
- inline pipeline-timing CSV dropped; restored inside MMPClient later
- payload-size pre-check moved into mm.infer (ValueError → 413)
- routers/infer.py: state.* helpers → mm.infer / mm.ensure_loaded
- body buffered to bytes (1 extra memcpy vs direct stream-to-SHM);
pickle format re-pickles deserialized prediction (perf regression
vs raw-bytes passthrough — TODO restore if benchmarks demand)
- app.py: drop transitional state.py setup; lifespan is just
construct + proxy.start() / proxy.shutdown()
- nothing references state.py now (deletable in next commit)
- ModelManagerProxy.infer: add raw_pickle param
- MMPClient: raw_pickle=True skips pickle.loads, returns SHM bytes
- MMWrapper: raw_pickle=True pickle.dumps the typed result once
- routers/infer.py: format=pickle → raw_pickle=True, zero router-side
pickle work; format=json unchanged
- proxies/mmp_client.py is the only owner of ZMQ+SHM state now
- mmp_client.py docstring: drop reference to removed state.py
- tests/integration_tests/test_v2_endpoints.py: construct MMPClient
and attach to app.state directly (no state.py shim)
| headers={"Retry-After": "1"}, | ||
| content=b"server busy, try again", | ||
| ) | ||
| return Response(status_code=500, content=msg) |
- configuration: drop cached INFERENCE_DEPLOYMENT_MODE constant;
keep ENV name + DEFAULT (= MODE_BUNDLED) as constants only
- launcher.launch(mode): mode is required; no env read
- app.py lifespan: reads env once at the boundary
- tests/unit_tests/test_launcher.py: drop monkeypatch env tests;
direct launch(MODE_BUNDLED|MODE_MMP) tests stay
- framework/input_parsers/: raw_body, multipart, json_base64, url_fetch - framework/input_parsers/dispatch: content-type → parser - routers/v2_models.py: import from framework, drop inline impls - tests: update imports to framework.input_parsers
dkosowski87
left a comment
There was a problem hiding this comment.
1/6 with the pace I'm currently at ;)
| self._device_str = device | ||
| self._decoder_name = decoder | ||
| self._executor = executor | ||
| self._state_value: str = "loading" |
There was a problem hiding this comment.
Enum instead of string?
| for proc in pynvml.nvmlDeviceGetComputeRunningProcesses(handle): | ||
| if proc.pid == os.getpid(): | ||
| return proc.usedGpuMemory or 0 | ||
| except Exception: |
There was a problem hiding this comment.
Wondering if we should have a specific message on except pynvml.NVMLError_FunctionNotFound as this might mean that older bindings for nvml are on the os.
| import torch | ||
|
|
||
| if torch.cuda.is_available(): | ||
| return torch.cuda.memory_allocated() |
There was a problem hiding this comment.
torch.cuda.memory_reserved() would be more conservative as it includes CUDA's caching allocator. But this will only work IMO if we resolve to the torch backend. I think ONNX models running onn
onnxruntime even when using the CUDU execution provider won't expose this.
| "shared" if executor else "none", | ||
| ) | ||
|
|
||
| def _detect_device(self) -> str: |
There was a problem hiding this comment.
Does this work?
self._model = AutoModel.from_pretrained(
model_id, api_key=api_key, **load_kwargs
)
resolves to the model wrapper like ResNetForClassificationTorch which itself isn't a torch.module thus doesn't have #parameters nor #buffers. It's model attributes has those. I don't see any passthrough.
Plus for onnx and trt parameters and buffers are not available.
| list(self._model.parameters()) if hasattr(self._model, "parameters") else [] | ||
| ) | ||
| if params: | ||
| return str(params[0].device) |
There was a problem hiding this comment.
Even for torch models it might be the case that some params were kept on CPU even if we put the device on GPU. So this is a little bit of a misrepresentation.
| except Exception: | ||
| self._model = None | ||
| raise | ||
| self._gpu_memory_delta_mb = (self._gpu_mem_snapshot() - gpu_before) / ( |
There was a problem hiding this comment.
This will give us the difference between the current process memory utilisation. Do we want this or the difference between the full gpu utilisation change?
| batch_max_size, | ||
| ) | ||
|
|
||
| b = self._create_backend( |
There was a problem hiding this comment.
Wondering about the names backend and task. It's a different domain than inference_models so normally I would same that using a similar nomenclature isn't a problem. Although here it is quite connected as we are loading models that have their own backend and task definitions - `"trt", "classification". Maybe we could change the names. WDYT?
| backend: str, | ||
| **kwargs, | ||
| ) -> Backend: | ||
| if backend == "direct": |
There was a problem hiding this comment.
Enum instead of string?
What does this PR do?
Adding
inference_model_managerType of Change
Testing
Initial suit of tests added
Checklist
Additional Context
N/A