diff --git a/docs/source/en/serve-cli/serving_optims.md b/docs/source/en/serve-cli/serving_optims.md index 9a927dbf9e25..d7d4572e51fa 100644 --- a/docs/source/en/serve-cli/serving_optims.md +++ b/docs/source/en/serve-cli/serving_optims.md @@ -29,14 +29,6 @@ transformers serve \ --attn-implementation "sdpa" ``` -Monitor continuous batching performance with [OpenTelemetry](https://opentelemetry.io). It collects traces and metrics, but you'll need a backend to visualize them. - -Install the OpenTelemetry dependency. - -```py -pip install transformers[open-telemetry] -``` - ## Quantization [Quantization](../quantization/overview) reduces memory usage by mapping weights to a lower precision. `transformers serve` is compatible with all quantization methods in Transformers. It supports pre-quantized models and runtime quantization. @@ -90,4 +82,4 @@ The `"bfloat16"` or `"float16"` [data types](../models#model-data-type) save mem transformers serve \ --continuous-batching \ --dtype "bfloat16" -``` \ No newline at end of file +``` diff --git a/examples/metrics-monitoring/README.md b/examples/metrics-monitoring/README.md deleted file mode 100644 index 83cc0fd854c9..000000000000 --- a/examples/metrics-monitoring/README.md +++ /dev/null @@ -1,41 +0,0 @@ -# Metrics Monitoring - -## Continuous Batching Metrics in Transformers - -To setup metric monitoring with continuous batching, you will want to have tempo and prometheus running. - -For this, we provide a docker compose image in `examples/metrics-monitoring`. - -To run it: - -```sh -cd examples/metrics-monitoring -docker compose up -``` - -Then, in your script running CB, you will need to create a MeterProvider and TracerProvider as follows: - -```py -from opentelemetry import metrics, trace -from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter -from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter -from opentelemetry.sdk.metrics import MeterProvider -from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader -from opentelemetry.sdk.resources import Resource -from opentelemetry.sdk.trace import TracerProvider -from opentelemetry.sdk.trace.export import BatchSpanProcessor - -resource = Resource.create({"service.name": "transformers"}) - -metrics_exporter = PeriodicExportingMetricReader( - OTLPMetricExporter(endpoint="http://localhost:9090/api/v1/otlp/v1/metrics"), # Uses OTEL_EXPORTER_OTLP_METRICS_ENDPOINT env var - export_interval_millis=1000 -) -meter_provider = MeterProvider(resource=resource, metric_readers=[metrics_exporter]) -metrics.set_meter_provider(meter_provider) - -trace_exporter = OTLPSpanExporter(endpoint="http://localhost:4318/v1/traces") # Uses OTEL_EXPORTER_OTLP_TRACES_ENDPOINT env var -tracer_provider = TracerProvider(resource=resource) -tracer_provider.add_span_processor(BatchSpanProcessor(trace_exporter)) -trace.set_tracer_provider(tracer_provider) -``` diff --git a/examples/metrics-monitoring/continuous-batching-dashboard.json b/examples/metrics-monitoring/continuous-batching-dashboard.json deleted file mode 100644 index e0a293d06295..000000000000 --- a/examples/metrics-monitoring/continuous-batching-dashboard.json +++ /dev/null @@ -1,974 +0,0 @@ -{ - "annotations": { - "list": [ - { - "builtIn": 1, - "datasource": { - "type": "grafana", - "uid": "-- Grafana --" - }, - "enable": true, - "hide": true, - "iconColor": "rgba(0, 211, 255, 1)", - "name": "Annotations & Alerts", - "target": { - "limit": 100, - "matchAny": false, - "tags": [], - "type": "dashboard" - }, - "type": "dashboard" - } - ] - }, - "editable": true, - "fiscalYearStartMonth": 0, - "graphTooltip": 0, - "id": 2, - "links": [], - "panels": [ - { - "datasource": { - "type": "prometheus", - "uid": "PBFA97CFB590B2093" - }, - "description": "Memory usage of the PagedAttentionCache", - "fieldConfig": { - "defaults": { - "color": { - "mode": "thresholds" - }, - "mappings": [], - "max": 10737418240, - "min": 0, - "thresholds": { - "mode": "absolute", - "steps": [ - { - "color": "green" - }, - { - "color": "yellow", - "value": 5368709120 - }, - { - "color": "red", - "value": 8589934592 - } - ] - }, - "unit": "bytes" - }, - "overrides": [] - }, - "gridPos": { - "h": 8, - "w": 6, - "x": 0, - "y": 0 - }, - "id": 2, - "options": { - "minVizHeight": 75, - "minVizWidth": 75, - "orientation": "auto", - "reduceOptions": { - "calcs": [ - "lastNotNull" - ], - "fields": "", - "values": false - }, - "showThresholdLabels": false, - "showThresholdMarkers": true, - "sizing": "auto" - }, - "pluginVersion": "12.0.0", - "targets": [ - { - "datasource": { - "type": "prometheus", - "uid": "PBFA97CFB590B2093" - }, - "disableTextWrap": false, - "editorMode": "builder", - "expr": "kv_cache_memory_bytes", - "fullMetaSearch": false, - "includeNullMetadata": true, - "legendFormat": "__auto", - "range": true, - "refId": "A", - "useBackend": false - } - ], - "title": "KV Cache Memory Usage", - "transparent": true, - "type": "gauge" - }, - { - "datasource": { - "type": "prometheus", - "uid": "PBFA97CFB590B2093" - }, - "fieldConfig": { - "defaults": { - "color": { - "mode": "thresholds" - }, - "mappings": [], - "thresholds": { - "mode": "absolute", - "steps": [ - { - "color": "dark-blue" - } - ] - } - }, - "overrides": [] - }, - "gridPos": { - "h": 8, - "w": 6, - "x": 6, - "y": 0 - }, - "id": 13, - "options": { - "colorMode": "value", - "graphMode": "area", - "justifyMode": "auto", - "orientation": "auto", - "percentChangeColorMode": "standard", - "reduceOptions": { - "calcs": [ - "lastNotNull" - ], - "fields": "", - "values": false - }, - "showPercentChange": false, - "textMode": "auto", - "wideLayout": true - }, - "pluginVersion": "12.0.0", - "targets": [ - { - "disableTextWrap": false, - "editorMode": "builder", - "expr": "active_requests_count", - "fullMetaSearch": false, - "includeNullMetadata": true, - "legendFormat": "__auto", - "range": true, - "refId": "A", - "useBackend": false - } - ], - "title": "Active Requests", - "transparent": true, - "type": "stat" - }, - { - "datasource": { - "type": "prometheus", - "uid": "PBFA97CFB590B2093" - }, - "fieldConfig": { - "defaults": { - "color": { - "mode": "thresholds" - }, - "mappings": [], - "thresholds": { - "mode": "absolute", - "steps": [ - { - "color": "dark-orange" - } - ] - } - }, - "overrides": [] - }, - "gridPos": { - "h": 8, - "w": 6, - "x": 12, - "y": 0 - }, - "id": 14, - "options": { - "colorMode": "value", - "graphMode": "area", - "justifyMode": "auto", - "orientation": "auto", - "percentChangeColorMode": "standard", - "reduceOptions": { - "calcs": [ - "lastNotNull" - ], - "fields": "", - "values": false - }, - "showPercentChange": false, - "textMode": "auto", - "wideLayout": true - }, - "pluginVersion": "12.0.0", - "targets": [ - { - "disableTextWrap": false, - "editorMode": "builder", - "expr": "waiting_requests_count", - "fullMetaSearch": false, - "includeNullMetadata": true, - "legendFormat": "__auto", - "range": true, - "refId": "A", - "useBackend": false - } - ], - "title": "Waiting Requests", - "transparent": true, - "type": "stat" - }, - { - "datasource": { - "type": "prometheus", - "uid": "PBFA97CFB590B2093" - }, - "description": "Ratio of decode tokens to prefill tokens in a batch", - "fieldConfig": { - "defaults": { - "color": { - "mode": "thresholds" - }, - "mappings": [], - "thresholds": { - "mode": "absolute", - "steps": [ - { - "color": "blue" - } - ] - } - }, - "overrides": [] - }, - "gridPos": { - "h": 8, - "w": 6, - "x": 18, - "y": 0 - }, - "id": 6, - "options": { - "colorMode": "value", - "graphMode": "none", - "justifyMode": "auto", - "orientation": "auto", - "percentChangeColorMode": "standard", - "reduceOptions": { - "calcs": [ - "lastNotNull" - ], - "fields": "", - "values": false - }, - "showPercentChange": false, - "textMode": "auto", - "wideLayout": true - }, - "pluginVersion": "12.0.0", - "targets": [ - { - "datasource": { - "type": "prometheus", - "uid": "PBFA97CFB590B2093" - }, - "disableTextWrap": false, - "editorMode": "builder", - "expr": "decode_prefill_ratio", - "fullMetaSearch": false, - "includeNullMetadata": true, - "legendFormat": "__auto", - "range": true, - "refId": "A", - "useBackend": false - } - ], - "title": "Decode/Prefill Ratio", - "transparent": true, - "type": "stat" - }, - { - "datasource": { - "type": "prometheus", - "uid": "PBFA97CFB590B2093" - }, - "fieldConfig": { - "defaults": { - "color": { - "mode": "palette-classic" - }, - "custom": { - "axisBorderShow": false, - "axisCenteredZero": false, - "axisColorMode": "text", - "axisLabel": "", - "axisPlacement": "auto", - "barAlignment": 0, - "barWidthFactor": 0.6, - "drawStyle": "line", - "fillOpacity": 0, - "gradientMode": "none", - "hideFrom": { - "legend": false, - "tooltip": false, - "viz": false - }, - "insertNulls": false, - "lineInterpolation": "linear", - "lineWidth": 1, - "pointSize": 5, - "scaleDistribution": { - "type": "linear" - }, - "showPoints": "auto", - "spanNulls": false, - "stacking": { - "group": "A", - "mode": "none" - }, - "thresholdsStyle": { - "mode": "off" - } - }, - "mappings": [], - "thresholds": { - "mode": "absolute", - "steps": [ - { - "color": "green" - }, - { - "color": "red", - "value": 80 - } - ] - } - }, - "overrides": [] - }, - "gridPos": { - "h": 8, - "w": 12, - "x": 0, - "y": 8 - }, - "id": 10, - "options": { - "legend": { - "calcs": [], - "displayMode": "list", - "placement": "bottom", - "showLegend": true - }, - "tooltip": { - "hideZeros": false, - "mode": "single", - "sort": "none" - } - }, - "pluginVersion": "12.0.0", - "targets": [ - { - "editorMode": "code", - "expr": "rate(decode_tokens_processed_total[$__rate_interval])", - "legendFormat": "__auto", - "range": true, - "refId": "A" - } - ], - "title": "Decode tokens throupught tok/s", - "type": "timeseries" - }, - { - "datasource": { - "type": "prometheus", - "uid": "PBFA97CFB590B2093" - }, - "fieldConfig": { - "defaults": { - "color": { - "mode": "palette-classic" - }, - "custom": { - "axisBorderShow": false, - "axisCenteredZero": false, - "axisColorMode": "text", - "axisLabel": "", - "axisPlacement": "auto", - "barAlignment": 0, - "barWidthFactor": 0.6, - "drawStyle": "line", - "fillOpacity": 0, - "gradientMode": "none", - "hideFrom": { - "legend": false, - "tooltip": false, - "viz": false - }, - "insertNulls": false, - "lineInterpolation": "linear", - "lineWidth": 1, - "pointSize": 5, - "scaleDistribution": { - "type": "linear" - }, - "showPoints": "auto", - "spanNulls": false, - "stacking": { - "group": "A", - "mode": "none" - }, - "thresholdsStyle": { - "mode": "off" - } - }, - "mappings": [], - "thresholds": { - "mode": "absolute", - "steps": [ - { - "color": "green" - }, - { - "color": "red", - "value": 80 - } - ] - } - }, - "overrides": [] - }, - "gridPos": { - "h": 8, - "w": 12, - "x": 12, - "y": 8 - }, - "id": 11, - "options": { - "legend": { - "calcs": [], - "displayMode": "list", - "placement": "bottom", - "showLegend": true - }, - "tooltip": { - "hideZeros": false, - "mode": "single", - "sort": "none" - } - }, - "pluginVersion": "12.0.0", - "targets": [ - { - "editorMode": "code", - "expr": "rate(prefill_tokens_processed_total[$__rate_interval])", - "legendFormat": "__auto", - "range": true, - "refId": "A" - } - ], - "title": "Prefill rate tok/s", - "type": "timeseries" - }, - { - "datasource": { - "type": "prometheus", - "uid": "PBFA97CFB590B2093" - }, - "fieldConfig": { - "defaults": { - "color": { - "mode": "palette-classic" - }, - "custom": { - "axisBorderShow": false, - "axisCenteredZero": false, - "axisColorMode": "text", - "axisLabel": "", - "axisPlacement": "auto", - "barAlignment": 0, - "barWidthFactor": 0.6, - "drawStyle": "line", - "fillOpacity": 0, - "gradientMode": "none", - "hideFrom": { - "legend": false, - "tooltip": false, - "viz": false - }, - "insertNulls": false, - "lineInterpolation": "linear", - "lineWidth": 1, - "pointSize": 5, - "scaleDistribution": { - "type": "linear" - }, - "showPoints": "auto", - "spanNulls": false, - "stacking": { - "group": "A", - "mode": "none" - }, - "thresholdsStyle": { - "mode": "off" - } - }, - "mappings": [], - "thresholds": { - "mode": "absolute", - "steps": [ - { - "color": "green" - }, - { - "color": "red", - "value": 80 - } - ] - } - }, - "overrides": [] - }, - "gridPos": { - "h": 8, - "w": 12, - "x": 0, - "y": 16 - }, - "id": 9, - "options": { - "legend": { - "calcs": [], - "displayMode": "list", - "placement": "bottom", - "showLegend": true - }, - "tooltip": { - "hideZeros": false, - "mode": "single", - "sort": "none" - } - }, - "pluginVersion": "12.0.0", - "targets": [ - { - "editorMode": "code", - "expr": "histogram_quantile(0.95, sum by(le) (rate(batch_fill_percentage_percent_bucket[$__rate_interval])))", - "legendFormat": "p95", - "range": true, - "refId": "A" - }, - { - "datasource": { - "type": "prometheus", - "uid": "PBFA97CFB590B2093" - }, - "editorMode": "code", - "expr": "histogram_quantile(0.99, sum by(le) (rate(batch_fill_percentage_percent_bucket[$__rate_interval])))", - "hide": false, - "instant": false, - "legendFormat": "p99", - "range": true, - "refId": "B" - }, - { - "datasource": { - "type": "prometheus", - "uid": "PBFA97CFB590B2093" - }, - "editorMode": "code", - "expr": "histogram_quantile(0.5, sum by(le) (rate(batch_fill_percentage_percent_bucket[$__rate_interval])))", - "hide": false, - "instant": false, - "legendFormat": "p50", - "range": true, - "refId": "C" - } - ], - "title": "Batch fill percentage percentiles", - "type": "timeseries" - }, - { - "datasource": { - "type": "prometheus", - "uid": "PBFA97CFB590B2093" - }, - "description": "KV Cache Memory Usage Over Time", - "fieldConfig": { - "defaults": { - "color": { - "mode": "palette-classic" - }, - "custom": { - "axisBorderShow": false, - "axisCenteredZero": false, - "axisColorMode": "text", - "axisLabel": "", - "axisPlacement": "auto", - "barAlignment": 0, - "barWidthFactor": 0.6, - "drawStyle": "line", - "fillOpacity": 20, - "gradientMode": "none", - "hideFrom": { - "legend": false, - "tooltip": false, - "viz": false - }, - "insertNulls": false, - "lineInterpolation": "linear", - "lineWidth": 2, - "pointSize": 5, - "scaleDistribution": { - "type": "linear" - }, - "showPoints": "auto", - "spanNulls": false, - "stacking": { - "group": "A", - "mode": "none" - }, - "thresholdsStyle": { - "mode": "off" - } - }, - "mappings": [], - "thresholds": { - "mode": "absolute", - "steps": [ - { - "color": "green" - }, - { - "color": "red", - "value": 80 - } - ] - }, - "unit": "bytes" - }, - "overrides": [] - }, - "gridPos": { - "h": 8, - "w": 12, - "x": 12, - "y": 16 - }, - "id": 4, - "options": { - "legend": { - "calcs": [], - "displayMode": "list", - "placement": "bottom", - "showLegend": true - }, - "tooltip": { - "hideZeros": false, - "mode": "single", - "sort": "none" - } - }, - "pluginVersion": "12.0.0", - "targets": [ - { - "datasource": { - "type": "prometheus", - "uid": "PBFA97CFB590B2093" - }, - "disableTextWrap": false, - "editorMode": "builder", - "expr": "kv_cache_memory_bytes", - "fullMetaSearch": false, - "includeNullMetadata": true, - "legendFormat": "Used memory", - "range": true, - "refId": "A", - "useBackend": false - }, - { - "datasource": { - "type": "prometheus", - "uid": "PBFA97CFB590B2093" - }, - "disableTextWrap": false, - "editorMode": "builder", - "expr": "kv_cache_free_memory_bytes", - "fullMetaSearch": false, - "hide": false, - "includeNullMetadata": true, - "instant": false, - "legendFormat": "free memory", - "range": true, - "refId": "B", - "useBackend": false - } - ], - "title": "KV Cache Memory Usage Over Time", - "type": "timeseries" - }, - { - "datasource": { - "type": "prometheus", - "uid": "PBFA97CFB590B2093" - }, - "fieldConfig": { - "defaults": { - "color": { - "mode": "thresholds" - }, - "mappings": [], - "thresholds": { - "mode": "absolute", - "steps": [ - { - "color": "green" - }, - { - "color": "red", - "value": 80 - } - ] - }, - "unit": "ms" - }, - "overrides": [] - }, - "gridPos": { - "h": 8, - "w": 12, - "x": 0, - "y": 24 - }, - "id": 8, - "options": { - "displayMode": "gradient", - "legend": { - "calcs": [], - "displayMode": "list", - "placement": "bottom", - "showLegend": false - }, - "maxVizHeight": 300, - "minVizHeight": 10, - "minVizWidth": 0, - "namePlacement": "auto", - "orientation": "auto", - "reduceOptions": { - "calcs": [ - "lastNotNull" - ], - "fields": "", - "values": false - }, - "showUnfilled": true, - "sizing": "auto", - "valueMode": "color" - }, - "pluginVersion": "12.0.0", - "targets": [ - { - "datasource": { - "type": "prometheus", - "uid": "PBFA97CFB590B2093" - }, - "disableTextWrap": false, - "editorMode": "builder", - "expr": "histogram_quantile(0.95, sum by(le) (rate(ttft_milliseconds_bucket[$__rate_interval])))", - "fullMetaSearch": false, - "includeNullMetadata": true, - "legendFormat": "p95", - "range": true, - "refId": "A", - "useBackend": false - }, - { - "datasource": { - "type": "prometheus", - "uid": "PBFA97CFB590B2093" - }, - "disableTextWrap": false, - "editorMode": "builder", - "expr": "histogram_quantile(0.5, sum by(le) (rate(ttft_milliseconds_bucket[$__rate_interval])))", - "fullMetaSearch": false, - "hide": false, - "includeNullMetadata": true, - "legendFormat": "p50", - "range": true, - "refId": "B", - "useBackend": false - }, - { - "datasource": { - "type": "prometheus", - "uid": "PBFA97CFB590B2093" - }, - "disableTextWrap": false, - "editorMode": "builder", - "expr": "histogram_quantile(0.99, sum by(le) (rate(ttft_milliseconds_bucket[$__rate_interval])))", - "fullMetaSearch": false, - "hide": false, - "includeNullMetadata": false, - "instant": false, - "legendFormat": "p99", - "range": true, - "refId": "C", - "useBackend": false - } - ], - "title": "Time to First Token (TTFT)", - "type": "bargauge" - }, - { - "datasource": { - "type": "prometheus", - "uid": "PBFA97CFB590B2093" - }, - "fieldConfig": { - "defaults": { - "color": { - "mode": "palette-classic" - }, - "custom": { - "axisBorderShow": false, - "axisCenteredZero": false, - "axisColorMode": "text", - "axisLabel": "", - "axisPlacement": "auto", - "barAlignment": 0, - "barWidthFactor": 0.6, - "drawStyle": "line", - "fillOpacity": 0, - "gradientMode": "none", - "hideFrom": { - "legend": false, - "tooltip": false, - "viz": false - }, - "insertNulls": false, - "lineInterpolation": "linear", - "lineWidth": 1, - "pointSize": 5, - "scaleDistribution": { - "type": "linear" - }, - "showPoints": "auto", - "spanNulls": false, - "stacking": { - "group": "A", - "mode": "none" - }, - "thresholdsStyle": { - "mode": "off" - } - }, - "mappings": [], - "thresholds": { - "mode": "absolute", - "steps": [ - { - "color": "green" - }, - { - "color": "red", - "value": 80 - } - ] - }, - "unit": "ms" - }, - "overrides": [] - }, - "gridPos": { - "h": 8, - "w": 12, - "x": 12, - "y": 24 - }, - "id": 12, - "options": { - "legend": { - "calcs": [], - "displayMode": "list", - "placement": "bottom", - "showLegend": true - }, - "tooltip": { - "hideZeros": false, - "mode": "single", - "sort": "none" - } - }, - "pluginVersion": "12.0.0", - "targets": [ - { - "editorMode": "code", - "expr": "histogram_quantile(0.5, sum by(le) (rate(request_latency_milliseconds_bucket[$__rate_interval])))", - "legendFormat": "p50", - "range": true, - "refId": "A" - }, - { - "datasource": { - "type": "prometheus", - "uid": "PBFA97CFB590B2093" - }, - "editorMode": "code", - "expr": "histogram_quantile(0.95, sum by(le) (rate(request_latency_milliseconds_bucket[$__rate_interval])))", - "hide": false, - "instant": false, - "legendFormat": "p95", - "range": true, - "refId": "B" - }, - { - "datasource": { - "type": "prometheus", - "uid": "PBFA97CFB590B2093" - }, - "editorMode": "code", - "expr": "histogram_quantile(0.99, sum by(le) (rate(request_latency_milliseconds_bucket[$__rate_interval])))", - "hide": false, - "instant": false, - "legendFormat": "p99", - "range": true, - "refId": "C" - } - ], - "title": "Request latency percentiles", - "type": "timeseries" - } - ], - "preload": false, - "refresh": "5s", - "schemaVersion": 41, - "tags": [], - "templating": { - "list": [] - }, - "time": { - "from": "now-15m", - "to": "now" - }, - "timepicker": {}, - "timezone": "", - "title": "Transformers Continuous Batching Metrics", - "uid": "Lw6CTvVSz", - "version": 5 -} \ No newline at end of file diff --git a/examples/metrics-monitoring/docker-compose.yml b/examples/metrics-monitoring/docker-compose.yml deleted file mode 100644 index 936f4a894ced..000000000000 --- a/examples/metrics-monitoring/docker-compose.yml +++ /dev/null @@ -1,55 +0,0 @@ -services: - memcached: - image: memcached:1.6.29 - container_name: memcached - ports: - - "11211:11211" - environment: - - MEMCACHED_MAX_MEMORY=64m # Set the maximum memory usage - - MEMCACHED_THREADS=4 # Number of threads to use - - prometheus: - image: prom/prometheus:latest - command: - - "--config.file=/etc/prometheus/prometheus.yml" - - --web.enable-otlp-receiver # Enable OTLP receiver - - --web.enable-remote-write-receiver - - --enable-feature=exemplar-storage - - --enable-feature=native-histograms - volumes: - - ./prometheus.yml:/etc/prometheus/prometheus.yml - ports: - - "9090:9090" - - tempo: - image: grafana/tempo:latest - command: [ "-config.file=/etc/tempo.yaml" ] - volumes: - - ./tempo.yaml:/etc/tempo.yaml - ports: - - "14268:14268" # jaeger ingest - - "3200:3200" # tempo - - "9095:9095" # tempo grpc - - "4317:4317" # otlp grpc - - "4318:4318" # otlp http - - "9411:9411" # zipkin - depends_on: - - memcached - - grafana: - image: grafana/grafana:latest - volumes: - - ./continuous-batching-dashboard.json:/etc/grafana/provisioning/dashboards/continuous-batching-dashboard.json - - ./grafana-dashboard.yaml:/etc/grafana/provisioning/dashboards/grafana-dashboard.yaml - - ./grafana-datasources.yaml:/etc/grafana/provisioning/datasources/datasources.yaml - environment: - - GF_AUTH_ANONYMOUS_ENABLED=true - - GF_AUTH_ANONYMOUS_ORG_ROLE=Admin - - GF_AUTH_DISABLE_LOGIN_FORM=true - - GF_FEATURE_TOGGLES_ENABLE=traceqlEditor metricsSummary - - GF_INSTALL_PLUGINS=https://storage.googleapis.com/integration-artifacts/grafana-exploretraces-app/grafana-exploretraces-app-latest.zip;grafana-traces-app - ports: - - "3000:3000" - depends_on: - - prometheus - - tempo diff --git a/examples/metrics-monitoring/grafana-dashboard.yaml b/examples/metrics-monitoring/grafana-dashboard.yaml deleted file mode 100644 index 6dd396d00e16..000000000000 --- a/examples/metrics-monitoring/grafana-dashboard.yaml +++ /dev/null @@ -1,11 +0,0 @@ -apiVersion: 1 - -providers: - - name: 'Transformers Dashboards' - orgId: 1 - folder: 'Transformers' - type: file - disableDeletion: false - editable: true - options: - path: /etc/grafana/provisioning/dashboards diff --git a/examples/metrics-monitoring/grafana-datasources.yaml b/examples/metrics-monitoring/grafana-datasources.yaml deleted file mode 100644 index e3f2e78becea..000000000000 --- a/examples/metrics-monitoring/grafana-datasources.yaml +++ /dev/null @@ -1,14 +0,0 @@ -apiVersion: 1 - -datasources: - - name: Prometheus - type: prometheus - access: proxy - url: http://prometheus:9090 - isDefault: true - - - name: Tempo - type: tempo - access: proxy - url: http://tempo:3200 - uid: tempo diff --git a/examples/metrics-monitoring/metrics_example.py b/examples/metrics-monitoring/metrics_example.py deleted file mode 100644 index df3551b68d48..000000000000 --- a/examples/metrics-monitoring/metrics_example.py +++ /dev/null @@ -1,48 +0,0 @@ -# Example usage of the trace and attach_tracer decorators - -from transformers.utils.metrics import attach_tracer, traced - - -@attach_tracer() -class ExampleClass: - def __init__(self, name): - # The attach_tracer decorator has already created self.tracer for us - self.name = name - - @traced # This method will use the tracer from the class instance - def process_data(self, data): - # This method is traced and can use self.tracer - return f"Processed {data} with {self.name}" - - @traced(span_name="custom_operation") # With custom span name - def special_operation(self, value): - # Also traced, with a custom span name - return value * 2 - - @traced( - additional_attributes=[ - ("name", "object.name", lambda x: x.upper()), # Using a transform function - ("name", "object.fixed_value", "static_value"), # Using a fixed value - ] - ) - def operation_with_attributes(self): - # This will add the specified attributes to the span - return "Operation completed" - - -# For functions without a class, the traced decorator still works -@traced -def standalone_function(arg1, arg2): - # For functions, a tracer is created based on the module name - return arg1 + arg2 - - -# Usage: -if __name__ == "__main__": - # With OpenTelemetry configured, these will produce traces - example = ExampleClass("test_object") - example.process_data("sample") - example.special_operation(42) - example.operation_with_attributes() - - result = standalone_function(1, 2) diff --git a/examples/metrics-monitoring/prometheus.yml b/examples/metrics-monitoring/prometheus.yml deleted file mode 100644 index 6c578ad89f51..000000000000 --- a/examples/metrics-monitoring/prometheus.yml +++ /dev/null @@ -1,3 +0,0 @@ -global: - scrape_interval: 15s - diff --git a/examples/metrics-monitoring/tempo.yaml b/examples/metrics-monitoring/tempo.yaml deleted file mode 100644 index 353b83e1cccf..000000000000 --- a/examples/metrics-monitoring/tempo.yaml +++ /dev/null @@ -1,90 +0,0 @@ -stream_over_http_enabled: true -server: - http_listen_port: 3200 - log_level: info - - -cache: - background: - writeback_goroutines: 5 - caches: - - roles: - - frontend-search - memcached: - addresses: dns+memcached:11211 - -query_frontend: - search: - duration_slo: 5s - throughput_bytes_slo: 1.073741824e+09 - metadata_slo: - duration_slo: 5s - throughput_bytes_slo: 1.073741824e+09 - trace_by_id: - duration_slo: 100ms - metrics: - max_duration: 200h # maximum duration of a metrics query, increase for local setups - query_backend_after: 5m - duration_slo: 5s - throughput_bytes_slo: 1.073741824e+09 - -distributor: - receivers: # this configuration will listen on all ports and protocols that tempo is capable of. - jaeger: # the receives all come from the OpenTelemetry collector. more configuration information can - protocols: # be found there: https://github.com/open-telemetry/opentelemetry-collector/tree/main/receiver - thrift_http: # - endpoint: "tempo:14268" # for a production deployment you should only enable the receivers you need! - grpc: - endpoint: "tempo:14250" - thrift_binary: - endpoint: "tempo:6832" - thrift_compact: - endpoint: "tempo:6831" - zipkin: - endpoint: "tempo:9411" - otlp: - protocols: - grpc: - endpoint: "tempo:4317" - http: - endpoint: "tempo:4318" - opencensus: - endpoint: "tempo:55678" - -ingester: - max_block_duration: 5m # cut the headblock when this much time passes. this is being set for demo purposes and should probably be left alone normally - -compactor: - compaction: - block_retention: 720h # overall Tempo trace retention. set for demo purposes - -metrics_generator: - registry: - external_labels: - source: tempo - cluster: docker-compose - storage: - path: /var/tempo/generator/wal - remote_write: - - url: http://prometheus:9090/api/v1/write - send_exemplars: true - traces_storage: - path: /var/tempo/generator/traces - processor: - local_blocks: - filter_server_spans: false - flush_to_storage: true - -storage: - trace: - backend: local # backend configuration to use - wal: - path: /var/tempo/wal # where to store the wal locally - local: - path: /var/tempo/blocks - -overrides: - defaults: - metrics_generator: - processors: [service-graphs, span-metrics, local-blocks] # enables metrics generator - generate_native_histograms: both diff --git a/examples/pytorch/continuous_batching.py b/examples/pytorch/continuous_batching.py index 1147d18e83dc..33682f8ec3dc 100644 --- a/examples/pytorch/continuous_batching.py +++ b/examples/pytorch/continuous_batching.py @@ -51,38 +51,6 @@ def generate_without_cb( return decoded_outputs -def maybe_setup_metrics(use_metrics: bool) -> None: - if not use_metrics: - return - try: - from opentelemetry import metrics, trace - from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter - from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter - from opentelemetry.sdk.metrics import MeterProvider - from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader - from opentelemetry.sdk.resources import Resource - from opentelemetry.sdk.trace import TracerProvider - from opentelemetry.sdk.trace.export import BatchSpanProcessor - - resource = Resource.create({"service.name": "transformers"}) - metrics_exporter = PeriodicExportingMetricReader( - OTLPMetricExporter( - endpoint="http://localhost:9090/api/v1/otlp/v1/metrics" - ), # Uses OTEL_EXPORTER_OTLP_METRICS_ENDPOINT env var - export_interval_millis=1000, - ) - meter_provider = MeterProvider(resource=resource, metric_readers=[metrics_exporter]) - metrics.set_meter_provider(meter_provider) - trace_exporter = OTLPSpanExporter( - endpoint="http://localhost:4318/v1/traces" - ) # Uses OTEL_EXPORTER_OTLP_TRACES_ENDPOINT env var - tracer_provider = TracerProvider(resource=resource) - tracer_provider.add_span_processor(BatchSpanProcessor(trace_exporter)) - trace.set_tracer_provider(tracer_provider) - except Exception as e: - print(f"Error setting up metrics: {e}") - - def batch_generate( model: AutoModelForCausalLM, simple_batch_inputs: list, @@ -202,7 +170,6 @@ def batch_generate( parser.add_argument("--add-prefix", action="store_true", help="Add a prefix to the samples") parser.add_argument("--compare", action="store_true", help="Compare CB generation with classic generate") parser.add_argument("--profile", type=str, default=None) - parser.add_argument("--metrics", action="store_true") parser.add_argument("--seed", type=int, default=None, help="Random seed") # Display parameters @@ -233,7 +200,6 @@ def batch_generate( # Set up diagnostics logger.setLevel(args.log_level.upper()) - maybe_setup_metrics(args.metrics) # Set up performance if args.matmul_precision != "none": diff --git a/setup.py b/setup.py index ac6fe2f46876..cd2bed904f5b 100644 --- a/setup.py +++ b/setup.py @@ -162,9 +162,6 @@ "libcst", "rich", "ray[tune]>=2.7.0", - "opentelemetry-api", - "opentelemetry-exporter-otlp", - "opentelemetry-sdk", ] # This is a lookup table with items like: {"tokenizers": "tokenizers==0.9.4", "packaging": "packaging"}, i.e. @@ -211,8 +208,6 @@ def deps_list(*pkgs): extras["ja"] = deps_list("fugashi", "ipadic", "unidic_lite", "unidic", "rhoknp") if PYTHON_MINOR_VERSION < 14: extras["ja"] += deps_list("sudachipy", "sudachidict_core") -# OpenTelemetry dependencies for metrics collection in continuous batching -extras["open-telemetry"] = deps_list("opentelemetry-api", "opentelemetry-exporter-otlp", "opentelemetry-sdk") extras["testing"] = ( deps_list( diff --git a/src/transformers/dependency_versions_table.py b/src/transformers/dependency_versions_table.py index 1a721ca2a82a..39d7900c5f2e 100644 --- a/src/transformers/dependency_versions_table.py +++ b/src/transformers/dependency_versions_table.py @@ -89,7 +89,4 @@ "libcst": "libcst", "rich": "rich", "ray[tune]": "ray[tune]>=2.7.0", - "opentelemetry-api": "opentelemetry-api", - "opentelemetry-exporter-otlp": "opentelemetry-exporter-otlp", - "opentelemetry-sdk": "opentelemetry-sdk", } diff --git a/src/transformers/generation/continuous_batching/cache.py b/src/transformers/generation/continuous_batching/cache.py index 32f4cc7caafb..9d509099e859 100644 --- a/src/transformers/generation/continuous_batching/cache.py +++ b/src/transformers/generation/continuous_batching/cache.py @@ -20,7 +20,6 @@ from ...configuration_utils import PreTrainedConfig from ...generation.configuration_utils import ContinuousBatchingConfig from ...utils.generic import is_flash_attention_requested -from ...utils.metrics import attach_tracer, traced from .cache_manager import BlockManager, CacheAllocator, FullAttentionCacheAllocator, SlidingAttentionCacheAllocator from .distributed import DistributedHelper from .initialization import resolve_max_memory_percent @@ -60,7 +59,6 @@ def group_layers_by_attn_type(config: PreTrainedConfig) -> tuple[list[list[int]] return layer_groups, group_types -@attach_tracer() class PagedAttentionCache: """ Manages the cache for a paged attention mechanism, inspired by VLLM's hybrid allocator. The cache relies on making @@ -312,7 +310,6 @@ def will_allocation_be_successful(self, num_requested_blocks: int, allocated_blo needed_blocks += min(blocks_left, num_requested_blocks) * self.num_sliding_attention_groups return needed_blocks <= self.get_num_free_blocks() - @traced def allocate_blocks(self, n_blocks: int, request_id: str, allocated_blocks: int) -> int | None: """Allocate cache blocks across all layer groups for a given request. Actual allocation is done by the cache managers, and this method only returns the maximum number of blocks actually allocated across all managers.""" @@ -328,7 +325,6 @@ def allocate_blocks(self, n_blocks: int, request_id: str, allocated_blocks: int) max_allocated = max(max_allocated, num_allocated_blocks) return max_allocated - @traced def free_blocks(self, request_id: str) -> None: """Free all allocated cache blocks for a given request across all layer groups. Actual deallocation is done by the cache managers.""" @@ -339,7 +335,6 @@ def get_num_free_blocks(self) -> int: """Get the current number of unallocated blocks available for new requests.""" return self._block_manager.num_free_blocks - @traced def extend_read_and_write_indices( self, request_id: str, @@ -366,7 +361,6 @@ def fill_block_table( for i, cm in enumerate(self.group_cache_managers): cm.fill_block_table(request_id, past_length, query_length, block_table[i]) - @traced def get_seqlens_k(self, past_length: int, query_length: int) -> dict[str, int]: """Retrieve the key sequence length for the given request_id across all layer types. Returns a dictionary of layer types to their corresponding key sequence lengths.""" @@ -378,7 +372,6 @@ def get_seqlens_k(self, past_length: int, query_length: int) -> dict[str, int]: # NOTE: when we add more attention types / different sliding windows, we can go back to looping over CMs return seqlens_k - @traced def update( self, key_states: torch.Tensor, # shape [1, num_kv_heads, seqlen_kv, head_dim] diff --git a/src/transformers/generation/continuous_batching/continuous_api.py b/src/transformers/generation/continuous_batching/continuous_api.py index e7f276bb56fb..703d95b4f459 100644 --- a/src/transformers/generation/continuous_batching/continuous_api.py +++ b/src/transformers/generation/continuous_batching/continuous_api.py @@ -31,7 +31,6 @@ from ...configuration_utils import PretrainedConfig from ...generation.configuration_utils import ContinuousBatchingConfig, GenerationConfig from ...utils.logging import logging -from ...utils.metrics import ContinuousBatchProcessorMetrics, attach_tracer, traced from ..logits_process import LogitsProcessorList from .cache import PagedAttentionCache from .cb_logits_processors import ContinuousBatchingLogitsProcessorList @@ -130,7 +129,6 @@ def _run_batch(batch=callbacks): # Continuous Batch Processor (Internal Logic) -@attach_tracer() class ContinuousBatchProcessor: inputs_and_outputs: ContinuousBatchingIOs | ContinuousBatchingAsyncIOs scheduler: Scheduler @@ -192,9 +190,7 @@ def __init__( # Retrieve the size of the sliding window if there is one self.sliding_window = 1 if getattr(config, "sliding_window", None) is None else config.sliding_window - # Set up metrics collector self.max_batch_tokens = cache.max_batch_tokens - self.metrics = ContinuousBatchProcessorMetrics(cache.max_batch_tokens) # Setup inputs and outputs use_cuda_graph_varlen, _ = self.cb_config.cuda_graph_booleans @@ -255,10 +251,8 @@ def reset(self) -> None: self.scheduler.reset() self.inputs_and_outputs.reset() self.cache.free_all_requests() - self.metrics = ContinuousBatchProcessorMetrics(self.cache.max_batch_tokens) self.driver_stopped = False - @traced def _get_new_requests(self) -> bool: """Pull new requests and cancellations from the queues and apply them to the scheduler. In the context of TP, only the TP driver of the TP group does this, and broadcasts the new_states / cancellations to other TP ranks. @@ -296,7 +290,6 @@ def _get_new_requests(self) -> bool: self.scheduler.set_request_cancellation(request_id) return False - @traced def _handle_request_error(self, error: Exception, state: RequestState) -> None: """Handle general request processing error.""" state.status = RequestStatus.FAILED @@ -308,10 +301,8 @@ def _handle_request_error(self, error: Exception, state: RequestState) -> None: else: state.generated_tokens = [] - self.metrics.record_request_completion(state.created_time, state.request_id) self.output_router.deliver(state.to_generation_output()) - @traced def prepare_next_batch(self) -> bool: """Prepare tensors and metadata for the next model forward pass. Returns True if there are requests to process, False otherwise.""" @@ -326,7 +317,6 @@ def prepare_next_batch(self) -> bool: self.offloading_manager.free_request_cpu_cache(state) if not self.scheduler.has_pending_requests(): return False - self.metrics.record_queue_metrics(len(self.scheduler.active_requests), len(self.scheduler.waiting_requests)) # Schedule the next batch of requests requests_in_batch, use_decode_fast_path, num_q_tokens, max_kv_read = self.scheduler.schedule_batch( @@ -347,7 +337,6 @@ def prepare_next_batch(self) -> bool: self.offloading_manager.restore_scheduled_requests(requests_in_batch) # Otherwise, we can continue with the non-empty batch and log in the dimensions before padding - self.metrics.record_batch_metrics(requests_in_batch) logger.debug( f"Scheduled: {len(requests_in_batch)}, Waiting: {len(self.scheduler.waiting_requests)}, " f"Active: {len(self.scheduler.active_requests)}. cum Q: {num_q_tokens}. " @@ -360,10 +349,8 @@ def prepare_next_batch(self) -> bool: self.inputs_and_outputs.prepare_batch_tensors( requests_in_batch, self.logit_processor, use_decode_fast_path, num_q_tokens, max_kv_read ) - self.metrics.record_kv_cache_memory_metrics(self.cache) return True - @traced def update_batch(self) -> None: """Update request states based on generated tokens.""" requests_in_batch, new_tokens, logprobs = self.inputs_and_outputs.prepare_batch_update() @@ -383,7 +370,6 @@ def update_batch(self) -> None: if future_state.has_new_token: # If there is just one temporary token, it means prefill just ended if state.generated_len() == 0: - self.metrics.record_ttft_metric(state.created_time, state.request_id) state.status = RequestStatus.DECODING token = new_tokens[current_logits_index] @@ -395,7 +381,6 @@ def update_batch(self) -> None: # We mark the completed blocks as such self.cache.mark_shareable_blocks_as_complete(state, future_state.complete_blocks) if is_finished: - self.metrics.record_request_completion(state.created_time, state.request_id) self.scheduler.finish_request(state.request_id) self.scheduler.block_new_requests = False if state.streaming or state.status == RequestStatus.FINISHED: @@ -433,12 +418,10 @@ def update_batch(self) -> None: with maybe_stream: self.cache.copy_cache(copy_source, copy_destination) - @traced def has_pending_requests(self) -> bool: """Check if there are any active or waiting requests.""" return self.scheduler.has_pending_requests() - @traced def handle_batch_error(self, error): """Handle errors during batch processing.""" failed_future_states = self.inputs_and_outputs.prepare_batch_update()[0] @@ -446,7 +429,6 @@ def handle_batch_error(self, error): self._handle_request_error(error, future_state.state) self.scheduler.finish_request(future_state.state.request_id) - @traced def fail_all_requests(self, error: Exception) -> None: """Fail all active requests with the given error.""" @@ -464,7 +446,6 @@ def fail_all_requests(self, error: Exception) -> None: # Clear the ordering queue self.scheduler.waiting_requests_order.clear() - @traced @torch.no_grad() def _generation_step(self, model: nn.Module) -> None: """Perform a single generation step.""" @@ -489,7 +470,6 @@ def warmup(self, model: nn.Module) -> None: # Manager Class (User Interface) -@attach_tracer() class ContinuousBatchingManager: """Manager for handling continuous batching of generation requests. It provides a user interface for submitting generation requests, retrieving results, and managing the background generation thread. This class should not be @@ -567,7 +547,6 @@ def switch_to_paged_attn(self, model: ProtoPretrainedModel) -> None: self._original_attn_impl = model.config._attn_implementation model.set_attn_implementation(f"paged|{model.config._attn_implementation}") - @traced def start(self) -> None: """Start the background generation thread.""" if self.is_running(): @@ -810,7 +789,6 @@ def _auto_cleanup(result): with self.output_router._lock: self.output_router.result_handlers[request_id] = (_auto_cleanup, loop) - @traced def _generation_step(self) -> None: """Perform a single generation step. This is mostly cuda graphed""" if self.batch_processor is None: @@ -899,7 +877,6 @@ def _run_generation_loop(self) -> None: finally: logger.info("Generation loop finished.") - @traced(span_name="generation_loop") def _inner_generation_loop(self, batch_processor: ContinuousBatchProcessor) -> None: # Loop body ends if there is no requests in the batch if not batch_processor.prepare_next_batch(): @@ -910,7 +887,6 @@ def _inner_generation_loop(self, batch_processor: ContinuousBatchProcessor) -> N self._generation_step() batch_processor.update_batch() - @traced def _handle_critical_error(self, error: Exception, batch_processor: ContinuousBatchProcessor | None) -> None: """Handle critical errors that terminate the generation loop.""" # Record so callers (e.g. the serving layer) can fail fast on subsequent requests @@ -1045,7 +1021,6 @@ def continuous_batching_context_manager( manager.stop(block=block, timeout=timeout, keep_for_next_session=persistent_manager) # TODO: support streaming - @traced @torch.no_grad() def generate_batch( self, diff --git a/src/transformers/generation/continuous_batching/input_outputs.py b/src/transformers/generation/continuous_batching/input_outputs.py index a180eed31658..9a6615ce8e25 100644 --- a/src/transformers/generation/continuous_batching/input_outputs.py +++ b/src/transformers/generation/continuous_batching/input_outputs.py @@ -21,7 +21,6 @@ from transformers.configuration_utils import PretrainedConfig from ...utils import get_available_devices -from ...utils.metrics import traced from .cache import PagedAttentionCache from .cb_logits_processors import ContinuousBatchingLogitsProcessorList from .requests import TMP_TOKEN_ID, FutureRequestState, logger @@ -140,7 +139,6 @@ def __init__( self._reset_static_tensors(full_reset=True) self.compute_stream = torch.cuda.Stream(device=self.device) if device.type == "cuda" else None - @traced(standalone=True) def _setup_static_tensors(self, logit_processor: ContinuousBatchingLogitsProcessorList) -> None: """Allocates static tensors for generation inputs and outputs. This is called only once at init time, to avoid repeated allocations and enable CUDA graphs. All tensors are allocated with maximum possible sizes. @@ -261,7 +259,6 @@ def _transfer_inputs( for layer_type in self.attention_mask.keys(): other.attention_mask[layer_type].copy_(self.attention_mask[layer_type], non_blocking=non_blocking) - @traced @torch.no_grad() def _reset_static_tensors(self, full_reset: bool = False) -> None: """Reset static tensors for the next batch. For efficiency, this only resets the portions of tensors that were @@ -337,7 +334,6 @@ def prepare_batch_update(self) -> tuple[list[FutureRequestState], list[int], lis logprobs = None return requests_in_batch, new_tokens, logprobs - @traced def prepare_batch_tensors( self, requests_in_batch: list[FutureRequestState], diff --git a/src/transformers/generation/continuous_batching/requests.py b/src/transformers/generation/continuous_batching/requests.py index 816c5e0f14d3..baab3345881a 100644 --- a/src/transformers/generation/continuous_batching/requests.py +++ b/src/transformers/generation/continuous_batching/requests.py @@ -20,7 +20,6 @@ from ...utils import is_psutil_available, is_torch_xpu_available from ...utils.logging import logging -from ...utils.metrics import traced if is_psutil_available(): @@ -235,7 +234,6 @@ def generated_len(self) -> int: return len(self.generated_tokens) # TODO: this logic seems one token off, check it out - @traced def update_and_check_completion(self, token_id: int, logprob: float | None) -> bool: """Update the request with a newly generated token (and optional log probability of the token) and check for completion. Returns True if the request is now complete, False otherwise.""" diff --git a/src/transformers/generation/continuous_batching/scheduler.py b/src/transformers/generation/continuous_batching/scheduler.py index fb3d46665feb..f2339a0951c5 100644 --- a/src/transformers/generation/continuous_batching/scheduler.py +++ b/src/transformers/generation/continuous_batching/scheduler.py @@ -15,7 +15,6 @@ from abc import ABC, abstractmethod from collections import deque -from ...utils.metrics import attach_tracer, traced from .cache import PagedAttentionCache from .requests import FutureRequestState, RequestState, RequestStatus, logger @@ -45,7 +44,6 @@ def reset(self) -> None: self._requests_to_fork: list[RequestState] = [] self.block_new_requests = False - @traced def add_waiting_request(self, state: RequestState): """Adds a request to the waiting list.""" self.waiting_requests[state.request_id] = state @@ -62,12 +60,10 @@ def schedule_batch( Returns the list of scheduled requests in their "FutureRequestState" form, a boolean indicating if the decode fast path can be used, the total number of query tokens and the maximum number of kv tokens read.""" - @traced def has_pending_requests(self) -> bool: """Checks if there are requests ready to be processed.""" return bool(len(self.active_requests) or len(self.waiting_requests)) - @traced def finish_request(self, request_id: str) -> None: """Completes processing of a request and frees its allocated cache blocks. This method is called when a request has finished generation or encountered an error. @@ -83,20 +79,17 @@ def pop_request_to_evict(self) -> tuple[str, RequestState]: request_id = next(iter(self.active_requests)) return request_id, self.active_requests.pop(request_id) - @traced def get_active_request_static_outputs(self, request_id: str) -> list[int]: """Gets generated tokens for an active request.""" if request_id in self.active_requests: return self.active_requests[request_id].generated_tokens return [] - @traced def set_request_cancellation(self, request_id: str): """Marks a request for cancellation.""" with self._cancellation_lock: self._requests_to_cancel.add(request_id) - @traced def clear_cancelled_requests(self) -> list[RequestState]: """Remove all cancelled requests from active and waiting queues.""" cancelled_states = [] @@ -114,14 +107,12 @@ def clear_cancelled_requests(self) -> list[RequestState]: self._requests_to_cancel = set() return cancelled_states - @traced def request_is_cancelled(self, request_id: str) -> bool: """Checks if a request has been cancelled or removed.""" return request_id in self._requests_to_cancel or ( request_id not in self.active_requests and request_id not in self.waiting_requests ) - @traced def _allocate_blocks_if_needed(self, state: RequestState, len_next_tokens: int) -> bool: """Allocate additional cache blocks for a request if the currently allocated blocks are insufficient to accommodate the next tokens. It calculates how many blocks are needed based on the request's current @@ -324,7 +315,6 @@ def _cleanup_waiting_queue(self, request_ids_to_remove_from_waiting: set[str]) - # TODO: further common-ize the two classes -@attach_tracer() class FIFOScheduler(Scheduler): """This scheduler processes requests in the order they arrive, meaning decoding requests has priority over prefilling requests. Additionally, it includes a safety margin mechanism to prevent cache exhaustion. By default, @@ -338,7 +328,6 @@ def __init__(self, cache: PagedAttentionCache, safety_margin: float = 0.2): super().__init__(cache) self.safety_margin = safety_margin - @traced def schedule_batch( self, token_budget: int, cache_budget: int ) -> tuple[list[FutureRequestState] | None, bool, int, int]: @@ -379,13 +368,11 @@ def schedule_batch( # FIXME: prioritize adding from waiting reqs before scheduling `RequestStatus.DECODING` when cache space allows it # TODO: further consolidate the code by making more of it common. The reference Scheduler is FIFO, not this one. -@attach_tracer() class PrefillFirstScheduler(Scheduler): """Scheduler that prioritizes split prefill requests over decoding requests. This scheduler ensures that split prefill requests (which are continuations of partially processed prompts) are completed before processing new decoding requests.""" - @traced def schedule_batch( self, token_budget: int, cache_budget: int ) -> tuple[list[FutureRequestState] | None, bool, int, int]: diff --git a/src/transformers/utils/import_utils.py b/src/transformers/utils/import_utils.py index 858e60232787..bfd6f13bb12f 100644 --- a/src/transformers/utils/import_utils.py +++ b/src/transformers/utils/import_utils.py @@ -1374,16 +1374,6 @@ def is_mistral_common_available() -> bool: return is_vision_available() and _is_package_available("mistral_common")[0] -@lru_cache -def is_opentelemetry_available() -> bool: - try: - return _is_package_available("opentelemetry")[0] and version.parse( - importlib.metadata.version("opentelemetry-api") - ) >= version.parse("1.30.0") - except Exception as _: - return False - - @lru_cache def is_pynvml_available() -> bool: return _is_package_available("pynvml")[0] diff --git a/src/transformers/utils/metrics.py b/src/transformers/utils/metrics.py deleted file mode 100644 index e62fa846ae02..000000000000 --- a/src/transformers/utils/metrics.py +++ /dev/null @@ -1,405 +0,0 @@ -import functools -import logging -import time -from collections.abc import Callable -from enum import Enum -from typing import Any - -from .import_utils import is_opentelemetry_available - - -class RequestStatus(Enum): - """Status of a generation request through its lifecycle.""" - - PENDING = "pending" - PREFILLING = "prefilling" - PREFILLING_SPLIT = "prefilling_split" - SPLIT_PENDING_REMAINDER = "split_pending_remainder" - DECODING = "decoding" - FINISHED = "finished" - FAILED = "failed" - - -if is_opentelemetry_available(): - from opentelemetry import metrics - from opentelemetry.trace import Status, StatusCode, get_tracer - - _has_opentelemetry = True -else: - _has_opentelemetry = False - - -def attach_tracer(tracer_name_template=None): - """ - Decorator that attaches a tracer to a class. - - This decorator should be applied to classes that need OpenTelemetry tracing. - It adds a tracer attribute to the class instance that can be used by the traced decorator. - - Args: - tracer_name_template: Optional template string for the tracer name. - If provided, it should contain {module} which will be replaced with the class's full module path - and {class_name} for the class name. - If None, a default naming scheme will be used where: - - If the module already starts with "transformers.", it will use that directly - - Otherwise, it will prepend "transformers." to the module name - - Returns: - Class decorator function - """ - if not _has_opentelemetry: - return lambda cls: cls - - def decorator(cls): - original_init = cls.__init__ - - @functools.wraps(original_init) - def init_with_tracer(self, *args, **kwargs): - original_init(self, *args, **kwargs) - - module_name = cls.__module__ - class_name = cls.__qualname__ - - if tracer_name_template is None: - if module_name.startswith("transformers."): - tracer_name = f"{module_name}.{class_name}" - else: - tracer_name = f"transformers.{module_name}.{class_name}" - else: - tracer_name = tracer_name_template.format(module=module_name, class_name=class_name) - - self.tracer = get_tracer(tracer_name) - - cls.__init__ = init_with_tracer - return cls - - return decorator - - -def traced( - func=None, - *, - span_name=None, - standalone=False, - additional_attributes: list[tuple[str, str, Any | Callable[[Any], Any]]] | None = None, -): - """ - Decorator to trace function calls with OpenTelemetry. - - Can be used as @traced or @traced(span_name="custom_name") - - Args: - func: The function to trace - span_name: Optional custom name for the span (defaults to function name) - standalone: If True, creates a parentless span - additional_attributes: Optional list of additional attributes to set on the span. - Each item is a tuple of (instance_attribute_name, span_attribute_key, value_or_transform_function) - where: - - instance_attribute_name: Name of the attribute to get from the class instance - - span_attribute_key: Key to use when setting the attribute on the span - - value_or_transform_function: Either a raw value to use directly, or a function to transform - the attribute value before setting it on the span - - Returns: - Decorated function with tracing - """ - - def decorator(func): - if not _has_opentelemetry: - return func - - @functools.wraps(func) - def wrapper(*args, **kwargs): - instance = args[0] if args and (hasattr(func, "__self__") and func.__self__ is not None) else None - is_method = instance is not None - - if is_method and hasattr(instance, "tracer"): - tracer = instance.tracer - else: - tracer = get_tracer(f"transformers.{func.__module__}.{func.__name__}") - - name = span_name or func.__name__ - span_fn = tracer.start_span if standalone else tracer.start_as_current_span - with span_fn(name) as span: - span.set_attribute("function.name", func.__name__) - span.set_attribute("function.module", func.__module__) - span.set_attribute("function.is_method", is_method) - - if args: - for i, arg in enumerate(args): - if isinstance(arg, (str, int, float, bool)) or arg is None: - span.set_attribute(f"args.{i}", str(arg)) - else: - span.set_attribute(f"args.{i}", str(type(arg))) - if kwargs: - for key, value in kwargs.items(): - if isinstance(value, (str, int, float, bool)) or value is None: - span.set_attribute(f"kwargs.{key}", str(value)) - else: - span.set_attribute(f"kwargs.{key}", str(type(value))) - - if additional_attributes and is_method: - for attr_config in additional_attributes: - instance_attribute_name, span_attribute_key, value_or_transform_function = attr_config - if hasattr(instance, instance_attribute_name): - attribute_value = getattr(instance, instance_attribute_name) - if callable(value_or_transform_function): - transformed_value = value_or_transform_function(attribute_value) - else: - transformed_value = value_or_transform_function - span.set_attribute(span_attribute_key, transformed_value) - - try: - result = func(*args, **kwargs) - return result - except Exception as e: - span.set_status(Status(StatusCode.ERROR)) - span.record_exception(e) - raise - - return wrapper - - if func is None: - return decorator - return decorator(func) - - -logger = logging.getLogger(__name__) - - -@attach_tracer() -class ContinuousBatchProcessorMetrics: - """Metrics collection for ContinuousBatchProcessor.""" - - def __init__(self, max_batch_tokens: int): - """Initialize metrics for continuous batch processor. - - Args: - max_batch_tokens: Maximum number of tokens in a batch - """ - self.max_batch_tokens = max_batch_tokens - - self._setup_metrics() - - def _setup_metrics(self): - """Initialize OpenTelemetry metrics and tracing if the library is available.""" - - if not _has_opentelemetry: - logger.info( - "OpenTelemetry is not installed. Metrics and tracing will not be recorded." - "You can install it with `pip install opentelemetry-api>=1.30.0`" - ) - return - - self.meter = metrics.get_meter("transformers.generation.continuous_batch_processor") - - # Define appropriate buckets for TTFT (typically ranges from ~50ms to several seconds) - ttft_buckets = [10, 25, 50, 75, 100, 150, 200, 300, 500, 750, 1000, 2000, 5000, 10000] - - self.ttft_histogram = self.meter.create_histogram( - name="ttft_milliseconds", - description="Time to first token in milliseconds", - unit="ms", - explicit_bucket_boundaries_advisory=ttft_buckets, - ) - - self.active_requests_gauge = self.meter.create_gauge( - name="active_requests_count", - description="Number of active requests currently being processed", - unit="requests", - ) - - self.waiting_requests_gauge = self.meter.create_gauge( - name="waiting_requests_count", - description="Number of requests waiting to be processed", - unit="requests", - ) - - # Define appropriate buckets for request latency (similar to TTFT but with higher upper bounds) - latency_buckets = [50, 100, 250, 500, 1000, 2000, 5000, 10000, 20000, 30000, 60000] - - self.request_latency_histogram = self.meter.create_histogram( - name="request_latency_milliseconds", - description="End-to-end latency for completed requests in milliseconds", - unit="ms", - explicit_bucket_boundaries_advisory=latency_buckets, - ) - - self.decode_prefill_ratio_gauge = self.meter.create_gauge( - name="decode_prefill_ratio", - description="Ratio of decode tokens to prefill tokens in a batch", - unit="ratio", - ) - - self.prefill_tokens_counter = self.meter.create_counter( - name="prefill_tokens_processed", - description="Number of prefill tokens processed", - unit="tokens", - ) - - self.decode_tokens_counter = self.meter.create_counter( - name="decode_tokens_processed", - description="Number of decode tokens processed", - unit="tokens", - ) - - # Define appropriate buckets for batch fill percentage (0-100%) - batch_fill_buckets = [5, 10, 20, 30, 40, 50, 60, 70, 80, 90, 95, 98, 100] - - self.batch_fill_percentage_histogram = self.meter.create_histogram( - name="batch_fill_percentage", - description="Percentage of max_batch_tokens utilized in each batch", - unit="percent", - explicit_bucket_boundaries_advisory=batch_fill_buckets, - ) - - self.kv_cache_free_memory_gauge = self.meter.create_gauge( - name="kv_cache_free_memory_bytes", - description="Free memory of the PagedAttentionCache in bytes", - unit="bytes", - ) - - self.kv_cache_memory_gauge = self.meter.create_gauge( - name="kv_cache_memory_bytes", - description="Memory usage of the PagedAttentionCache in bytes", - unit="bytes", - ) - - @traced - def record_ttft_metric(self, created_time: float, request_id: str) -> None: - """Record Time to First Token (TTFT). - - Args: - created_time: The time the request was created - request_id: The ID of the request - """ - if not _has_opentelemetry: - return - - ttft_ms = (time.time() - created_time) * 1000.0 - - try: - self.ttft_histogram.record(ttft_ms) - logger.debug(f"Recorded TTFT for request {request_id}: {ttft_ms:.2f}ms") - except Exception as e: - logger.warning(f"Failed to record TTFT metric: {e}") - - @traced - def record_batch_metrics(self, future_states: list) -> None: - """Record metrics about the batch composition including decode/prefill ratio and batch fill percentage. - - Args: - requests_in_batch: List of request states in the current batch - """ - if not _has_opentelemetry or not future_states: - return - - decode_tokens = 0 - prefill_tokens = 0 - - for future_state in future_states: - state = future_state.state - if state.status == RequestStatus.DECODING: - decode_tokens += 1 - elif state.status in [RequestStatus.PREFILLING, RequestStatus.PREFILLING_SPLIT]: - prefill_tokens += len(state.prompt_ids) - - total_batch_tokens = decode_tokens + prefill_tokens - - try: - if prefill_tokens > 0: - self.prefill_tokens_counter.add(prefill_tokens) - - if decode_tokens > 0: - self.decode_tokens_counter.add(decode_tokens) - - if prefill_tokens > 0: - ratio = decode_tokens / prefill_tokens - self.decode_prefill_ratio_gauge.set(ratio) - - fill_percentage = (total_batch_tokens / self.max_batch_tokens) * 100.0 - self.batch_fill_percentage_histogram.record(fill_percentage) - logger.debug( - f"Batch metrics: {decode_tokens} decode tokens, {prefill_tokens} prefill tokens, " - f"batch fill: {fill_percentage:.2f}% ({total_batch_tokens}/{self.max_batch_tokens})" - ) - except Exception as e: - logger.warning(f"Failed to record batch metrics: {e}") - - @traced - def record_kv_cache_memory_metrics(self, cache) -> None: - """Record memory usage of the PagedAttentionCache without GPU synchronization. - - This calculates the theoretical memory usage based on cache configuration - and the number of blocks currently in use. - - Args: - cache: The PagedAttentionCache object to measure - """ - if not _has_opentelemetry: - return - - try: - # Retrieve the memory footprint of the cache - page_size = cache.head_dim * cache.num_key_value_heads - page_mem_in_bytes = page_size * cache.dtype.itemsize - # When a block is allocated, it is for both K and V, so we multiply by 2 - # It's also allocated across all cache tensors, so we multiply by the nb of tensors: len(cache.key_cache) - block_mem_in_bytes = 2 * len(cache.key_cache) * cache.block_size * page_mem_in_bytes - - # Retrieve the number of used and free blocks - free_blocks = cache.get_num_free_blocks() - used_blocks = cache.num_blocks - free_blocks - - # Convert that into used and free memory in bytes - used_memory_bytes = used_blocks * block_mem_in_bytes - free_memory_bytes = free_blocks * block_mem_in_bytes - - # Update the telemetry gauges and add a message in the logs - self.kv_cache_memory_gauge.set(used_memory_bytes) - self.kv_cache_free_memory_gauge.set(free_memory_bytes) - logger.debug( - f"KV Cache memory: {used_memory_bytes / (1024 * 1024):.2f}MB, " - f"Used blocks: {used_blocks}/{cache.num_blocks} " - f"({used_blocks / cache.num_blocks * 100:.1f}%)" - ) - except Exception as e: - logger.warning(f"Failed to record KV cache memory metrics: {e}") - - @traced - def record_queue_metrics(self, active_requests: int, waiting_requests: int) -> None: - """Record metrics about active and waiting requests. - - Args: - active_requests: Number of active requests - waiting_requests: Number of waiting requests - """ - if not _has_opentelemetry: - return - - try: - self.active_requests_gauge.set(active_requests) - self.waiting_requests_gauge.set(waiting_requests) - logger.debug(f"Queue metrics: {active_requests} active requests, {waiting_requests} waiting requests") - except Exception as e: - logger.warning(f"Failed to record queue metrics: {e}") - - @traced - def record_request_completion(self, created_time: float, request_id: str) -> None: - """Record metrics about a completed request. - - Args: - created_time: The time the request was created - request_id: The ID of the request - """ - if not _has_opentelemetry: - return - - latency_ms = (time.time() - created_time) * 1000.0 - - try: - self.request_latency_histogram.record(latency_ms) - - logger.debug(f"Recorded request completion for {request_id}: {latency_ms:.2f}ms") - except Exception as e: - logger.warning(f"Failed to record request completion metric: {e}")