diff --git a/doc/source/images/trackio_logo_full.png b/doc/source/images/trackio_logo_full.png new file mode 100644 index 000000000000..9a7a75b5bd14 Binary files /dev/null and b/doc/source/images/trackio_logo_full.png differ diff --git a/doc/source/tune/api/logging.rst b/doc/source/tune/api/logging.rst index 2ef841929056..3c9b0018ae38 100644 --- a/doc/source/tune/api/logging.rst +++ b/doc/source/tune/api/logging.rst @@ -92,6 +92,19 @@ See the :doc:`tutorial here `. ~air.integrations.comet.CometLoggerCallback +Trackio Integration +------------------------------ + +Tune also provides a logger for `Trackio `_. +You can install Trackio via ``pip install trackio``. +See the :doc:`tutorial here `. + +.. autosummary:: + :nosignatures: + :toctree: doc/ + + ~air.integrations.trackio.TrackioLoggerCallback + Aim Integration --------------- diff --git a/doc/source/tune/examples/index.rst b/doc/source/tune/examples/index.rst index 075e5fe20ac8..7e6a0892e012 100644 --- a/doc/source/tune/examples/index.rst +++ b/doc/source/tune/examples/index.rst @@ -59,6 +59,7 @@ Experiment tracking tools MLflow Example Aim Example Comet Example + Trackio Example Ray Tune integrates with some popular Experiment tracking and management tools, such as CometML, or Weights & Biases. For how diff --git a/doc/source/tune/examples/tune-trackio.ipynb b/doc/source/tune/examples/tune-trackio.ipynb new file mode 100644 index 000000000000..3434c88522e6 --- /dev/null +++ b/doc/source/tune/examples/tune-trackio.ipynb @@ -0,0 +1,498 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Using Trackio with Tune\n", + "\n", + "\n", + " \"try-anyscale-quickstart\"\n", + "\n", + "

