Skip to content

Feat/new model manager#2251

Draft
grzegorz-roboflow wants to merge 161 commits into
mainfrom
feat/new-model-manager
Draft

Feat/new model manager#2251
grzegorz-roboflow wants to merge 161 commits into
mainfrom
feat/new-model-manager

Conversation

@grzegorz-roboflow
Copy link
Copy Markdown
Collaborator

What does this PR do?

Adding inference_model_manager

Type of Change

  • New feature (non-breaking change that adds functionality)

Testing

Initial suit of tests added

Checklist

  • My code follows the style guidelines of this project
  • I have performed a self-review of my own code
  • I have commented my code where necessary, particularly in hard-to-understand areas
  • My changes generate no new warnings or errors
  • I have updated the documentation accordingly (if applicable)

Additional Context

N/A

Comment thread inference_server/inference_server/app.py Fixed
Comment thread inference_server/inference_server/server.py Fixed
Comment thread inference_server/inference_server/server.py Fixed
Comment thread inference_server/inference_server/server.py Fixed
Comment thread inference_server/inference_server/server.py Fixed
Comment thread inference_server/inference_server/server.py Fixed
Comment thread inference_server/inference_server/server.py Fixed
Comment thread inference_server/inference_server/server.py Fixed
Comment thread inference_server/inference_server/server.py Fixed
Comment thread inference_server/inference_server/server.py Fixed
Comment thread inference_server/inference_server/server.py Fixed
Comment thread inference_server/inference_server/app.py Fixed
Comment thread inference_server/inference_server/server.py Fixed
Comment thread inference_server/inference_server/server.py Fixed
Comment thread inference_server/inference_server/auth.py Dismissed
Comment thread inference_models/inference_models/models/base/classification.py Outdated
Comment thread inference_models/inference_models/models/auto_loaders/core.py Outdated
Comment thread inference_models/inference_models/models/auto_loaders/core.py Outdated
Comment thread inference_models/inference_models/models/base/documents_parsing.py Outdated
Comment thread inference_models/inference_models/models/moondream2/moondream2_hf.py Outdated
@socket-security
Copy link
Copy Markdown

socket-security Bot commented May 12, 2026

ModelManagerProcess.__init__ has no  parameter — it builds its
own ModelManager internally. Passing manager=ModelManager() from main()
raised TypeError before the ROUTER/SHM pool could start, breaking
. Removed the
kwarg + unused import; updated module docstring example to match.
Skip logic used registry.get_entry_for_class(cls, task_name), which
walks the MRO. Once ObjectDetectionModel.infer was registered, a later
OpenVocabularyObjectDetectionModel.infer registration was skipped —
the subclass silently inherited the base validator and lost the
 requirement. Switched to registered_tasks(cls), which only
looks at exact-class entries. Added regression test.
… path

process_async()'s subprocess branch returned the raw worker result and
skipped the registry serialization that process() applies after
submit().result(). Same logical API produced different shapes
depending on sync/async caller.

Collapsed process_async() to delegate to self.process() in the default
executor. Subprocess + direct backends both go through the same
serialization tail. Threading model is unchanged — the direct path
already used run_in_executor; subprocess now does the same.
_schedule_reload() called _load_model(model_id) with default
api_key= and device=. Original values from _handle_ensure_loaded
(and admin T_LOAD) were not stored anywhere, so a worker that died
after loading a private model or a GPU-pinned model would reload
without credentials or land on the wrong device.

Added api_key/device fields to ModelState. _handle_ensure_loaded and
_handle_load now persist them on first load; _schedule_reload reads
them back when scheduling the recovery load. Added regression test.
…pletion

_handle_ensure_loaded() stored a per-waiter deadline but only consulted it
inside _flush_load_waiters(), which runs after load completes (or fails).
If a load hung — slow disk, stuck worker init, GPU contention — clients
never received T_LOAD_TIMEOUT at wait_ms and sat until the client socket
timed out.

Each waiter now spawns an _expire_waiter() task that sleeps until its
deadline and sends T_LOAD_TIMEOUT iff the waiter is still pending. Success
path: _flush_load_waiters() consumes the waiter first, the expiry task
wakes, finds itself absent, and no-ops. Added regression tests for both
the hang path and the no-double-send invariant.
All inference now flows through ModelManager.process()/submit(); backends
have no public inference entry point. Aligns DirectBackend and
SubprocessBackend — both raised or dropped kwargs from those methods
inconsistently. Removes a footgun: DirectBackend.infer_sync(img, conf=0.7)
silently ignored conf=0.7 while the same call on SubprocessBackend raised.

Backend ABC: removed abstract infer_sync/infer_async/submit.
DirectBackend: removed the public methods and the BatchCollector wiring
  that they were the only entry point for. Kept batch_max_size /
  batch_max_delay_ms ctor args as no-ops to preserve ModelManager.load
  call sites — they remain meaningful for SubprocessBackend.
SubprocessBackend: removed the three raising stubs (no contract left).
Tests rewritten to drive inference through ModelManager.process().
PassthroughModel returned numpy arrays which lack .cpu(); the subproc
worker's result normalization (result.xyxy.cpu() etc.) crashed in
benchmark mode. Switched the dummy fields to CPU torch tensors so the
existing .cpu() calls are no-ops. Hot path stays branch-free — no
hasattr guard in production.
…sult path

