Learn how to leverage advanced NeMo Microservices Platform capabilities for model customization, evaluation, and deployment within the Data Flywheel Blueprint.
The Data Flywheel Blueprint integrates with five core NeMo services:
- NeMo Customizer: Fine-tuning and model adaptation
- NeMo Evaluator: Comprehensive model evaluation
- NeMo Deployment Manager (DMS): Model deployment and serving
- NeMo Data Service: Dataset management and storage
- NeMo Embedding Service: Text embedding generation and similarity operations
Note: The default config/config.yaml contains development/test URLs. For production deployment, update with your actual service endpoints.
Development Configuration (config/config.yaml:1-10):
nmp_config:
nemo_base_url: "http://nemo.test" # Development endpoint
nim_base_url: "http://nim.test" # Development endpoint
datastore_base_url: "http://data-store.test" # Development endpoint
nmp_namespace: "dfwbp"Production Configuration Example:
nmp_config:
nemo_base_url: "http://nemo-service.your-cluster"
nim_base_url: "http://nim-service.your-cluster"
datastore_base_url: "http://data-store.your-cluster"
nmp_namespace: "your-namespace"Source: src/lib/nemo/customizer.py:1-441
The Customizer service enables fine-tuning of foundation models using your production data with parameter-efficient techniques like LoRA.
graph TD
A[Create Config] --> B[Upload Dataset]
B --> C[Start Training Job]
C --> D[Monitor Progress]
D --> E[Retrieve Model]
E --> F[Deploy Custom Model]
Creating Customizer Configurations:
def create_customizer_config(
self,
base_model: str,
customizer_config: CustomizerConfig,
training_config: TrainingConfig,
) -> str:
"""Creates a reusable customization configuration."""
config_name = NIMConfig.generate_config_name(base_model)
config_payload = {
"name": config_name,
"namespace": self.namespace,
"target": customizer_config.target,
"training_options": [{
"training_type": training_config.training_type,
"finetuning_type": training_config.finetuning_type,
"num_gpus": customizer_config.gpus,
"num_nodes": customizer_config.num_nodes,
"tensor_parallel_size": customizer_config.tensor_parallel_size,
"data_parallel_size": customizer_config.data_parallel_size,
"use_sequence_parallel": customizer_config.use_sequence_parallel,
"micro_batch_size": customizer_config.micro_batch_size,
}],
"training_precision": customizer_config.training_precision,
"max_seq_length": customizer_config.max_seq_length,
}Starting Training Jobs:
def start_training_job(
self,
name: str,
base_model: str,
output_model_name: str,
dataset_name: str,
training_config: TrainingConfig,
nim_config: NIMConfig,
) -> tuple[str, str]:
"""Initiates a fine-tuning job and returns job_id and config_name."""
# Create or get the config name
config_name = self.create_customizer_config(base_model, nim_config.customizer_configs, training_config)
training_params = {
"name": name,
"output_model": f"{self.namespace}/{output_model_name}",
"config": f"{self.namespace}/{config_name}",
"dataset": {"name": dataset_name, "namespace": self.namespace},
"hyperparameters": {
"training_type": training_config.training_type,
"finetuning_type": training_config.finetuning_type,
"epochs": training_config.epochs,
"batch_size": training_config.batch_size,
"learning_rate": training_config.learning_rate,
"lora": {
"adapter_dim": training_config.lora.adapter_dim,
"adapter_dropout": training_config.lora.adapter_dropout,
},
},
}Source: src/lib/nemo/evaluator.py:1-500
The Evaluator service provides comprehensive model assessment capabilities with multiple evaluation types and metrics.
LLM-as-Judge Setup:
class Evaluator:
def __init__(
self,
judge_model_config: Any | None = None,
include_tools: bool = False,
include_tool_choice: bool = False,
include_nvext: bool = False,
):
"""Initialize evaluator with judge model configuration."""
self.nemo_url = settings.nmp_config.nemo_base_url
assert self.nemo_url, "nemo_base_url must be set in config"
self.namespace = settings.nmp_config.nmp_namespaceJudge Prompt System:
JUDGE_SYSTEM_PROMPT = """You are an expert evaluator.
Read the task, reference answer, and candidate answer.
Score similarity on a scale of 1 to 10 using this rubric:
10 - Nearly identical meaning; wording differences only.
8-9 - Same key info, maybe one minor detail off.
6-7 - Rough overlap but missing or altering several points.
4-5 - Some relation but wrong on most core content.
2-3 - Largely irrelevant or incorrect.
1 - Completely irrelevant or empty.
Return ONLY the integer (1-10). No other text."""Function Call Validation:
TOOL_CALLING_JUDGE_SYSTEM_PROMPT = """
You are an expert evaluator. Your task is to compare two tool call objects: a ground truth and a prediction. Each object contains metadata about a function call, including the function name and its arguments. Your goal is to determine whether the predicted tool call is correct according to the following strict criteria:
Evaluation Rules:
1. If the prediction is null, empty, or structurally invalid, return a rating of 0.
2. The prediction must include a valid `function.name` field that exactly matches the `function.name` in the ground truth.
3. The prediction must include a `function.arguments` field containing a valid JSON object, or a string that can be parsed into one.
4. Each argument key in the ground truth must also appear in the prediction and satisfy one of the following matching criteria:
- **Strict-match arguments**: These are typically short identifiers (e.g., user_id, product_id). They must match the ground truth exactly.
- **Semantic-match arguments**: These are typically longer, natural-language strings (e.g., queries, messages, questions). They should be semantically similar to the ground truth but may differ in wording.
5. If any required argument is missing or fails to meet its matching criteria, the rating must be 0.
6. Only return a rating of 1 if:
- The function name matches exactly, AND
- All required arguments are present and correctly matched (strict or semantic as appropriate).
Assume you're familiar with typical tool function usage patterns and can infer which argument types require exact matching versus semantic similarity.
Return your result in the following format:
RATING: 0 or 1
EXPLANATION: Explanation of the rating.
"""Required Imports:
from src.lib.nemo.evaluator import Evaluator
from src.api.models import WorkloadClassification, ToolEvalTypeParallel Evaluation Strategy:
def run_comprehensive_evaluation(nim_config: dict, datasets: dict, workload_type: WorkloadClassification) -> list:
"""Run multiple evaluation types in parallel."""
# Initialize evaluator with judge model config
evaluator = Evaluator(judge_model_config=judge_config)
eval_job_ids = []
# Base accuracy evaluation
job_id = evaluator.run_evaluation(
dataset_name=datasets["base"],
workload_type=workload_type,
target_model=nim_config["model_name"],
test_file="eval_data.jsonl"
)
eval_job_ids.append(job_id)
# ICL evaluation with examples
job_id = evaluator.run_evaluation(
dataset_name=datasets["icl"],
workload_type=workload_type,
target_model=nim_config["model_name"],
test_file="eval_data.jsonl"
)
eval_job_ids.append(job_id)
# Tool calling evaluation (if applicable)
if workload_type == WorkloadClassification.TOOL_CALLING:
job_id = evaluator.run_evaluation(
dataset_name=datasets["base"],
workload_type=workload_type,
target_model=nim_config["model_name"],
test_file="eval_data.jsonl",
tool_eval_type=ToolEvalType.TOOL_CALLING_METRIC
)
eval_job_ids.append(job_id)
return eval_job_idsSource: src/lib/nemo/llm_as_judge.py:1-99
The LLM-as-Judge service provides automated evaluation capabilities using large language models as evaluators.
Judge Service Initialization:
from src.lib.nemo.llm_as_judge import LLMAsJudge
judge = LLMAsJudge()
# Uses settings.llm_judge_config for configurationSource: src/lib/nemo/dms_client.py:1-179
The DMS client manages NVIDIA Inference Microservice (NIM) deployments for serving models.
Standard Deployment:
def deploy_model(self) -> dict[str, Any] | None:
"""Deploy a model using the DMS API."""
url = f"{self.nmp_config.nemo_base_url}/v1/deployment/model-deployments"
payload = self.nim.to_dms_config()
response = requests.post(url, json=payload)
# Bug in DMS API, if the model deployment already exists, it will return a 500
if "model deployment already exists" in response.text:
return
else:
response.raise_for_status()
return response.json()Multi-Model Serving:
nims:
- model_name: "meta/llama-3.2-1b-instruct"
model_type: "llm"
context_length: 8192
gpus: 1
pvc_size: 25Gi
tag: "1.8.3"
customization_enabled: true
customizer_configs:
target: "meta/llama-3.2-1b-instruct@2.0"
gpus: 1
max_seq_length: 8192
# - model_name: "meta/llama-3.2-3b-instruct"
# model_type: "llm"
# context_length: 8192
# gpus: 2
# customization_enabled: trueDeployment Status Checking:
def get_deployment_status(self) -> str:
"""Get the status of a model deployment."""
response = self._call_deployment_endpoint()
response.raise_for_status()
return response.json()["status_details"]["status"]
def is_deployed(self) -> bool:
"""Check if a model is deployed."""
try:
return self.get_deployment_status() == "deployed" # Note: wait_for_deployment checks for "ready"
except Exception:
return FalseHealth Checking:
def wait_for_deployment(
self,
flywheel_run_id: str,
progress_callback: Callable[[str], None] | None = None,
timeout: int = 3600,
):
"""Wait for a deployment to complete.
Args:
flywheel_run_id: Unique identifier for the flywheel run
progress_callback: Optional callback function to report progress.
timeout: Maximum time to wait in seconds (default: 3600)
"""
start_time = time.time()
while time.time() - start_time < timeout:
check_cancellation(flywheel_run_id)
status = self.get_deployment_status()
if status == "ready":
return status
elif status == "failed":
raise Exception("Deployment failed")
time.sleep(30)Source: src/lib/nemo/data_uploader.py:1-346
The Data Service manages datasets for training and evaluation workflows.
Structured Data Upload:
def upload_file(
self,
data_fp: str,
data_type: str,
) -> str:
"""Upload a single data file to the repository."""
# Validate data type
valid_types = ["training", "validation", "testing"]
assert data_type in valid_types, f"data_type must be one of {valid_types}"
# Validate file exists
assert os.path.exists(data_fp), f"Data file at '{data_fp}' does not exist"
# Create namespaces if not already done
if not hasattr(self, "repo_id"):
self._create_namespaces()
self.repo_id = self._create_repo()
# Determine filename in repo
filename = os.path.basename(data_fp)OpenAI Format Validation:
from src.lib.integration.openai_format_validator import OpenAIFormatValidator
def validate_records(conversation: dict) -> bool:
"""Validate OpenAI chat completion format."""
validator = OpenAIFormatValidator()
return validator.validate_chat_completion_format(conversation)Note: The following sections contain example implementations and patterns for advanced NeMo integration. These are illustrative code samples that demonstrate best practices rather than functions available in the current codebase.
Retry Logic with Exponential Backoff (Example Implementation):
def resilient_nemo_request(func, max_retries=3):
"""Execute NeMo API calls with retry logic."""
for attempt in range(max_retries):
try:
return func()
except requests.exceptions.ConnectionError:
if attempt == max_retries - 1:
raise
delay = (2 ** attempt) + random.uniform(0, 1)
time.sleep(min(delay, 30))
except requests.exceptions.HTTPError as e:
if e.response.status_code in [429, 503]: # Rate limit or service unavailable
time.sleep(60 * (attempt + 1))
else:
raiseSymptoms: Timeouts or connection refused errors Solution:
# Test service connectivity
curl -v "http://nemo-service/v1/health"
# Check Kubernetes service discovery
kubectl get svc -n nemo-namespace
# Verify network policies
kubectl describe networkpolicy -n nemo-namespaceSymptoms: 401/403 HTTP errors Solution:
# Verify NGC API key
echo $NGC_API_KEY | base64 -d
# Check Kubernetes secrets
kubectl get secret ngc-api -o yaml
# Test authentication manually
curl -H "Authorization: Bearer $NGC_API_KEY" "http://nemo-service/v1/models"Symptoms: Training jobs fail with OOM or insufficient GPU errors Solution:
# Check GPU availability
nvidia-smi
# Monitor Kubernetes resource usage
kubectl top nodes
kubectl describe node <node-name>
# Adjust resource requests
kubectl patch deployment <deployment> -p '{"spec":{"template":{"spec":{"containers":[{"name":"<container>","resources":{"requests":{"nvidia.com/gpu":"2"}}}]}}}}'# Optimize training parameters for speed vs. quality
training_config = {
"micro_batch_size": 8, # Larger batches for better GPU utilization
"batch_size": 16, # Training batch size
"learning_rate": 3e-4, # Aggressive learning rate
"epochs": 3, # Fewer epochs for faster iteration
"training_type": "sft", # Supervised fine-tuning
"finetuning_type": "lora", # LoRA fine-tuning
}# Batch evaluation requests for efficiency (Example Implementation)
def batch_evaluation_requests(items: list, batch_size: int = 32):
"""Process evaluation requests in batches."""
for i in range(0, len(items), batch_size):
batch = items[i:i + batch_size]
yield run_evaluation_batch(batch)Source: src/lib/nemo/embedding.py:1-94
The Embedding service provides text embedding generation capabilities for semantic similarity ICL example selection.
Embedding Service Usage:
from src.lib.nemo.embedding import Embedding
# Initialize embedding service
embedding_service = Embedding(
endpoint_url="http://nim.test/v1/embeddings", # Uses nim_base_url from config
model_name="nvidia/llama-3.2-nv-embedqa-1b-v2",
api_key=None # Optional API key for remote endpoint
)
# Get embeddings for single text or batch
embeddings = embedding_service.get_embedding("Your text here", input_type="query")
batch_embeddings = embedding_service.get_embeddings_batch(
["text1", "text2", "text3"],
input_type="passage",
batch_size=32
)This comprehensive integration guide enables advanced usage of NeMo services within the Data Flywheel Blueprint. For basic configuration, refer to the Configuration Guide.