Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 81 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,26 @@ Options:

[env: DISABLE_SPANS=]

--log-sample-rate <LOG_SAMPLE_RATE>
Maximum number of request success logs per second.

Set to 0 to disable request success logging entirely.
Set to a positive number to rate-limit request logs.
Default is 10 logs/second. Use --log-aggregate-interval to enable aggregate logging instead.

[env: LOG_SAMPLE_RATE=]
[default: 10]

--log-aggregate-interval <LOG_AGGREGATE_INTERVAL>
Interval in seconds for logging aggregate statistics.

Set to 0 to disable aggregate logging (default).
When set, aggregate stats will be logged at this interval instead of per-request logs.
Aggregate logs include request count, error count, characters processed, and tokens processed per interval.

[env: LOG_AGGREGATE_INTERVAL=]
[default: 0]

--otlp-endpoint <OTLP_ENDPOINT>
The grpc endpoint for opentelemetry. Telemetry is sent to this endpoint as OTLP over gRPC. e.g. `http://localhost:4317`

Expand Down Expand Up @@ -475,6 +495,67 @@ curl 127.0.0.1:8080/embed_sparse \
`text-embeddings-inference` is instrumented with distributed tracing using OpenTelemetry. You can use this feature
by setting the address to an OTLP collector with the `--otlp-endpoint` argument.

### Logging Configuration

At high load, logging every request can create excessive log volume, especially when using log aggregation solutions
like Loki. TEI provides flexible logging controls to address this.

#### Rate-Limited Request logging

By default, TEI logs at most 10 request successes per second. You can adjust this limit:

```shell
# Log at most 5 request successes per second
docker run --gpus all -p 8080:80 ghcr.io/huggingface/text-embeddings-inference:cuda-1.9 \
--model-id Qwen/Qwen3-Embedding-0.6B \
--log-sample-rate 5
```

To disable per-request success logging entirely:

```shell
# Disable individual request logging
docker run --gpus all -p 8080:80 ghcr.io/huggingface/text-embeddings-inference:cuda-1.9 \
--model-id Qwen/Qwen3-Embedding-0.6B \
--log-sample-rate 0
```

#### Aggregate logging

Instead of logging each individual request, you can opt for periodic aggregate statistics. This is recommended for
production high-load deployments:

```shell
# Log aggregate statistics every 60 seconds
docker run --gpus all -p 8080:80 ghcr.io/huggingface/text-embeddings-inference:cuda-1.9 \
--model-id Qwen/Qwen3-Embedding-0.6B \
--log-sample-rate 0 \
--log-aggregate-interval 60
```

This will produce logs like:
```
Request aggregate: 5234 requests (87.2/s), 12 errors | 10456789 chars (174280/s) | 2345678 tokens (39094/s)
```

#### Hybrid mode

You can combine both approaches for balanced observability:

```shell
# Log up to 2 individual requests per second + aggregate stats every 30 seconds
docker run --gpus all -p 8080:80 ghcr.io/huggingface/text-embeddings-inference:cuda-1.9 \
--model-id Qwen/Qwen3-Embedding-0.6B \
--log-sample-rate 2 \
--log-aggregate-interval 30
```

#### Environment variables

Both options can also be configured via environment variables:
- `LOG_SAMPLE_RATE` - Maximum request logs per second (default: 10)
- `LOG_AGGREGATE_INTERVAL` - Aggregate logging interval in seconds (default: 0 = disabled)

### gRPC