\n", + "\n", + "\n", + "[Trackio](https://huggingface.co/docs/trackio/index) is a open-source, lightweight, free experiment tracking \n", + "Python library built on top of Hugging Face Datasets and Spaces 🤗. It has a local first design and the experiments can be viewed with a Gradio dashboard locally or on the HF Hub\n", + "\n", + "```{image} /images/trackio_logo_full.png\n", + ":align: center\n", + ":alt: Trackio\n", + ":height: 80px\n", + ":target: https://huggingface.co/docs/trackio/index/\n", + "```\n", + "\n", + "Ray Tune currently offers two lightweight integrations for Weights & Biases.\n", + "One is the `TrackioLoggerCallback`, which automatically logs\n", + "metrics reported to Tune to the Wandb API.\n", + "\n", + "The other one is the `setup_trackio()` function, which can be\n", + "used with the function API. It automatically\n", + "initializes the Wandb API with Tune's training information. You can just use the\n", + "Wandb API like you would normally do, e.g. using `wandb.log()` to log your training\n", + "process.\n", + "\n", + "```{contents}\n", + ":backlinks: none\n", + ":local: true\n", + "```\n", + "\n", + "## Running A Trackio Example\n", + "\n", + "In the following example we're going to use both of the above methods, namely the `TrackioLoggerCallback` and\n", + "the `setup_trackio` function to log metrics.\n", + "\n", + "As the very first step, make sure you're logged in into the HF Hub on all machines you're running your training on:\n", + "\n", + " hf auth login\n", + "\n", + "We can then start with a few crucial imports:" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [], + "source": [ + "import random\n", + "import time\n", + "\n", + "import numpy as np\n", + "import trackio\n", + "\n", + "import ray\n", + "from ray import tune\n", + "from ray.air.integrations.trackio import (\n", + " TrackioLoggerCallback,\n", + " setup_trackio,\n", + ")\n", + "from ray.train import RunConfig, ScalingConfig\n", + "from ray.train.torch import TorchTrainer\n", + "\n", + "PROJECT_NAME = \"trackio-ray-example\"\n", + "\n", + "HF_DATASET_ID = \"AINovice2005/ray-trackio-experiments\"\n", + "HF_SPACE_ID = \"AINovice2005/ray-trackio-dashboard\"\n", + "\n", + "NUM_STEPS = 15" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Tracking Ray Tune Experiments with Trackio\n", + "\n", + "This example demonstrates how to integrate Trackio with Ray Tune using `TrackioLoggerCallback`. The callback automatically captures:\n", + "\n", + "- Metrics reported via `tune.report`\n", + "- GPU utilization and system telemetry\n", + "- Trial configuration and metadata\n", + "\n", + "All trials are logged under a single project, enabling structured comparison across hyperparameter configurations. Results can be persisted to a dataset or space for analysis and sharing with the community." + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "def tune_trainable(config):\n", + "\n", + " for step in range(NUM_STEPS):\n", + "\n", + " loss = (config[\"lr\"] * 10) / (step + 1) + random.random()\n", + " accuracy = 1 / (loss + 1e-3)\n", + "\n", + " # Example artifact\n", + " image = np.random.rand(64, 64, 3)\n", + "\n", + " tune.report(\n", + " {\n", + " \"loss\": loss,\n", + " \"accuracy\": accuracy,\n", + " \"image_mean\": float(image.mean()),\n", + " \"step\": step,\n", + " }\n", + " )\n", + "\n", + " time.sleep(0.2)" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [], + "source": [ + "def run_tune_example():\n", + "\n", + " tuner = tune.Tuner(\n", + " tune_trainable,\n", + " param_space={\n", + " \"lr\": tune.grid_search([0.001, 0.01, 0.1]),\n", + " },\n", + " run_config=tune.RunConfig(\n", + " name=\"trackio-ray-tune-demo\",\n", + " callbacks=[\n", + " TrackioLoggerCallback(\n", + " project=PROJECT_NAME,\n", + " auto_log_gpu=True,\n", + " gpu_log_interval=5,\n", + " dataset_id=HF_DATASET_ID,\n", + " space_id=HF_SPACE_ID,\n", + " )\n", + " ],\n", + " ),\n", + " )\n", + "\n", + " results = tuner.fit()\n", + "\n", + " print(\"\\nTune finished\\n\")\n", + " print(results)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Applying Ray Train with Trackio\n", + "\n", + "This example demonstrates how to integrate Trackio with Ray Train using setup_trackio, enabling manual logging within a training loop.\n", + "\n", + "Here, we simulate the training loop which logs the scalar metrics such as loss and throughput\n", + "Step-wise progress for time-series tracking\n", + "\n" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [], + "source": [ + "def train_loop(config):\n", + "\n", + " run = setup_trackio(\n", + " config=config,\n", + " project=PROJECT_NAME,\n", + " auto_log_gpu=True,\n", + " gpu_log_interval=5,\n", + " dataset_id=HF_DATASET_ID,\n", + " space_id=HF_SPACE_ID,\n", + " )\n", + "\n", + " for step in range(NUM_STEPS):\n", + "\n", + " loss = 5 / (step + 1) + random.random()\n", + " throughput = random.uniform(50, 150)\n", + "\n", + " if run:\n", + " run.log(\n", + " {\n", + " \"loss\": loss,\n", + " \"throughput\": throughput,\n", + " \"step\": step,\n", + " },\n", + " step=step,\n", + " )\n", + "\n", + " sample_image = np.random.rand(64, 64, 3)\n", + "\n", + " if run:\n", + " run.log(\n", + " {\n", + " \"image_mean\": float(sample_image.mean()),\n", + " \"image_std\": float(sample_image.std()),\n", + " }\n", + " )\n", + "\n", + " time.sleep(0.2)\n", + "\n", + " if run:\n", + " run.finish()" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [], + "source": [ + "def run_train_example():\n", + "\n", + " trainer = TorchTrainer(\n", + " train_loop_per_worker=train_loop,\n", + " train_loop_config={\"lr\": 0.01},\n", + " scaling_config=ScalingConfig(num_workers=1),\n", + " run_config=RunConfig(name=\"trackio-ray-train-demo\"),\n", + " )\n", + "\n", + " trainer.fit()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Execute the example\n", + "\n" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "
\n", + "
\n", + "

Tune Status

\n", + " \n", + "\n", + "\n", + "\n", + "\n", + "\n", + "
Current time:2026-03-28 08:14:11
Running for: 00:00:07.66
Memory: 7.2/15.6 GiB
\n", + "
\n", + "
\n", + "
\n", + "

System Info

\n", + " Using FIFO scheduling algorithm.
Logical resource usage: 3.0/4 CPUs, 0/0 GPUs\n", + "
\n", + " \n", + "
\n", + "
\n", + "
\n", + "

Trial Status

\n", + " \n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "
Trial name status loc lr iter total time (s) loss accuracy image_mean
tune_trainable_15a2f_00000TERMINATED10.128.0.96:836220.001 15 2.815570.160352 6.19765 0.49986
tune_trainable_15a2f_00001TERMINATED10.128.0.96:836210.01 15 2.815530.564934 1.76699 0.502778
tune_trainable_15a2f_00002TERMINATED10.128.0.96:836200.1 15 2.820820.923933 1.08116 0.50249
\n", + "
\n", + "
\n", + "\n" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "* Trackio project initialized: trackio-ray-example\n", + "* Trackio metrics will be synced to Hugging Face Dataset: AINovice2005/ray-trackio-experiments\n", + "* Found existing space: https://huggingface.co/spaces/AINovice2005/ray-trackio-dashboard\n", + "* View dashboard by going to: https://AINovice2005-ray-trackio-dashboard.hf.space/\n" + ] + }, + { + "data": { + "text/html": [ + "
" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "2026-03-28 08:14:07,714\tWARNING trackio.py:286 -- trackio: Dropping unsupported metric 'checkpoint_dir_name' (type=NoneType). Only int/float supported.\n", + "2026-03-28 08:14:07,715\tWARNING trackio.py:286 -- trackio: Dropping unsupported metric 'trial_id' (type=str). Only int/float supported.\n", + "2026-03-28 08:14:07,716\tWARNING trackio.py:286 -- trackio: Dropping unsupported metric 'date' (type=str). Only int/float supported.\n", + "2026-03-28 08:14:07,718\tWARNING trackio.py:286 -- trackio: Dropping unsupported metric 'hostname' (type=str). Only int/float supported.\n", + "2026-03-28 08:14:07,722\tWARNING trackio.py:286 -- trackio: Dropping unsupported metric 'node_ip' (type=str). Only int/float supported.\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "* Created new run: tune_trainable_15a2f_00002\n", + "* Created new run: tune_trainable_15a2f_00000\n", + "* Created new run: tune_trainable_15a2f_00001\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/teamspace/studios/this_studio/ray/.venv/lib/python3.12/site-packages/trackio/run.py:608: UserWarning: Reserved keys renamed: ['step', 'timestamp'] → '__{key}'\n", + " warnings.warn(f\"Reserved keys renamed: {renamed_keys} → '__{{key}}'\")\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "* Run finished. Uploading logs to Trackio Space (please wait...)\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "2026-03-28 08:14:11,946\tINFO tune.py:1001 -- Wrote the latest version of all result files and experiment state to '/teamspace/studios/this_studio/ray_results/trackio-ray-tune-demo' in 0.0084s.\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "* Run finished. Uploading logs to Trackio Space (please wait...)\n", + "* Run finished. Uploading logs to Trackio Space (please wait...)\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "2026-03-28 08:14:11,964\tINFO tune.py:1033 -- Total run time: 7.73 seconds (7.65 seconds for the tuning loop).\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "* Run finished. Uploading logs to Trackio Space (please wait...)\n", + "\n", + "Tune finished\n", + "\n", + "ResultGrid<[\n", + " Result(\n", + " metrics={'loss': 0.16035154682514471, 'accuracy': 6.197647433053068, 'image_mean': 0.4998603843744434, 'step': 14},\n", + " path='/teamspace/studios/this_studio/ray_results/trackio-ray-tune-demo/tune_trainable_15a2f_00000_0_lr=0.0010_2026-03-28_08-14-04',\n", + " filesystem='local',\n", + " checkpoint=None\n", + " ),\n", + " Result(\n", + " metrics={'loss': 0.5649336419903179, 'accuracy': 1.7669916149234828, 'image_mean': 0.5027780010222311, 'step': 14},\n", + " path='/teamspace/studios/this_studio/ray_results/trackio-ray-tune-demo/tune_trainable_15a2f_00001_1_lr=0.0100_2026-03-28_08-14-04',\n", + " filesystem='local',\n", + " checkpoint=None\n", + " ),\n", + " Result(\n", + " metrics={'loss': 0.9239329737032552, 'accuracy': 1.081159422824111, 'image_mean': 0.5024897968228581, 'step': 14},\n", + " path='/teamspace/studios/this_studio/ray_results/trackio-ray-tune-demo/tune_trainable_15a2f_00002_2_lr=0.1000_2026-03-28_08-14-04',\n", + " filesystem='local',\n", + " checkpoint=None\n", + " )\n", + "]>\n", + "\n", + "Running Ray Train experiment\n", + "\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "\u001b[36m(TrainController pid=84397)\u001b[0m Requesting resources: {'CPU': 1} * 1\n", + "\u001b[36m(TrainController pid=84397)\u001b[0m Attempting to start training worker group of size 1 with the following resources: [{'CPU': 1}] * 1\n", + "\u001b[36m(RayTrainWorker pid=85069)\u001b[0m Setting up process group for: env:// [rank=0, world_size=1]\n", + "\u001b[36m(TrainController pid=84397)\u001b[0m Started training worker group of size 1: \n", + "\u001b[36m(TrainController pid=84397)\u001b[0m - (ip=10.128.0.96, pid=85069) world_rank=0, local_rank=0, node_rank=0\n", + "\u001b[36m(RayTrainWorker pid=85069)\u001b[0m * Trackio project initialized: trackio-ray-example\n", + "\u001b[36m(RayTrainWorker pid=85069)\u001b[0m * Trackio metrics will be synced to Hugging Face Dataset: AINovice2005/ray-trackio-experiments\n", + "\u001b[36m(RayTrainWorker pid=85069)\u001b[0m * Found existing space: https://huggingface.co/spaces/AINovice2005/ray-trackio-dashboard\n", + "\u001b[36m(RayTrainWorker pid=85069)\u001b[0m * View dashboard by going to: https://AINovice2005-ray-trackio-dashboard.hf.space/\n", + "\u001b[36m(RayTrainWorker pid=85069)\u001b[0m * Created new run: AINovice2005-1774685667\n", + "\u001b[36m(RayTrainWorker pid=85069)\u001b[0m /teamspace/studios/this_studio/ray/.venv/lib/python3.12/site-packages/trackio/run.py:608: UserWarning: Reserved keys renamed: ['step'] → '__{key}'\n", + "\u001b[36m(RayTrainWorker pid=85069)\u001b[0m warnings.warn(f\"Reserved keys renamed: {renamed_keys} → '__{{key}}'\")\n", + "\u001b[36m(RayTrainWorker pid=85069)\u001b[0m * Run finished. Uploading logs to Trackio Space (please wait...)\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\n", + "Opening dashboard\n", + "\n", + "Run manually if needed:\n", + "trackio show --project \"trackio-ray-demo\"\n", + "* Run finished. Uploading logs to Trackio Space (please wait...)\n", + "\n", + "Execution completed\n", + "\n" + ] + } + ], + "source": [ + "ray.init(ignore_reinit_error=True)\n", + "\n", + "print(\"\\nRunning Ray Tune experiment\\n\")\n", + "run_tune_example()\n", + "\n", + "print(\"\\nRunning Ray Train experiment\\n\")\n", + "run_train_example()\n", + "\n", + "print(\"\\nOpening dashboard\\n\")\n", + "print(\"Run manually if needed:\")\n", + "print('trackio show --project \"trackio-ray-demo\"')\n", + "\n", + "trackio.finish()\n", + "ray.shutdown()\n", + "\n", + "print(\"\\nExecution completed\\n\")" + ] + } + ], + "metadata": { + "language_info": { + "name": "python" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/python/ray/air/BUILD.bazel b/python/ray/air/BUILD.bazel index 0c119c16cf2b..e5205af54534 100644 --- a/python/ray/air/BUILD.bazel +++ b/python/ray/air/BUILD.bazel @@ -120,6 +120,19 @@ py_test( deps = [":ml_lib"], ) +py_test( + name = "test_integration_trackio", + size = "small", + srcs = ["tests/test_integration_trackio.py"], + # NOTE: This tests the Tune Trackio callback. + env = {"RAY_TRAIN_V2_ENABLED": "1"}, + tags = [ + "exclusive", + "team:ml", + ], + deps = [":ml_lib"], +) + py_test( name = "test_keras_callback", size = "medium", diff --git a/python/ray/air/integrations/trackio.py b/python/ray/air/integrations/trackio.py new file mode 100644 index 000000000000..8bd8c96ad6b3 --- /dev/null +++ b/python/ray/air/integrations/trackio.py @@ -0,0 +1,339 @@ +import logging +from typing import Dict, List, Optional + +import numpy as np + +try: + import trackio +except ImportError: + trackio = None + +from ray.air.constants import TRAINING_ITERATION +from ray.train._internal.session import get_session +from ray.tune.experiment import Trial +from ray.tune.logger import LoggerCallback +from ray.util import PublicAPI + +logger = logging.getLogger(__name__) + + +def _flatten_dict(d: Dict, parent_key: str = "", sep: str = "/") -> Dict: + """Flatten a nested dictionary.""" + + items = [] + + for k, v in d.items(): + new_key = f"{parent_key}{sep}{k}" if parent_key else k + + if isinstance(v, dict): + items.extend(_flatten_dict(v, new_key, sep=sep).items()) + else: + items.append((new_key, v)) + + return dict(items) + + +@PublicAPI(stability="alpha") +def setup_trackio( + config: Optional[Dict] = None, + project: Optional[str] = None, + name: Optional[str] = None, + group: Optional[str] = None, + auto_log_gpu: bool = False, + gpu_log_interval: float = 10.0, + dataset_id: Optional[str] = None, + space_id: Optional[str] = None, + rank_zero_only: bool = True, +): + """Set up a Trackio experiment run. + + This function initializes a Trackio run for experiment tracking within + Ray Train training loops. Trackio is a lightweight experiment tracking + system that logs metrics, configuration parameters, and system statistics + during model training. + + By default, the run name is derived from the Ray trial name and the run + group corresponds to the Ray experiment name. These values can be overridden + by explicitly passing ``name`` and ``group``. + + In distributed training with Ray Train, only the worker with rank 0 will + initialize the Trackio run by default. This prevents duplicate logging from + multiple workers. If ``rank_zero_only=False`` is specified, every worker + will initialize its own Trackio run. + + Trackio supports additional features such as GPU utilization logging, + remote experiment logging via Hugging Face datasets, and remote dashboards + hosted on Hugging Face Spaces. + + Args: + config: Configuration dictionary to log as part of the experiment + metadata. This typically contains hyperparameters or training + configuration values. + project: Name of the Trackio project under which runs are grouped. + Defaults to ``"ray-train"`` if not provided. + name: Optional name of the Trackio run. Defaults to the Ray trial name. + group: Optional grouping identifier for runs. Defaults to the Ray + experiment name. + auto_log_gpu: If True, Trackio automatically records GPU utilization + metrics during training. + gpu_log_interval: Interval (in seconds) between GPU metric samples. + dataset_id: Optional Hugging Face dataset ID where experiment logs + should be uploaded. + space_id: Optional Hugging Face Space ID used for hosting the Trackio + dashboard remotely. + rank_zero_only: If True, only the rank 0 worker in distributed training + will initialize Trackio. If False, all workers will create runs. + + Returns: + A Trackio run object returned by ``trackio.init()``. + + Example: + + .. code-block:: python + + from ray.air.integrations.trackio import setup_trackio + + def training_loop(config): + run = setup_trackio(config=config, project="my-project") + + for step in range(10): + loss = train_step() + if run: + run.log({"loss": loss}, step=step) + + if run: + run.finish() + """ + if trackio is None: + raise RuntimeError("Trackio was not found. Install with `pip install trackio`.") + + trial_name = None + experiment_name = None + + session = get_session() + if session: + if ( + rank_zero_only + and session.world_rank is not None + and session.world_rank != 0 + ): + return None + + trial_name = session.trial_name + experiment_name = session.experiment_name + + name = name or trial_name + group = group or experiment_name + + run = trackio.init( + project=project or "ray-train", + name=name, + group=group, + config=config, + auto_log_gpu=auto_log_gpu, + gpu_log_interval=gpu_log_interval, + dataset_id=dataset_id, + space_id=space_id, + ) + return run + + +@PublicAPI(stability="alpha") +class TrackioLoggerCallback(LoggerCallback): + """Logger callback that logs Ray Tune experiment results to Trackio. + + This callback integrates Trackio experiment tracking with Ray Tune. + Each Ray Tune trial corresponds to a separate Trackio run. Metrics + reported by the training function are logged to the corresponding run. + + Trackio supports additional capabilities such as GPU telemetry logging, + remote experiment logging through Hugging Face datasets, and remote + dashboards hosted on Hugging Face Spaces. + + Example: + + .. code-block:: python + + from ray import tune + from ray.air.integrations.trackio import TrackioLoggerCallback + + def train_fn(config): + for step in range(10): + loss = 1 / (step + 1) + tune.report({"loss": loss}) + + tuner = tune.Tuner( + train_fn, + param_space={"lr": tune.grid_search([0.001, 0.01])}, + run_config=tune.RunConfig( + callbacks=[ + TrackioLoggerCallback( + project="ray-experiments", + auto_log_gpu=True + ) + ] + ) + ) + + tuner.fit() + + Args: + project: Name of the Trackio project. + group: Optional grouping identifier for runs. + log_config: If True, Ray trial configuration parameters will be logged. + auto_log_gpu: If True, GPU utilization metrics will be logged. + gpu_log_interval: Interval (seconds) between GPU metric samples. + dataset_id: Optional Hugging Face dataset ID used for remote logging. + space_id: Optional Hugging Face Space used to host the Trackio dashboard. + excludes: List of metric keys that should not be logged. + """ + + _exclude_results = ["done", "should_checkpoint", TRAINING_ITERATION] + + def __init__( + self, + project: str, + group: Optional[str] = None, + log_config: bool = True, + auto_log_gpu: bool = False, + gpu_log_interval: float = 10.0, + dataset_id: Optional[str] = None, + space_id: Optional[str] = None, + excludes: Optional[List[str]] = None, + ): + if trackio is None: + raise RuntimeError( + "Trackio was not found. Install with `pip install trackio`." + ) + + self.project = project + self.group = group + self.log_config = log_config + + self.auto_log_gpu = auto_log_gpu + self.gpu_log_interval = gpu_log_interval + self.dataset_id = dataset_id + self.space_id = space_id + + self._warned_unsupported_keys = set() + self.excludes = excludes or [] + self._effective_excludes = set(self._exclude_results) + + if excludes: + self._effective_excludes.update(excludes) + + if not self.log_config: + self._effective_excludes.add("config") + + self._trial_runs: Dict[Trial, trackio.Run] = {} + + def log_trial_start(self, trial: Trial): + """Initialize a Trackio run when a Ray Tune trial starts.""" + # Prevent duplicate runs during trial recovery + if trial in self._trial_runs: + return + + config = trial.config.copy() + if not self.log_config: + config = {} + else: + config.pop("callbacks", None) + + self._trial_runs[trial] = trackio.init( + project=self.project, + name=str(trial), + group=self.group or trial.experiment_dir_name, + config=config, + auto_log_gpu=self.auto_log_gpu, + gpu_log_interval=self.gpu_log_interval, + dataset_id=self.dataset_id, + space_id=self.space_id, + ) + + def log_trial_result( + self, + iteration: int, + trial: Trial, + result: Dict, + ): + """Log metrics from a Ray Tune training iteration to Trackio.""" + run = self._trial_runs.get(trial) + + # Lazy initialization after experiment restore + if run is None: + self.log_trial_start(trial) + run = self._trial_runs.get(trial) + + flat = _flatten_dict(result) + metrics = {} + for key, value in flat.items(): + if any( + key == ex or key.startswith(ex + "/") for ex in self._effective_excludes + ): + continue + + # Convert numpy arrays and scalar types + if isinstance(value, np.ndarray): + value = value.tolist() + if isinstance(value, np.generic): + value = value.item() + # Only log supported metric types + if isinstance(value, (int, float)): + metrics[key] = value + else: + # Warn once per key + if key not in self._warned_unsupported_keys: + logger.warning( + f"trackio: Dropping unsupported metric '{key}' " + f"(type={type(value).__name__}). Only int/float supported." + ) + self._warned_unsupported_keys.add(key) + + if metrics and run: + training_step = result.get(TRAINING_ITERATION, iteration) + try: + run.log(metrics, step=training_step) + except Exception as e: + logger.warning(f"trackio: Failed to log metrics: {e}") + + def log_trial_save(self, trial: Trial): + """Log checkpoint metadata when a Ray Tune trial checkpoint is saved.""" + checkpoint = trial.checkpoint + if not checkpoint: + return + + run = self._trial_runs.get(trial) + if run is None: + self.log_trial_start(trial) + run = self._trial_runs.get(trial) + + if run: + try: + run.log({"checkpoint_saved": 1}) + except Exception as e: + logger.warning(f"trackio: Failed to log checkpoint: {e}") + + def log_trial_end(self, trial: Trial, failed: bool = False): + """Finalize the Trackio run when a Ray Tune trial finishes.""" + run = self._trial_runs.get(trial) + if run: + run.finish() + + self._trial_runs.pop(trial, None) + + def on_experiment_end(self, trials, **info): + """Finalize any remaining Trackio runs after the Ray Tune experiment ends. + + ``log_trial_end`` handles normal trial cleanup. This method catches + runs that were never properly ended due to interruption or failure. + """ + for trial, run in self._trial_runs.items(): + if run: + run.finish() + + self._trial_runs.clear() + + try: + trackio.finish() + except Exception as e: + logger.warning(f"trackio: `trackio.finish()` failed: {e}") diff --git a/python/ray/air/tests/test_integration_trackio.py b/python/ray/air/tests/test_integration_trackio.py new file mode 100644 index 000000000000..0d7330b43b16 --- /dev/null +++ b/python/ray/air/tests/test_integration_trackio.py @@ -0,0 +1,285 @@ +from unittest.mock import MagicMock + +import numpy as np +import pytest + +from ray.air.integrations.trackio import ( + TrackioLoggerCallback, + setup_trackio, +) + +# ------------------------- +# Mock Objects +# ------------------------- + + +class MockTrial: + def __init__(self): + self.config = {"lr": 0.01, "callbacks": "remove_me"} + self.experiment_dir_name = "exp_group" + self.checkpoint = None + + def __str__(self): + return "mock_trial" + + +class MockRun: + def __init__(self): + self.logged = [] + self.finished = False + + def log(self, metrics, step=None): + self.logged.append((metrics, step)) + + def finish(self): + self.finished = True + + +# ------------------------- +# Fixtures +# ------------------------- + + +@pytest.fixture +def mock_trackio(mocker): + mock_run = MockRun() + + mock_init = mocker.patch( + "ray.air.integrations.trackio.trackio.init", + return_value=mock_run, + ) + + return mock_init, mock_run + + +# ------------------------- +# Tests: TrackioLoggerCallback +# ------------------------- + + +def test_log_trial_start_initializes_run(mock_trackio): + mock_init, _ = mock_trackio + + cb = TrackioLoggerCallback(project="test_project") + trial = MockTrial() + + cb.log_trial_start(trial) + + assert trial in cb._trial_runs + mock_init.assert_called_once() + + _, kwargs = mock_init.call_args + assert kwargs["project"] == "test_project" + assert kwargs["name"] == "mock_trial" + assert kwargs["group"] == "exp_group" + + # callbacks removed + assert "callbacks" not in kwargs["config"] + + +def test_log_trial_result_basic_metrics(mock_trackio): + _, mock_run = mock_trackio + + cb = TrackioLoggerCallback(project="test") + trial = MockTrial() + + cb.log_trial_start(trial) + + result = { + "loss": 0.5, + "accuracy": 0.9, + "training_iteration": 3, + } + + cb.log_trial_result(1, trial, result) + + assert len(mock_run.logged) == 1 + metrics, step = mock_run.logged[0] + + assert metrics == {"loss": 0.5, "accuracy": 0.9} + assert step == 3 + + +def test_flatten_dict_logging(mock_trackio): + _, mock_run = mock_trackio + + cb = TrackioLoggerCallback(project="test") + trial = MockTrial() + + cb.log_trial_start(trial) + + result = { + "metrics": { + "loss": 0.1, + "acc": 0.95, + } + } + + cb.log_trial_result(1, trial, result) + + metrics, _ = mock_run.logged[0] + + assert metrics["metrics/loss"] == 0.1 + assert metrics["metrics/acc"] == 0.95 + + +def test_numpy_conversion_and_filtering(mock_trackio): + _, mock_run = mock_trackio + + cb = TrackioLoggerCallback(project="test") + trial = MockTrial() + + cb.log_trial_start(trial) + + result = { + "loss": np.float32(0.25), + "arr": np.array([1, 2, 3]), # dropped + } + + cb.log_trial_result(1, trial, result) + + metrics, _ = mock_run.logged[0] + + assert metrics["loss"] == 0.25 + assert "arr" not in metrics + + +def test_warning_logged_once_per_key(mock_trackio, mocker): + mocker.patch("ray.air.integrations.trackio.logger.warning") + + cb = TrackioLoggerCallback(project="test") + trial = MockTrial() + + cb.log_trial_start(trial) + + result = {"bad_metric": "string"} + + cb.log_trial_result(1, trial, result) + cb.log_trial_result(2, trial, result) + + logger_mock = pytest.importorskip("ray.air.integrations.trackio").logger.warning + + assert logger_mock.call_count == 1 + + +def test_excludes_filtering(mock_trackio): + _, mock_run = mock_trackio + + cb = TrackioLoggerCallback( + project="test", + excludes=["loss"], + ) + + trial = MockTrial() + cb.log_trial_start(trial) + + result = { + "loss": 0.5, + "accuracy": 0.8, + "done": True, + } + + cb.log_trial_result(1, trial, result) + + metrics, _ = mock_run.logged[0] + + assert "loss" not in metrics + assert "done" not in metrics + assert metrics["accuracy"] == 0.8 + + +def test_lazy_initialization_on_result(mock_trackio): + mock_init, _ = mock_trackio + + cb = TrackioLoggerCallback(project="test") + trial = MockTrial() + + cb.log_trial_result(1, trial, {"loss": 1.0}) + + assert mock_init.called + assert trial in cb._trial_runs + + +def test_log_trial_end_finishes_run(mock_trackio): + _, mock_run = mock_trackio + + cb = TrackioLoggerCallback(project="test") + trial = MockTrial() + + cb.log_trial_start(trial) + cb.log_trial_end(trial) + + assert mock_run.finished is True + assert trial not in cb._trial_runs + + +def test_on_experiment_end_cleans_all_runs(mock_trackio): + _, mock_run = mock_trackio + + cb = TrackioLoggerCallback(project="test") + trial = MockTrial() + + cb.log_trial_start(trial) + + cb.on_experiment_end([]) + + assert mock_run.finished is True + assert cb._trial_runs == {} + + +# ------------------------- +# Tests: setup_trackio +# ------------------------- + + +def test_setup_trackio_rank_zero_only_blocks_nonzero(mocker): + mocker.patch("ray.air.integrations.trackio.trackio.init") + + session = MagicMock() + session.world_rank = 1 + session.trial_name = "trial" + session.experiment_name = "exp" + + mocker.patch( + "ray.air.integrations.trackio.get_session", + return_value=session, + ) + + run = setup_trackio(rank_zero_only=True) + + assert run is None + + +def test_setup_trackio_initializes_on_rank_zero(mocker): + mock_run = MagicMock() + + mock_init = mocker.patch( + "ray.air.integrations.trackio.trackio.init", + return_value=mock_run, + ) + + session = MagicMock() + session.world_rank = 0 + session.trial_name = "trial_name" + session.experiment_name = "exp_name" + + mocker.patch( + "ray.air.integrations.trackio.get_session", + return_value=session, + ) + + run = setup_trackio(config={"lr": 0.1}, project="proj") + + assert run == mock_run + + _, kwargs = mock_init.call_args + assert kwargs["project"] == "proj" + assert kwargs["name"] == "trial_name" + assert kwargs["group"] == "exp_name" + assert kwargs["config"]["lr"] == 0.1 + + +if __name__ == "__main__": + import pytest + + pytest.main(["-v", __file__]) diff --git a/python/requirements/ml/core-requirements.txt b/python/requirements/ml/core-requirements.txt index 2b3a4b6688e3..55283f2a7ec0 100644 --- a/python/requirements/ml/core-requirements.txt +++ b/python/requirements/ml/core-requirements.txt @@ -2,6 +2,7 @@ comet-ml==3.44.1 mlflow>=2.22.0 wandb>=0.23.1 +trackio==0.25.1 # ML training frameworks xgboost==2.1.0