Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions docs/source/python-api-reference/transfer-engine.md
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,25 @@ Gets the list of pending transfer notifications received from other nodes.
**Returns:**
- `List[TransferNotify]`: List of notification objects containing name and message

#### send_probe()

```python
send_probe(peer_server_name)
```

Sends a lightweight JSON-RPC probe to a peer to verify reachability. Used to
test whether a previously-unreachable peer has recovered (e.g. SGLang's
`MooncakeKVManager` uses this to clear entries from its `failed_sessions`
blacklist).

**Parameters:**
- `peer_server_name` (str): Peer hostname in `host:port` form, as registered
with the metadata server.

**Returns:**
- `int`: 0 on success, non-zero on failure (peer unknown, unreachable, or
RPC error).

## Environment Variables

The Transfer Engine respects the following environment variables:
Expand Down
11 changes: 11 additions & 0 deletions mooncake-integration/transfer_engine/transfer_engine_py.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -993,6 +993,12 @@ std::vector<TransferEnginePy::TransferNotify> TransferEnginePy::getNotifies() {
return result;
}

int TransferEnginePy::sendProbe(const std::string& peer_server_name) {
if (!engine_) return -1;
pybind11::gil_scoped_release release;
return engine_->getMetadata()->sendProbe(peer_server_name);
}
Comment thread
kflansburg marked this conversation as resolved.

namespace py = pybind11;

// Implementation of coro_rpc_interface binding function
Expand Down Expand Up @@ -1087,6 +1093,11 @@ PYBIND11_MODULE(engine, m) {
.def("get_first_buffer_address",
&TransferEnginePy::getFirstBufferAddress)
.def("get_notifies", &TransferEnginePy::getNotifies)
.def("send_probe", &TransferEnginePy::sendProbe,
py::arg("peer_server_name"),
"Send a JSON-RPC probe to peer to verify reachability. "
"Returns 0 on success, non-zero on failure. Used by "
"SGLang's failed-session blacklist recovery.")
.def("get_engine", &TransferEnginePy::getEngine)
.def("get_engine_ptr", &TransferEnginePy::getEnginePtr);

Expand Down
2 changes: 2 additions & 0 deletions mooncake-integration/transfer_engine/transfer_engine_py.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@ class TransferEnginePy {

std::vector<TransferNotify> getNotifies();

int sendProbe(const std::string &peer_server_name);

std::shared_ptr<TransferEngine> getEngine() const { return engine_; }

uintptr_t getEnginePtr() const { return (uintptr_t)engine_.get(); }
Expand Down
20 changes: 20 additions & 0 deletions mooncake-wheel/tests/transfer_engine_initiator_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,26 @@ def generate_random_string(length):
f"[✓] {circles} rounds of batch_write_async_read passed, batch size {batch_size}."
)

def test_send_probe_reachable_peer(self):
"""send_probe against a registered, reachable peer returns 0."""
rc = self.adaptor.send_probe(self.target_server_name)
self.assertEqual(
rc,
0,
f"send_probe to reachable peer {self.target_server_name} returned {rc}",
)

def test_send_probe_unknown_peer(self):
"""send_probe against an unknown peer returns non-zero (does not crash)."""
# "unreachable_peer:9" is not registered with the metadata server, so the
# metadata lookup itself should fail and sendProbe returns ERR_METADATA.
rc = self.adaptor.send_probe("unreachable_peer:9")
self.assertNotEqual(
rc,
0,
"send_probe to unknown peer unexpectedly succeeded",
)


if __name__ == "__main__":
unittest.main()
Loading