Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
## 0.0.45

* **Cooperative SIGTERM shutdown for plugin webservers**: plugin uvicorn servers now run under
a `GracefulServer` that signals a process-global cancellation event when SIGTERM/SIGINT is
received. Run-functions that declare a `cancellation_token` parameter receive a read-only token
they can poll between units of work and raise `PluginShutdown` to abort promptly; this maps to an
HTTP 503 shutdown-abort response. A finite default `timeout_graceful_shutdown` (30s, overridable
via `UVICORN_TIMEOUT_GRACEFUL_SHUTDOWN`) bounds uvicorn's drain window — previously unbounded.
Plugins that do not declare `cancellation_token` are unaffected.
* **Remove the inert SIGTERM-ignore signal patch**: 0.0.44 set `uvicorn.Server.install_signal_handlers`
to a SIGINT-only handler, but the pinned uvicorn (0.37) removed that method in favor of
`capture_signals()`, so the patch never took effect and plugins already handled SIGTERM normally.
It is removed to avoid a misleading mental model; prompt shutdown now comes from the finite
graceful-shutdown timeout plus the `GracefulServer` cancellation hook above.

## 0.0.44

* **Ignore SIGTERM in plugin uvicorn Servers**: plugin webservers now keep
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ lint = [
test = [
"pytest",
"httpx",
"pytest-asyncio",
"pytest-cov"
]

Expand Down
90 changes: 90 additions & 0 deletions test/api/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
SourceIdentifiers,
)

from test.assets.cancellation_token_async import CancelAwareAsync
from test.assets.cancellation_token_asyncgen import CancelAwareAsyncGen
from test.assets.cancellation_token_sync import CancelAwareSync
from unstructured_platform_plugins.etl_uvicorn import shutdown
from unstructured_platform_plugins.etl_uvicorn.api_generator import (
EtlApiException,
UsageData,
Expand Down Expand Up @@ -470,3 +474,89 @@ def test_streaming_unstructured_ingest_error_with_none_status_code():
"Async gen test UnstructuredIngestError with None status_code"
in invoke_response.status_code_text
)


# ── CancellationToken / PluginShutdown tests ──────────────────────────────────


@pytest.fixture(autouse=True)
def _reset_shutdown():
shutdown.reset_for_tests()
yield
shutdown.reset_for_tests()


@pytest.mark.parametrize("job_cls", [CancelAwareSync, CancelAwareAsync])
def test_cancellation_token_not_in_invoke_schema(job_cls):
job = job_cls()
app = wrap_in_fastapi(func=job.run, plugin_id=job.id())
client = TestClient(app)
schema = client.get("/schema").json()
assert "cancellation_token" not in schema["inputs"]
openapi = client.get("/openapi.json").json()
invoke_schema = str(openapi["paths"]["/invoke"])
assert "cancellation_token" not in invoke_schema


@pytest.mark.parametrize("job_cls", [CancelAwareSync, CancelAwareAsync])
def test_invoke_succeeds_when_not_shutting_down(job_cls):
job = job_cls()
app = wrap_in_fastapi(func=job.run, plugin_id=job.id())
client = TestClient(app)
resp = client.post("/invoke", json={"value": 41}).json()
assert resp["status_code"] == 200
assert resp["output"]["value"] == 42


@pytest.mark.parametrize("job_cls", [CancelAwareSync, CancelAwareAsync])
def test_invoke_returns_503_shutdown_abort_when_cancelled(job_cls):
job = job_cls()
app = wrap_in_fastapi(func=job.run, plugin_id=job.id())
client = TestClient(app)
shutdown.request_shutdown()
resp = client.post("/invoke", json={"value": 41}).json()
assert resp["status_code"] == 503
assert "shutdown" in (resp["status_code_text"] or "").lower()


def test_streaming_invoke_succeeds_when_not_shutting_down():
"""Happy-path: async-gen run func streams a 200 row when shutdown is not requested."""
import json

job = CancelAwareAsyncGen()
app = wrap_in_fastapi(func=job.run, plugin_id=job.id())
client = TestClient(app)

resp = client.post("/invoke", json={"value": 41})

assert resp.status_code == 200
assert resp.headers["content-type"] == "application/x-ndjson"

lines = resp.content.decode().strip().split("\n")
assert len(lines) == 1

row = InvokeResponse.model_validate(json.loads(lines[0]))
assert row.status_code == 200
assert row.output == {"value": 42}


def test_streaming_invoke_returns_503_shutdown_abort_when_cancelled():
"""Streaming PluginShutdown handler: yields a 503 row and stops when shutdown is requested."""
import json

job = CancelAwareAsyncGen()
app = wrap_in_fastapi(func=job.run, plugin_id=job.id())
client = TestClient(app)

shutdown.request_shutdown()
resp = client.post("/invoke", json={"value": 41})

assert resp.status_code == 200
assert resp.headers["content-type"] == "application/x-ndjson"

lines = resp.content.decode().strip().split("\n")
assert len(lines) == 1

row = InvokeResponse.model_validate(json.loads(lines[0]))
assert row.status_code == 503
assert "shutdown" in (row.status_code_text or "").lower()
18 changes: 18 additions & 0 deletions test/assets/cancellation_token_async.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from typing import Optional

from pydantic import BaseModel

from unstructured_platform_plugins.etl_uvicorn.shutdown import CancellationToken


class Result(BaseModel):
value: int


class CancelAwareAsync:
def id(self) -> str:
return "cancel_aware_async"

async def run(self, value: int, cancellation_token: CancellationToken) -> Optional[Result]:
cancellation_token.raise_if_cancelled()
return Result(value=value + 1)
16 changes: 16 additions & 0 deletions test/assets/cancellation_token_asyncgen.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from pydantic import BaseModel

from unstructured_platform_plugins.etl_uvicorn.shutdown import CancellationToken


class Result(BaseModel):
value: int


class CancelAwareAsyncGen:
def id(self) -> str:
return "cancel_aware_asyncgen"

async def run(self, value: int, cancellation_token: CancellationToken) -> Result:
cancellation_token.raise_if_cancelled()
yield Result(value=value + 1)
18 changes: 18 additions & 0 deletions test/assets/cancellation_token_sync.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from typing import Optional

from pydantic import BaseModel

from unstructured_platform_plugins.etl_uvicorn.shutdown import CancellationToken


class Result(BaseModel):
value: int


class CancelAwareSync:
def id(self) -> str:
return "cancel_aware_sync"

def run(self, value: int, cancellation_token: CancellationToken) -> Optional[Result]:
cancellation_token.raise_if_cancelled()
return Result(value=value + 1)
34 changes: 34 additions & 0 deletions test/test_main_cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from unittest.mock import patch

from click.testing import CliRunner

from unstructured_platform_plugins.etl_uvicorn.main import get_command
from unstructured_platform_plugins.etl_uvicorn.serve import DEFAULT_TIMEOUT_GRACEFUL_SHUTDOWN


def test_cli_uses_graceful_server_with_finite_timeout():
captured = {}

class _FakeServer:
def __init__(self, config):
captured["config"] = config

def run(self):
captured["ran"] = True

with (
patch(
"unstructured_platform_plugins.etl_uvicorn.main.generate_fast_api",
return_value=lambda *a, **k: None,
),
patch("unstructured_platform_plugins.etl_uvicorn.main.GracefulServer", _FakeServer),
):
runner = CliRunner()
result = runner.invoke(
get_command(),
["test.assets.hash_function:get_hash", "--host", "0.0.0.0", "--port", "8000"],
)

assert result.exit_code == 0, result.output
assert captured.get("ran") is True
assert captured["config"].timeout_graceful_shutdown == DEFAULT_TIMEOUT_GRACEFUL_SHUTDOWN
57 changes: 57 additions & 0 deletions test/test_serve.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import signal

import pytest
from uvicorn.config import Config

from unstructured_platform_plugins.etl_uvicorn import shutdown
from unstructured_platform_plugins.etl_uvicorn.serve import (
DEFAULT_TIMEOUT_GRACEFUL_SHUTDOWN,
GracefulServer,
)


@pytest.fixture(autouse=True)
def _reset():
shutdown.reset_for_tests()
yield
shutdown.reset_for_tests()


def _dummy_app(scope, receive, send): # minimal ASGI app
raise NotImplementedError


def test_handle_exit_sets_cancellation_event_and_delegates():
server = GracefulServer(Config(_dummy_app))
assert shutdown.is_shutting_down() is False
server.handle_exit(signal.SIGTERM, None)
assert shutdown.is_shutting_down() is True
# super().handle_exit must still run uvicorn's own shutdown bookkeeping
assert server.should_exit is True


def test_default_timeout_is_finite():
assert isinstance(DEFAULT_TIMEOUT_GRACEFUL_SHUTDOWN, int)
assert DEFAULT_TIMEOUT_GRACEFUL_SHUTDOWN > 0


def test_serve_constructs_graceful_server_with_passed_timeout():
"""serve() must build a GracefulServer whose Config carries the caller's timeout."""
from unittest.mock import patch

from unstructured_platform_plugins.etl_uvicorn.serve import serve

captured = {}

class _FakeServer:
def __init__(self, config):
captured["config"] = config

def run(self):
captured["ran"] = True

with patch("unstructured_platform_plugins.etl_uvicorn.serve.GracefulServer", _FakeServer):
serve(_dummy_app, port=9999, timeout_graceful_shutdown=7)

assert captured.get("ran") is True
assert captured["config"].timeout_graceful_shutdown == 7
45 changes: 45 additions & 0 deletions test/test_shutdown.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import threading

import pytest

from unstructured_platform_plugins.etl_uvicorn import shutdown
from unstructured_platform_plugins.etl_uvicorn.shutdown import (
CancellationToken,
PluginShutdown,
)


@pytest.fixture(autouse=True)
def _reset():
shutdown.reset_for_tests()
yield
shutdown.reset_for_tests()


def test_token_not_cancelled_by_default():
token = shutdown.get_cancellation_token()
assert token.cancelled is False
token.raise_if_cancelled() # no raise


def test_request_shutdown_flips_token_and_global():
token = shutdown.get_cancellation_token()
assert shutdown.is_shutting_down() is False
shutdown.request_shutdown()
assert shutdown.is_shutting_down() is True
assert token.cancelled is True
with pytest.raises(PluginShutdown):
token.raise_if_cancelled()


def test_token_has_no_set_method():
token = shutdown.get_cancellation_token()
assert not hasattr(token, "set")


def test_token_reads_live_event():
event = threading.Event()
token = CancellationToken(event)
assert token.cancelled is False
event.set()
assert token.cancelled is True
39 changes: 39 additions & 0 deletions test/test_shutdown_cooperative.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import asyncio
import threading

import pytest

from unstructured_platform_plugins.etl_uvicorn import shutdown
from unstructured_platform_plugins.etl_uvicorn.api_generator import invoke_func
from unstructured_platform_plugins.etl_uvicorn.shutdown import (
CancellationToken,
PluginShutdown,
)


@pytest.fixture(autouse=True)
def _reset():
shutdown.reset_for_tests()
yield
shutdown.reset_for_tests()


async def test_sync_runfunc_bails_promptly_when_cancelled():
started = threading.Event()

def blocking_run(cancellation_token: CancellationToken) -> str:
started.set()
# simulate a unit-of-work loop that polls between units
for _ in range(1000):
cancellation_token.raise_if_cancelled()
threading.Event().wait(0.01)
return "completed"

token = shutdown.get_cancellation_token()
task = asyncio.ensure_future(
invoke_func(func=blocking_run, kwargs={"cancellation_token": token})
)
await asyncio.get_event_loop().run_in_executor(None, started.wait, 2.0)
shutdown.request_shutdown()
with pytest.raises(PluginShutdown):
await asyncio.wait_for(task, timeout=2.0)
35 changes: 0 additions & 35 deletions test/test_signal_handlers.py

This file was deleted.

Loading
Loading