`text-embeddings-inference` offers a gRPC API as an alternative to the default HTTP API for high performance
Expand Down
97 changes: 91 additions & 6 deletions router/src/http/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ async fn predict(
infer: Extension<Infer>,
info: Extension<Info>,
Extension(context): Extension<Option<opentelemetry::Context>>,
Extension(rate_limited_logger): Extension<Option<crate::RateLimitedLogger>>,
Extension(aggregate_logger): Extension<Option<crate::AggregateLogger>>,
Json(req): Json<PredictRequest>,
) -> Result<(HeaderMap, Json<PredictResponse>), (StatusCode, Json<ErrorResponse>)> {
let span = tracing::Span::current();
Expand Down Expand Up @@ -274,7 +276,17 @@ async fn predict(

let headers = HeaderMap::from(metadata);

tracing::info!("Success");
// Use rate-limited logger if available, otherwise use regular logging
if let Some(logger) = rate_limited_logger {
logger.try_log_success();
} else {
tracing::info!("Success");
}

// Record success in aggregate logger if available
if let Some(agg_logger) = aggregate_logger {
agg_logger.record_success(metadata.compute_chars as u64, metadata.compute_tokens as u64);
}

Ok((headers, Json(response)))
}
Expand Down Expand Up @@ -308,6 +320,8 @@ async fn rerank(
infer: Extension<Infer>,
info: Extension<Info>,
Extension(context): Extension<Option<opentelemetry::Context>>,
Extension(rate_limited_logger): Extension<Option<crate::RateLimitedLogger>>,
Extension(aggregate_logger): Extension<Option<crate::AggregateLogger>>,
Json(req): Json<RerankRequest>,
) -> Result<(HeaderMap, Json<RerankResponse>), (StatusCode, Json<ErrorResponse>)> {
let span = tracing::Span::current();
Expand Down Expand Up @@ -468,7 +482,17 @@ async fn rerank(

let headers = HeaderMap::from(metadata);

tracing::info!("Success");
// Use rate-limited logger if available, otherwise use regular logging
if let Some(logger) = rate_limited_logger {
logger.try_log_success();
} else {
tracing::info!("Success");
}

// Record success in aggregate logger if available
if let Some(agg_logger) = aggregate_logger {
agg_logger.record_success(compute_chars as u64, total_compute_tokens as u64);
}

Ok((headers, Json(response)))
}
Expand Down Expand Up @@ -587,6 +611,8 @@ async fn embed(
infer: Extension<Infer>,
info: Extension<Info>,
Extension(context): Extension<Option<opentelemetry::Context>>,
Extension(rate_limited_logger): Extension<Option<crate::RateLimitedLogger>>,
Extension(aggregate_logger): Extension<Option<crate::AggregateLogger>>,
Json(req): Json<EmbedRequest>,
) -> Result<(HeaderMap, Json<EmbedResponse>), (StatusCode, Json<ErrorResponse>)> {
let span = tracing::Span::current();
Expand Down Expand Up @@ -734,7 +760,17 @@ async fn embed(

let headers = HeaderMap::from(metadata);

tracing::info!("Success");
// Use rate-limited logger if available, otherwise use regular logging
if let Some(logger) = rate_limited_logger {
logger.try_log_success();
} else {
tracing::info!("Success");
}

// Record success in aggregate logger if available
if let Some(agg_logger) = aggregate_logger {
agg_logger.record_success(metadata.compute_chars as u64, metadata.compute_tokens as u64);
}

Ok((headers, Json(response)))
}
Expand Down Expand Up @@ -767,6 +803,8 @@ async fn embed_sparse(
infer: Extension<Infer>,
info: Extension<Info>,
Extension(context): Extension<Option<opentelemetry::Context>>,
Extension(rate_limited_logger): Extension<Option<crate::RateLimitedLogger>>,
Extension(aggregate_logger): Extension<Option<crate::AggregateLogger>>,
Json(req): Json<EmbedSparseRequest>,
) -> Result<(HeaderMap, Json<EmbedSparseResponse>), (StatusCode, Json<ErrorResponse>)> {
let span = tracing::Span::current();
Expand Down Expand Up @@ -920,7 +958,17 @@ async fn embed_sparse(

let headers = HeaderMap::from(metadata);

tracing::info!("Success");
// Use rate-limited logger if available, otherwise use regular logging
if let Some(logger) = rate_limited_logger {
logger.try_log_success();
} else {
tracing::info!("Success");
}

// Record success in aggregate logger if available
if let Some(agg_logger) = aggregate_logger {
agg_logger.record_success(metadata.compute_chars as u64, metadata.compute_tokens as u64);
}

Ok((headers, Json(response)))
}
Expand Down Expand Up @@ -954,6 +1002,8 @@ async fn embed_all(
infer: Extension<Infer>,
info: Extension<Info>,
Extension(context): Extension<Option<opentelemetry::Context>>,
Extension(rate_limited_logger): Extension<Option<crate::RateLimitedLogger>>,
Extension(aggregate_logger): Extension<Option<crate::AggregateLogger>>,
Json(req): Json<EmbedAllRequest>,
) -> Result<(HeaderMap, Json<EmbedAllResponse>), (StatusCode, Json<ErrorResponse>)> {
let span = tracing::Span::current();
Expand Down Expand Up @@ -1097,7 +1147,17 @@ async fn embed_all(

let headers = HeaderMap::from(metadata);

tracing::info!("Success");
// Use rate-limited logger if available, otherwise use regular logging
if let Some(logger) = rate_limited_logger {
logger.try_log_success();
} else {
tracing::info!("Success");
}

// Record success in aggregate logger if available
if let Some(agg_logger) = aggregate_logger {
agg_logger.record_success(compute_chars as u64, total_compute_tokens as u64);
}

Ok((headers, Json(response)))
}
Expand Down Expand Up @@ -1130,6 +1190,8 @@ async fn openai_embed(
infer: Extension<Infer>,
info: Extension<Info>,
Extension(context): Extension<Option<opentelemetry::Context>>,
Extension(rate_limited_logger): Extension<Option<crate::RateLimitedLogger>>,
Extension(aggregate_logger): Extension<Option<crate::AggregateLogger>>,
Json(req): Json<OpenAICompatRequest>,
) -> Result<(HeaderMap, Json<OpenAICompatResponse>), (StatusCode, Json<OpenAICompatErrorResponse>)>
{
Expand Down Expand Up @@ -1315,7 +1377,17 @@ async fn openai_embed(
let compute_tokens = metadata.compute_tokens;
let headers = HeaderMap::from(metadata);

tracing::info!("Success");
// Use rate-limited logger if available, otherwise use regular logging
if let Some(logger) = rate_limited_logger {
logger.try_log_success();
} else {
tracing::info!("Success");
}

// Record success in aggregate logger if available
if let Some(agg_logger) = aggregate_logger {
agg_logger.record_success(metadata.compute_chars as u64, metadata.compute_tokens as u64);
}

let response = OpenAICompatResponse {
object: "list",
Expand Down Expand Up @@ -1632,6 +1704,9 @@ pub async fn run(
payload_limit: usize,
api_key: Option<String>,
cors_allow_origin: Option<Vec<String>>,
rate_limited_logger: Option<crate::RateLimitedLogger>,
aggregate_logger: Option<crate::AggregateLogger>,
log_aggregate_interval: u64,
) -> Result<(), anyhow::Error> {
// OpenAPI documentation
#[derive(OpenApi)]
Expand Down Expand Up @@ -1865,13 +1940,23 @@ pub async fn run(
.layer(Extension(infer))
.layer(Extension(info))
.layer(Extension(prom_handle.clone()))
.layer(Extension(rate_limited_logger))
.layer(Extension(aggregate_logger))
.layer(OtelAxumLayer::default())
.layer(axum::middleware::from_fn(
logging::http::trace_context_middleware,
))
.layer(DefaultBodyLimit::max(payload_limit))
.layer(cors_layer);

// Spawn aggregate logger task
if let (Some(agg_logger), interval) = (aggregate_logger.clone(), log_aggregate_interval) {
if interval > 0 {
crate::spawn_aggregate_logger_task(agg_logger, interval);
tracing::info!("Aggregate logger task spawned with interval: {} seconds", interval);
}
}

// Run server
let listener = tokio::net::TcpListener::bind(&addr)
.await
Expand Down
7 changes: 7 additions & 0 deletions router/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use tokenizers::{PostProcessorWrapper, Tokenizer};
use tracing::Span;

pub use logging::init_logging;
pub use logging::{RateLimitedLogger, AggregateLogger, spawn_aggregate_logger_task};

/// Create entrypoint
#[allow(clippy::too_many_arguments)]
Expand Down Expand Up @@ -67,6 +68,9 @@ pub async fn run(
otlp_service_name: String,
prometheus_port: u16,
cors_allow_origin: Option<Vec<String>>,
rate_limited_logger: Option<RateLimitedLogger>,
aggregate_logger: Option<AggregateLogger>,
log_aggregate_interval: u64,
) -> Result<()> {
let model_id_path = Path::new(&model_id);
let (model_root, api_repo) = if model_id_path.exists() && model_id_path.is_dir() {
Expand Down Expand Up @@ -377,6 +381,9 @@ pub async fn run(
payload_limit,
api_key,
cors_allow_origin,
rate_limited_logger,
aggregate_logger,
log_aggregate_interval,
)
.await
}
Expand Down
Loading