f'...{len(mv)}B' evaluated after mv.release() raised ValueError, aborting
_write_error_to_slot + the T_RESULT notify. Caller waited forever.
Lift is_accepting check + registry validation + record_inference into
submit()'s direct branch. Future still resolves to the raw model output.
  - proxies/mmp_client.py: MMPClient class wraps today's state.py logic
  - module globals → instance attrs; ZMQ/SHM helpers private
  - start()/shutdown() replace inline lifespan code
  - state.py untouched; routers still import it
  - proxies/mm_wrapper.py: MMWrapper wraps ModelManager directly
  - sync MM methods (load/unload/stats) → run_in_executor
  - infer → manager.process_async(model_id, task=..., images=image)
  - in-process: no instance routing, no client-disconnect race
  - routers/v2_server.py: state.fetch_stats → mm.stats() via Depends
  - dependencies.py: new file holds get_model_manager (avoids circular
    import between app.py and routers)
  - app.py: get_model_manager moved out
  - state.alloc/write/submit/free → mm.infer (collapses _submit_one_slot
    into _infer_one; exceptions map to HTTP status codes)
  - state.lifecycle_req(T_LOAD/T_UNLOAD) → mm.load / mm.unload
  - state.fetch_stats → mm.stats; new /interface uses mm.interface
  - inline pipeline-timing CSV dropped; restored inside MMPClient later
  - payload-size pre-check moved into mm.infer (ValueError → 413)
  - routers/infer.py: state.* helpers → mm.infer / mm.ensure_loaded
  - body buffered to bytes (1 extra memcpy vs direct stream-to-SHM);
    pickle format re-pickles deserialized prediction (perf regression
    vs raw-bytes passthrough — TODO restore if benchmarks demand)
  - app.py: drop transitional state.py setup; lifespan is just
    construct + proxy.start() / proxy.shutdown()
  - nothing references state.py now (deletable in next commit)
  - ModelManagerProxy.infer: add raw_pickle param
  - MMPClient: raw_pickle=True skips pickle.loads, returns SHM bytes
  - MMWrapper: raw_pickle=True pickle.dumps the typed result once
  - routers/infer.py: format=pickle → raw_pickle=True, zero router-side
    pickle work; format=json unchanged
  - proxies/mmp_client.py is the only owner of ZMQ+SHM state now
  - mmp_client.py docstring: drop reference to removed state.py
  - tests/integration_tests/test_v2_endpoints.py: construct MMPClient
    and attach to app.state directly (no state.py shim)
headers={"Retry-After": "1"},
content=b"server busy, try again",
)
return Response(status_code=500, content=msg)
  - configuration: drop cached INFERENCE_DEPLOYMENT_MODE constant;
    keep ENV name + DEFAULT (= MODE_BUNDLED) as constants only
  - launcher.launch(mode): mode is required; no env read
  - app.py lifespan: reads env once at the boundary
  - tests/unit_tests/test_launcher.py: drop monkeypatch env tests;
    direct launch(MODE_BUNDLED|MODE_MMP) tests stay
  - framework/input_parsers/: raw_body, multipart, json_base64, url_fetch
  - framework/input_parsers/dispatch: content-type → parser
  - routers/v2_models.py: import from framework, drop inline impls
  - tests: update imports to framework.input_parsers
Copy link
Copy Markdown
Contributor

@dkosowski87 dkosowski87 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1/6 with the pace I'm currently at ;)

self._device_str = device
self._decoder_name = decoder
self._executor = executor
self._state_value: str = "loading"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Enum instead of string?

for proc in pynvml.nvmlDeviceGetComputeRunningProcesses(handle):
if proc.pid == os.getpid():
return proc.usedGpuMemory or 0
except Exception:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering if we should have a specific message on except pynvml.NVMLError_FunctionNotFound as this might mean that older bindings for nvml are on the os.

import torch

if torch.cuda.is_available():
return torch.cuda.memory_allocated()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

torch.cuda.memory_reserved() would be more conservative as it includes CUDA's caching allocator. But this will only work IMO if we resolve to the torch backend. I think ONNX models running onn
onnxruntime even when using the CUDU execution provider won't expose this.

"shared" if executor else "none",
)

def _detect_device(self) -> str:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this work?

self._model = AutoModel.from_pretrained(
    model_id, api_key=api_key, **load_kwargs
)

resolves to the model wrapper like ResNetForClassificationTorch which itself isn't a torch.module thus doesn't have #parameters nor #buffers. It's model attributes has those. I don't see any passthrough.

Plus for onnx and trt parameters and buffers are not available.

list(self._model.parameters()) if hasattr(self._model, "parameters") else []
)
if params:
return str(params[0].device)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even for torch models it might be the case that some params were kept on CPU even if we put the device on GPU. So this is a little bit of a misrepresentation.

except Exception:
self._model = None
raise
self._gpu_memory_delta_mb = (self._gpu_mem_snapshot() - gpu_before) / (
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will give us the difference between the current process memory utilisation. Do we want this or the difference between the full gpu utilisation change?

batch_max_size,
)

b = self._create_backend(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering about the names backend and task. It's a different domain than inference_models so normally I would same that using a similar nomenclature isn't a problem. Although here it is quite connected as we are loading models that have their own backend and task definitions - `"trt", "classification". Maybe we could change the names. WDYT?

backend: str,
**kwargs,
) -> Backend:
if backend == "direct":
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Enum instead of string?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants