Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1 +1,11 @@
log_format dstack_stat '$time_iso8601 $host $status $request_time';
log_format dstack_stat '$time_iso8601 $host $status $request_time $dstack_replica_hit';


# A hack to avoid this Nginx reload error when no services are registered:
# nginx: [emerg] unknown "dstack_replica_hit" variable
server {
listen unix:/tmp/dstack-dummy-nginx.sock;
server_name placeholder.local;
deny all;
set $dstack_replica_hit 0;
}
18 changes: 12 additions & 6 deletions src/dstack/_internal/proxy/gateway/resources/nginx/service.jinja2
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ upstream {{ domain }}.upstream {
server {
server_name {{ domain }};
limit_req_status 429;
set $dstack_replica_hit 0;
access_log {{ access_log_path }} dstack_stat;
client_max_body_size {{ client_max_body_size }};

Expand All @@ -23,40 +24,45 @@ server {
auth_request /_dstack_auth;
{% endif %}

{% if replicas %}
try_files /nonexistent @$http_upgrade;
{% else %}
return 503;
{% endif %}

{% if location.limit_req %}
limit_req zone={{ location.limit_req.zone }}{% if location.limit_req.burst %} burst={{ location.limit_req.burst }} nodelay{% endif %};
{% endif %}
}
{% endfor %}

{% if replicas %}
location @websocket {
set $dstack_replica_hit 1;
{% if replicas %}
proxy_pass http://{{ domain }}.upstream;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header Host $host;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "Upgrade";
proxy_read_timeout 300s;
{% else %}
return 503;
{% endif %}
}
location @ {
set $dstack_replica_hit 1;
{% if replicas %}
proxy_pass http://{{ domain }}.upstream;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header Host $host;
proxy_read_timeout 300s;
{% else %}
return 503;
{% endif %}
}
{% endif %}

{% if auth %}
location = /_dstack_auth {
internal;
if ($remote_addr = 127.0.0.1) {
# for requests from the gateway app, e.g. from the OpenAI-compatible API
return 200;
}
proxy_pass http://localhost:{{ proxy_port }}/api/auth/{{ project_name }};
Expand Down
20 changes: 17 additions & 3 deletions src/dstack/_internal/proxy/gateway/services/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@

from dstack._internal.proxy.gateway.repo.repo import GatewayProxyRepo
from dstack._internal.proxy.gateway.schemas.stats import PerWindowStats, ServiceStats, Stat
from dstack._internal.proxy.lib.errors import UnexpectedProxyError
from dstack._internal.utils.common import run_async

logger = logging.getLogger(__name__)
IGNORE_STATUSES = {403, 404}
WINDOWS = (30, 60, 300)
TTL = WINDOWS[-1]
EMPTY_STATS = {window: Stat(requests=0, request_time=0.0) for window in WINDOWS}
Expand All @@ -35,6 +35,7 @@ class LogEntry(BaseModel):
host: str
status: int
request_time: float
is_replica_hit: bool


class StatsCollector:
Expand Down Expand Up @@ -87,7 +88,8 @@ def _collect(self) -> None:
now = datetime.datetime.now(tz=datetime.timezone.utc)

for entry in self._read_access_log(now - datetime.timedelta(seconds=TTL)):
if entry.status in IGNORE_STATUSES:
# only include requests that hit or should hit a service replica
if not entry.is_replica_hit:
continue

frame_timestamp = int(entry.timestamp.timestamp())
Expand Down Expand Up @@ -119,7 +121,10 @@ def _read_access_log(self, after: datetime.datetime) -> Iterable[LogEntry]:
line = self._file.readline()
if not line:
break
timestamp_str, host, status, request_time = line.split()
cells = line.split()
if len(cells) == 4: # compatibility with pre-0.19.11 logs
cells.append("0" if cells[2] in ["403", "404"] else "1")
timestamp_str, host, status, request_time, dstack_replica_hit = cells
timestamp = datetime.datetime.fromisoformat(timestamp_str)
if timestamp < after:
continue
Expand All @@ -128,6 +133,7 @@ def _read_access_log(self, after: datetime.datetime) -> Iterable[LogEntry]:
host=host,
status=int(status),
request_time=float(request_time),
is_replica_hit=_parse_nginx_bool(dstack_replica_hit),
)
if os.fstat(self._file.fileno()).st_ino != st_ino:
# file was rotated
Expand All @@ -154,3 +160,11 @@ async def get_service_stats(
)
for service in services
]


def _parse_nginx_bool(v: str) -> bool:
if v == "0":
return False
if v == "1":
return True
raise UnexpectedProxyError(f"Cannot parse boolean value: expected '0' or '1', got {v!r}")
47 changes: 32 additions & 15 deletions src/tests/_internal/proxy/gateway/services/test_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@
pytest.param(
dedent(
"""
2024-12-06T12:08:00+00:00 srv-0.gtw.test 200 0.100
2024-12-06T12:08:00+00:00 srv-1.gtw.test 200 1.100
2024-12-06T12:09:15+00:00 srv-0.gtw.test 200 0.200
2024-12-06T12:09:15+00:00 srv-1.gtw.test 200 1.200
2024-12-06T12:09:45+00:00 srv-0.gtw.test 200 0.300
2024-12-06T12:08:00+00:00 srv-0.gtw.test 200 0.100 1
2024-12-06T12:08:00+00:00 srv-1.gtw.test 200 1.100 1
2024-12-06T12:09:15+00:00 srv-0.gtw.test 200 0.200 1
2024-12-06T12:09:15+00:00 srv-1.gtw.test 200 1.200 1
2024-12-06T12:09:45+00:00 srv-0.gtw.test 200 0.300 1
"""
),
{
Expand All @@ -41,11 +41,11 @@
pytest.param(
dedent(
"""
2024-12-06T12:08:00+00:00 srv.gtw.test 200 0.100
2024-12-06T12:08:00+00:00 srv.gtw.test 200 0.200
2024-12-06T12:08:00+00:00 srv.gtw.test 200 0.300
2024-12-06T12:08:01+00:00 srv.gtw.test 200 0.400
2024-12-06T12:08:01+00:00 srv.gtw.test 200 0.500
2024-12-06T12:08:00+00:00 srv.gtw.test 200 0.100 1
2024-12-06T12:08:00+00:00 srv.gtw.test 200 0.200 1
2024-12-06T12:08:00+00:00 srv.gtw.test 200 0.300 1
2024-12-06T12:08:01+00:00 srv.gtw.test 200 0.400 1
2024-12-06T12:08:01+00:00 srv.gtw.test 200 0.500 1
"""
),
{
Expand All @@ -60,10 +60,10 @@
pytest.param(
dedent(
"""
2024-12-06T12:04:50+00:00 srv.gtw.test 200 0.400
2024-12-06T12:08:00+00:00 srv.gtw.test 200 0.300
2024-12-06T12:09:15+00:00 srv.gtw.test 200 0.200
2024-12-06T12:09:45+00:00 srv.gtw.test 200 0.100
2024-12-06T12:04:50+00:00 srv.gtw.test 200 0.400 1
2024-12-06T12:08:00+00:00 srv.gtw.test 200 0.300 1
2024-12-06T12:09:15+00:00 srv.gtw.test 200 0.200 1
2024-12-06T12:09:45+00:00 srv.gtw.test 200 0.100 1
"""
),
{
Expand All @@ -75,6 +75,23 @@
},
id="ignores-out-of-window",
),
pytest.param(
dedent(
"""
2024-12-06T12:08:01+00:00 srv.gtw.test 200 0.100 1
2024-12-06T12:08:02+00:00 srv.gtw.test 200 0.200 0
2024-12-06T12:08:03+00:00 srv.gtw.test 200 0.300 1
"""
),
{
"srv.gtw.test": {
30: Stat(requests=0, request_time=0.0),
60: Stat(requests=0, request_time=0.0),
300: Stat(requests=2, request_time=0.2),
},
},
id="ignores-replica-not-hit",
),
pytest.param(
dedent(
"""
Expand All @@ -93,7 +110,7 @@
300: Stat(requests=4, request_time=0.25),
},
},
id="ignores-irrelevant-statuses",
id="ignores-irrelevant-statuses-in-legacy-pre-0.19.11-log",
),
pytest.param(
"",
Expand Down
Loading