Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
from typing import Callable, Dict, List, Tuple, Union

import ray
from ray._common.usage.usage_lib import record_extra_usage_tag
from ray._common.constants import HEAD_NODE_RESOURCE_NAME
from ray._common.usage.usage_lib import TagKey, record_extra_usage_tag
from ray.llm._internal.batch.observability.logging import get_logger
from ray.llm._internal.common.base_pydantic import BaseModelExtended
from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy

LLM_BATCH_TELEMETRY_NAMESPACE = "llm_batch_telemetry"
LLM_BATCH_TELEMETRY_ACTOR_NAME = "llm_batch_telemetry"
Expand Down Expand Up @@ -89,8 +91,6 @@ def generate_report(self) -> Dict[str, str]:

def record(self, telemetry: BatchModelTelemetry) -> None:
"""Append and record telemetries."""
from ray._common.usage.usage_lib import TagKey

self._tracking_telemetries.append(telemetry)
telemetry_dict = self.generate_report()
for key, value in telemetry_dict.items():
Expand All @@ -107,9 +107,6 @@ def __init__(self):
LLM_BATCH_TELEMETRY_ACTOR_NAME, namespace=LLM_BATCH_TELEMETRY_NAMESPACE
)
except ValueError:
from ray._common.constants import HEAD_NODE_RESOURCE_NAME
from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy

self.remote_telemetry_agent = _TelemetryAgent.options(
# Ensure the actor is created on the head node.
resources={HEAD_NODE_RESOURCE_NAME: 0.001},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from pydantic import BaseModel, field_validator

from .base import CallbackBase
from ray.llm._internal.common.utils.cloud_utils import CloudFileSystem

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -74,8 +75,6 @@ def __init__(self, **kwargs: Any) -> None:

def on_before_download_model_files_distributed(self) -> None:
"""Download files from cloud storage to local paths before model files are downloaded."""
from ray.llm._internal.common.utils.cloud_utils import CloudFileSystem

paths = self.kwargs["paths"]
start_time = time.monotonic()
for cloud_uri, local_path in paths:
Expand Down
4 changes: 2 additions & 2 deletions python/ray/llm/_internal/common/utils/lora_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
from functools import wraps
from typing import Any, Callable, List, Optional, TypeVar, Union

from filelock import FileLock

from ray.llm._internal.common.constants import (
CLOUD_OBJECT_EXISTS_EXPIRE_S,
CLOUD_OBJECT_MISSING_EXPIRE_S,
Expand Down Expand Up @@ -113,8 +115,6 @@ def sync_files_with_lock(
substrings_to_include: Optional[List[str]] = None,
) -> None:
"""Sync files from bucket_uri to local_path with file locking."""
from filelock import FileLock

logger.info("Downloading %s to %s", bucket_uri, local_path)

with FileLock(local_path + ".lock", timeout=timeout or -1):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,19 @@

import ray
from ray import serve
from ray._common.constants import HEAD_NODE_RESOURCE_NAME
from ray._common.usage.usage_lib import (
TagKey,
get_hardware_usages_to_report,
record_extra_usage_tag,
)
from ray.llm._internal.common.base_pydantic import BaseModelExtended
from ray.llm._internal.common.observability.telemetry_utils import DEFAULT_GPU_TYPE
from ray.llm._internal.common.utils.lora_utils import get_lora_model_ids
from ray.llm._internal.serve.core.configs.accelerators import AcceleratorType
from ray.llm._internal.serve.observability.logging import get_logger
from ray.serve.config import AutoscalingConfig
from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy

if TYPE_CHECKING:
from ray.llm._internal.serve.core.configs.llm_config import LLMConfig
Expand Down Expand Up @@ -172,8 +177,6 @@ def generate_report(self) -> Dict[str, str]:

def record(self, model: Optional[TelemetryModel] = None) -> None:
"""Record telemetry model."""
from ray._common.usage.usage_lib import TagKey

if model:
self.models.append(model)

Expand All @@ -192,9 +195,6 @@ def _get_or_create_telemetry_agent() -> TelemetryAgent:
LLM_SERVE_TELEMETRY_ACTOR_NAME, namespace=LLM_SERVE_TELEMETRY_NAMESPACE
)
except ValueError:
from ray._common.constants import HEAD_NODE_RESOURCE_NAME
from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy

telemetry_agent = TelemetryAgent.options(
# Ensure the actor is created on the head node.
resources={HEAD_NODE_RESOURCE_NAME: 0.001},
Expand Down Expand Up @@ -251,8 +251,6 @@ def infer_gpu_from_hardware(self) -> str:
"""Infer the GPU type from the hardware when the accelerator type on llm config is
not specified.
"""
from ray.llm._internal.serve.core.configs.accelerators import AcceleratorType

all_accelerator_types = [t.value for t in AcceleratorType]
gcs_client = ray.experimental.internal_kv.internal_kv_get_gcs_client()
hardwares = self._get_hardware_fn(gcs_client)
Expand Down Expand Up @@ -288,8 +286,6 @@ def push_telemetry_report_for_all_models(
use_autoscaling = model.deployment_config.get("autoscaling_config") is not None
num_replicas, min_replicas, max_replicas = 1, 1, 1
if use_autoscaling:
from ray.serve.config import AutoscalingConfig

autoscaling_config = AutoscalingConfig(
**model.deployment_config["autoscaling_config"]
)
Expand Down
Loading