| title | Testing |
|---|
This document describes the test infrastructure for validating Dynamo's fault tolerance mechanisms. The testing framework supports request cancellation, migration, etcd HA, and hardware fault injection scenarios.
Dynamo's fault tolerance test suite is located in tests/fault_tolerance/ and includes:
| Test Category | Location | Purpose |
|---|---|---|
| Cancellation | cancellation/ |
Request cancellation during in-flight operations |
| Migration | migration/ |
Request migration when workers fail |
| etcd HA | etcd_ha/ |
etcd failover and recovery |
| Hardware | hardware/ |
GPU and network fault injection |
| Deployment | deploy/ |
End-to-end deployment testing |
tests/fault_tolerance/
├── cancellation/
│ ├── test_vllm.py
│ ├── test_trtllm.py
│ ├── test_sglang.py
│ └── utils.py
├── migration/
│ ├── test_vllm.py
│ ├── test_trtllm.py
│ ├── test_sglang.py
│ └── utils.py
├── etcd_ha/
│ ├── test_vllm.py
│ ├── test_trtllm.py
│ ├── test_sglang.py
│ └── utils.py
├── hardware/
│ └── fault_injection_service/
│ ├── api_service/
│ └── agents/
├── deploy/
│ ├── test_deployment.py
│ ├── scenarios.py
│ ├── base_checker.py
│ └── ...
└── client.py
Test that in-flight requests can be properly canceled.
# Run all cancellation tests
pytest tests/fault_tolerance/cancellation/ -v
# Run for specific backend
pytest tests/fault_tolerance/cancellation/test_vllm.py -vThe cancellation/utils.py module provides:
Thread-safe request cancellation via TCP socket manipulation:
from tests.fault_tolerance.cancellation.utils import CancellableRequest
request = CancellableRequest()
# Send request in separate thread
thread = Thread(target=send_request, args=(request,))
thread.start()
# Cancel after some time
time.sleep(1)
request.cancel() # Closes underlying socketSend cancellable completion requests:
from tests.fault_tolerance.cancellation.utils import (
send_completion_request,
send_chat_completion_request
)
# Non-streaming
response = send_completion_request(
base_url="http://localhost:8000",
model="Qwen/Qwen3-0.6B",
prompt="Hello, world!",
max_tokens=100
)
# Streaming with cancellation
responses = send_chat_completion_request(
base_url="http://localhost:8000",
model="Qwen/Qwen3-0.6B",
messages=[{"role": "user", "content": "Hello!"}],
stream=True,
cancellable_request=request
)Wait for specific patterns in logs:
from tests.fault_tolerance.cancellation.utils import poll_for_pattern
# Wait for cancellation confirmation
found = poll_for_pattern(
log_file="/var/log/dynamo/worker.log",
pattern="Request cancelled",
timeout=30,
interval=0.5
)Test that requests migrate to healthy workers when failures occur.
# Run all migration tests
pytest tests/fault_tolerance/migration/ -v
# Run for specific backend
pytest tests/fault_tolerance/migration/test_vllm.py -vThe migration/utils.py module provides:
- Frontend wrapper with configurable request planes
- Long-running request spawning for migration scenarios
- Health check disabling for controlled testing
def test_migration_on_worker_failure():
# Start deployment with 2 workers
deployment = start_deployment(workers=2)
# Send long-running request
request_thread = spawn_long_request(max_tokens=1000)
# Kill one worker mid-generation
kill_worker(deployment.workers[0])
# Verify request completes on remaining worker
response = request_thread.join()
assert response.status_code == 200
assert len(response.tokens) > 0Test system behavior during etcd failures and recovery.
pytest tests/fault_tolerance/etcd_ha/ -v- Leader failover: etcd leader node fails, cluster elects new leader
- Network partition: etcd node becomes unreachable
- Recovery: System recovers after etcd becomes available
The fault injection service enables testing under simulated hardware failures.
Located at tests/fault_tolerance/hardware/fault_injection_service/, this FastAPI service orchestrates fault injection:
# Start the fault injection service
cd tests/fault_tolerance/hardware/fault_injection_service
python -m api_service.main| Fault Type | Description |
|---|---|
XID_ERROR |
Simulate GPU XID error (various codes) |
THROTTLE |
GPU thermal throttling |
MEMORY_PRESSURE |
GPU memory exhaustion |
OVERHEAT |
GPU overheating condition |
COMPUTE_OVERLOAD |
GPU compute saturation |
| Fault Type | Description |
|---|---|
FRONTEND_WORKER |
Partition between frontend and workers |
WORKER_NATS |
Partition between workers and NATS |
WORKER_WORKER |
Partition between workers |
CUSTOM |
Custom network partition |
curl -X POST http://localhost:8080/api/v1/faults/gpu/inject \
-H "Content-Type: application/json" \
-d '{
"target_pod": "vllm-worker-0",
"fault_type": "XID_ERROR",
"severity": "HIGH"
}'# Inject XID 79 (GPU memory page fault)
curl -X POST http://localhost:8080/api/v1/faults/gpu/inject/xid-79 \
-H "Content-Type: application/json" \
-d '{"target_pod": "vllm-worker-0"}'Supported XID codes: 43, 48, 74, 79, 94, 95, 119, 120
curl -X POST http://localhost:8080/api/v1/faults/network/inject \
-H "Content-Type: application/json" \
-d '{
"partition_type": "FRONTEND_WORKER",
"duration_seconds": 30
}'curl -X POST http://localhost:8080/api/v1/faults/{fault_id}/recovercurl http://localhost:8080/api/v1/faultsThe GPU fault injector runs as a DaemonSet on worker nodes:
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: gpu-fault-injector
spec:
selector:
matchLabels:
app: gpu-fault-injector
template:
spec:
containers:
- name: agent
image: dynamo/gpu-fault-injector:latest
securityContext:
privileged: true
volumeMounts:
- name: dev
mountPath: /devThe agent injects fake XID messages via /dev/kmsg to trigger NVSentinel detection.
The deploy/ directory contains an end-to-end testing framework.
Tests run through three phases:
| Phase | Description |
|---|---|
STANDARD |
Baseline performance under normal conditions |
OVERFLOW |
System behavior during fault/overload |
RECOVERY |
System recovery after fault resolution |
Define test scenarios in scenarios.py:
from tests.fault_tolerance.deploy.scenarios import Scenario, Load, Failure
scenario = Scenario(
name="worker_failure_migration",
backend="vllm",
load=Load(
clients=10,
requests_per_client=100,
max_tokens=256
),
failure=Failure(
type="pod_kill",
target="vllm-worker-0",
trigger_after_requests=50
)
)# Run all deployment tests
pytest tests/fault_tolerance/deploy/test_deployment.py -v
# Run specific scenario
pytest tests/fault_tolerance/deploy/test_deployment.py::test_worker_failure -vThe framework includes pluggable validators:
from tests.fault_tolerance.deploy.base_checker import BaseChecker, ValidationContext
class MigrationChecker(BaseChecker):
def check(self, context: ValidationContext) -> bool:
# Verify migrations occurred
migrations = context.metrics.get("migrations_total", 0)
return migrations > 0Parse test results for analysis:
from tests.fault_tolerance.deploy.parse_results import process_overflow_recovery_test
results = process_overflow_recovery_test(log_dir="/path/to/logs")
print(f"Success rate: {results['success_rate']}")
print(f"P99 latency: {results['p99_latency_ms']}ms")The client.py module provides shared client functionality:
from tests.fault_tolerance.client import client
# Generate load with multiple clients
results = client(
base_url="http://localhost:8000",
num_clients=10,
requests_per_client=100,
model="Qwen/Qwen3-0.6B",
max_tokens=256,
log_dir="/tmp/test_logs"
)| Parameter | Description |
|---|---|
base_url |
Frontend URL |
num_clients |
Number of concurrent clients |
requests_per_client |
Requests per client |
model |
Model name |
max_tokens |
Max tokens per request |
log_dir |
Directory for client logs |
endpoint |
completions or chat/completions |
- Kubernetes cluster with GPU nodes
- Dynamo deployment
- etcd cluster (for HA tests)
- Fault injection service (for hardware tests)
export KUBECONFIG=/path/to/kubeconfig
export DYNAMO_NAMESPACE=dynamo-test
export FRONTEND_URL=http://localhost:8000# Install test dependencies
pip install pytest pytest-asyncio
# Run all fault tolerance tests
pytest tests/fault_tolerance/ -v --tb=short
# Run with specific markers
pytest tests/fault_tolerance/ -v -m "not slow"| Marker | Description |
|---|---|
slow |
Long-running tests (> 5 minutes) |
gpu |
Requires GPU resources |
k8s |
Requires Kubernetes cluster |
etcd_ha |
Requires multi-node etcd |
Run fault tolerance tests in dedicated namespaces:
kubectl create namespace dynamo-fault-testEnsure fault injection is recovered:
# List and recover all active faults
curl http://localhost:8080/api/v1/faults | jq -r '.[].id' | \
xargs -I {} curl -X POST http://localhost:8080/api/v1/faults/{}/recoverPreserve logs for debugging:
pytest tests/fault_tolerance/ -v \
--log-dir=/tmp/fault_test_logs \
--capture=noWatch system state during tests:
# Terminal 1: Watch pods
watch kubectl get pods -n dynamo-test
# Terminal 2: Watch metrics
watch 'curl -s localhost:8000/metrics | grep -E "(migration|rejection)"'- Request Migration - Migration implementation details
- Request Cancellation - Cancellation implementation
- Health Checks - Health monitoring
- Metrics - Available metrics for monitoring