How to prepare real-world monitoring data for training the TFT model.
To train Tachyon Argus on your own data, you need to:
- Extract metrics from your monitoring system (database, API, files)
- Map your column names to the expected schema
- Transform data types and units
- Partition into time-chunked Parquet files
- Validate the final dataset
The training engine expects exactly these 16 columns:
| Column | Type | Range | Description |
|---|---|---|---|
timestamp |
datetime | - | ISO 8601 format |
server_name |
string | - | Unique server identifier |
status |
string | enum | Server state (see below) |
cpu_user_pct |
float | 0-100 | User CPU percentage |
cpu_sys_pct |
float | 0-100 | System CPU percentage |
cpu_iowait_pct |
float | 0-100 | I/O wait percentage |
cpu_idle_pct |
float | 0-100 | Idle CPU percentage |
java_cpu_pct |
float | 0-100 | Java process CPU (0 if N/A) |
mem_used_pct |
float | 0-100 | Memory usage percentage |
swap_used_pct |
float | 0-100 | Swap usage percentage |
disk_usage_pct |
float | 0-100 | Disk usage percentage |
net_in_mb_s |
float | 0+ | Network in (MB/s) |
net_out_mb_s |
float | 0+ | Network out (MB/s) |
back_close_wait |
int | 0+ | Backend CLOSE_WAIT connections |
front_close_wait |
int | 0+ | Frontend CLOSE_WAIT connections |
load_average |
float | 0+ | System load average (1-min) |
uptime_days |
int | 0-365 | Days since last reboot |
Valid status values:
healthy,critical_issue,heavy_load,idle,maintenance,morning_spike,offline,recovery
Here's a complete Python script to transform your data:
#!/usr/bin/env python3
"""
data_prep.py - Prepare real-world metrics for Tachyon Argus training
Usage:
python data_prep.py --source database --output ./training/
python data_prep.py --source csv --input metrics.csv --output ./training/
"""
import pandas as pd
import numpy as np
from pathlib import Path
from datetime import datetime, timedelta
import json
import argparse
# =============================================================================
# COLUMN MAPPING - CUSTOMIZE THIS FOR YOUR DATA
# =============================================================================
# Map YOUR column names to the expected schema
COLUMN_MAPPING = {
# Your column name -> Expected column name
# Required identifiers
'collection_time': 'timestamp', # or 'ts', 'datetime', 'time', etc.
'hostname': 'server_name', # or 'host', 'server', 'instance', etc.
'state': 'status', # or 'health', 'condition', etc.
# CPU metrics
'cpu_user': 'cpu_user_pct', # or 'user_cpu', 'cpu_usr', etc.
'cpu_system': 'cpu_sys_pct', # or 'sys_cpu', 'cpu_kernel', etc.
'cpu_iowait': 'cpu_iowait_pct', # or 'iowait', 'cpu_wait', etc.
'cpu_idle': 'cpu_idle_pct', # or 'idle_cpu', etc.
'java_cpu': 'java_cpu_pct', # or 'jvm_cpu', 'process_cpu', etc.
# Memory metrics
'memory_used': 'mem_used_pct', # or 'mem_pct', 'memory_percent', etc.
'swap_used': 'swap_used_pct', # or 'swap_pct', etc.
# Disk metrics
'disk_used': 'disk_usage_pct', # or 'disk_pct', 'fs_used', etc.
# Network metrics
'net_rx_mbps': 'net_in_mb_s', # or 'network_in', 'rx_bytes', etc.
'net_tx_mbps': 'net_out_mb_s', # or 'network_out', 'tx_bytes', etc.
# Connection metrics
'backend_close_wait': 'back_close_wait', # or 'close_wait_backend', etc.
'frontend_close_wait': 'front_close_wait', # or 'close_wait_frontend', etc.
# System metrics
'load_1min': 'load_average', # or 'loadavg', 'load1', etc.
'uptime': 'uptime_days', # or 'days_up', 'server_uptime', etc.
}
# Status value mapping (map your values to expected values)
STATUS_MAPPING = {
# Your status value -> Expected status value
'OK': 'healthy',
'GOOD': 'healthy',
'UP': 'healthy',
'NORMAL': 'healthy',
'WARNING': 'heavy_load',
'WARN': 'heavy_load',
'HIGH_LOAD': 'heavy_load',
'CRITICAL': 'critical_issue',
'ERROR': 'critical_issue',
'ALERT': 'critical_issue',
'DOWN': 'offline',
'UNREACHABLE': 'offline',
'MAINT': 'maintenance',
'MAINTENANCE_MODE': 'maintenance',
'IDLE': 'idle',
'LOW': 'idle',
'RECOVERING': 'recovery',
'STARTING': 'recovery',
'PEAK': 'morning_spike',
}
# =============================================================================
# DATA EXTRACTION FUNCTIONS
# =============================================================================
def extract_from_database(connection_string: str, query: str = None) -> pd.DataFrame:
"""
Extract metrics from a database.
Supports: PostgreSQL, MySQL, SQL Server, Oracle
"""
import sqlalchemy
engine = sqlalchemy.create_engine(connection_string)
if query is None:
# Default query - customize for your schema
query = """
SELECT
collection_time,
hostname,
state,
cpu_user,
cpu_system,
cpu_iowait,
cpu_idle,
java_cpu,
memory_used,
swap_used,
disk_used,
net_rx_mbps,
net_tx_mbps,
backend_close_wait,
frontend_close_wait,
load_1min,
uptime
FROM server_metrics
WHERE collection_time >= NOW() - INTERVAL '30 days'
ORDER BY collection_time, hostname
"""
print(f"Executing query...")
df = pd.read_sql(query, engine)
print(f"Extracted {len(df):,} rows")
return df
def extract_from_csv(file_path: str) -> pd.DataFrame:
"""Extract metrics from CSV file."""
print(f"Reading {file_path}...")
df = pd.read_csv(file_path)
print(f"Loaded {len(df):,} rows")
return df
def extract_from_prometheus(prometheus_url: str, servers: list,
start_time: datetime, end_time: datetime,
step: str = '5m') -> pd.DataFrame:
"""
Extract metrics from Prometheus.
Args:
prometheus_url: Prometheus server URL
servers: List of server names/instances
start_time: Start of time range
end_time: End of time range
step: Query resolution (default 5 minutes)
"""
import requests
records = []
# Prometheus queries for each metric
queries = {
'cpu_user_pct': 'avg by (instance) (rate(node_cpu_seconds_total{mode="user"}[5m])) * 100',
'cpu_sys_pct': 'avg by (instance) (rate(node_cpu_seconds_total{mode="system"}[5m])) * 100',
'cpu_iowait_pct': 'avg by (instance) (rate(node_cpu_seconds_total{mode="iowait"}[5m])) * 100',
'cpu_idle_pct': 'avg by (instance) (rate(node_cpu_seconds_total{mode="idle"}[5m])) * 100',
'mem_used_pct': '(1 - node_memory_MemAvailable_bytes / node_memory_MemTotal_bytes) * 100',
'disk_usage_pct': '(1 - node_filesystem_avail_bytes{mountpoint="/"} / node_filesystem_size_bytes{mountpoint="/"}) * 100',
'load_average': 'node_load1',
'net_in_mb_s': 'rate(node_network_receive_bytes_total[5m]) / 1048576',
'net_out_mb_s': 'rate(node_network_transmit_bytes_total[5m]) / 1048576',
}
for metric_name, query in queries.items():
print(f"Querying {metric_name}...")
response = requests.get(
f"{prometheus_url}/api/v1/query_range",
params={
'query': query,
'start': start_time.timestamp(),
'end': end_time.timestamp(),
'step': step
}
)
data = response.json()
if data['status'] == 'success':
for result in data['data']['result']:
instance = result['metric'].get('instance', 'unknown')
for timestamp, value in result['values']:
records.append({
'timestamp': datetime.fromtimestamp(timestamp),
'server_name': instance,
metric_name: float(value)
})
df = pd.DataFrame(records)
# Pivot to get one row per timestamp/server
df = df.groupby(['timestamp', 'server_name']).first().reset_index()
print(f"Extracted {len(df):,} rows from Prometheus")
return df
def extract_from_elasticsearch(es_url: str, index: str,
start_time: datetime, end_time: datetime) -> pd.DataFrame:
"""Extract metrics from Elasticsearch."""
from elasticsearch import Elasticsearch
es = Elasticsearch([es_url])
query = {
"query": {
"range": {
"@timestamp": {
"gte": start_time.isoformat(),
"lte": end_time.isoformat()
}
}
},
"size": 10000,
"sort": [{"@timestamp": "asc"}]
}
records = []
# Scroll through results
response = es.search(index=index, body=query, scroll='5m')
scroll_id = response['_scroll_id']
while True:
hits = response['hits']['hits']
if not hits:
break
for hit in hits:
records.append(hit['_source'])
response = es.scroll(scroll_id=scroll_id, scroll='5m')
df = pd.DataFrame(records)
print(f"Extracted {len(df):,} rows from Elasticsearch")
return df
# =============================================================================
# DATA TRANSFORMATION
# =============================================================================
def apply_column_mapping(df: pd.DataFrame, mapping: dict) -> pd.DataFrame:
"""Rename columns based on mapping."""
# Find which mappings apply to this dataframe
rename_dict = {}
for src_col, dest_col in mapping.items():
if src_col in df.columns:
rename_dict[src_col] = dest_col
elif src_col.lower() in [c.lower() for c in df.columns]:
# Case-insensitive match
actual_col = [c for c in df.columns if c.lower() == src_col.lower()][0]
rename_dict[actual_col] = dest_col
print(f"Mapping {len(rename_dict)} columns...")
df = df.rename(columns=rename_dict)
return df
def apply_status_mapping(df: pd.DataFrame, mapping: dict) -> pd.DataFrame:
"""Map status values to expected values."""
if 'status' not in df.columns:
print("No status column found, defaulting to 'healthy'")
df['status'] = 'healthy'
return df
# Apply mapping (case-insensitive)
df['status'] = df['status'].apply(
lambda x: mapping.get(str(x).upper(), mapping.get(str(x), 'healthy'))
)
# Validate all values are valid
valid_statuses = {'healthy', 'critical_issue', 'heavy_load', 'idle',
'maintenance', 'morning_spike', 'offline', 'recovery'}
invalid = df[~df['status'].isin(valid_statuses)]['status'].unique()
if len(invalid) > 0:
print(f"WARNING: Invalid status values found: {invalid}")
print(" Defaulting invalid values to 'healthy'")
df.loc[~df['status'].isin(valid_statuses), 'status'] = 'healthy'
return df
def convert_units(df: pd.DataFrame) -> pd.DataFrame:
"""Convert units to expected format."""
# Network: Convert bytes/s to MB/s if values seem too large
for col in ['net_in_mb_s', 'net_out_mb_s']:
if col in df.columns:
median_val = df[col].median()
if median_val > 1000: # Likely in bytes/s
print(f"Converting {col} from bytes/s to MB/s")
df[col] = df[col] / 1_048_576
elif median_val > 100: # Likely in KB/s
print(f"Converting {col} from KB/s to MB/s")
df[col] = df[col] / 1024
# Uptime: Convert seconds to days if values seem too large
if 'uptime_days' in df.columns:
median_val = df['uptime_days'].median()
if median_val > 365: # Likely in seconds
print("Converting uptime from seconds to days")
df['uptime_days'] = (df['uptime_days'] / 86400).astype(int)
elif median_val > 30 * 24: # Likely in hours
print("Converting uptime from hours to days")
df['uptime_days'] = (df['uptime_days'] / 24).astype(int)
# Memory: Convert bytes to percentage if needed
# (This requires knowing total memory, so usually done at source)
return df
def add_missing_columns(df: pd.DataFrame) -> pd.DataFrame:
"""Add missing columns with sensible defaults."""
required_columns = {
'timestamp': None, # Required, no default
'server_name': None, # Required, no default
'status': 'healthy',
'cpu_user_pct': 0.0,
'cpu_sys_pct': 0.0,
'cpu_iowait_pct': 0.0,
'cpu_idle_pct': 100.0,
'java_cpu_pct': 0.0,
'mem_used_pct': 0.0,
'swap_used_pct': 0.0,
'disk_usage_pct': 0.0,
'net_in_mb_s': 0.0,
'net_out_mb_s': 0.0,
'back_close_wait': 0,
'front_close_wait': 0,
'load_average': 0.0,
'uptime_days': 0,
}
for col, default in required_columns.items():
if col not in df.columns:
if default is None:
raise ValueError(f"Required column '{col}' is missing and has no default")
print(f"Adding missing column '{col}' with default value: {default}")
df[col] = default
return df
def derive_missing_metrics(df: pd.DataFrame) -> pd.DataFrame:
"""Derive metrics that can be calculated from others."""
# If cpu_idle is missing but we have the others, calculate it
if 'cpu_idle_pct' not in df.columns or df['cpu_idle_pct'].isna().all():
if all(c in df.columns for c in ['cpu_user_pct', 'cpu_sys_pct', 'cpu_iowait_pct']):
print("Deriving cpu_idle_pct from other CPU metrics")
df['cpu_idle_pct'] = 100 - df['cpu_user_pct'] - df['cpu_sys_pct'] - df['cpu_iowait_pct']
df['cpu_idle_pct'] = df['cpu_idle_pct'].clip(0, 100)
# If we have total CPU but not breakdown, estimate
if 'cpu_total' in df.columns:
if 'cpu_user_pct' not in df.columns or df['cpu_user_pct'].isna().all():
print("Estimating CPU breakdown from total")
df['cpu_user_pct'] = df['cpu_total'] * 0.7
df['cpu_sys_pct'] = df['cpu_total'] * 0.2
df['cpu_iowait_pct'] = df['cpu_total'] * 0.1
df['cpu_idle_pct'] = 100 - df['cpu_total']
return df
def fix_data_types(df: pd.DataFrame) -> pd.DataFrame:
"""Ensure correct data types."""
# Timestamp
if 'timestamp' in df.columns:
df['timestamp'] = pd.to_datetime(df['timestamp'])
# String columns
df['server_name'] = df['server_name'].astype(str)
df['status'] = df['status'].astype(str)
# Float columns (percentages and rates)
float_cols = ['cpu_user_pct', 'cpu_sys_pct', 'cpu_iowait_pct', 'cpu_idle_pct',
'java_cpu_pct', 'mem_used_pct', 'swap_used_pct', 'disk_usage_pct',
'net_in_mb_s', 'net_out_mb_s', 'load_average']
for col in float_cols:
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0.0)
# Integer columns
int_cols = ['back_close_wait', 'front_close_wait', 'uptime_days']
for col in int_cols:
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0).astype(int)
return df
def validate_ranges(df: pd.DataFrame) -> pd.DataFrame:
"""Validate and clip values to expected ranges."""
# Percentage columns (0-100)
pct_cols = ['cpu_user_pct', 'cpu_sys_pct', 'cpu_iowait_pct', 'cpu_idle_pct',
'java_cpu_pct', 'mem_used_pct', 'swap_used_pct', 'disk_usage_pct']
for col in pct_cols:
if col in df.columns:
out_of_range = ((df[col] < 0) | (df[col] > 100)).sum()
if out_of_range > 0:
print(f"Clipping {out_of_range} out-of-range values in {col}")
df[col] = df[col].clip(0, 100)
# Non-negative columns
nonneg_cols = ['net_in_mb_s', 'net_out_mb_s', 'back_close_wait',
'front_close_wait', 'load_average', 'uptime_days']
for col in nonneg_cols:
if col in df.columns:
negative = (df[col] < 0).sum()
if negative > 0:
print(f"Clipping {negative} negative values in {col}")
df[col] = df[col].clip(lower=0)
# Uptime days (0-365)
if 'uptime_days' in df.columns:
df['uptime_days'] = df['uptime_days'].clip(0, 365)
return df
# =============================================================================
# PARTITIONING AND OUTPUT
# =============================================================================
def partition_by_time(df: pd.DataFrame, chunk_hours: int = 2) -> dict:
"""
Partition dataframe into time chunks.
Args:
df: DataFrame with timestamp column
chunk_hours: Hours per chunk (default: 2)
Returns:
Dictionary mapping chunk_id to DataFrame
"""
df = df.sort_values('timestamp')
# Create chunk boundaries
min_time = df['timestamp'].min()
max_time = df['timestamp'].max()
chunks = {}
current_start = min_time.replace(minute=0, second=0, microsecond=0)
# Align to chunk boundary
hour = current_start.hour
current_start = current_start.replace(hour=(hour // chunk_hours) * chunk_hours)
while current_start <= max_time:
current_end = current_start + timedelta(hours=chunk_hours)
mask = (df['timestamp'] >= current_start) & (df['timestamp'] < current_end)
chunk_df = df[mask]
if len(chunk_df) > 0:
chunk_id = current_start.strftime('%Y%m%d_%H')
chunks[chunk_id] = chunk_df
current_start = current_end
print(f"Created {len(chunks)} time chunks")
return chunks
def save_partitioned_parquet(chunks: dict, output_dir: str) -> str:
"""
Save partitioned data as Parquet files with manifest.
Args:
chunks: Dictionary of chunk_id -> DataFrame
output_dir: Output directory path
Returns:
Path to manifest file
"""
output_path = Path(output_dir)
partitioned_dir = output_path / 'server_metrics_partitioned'
partitioned_dir.mkdir(parents=True, exist_ok=True)
manifest = {
'version': '2.0.0',
'created': datetime.now().isoformat(),
'chunk_hours': 2,
'chunks': []
}
total_rows = 0
servers = set()
for chunk_id, chunk_df in sorted(chunks.items()):
# Save chunk
chunk_path = partitioned_dir / f'chunk_{chunk_id}.parquet'
chunk_df.to_parquet(chunk_path, index=False, compression='snappy')
# Update manifest
manifest['chunks'].append({
'id': chunk_id,
'file': f'chunk_{chunk_id}.parquet',
'rows': len(chunk_df),
'servers': chunk_df['server_name'].nunique(),
'start': chunk_df['timestamp'].min().isoformat(),
'end': chunk_df['timestamp'].max().isoformat()
})
total_rows += len(chunk_df)
servers.update(chunk_df['server_name'].unique())
print(f" Saved {chunk_id}: {len(chunk_df):,} rows")
# Add summary to manifest
manifest['total_rows'] = total_rows
manifest['total_servers'] = len(servers)
manifest['servers'] = sorted(servers)
# Save manifest
manifest_path = partitioned_dir / 'chunk_manifest.json'
with open(manifest_path, 'w') as f:
json.dump(manifest, f, indent=2)
print(f"\nSaved {total_rows:,} rows across {len(chunks)} chunks")
print(f"Manifest: {manifest_path}")
return str(manifest_path)
def save_metadata(df: pd.DataFrame, output_dir: str) -> str:
"""Save dataset metadata."""
output_path = Path(output_dir)
metadata = {
'version': '2.0.0',
'created': datetime.now().isoformat(),
'total_rows': len(df),
'total_servers': df['server_name'].nunique(),
'servers': sorted(df['server_name'].unique().tolist()),
'time_range': {
'start': df['timestamp'].min().isoformat(),
'end': df['timestamp'].max().isoformat(),
'duration_hours': (df['timestamp'].max() - df['timestamp'].min()).total_seconds() / 3600
},
'columns': list(df.columns),
'status_distribution': df['status'].value_counts().to_dict()
}
metadata_path = output_path / 'metrics_metadata.json'
with open(metadata_path, 'w') as f:
json.dump(metadata, f, indent=2)
print(f"Metadata: {metadata_path}")
return str(metadata_path)
# =============================================================================
# VALIDATION
# =============================================================================
def validate_dataset(df: pd.DataFrame) -> bool:
"""Validate dataset meets requirements."""
print("\n" + "="*60)
print("DATASET VALIDATION")
print("="*60)
errors = []
warnings = []
# Check required columns
required = ['timestamp', 'server_name', 'status', 'cpu_user_pct', 'cpu_sys_pct',
'cpu_iowait_pct', 'cpu_idle_pct', 'java_cpu_pct', 'mem_used_pct',
'swap_used_pct', 'disk_usage_pct', 'net_in_mb_s', 'net_out_mb_s',
'back_close_wait', 'front_close_wait', 'load_average', 'uptime_days']
missing = set(required) - set(df.columns)
if missing:
errors.append(f"Missing required columns: {missing}")
# Check for nulls in critical columns
for col in ['timestamp', 'server_name']:
if col in df.columns and df[col].isna().any():
errors.append(f"NULL values in {col}")
# Check data volume
if len(df) < 1000:
warnings.append(f"Small dataset ({len(df)} rows). Recommend 10,000+ for good training.")
if df['server_name'].nunique() < 5:
warnings.append(f"Few servers ({df['server_name'].nunique()}). Recommend 10+ for good generalization.")
# Check time coverage
time_span_hours = (df['timestamp'].max() - df['timestamp'].min()).total_seconds() / 3600
if time_span_hours < 24:
warnings.append(f"Short time span ({time_span_hours:.1f} hours). Recommend 7+ days.")
# Check status distribution
status_counts = df['status'].value_counts()
if 'healthy' not in status_counts or status_counts['healthy'] / len(df) < 0.5:
warnings.append("Low percentage of 'healthy' status. May affect model calibration.")
# Report results
if errors:
print("\n❌ ERRORS (must fix):")
for e in errors:
print(f" - {e}")
if warnings:
print("\n⚠️ WARNINGS (consider):")
for w in warnings:
print(f" - {w}")
if not errors and not warnings:
print("\n✅ Dataset validation PASSED")
elif not errors:
print("\n✅ Dataset validation PASSED with warnings")
else:
print("\n❌ Dataset validation FAILED")
# Print summary stats
print(f"\nDataset Summary:")
print(f" Rows: {len(df):,}")
print(f" Servers: {df['server_name'].nunique()}")
print(f" Time span: {time_span_hours:.1f} hours ({time_span_hours/24:.1f} days)")
print(f" Timestamps per server: {len(df) / df['server_name'].nunique():.0f} avg")
return len(errors) == 0
# =============================================================================
# MAIN PIPELINE
# =============================================================================
def prepare_dataset(source_type: str, output_dir: str, **kwargs) -> str:
"""
Main pipeline to prepare dataset for training.
Args:
source_type: 'database', 'csv', 'prometheus', 'elasticsearch'
output_dir: Output directory for Parquet files
**kwargs: Source-specific arguments
Returns:
Path to manifest file
"""
print("="*60)
print("TACHYON ARGUS DATA PREPARATION")
print("="*60)
# Step 1: Extract
print("\n[1/6] EXTRACTING DATA...")
if source_type == 'database':
df = extract_from_database(kwargs['connection_string'], kwargs.get('query'))
elif source_type == 'csv':
df = extract_from_csv(kwargs['input_file'])
elif source_type == 'prometheus':
df = extract_from_prometheus(
kwargs['prometheus_url'],
kwargs['servers'],
kwargs['start_time'],
kwargs['end_time']
)
elif source_type == 'elasticsearch':
df = extract_from_elasticsearch(
kwargs['es_url'],
kwargs['index'],
kwargs['start_time'],
kwargs['end_time']
)
else:
raise ValueError(f"Unknown source type: {source_type}")
# Step 2: Map columns
print("\n[2/6] MAPPING COLUMNS...")
df = apply_column_mapping(df, COLUMN_MAPPING)
df = apply_status_mapping(df, STATUS_MAPPING)
# Step 3: Transform
print("\n[3/6] TRANSFORMING DATA...")
df = convert_units(df)
df = derive_missing_metrics(df)
df = add_missing_columns(df)
df = fix_data_types(df)
df = validate_ranges(df)
# Step 4: Validate
print("\n[4/6] VALIDATING...")
if not validate_dataset(df):
raise ValueError("Dataset validation failed")
# Step 5: Partition
print("\n[5/6] PARTITIONING...")
chunks = partition_by_time(df, chunk_hours=2)
# Step 6: Save
print("\n[6/6] SAVING...")
manifest_path = save_partitioned_parquet(chunks, output_dir)
save_metadata(df, output_dir)
print("\n" + "="*60)
print("DATA PREPARATION COMPLETE")
print("="*60)
print(f"\nOutput directory: {output_dir}")
print(f"Ready for training with: python src/training/main.py train --streaming")
return manifest_path
# =============================================================================
# CLI
# =============================================================================
def main():
parser = argparse.ArgumentParser(
description='Prepare real-world metrics for Tachyon Argus training',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# From CSV file
python data_prep.py --source csv --input metrics.csv --output ./training/
# From PostgreSQL
python data_prep.py --source database \\
--connection "postgresql://user:pass@host/db" \\
--output ./training/
# From Prometheus
python data_prep.py --source prometheus \\
--prometheus-url http://prometheus:9090 \\
--start 2025-01-01 --end 2025-01-15 \\
--output ./training/
"""
)
parser.add_argument('--source', type=str, required=True,
choices=['database', 'csv', 'prometheus', 'elasticsearch'],
help='Data source type')
parser.add_argument('--output', type=str, default='./training/',
help='Output directory (default: ./training/)')
# CSV options
parser.add_argument('--input', type=str,
help='Input CSV file path')
# Database options
parser.add_argument('--connection', type=str,
help='Database connection string')
parser.add_argument('--query', type=str,
help='Custom SQL query')
# Prometheus options
parser.add_argument('--prometheus-url', type=str,
help='Prometheus server URL')
parser.add_argument('--start', type=str,
help='Start date (YYYY-MM-DD)')
parser.add_argument('--end', type=str,
help='End date (YYYY-MM-DD)')
# Elasticsearch options
parser.add_argument('--es-url', type=str,
help='Elasticsearch URL')
parser.add_argument('--index', type=str,
help='Elasticsearch index name')
args = parser.parse_args()
# Build kwargs based on source
kwargs = {}
if args.source == 'csv':
if not args.input:
parser.error("--input required for csv source")
kwargs['input_file'] = args.input
elif args.source == 'database':
if not args.connection:
parser.error("--connection required for database source")
kwargs['connection_string'] = args.connection
kwargs['query'] = args.query
elif args.source == 'prometheus':
if not args.prometheus_url:
parser.error("--prometheus-url required for prometheus source")
kwargs['prometheus_url'] = args.prometheus_url
kwargs['start_time'] = datetime.fromisoformat(args.start) if args.start else datetime.now() - timedelta(days=30)
kwargs['end_time'] = datetime.fromisoformat(args.end) if args.end else datetime.now()
kwargs['servers'] = [] # Will discover from metrics
elif args.source == 'elasticsearch':
if not args.es_url or not args.index:
parser.error("--es-url and --index required for elasticsearch source")
kwargs['es_url'] = args.es_url
kwargs['index'] = args.index
kwargs['start_time'] = datetime.fromisoformat(args.start) if args.start else datetime.now() - timedelta(days=30)
kwargs['end_time'] = datetime.fromisoformat(args.end) if args.end else datetime.now()
prepare_dataset(args.source, args.output, **kwargs)
if __name__ == '__main__':
main()# If your CSV has different column names, edit COLUMN_MAPPING in the script first
python data_prep.py --source csv --input /path/to/metrics.csv --output ./training/python data_prep.py --source database \
--connection "postgresql://user:password@localhost:5432/monitoring" \
--output ./training/python data_prep.py --source database \
--connection "mysql+pymysql://user:password@localhost:3306/monitoring" \
--output ./training/python data_prep.py --source prometheus \
--prometheus-url http://prometheus:9090 \
--start 2025-01-01 \
--end 2025-01-15 \
--output ./training/Edit the COLUMN_MAPPING dictionary in the script to match your data:
COLUMN_MAPPING = {
# Your column name -> Expected column name
# Example: Your DB has 'collected_at' instead of 'timestamp'
'collected_at': 'timestamp',
# Example: Your DB has 'host_id' instead of 'server_name'
'host_id': 'server_name',
# Example: Your DB has 'cpu_percent' as total CPU
'cpu_percent': 'cpu_user_pct', # Map to user, derive others
# ... add all your mappings
}If your monitoring system doesn't collect all 16 metrics, the script will:
- Use defaults for non-critical metrics (e.g.,
java_cpu_pct = 0) - Derive metrics when possible (e.g., calculate
cpu_idle_pctfrom others) - Fail only if critical columns are missing (
timestamp,server_name)
No Java metrics:
# Script will default java_cpu_pct to 0
# No action neededOnly total CPU, no breakdown:
# Add to your data before mapping:
df['cpu_user_pct'] = df['cpu_total'] * 0.7
df['cpu_sys_pct'] = df['cpu_total'] * 0.2
df['cpu_iowait_pct'] = df['cpu_total'] * 0.1
df['cpu_idle_pct'] = 100 - df['cpu_total']No connection metrics:
# Script will default back_close_wait and front_close_wait to 0
# No action neededThe script auto-detects and converts common unit mismatches:
| Metric | Expected | Auto-converts from |
|---|---|---|
| Network | MB/s | bytes/s, KB/s |
| Uptime | days | seconds, hours |
| Percentages | 0-100 | 0-1 (multiplies by 100) |
After preparation, the script validates:
- All 16 required columns present
- No NULL values in critical columns
- Values within expected ranges
- Sufficient data volume (warns if < 1000 rows)
- Sufficient server count (warns if < 5)
- Sufficient time coverage (warns if < 24 hours)
training/
├── server_metrics_partitioned/
│ ├── chunk_20250101_00.parquet
│ ├── chunk_20250101_02.parquet
│ ├── chunk_20250101_04.parquet
│ ├── ...
│ └── chunk_manifest.json
└── metrics_metadata.json
The output is ready for streaming training:
cd Argus
python src/training/main.py train --streaming- Check your
COLUMN_MAPPING- make sure your column names are mapped - Column names are case-sensitive
- Ensure your source data has valid timestamps
- Check date parsing format
- Review the specific errors printed
- Most common: missing columns or wrong data types
- The script processes data in chunks for partitioning
- For very large datasets (>100M rows), consider extracting in batches
After data preparation:
-
Train the model:
cd Argus python src/training/main.py train --streaming -
Validate predictions: Test on held-out data
-
Deploy: See DEPLOYMENT_GUIDE.md