Skip to content

Commit bf10f5d

Browse files
authored
Merge pull request #28 from AstraBert/feat/long-polling-and-docs
feat: wait-task command and docs
2 parents 85c2d4f + 26f2666 commit bf10f5d

7 files changed

Lines changed: 358 additions & 16 deletions

File tree

packages/lobsterx/README.md

Lines changed: 89 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ You then need to set three required env variables:
4141
- `TELEGRAM_BOT_TOKEN`: token for the Telegram bot
4242
- `LLAMA_CLOUD_API_KEY`: API key for LlamaCloud
4343

44+
If you wish to setup LobsterX as an API server, you will need to set an API key that only you can use to interact with it, set in the environment as `LOBSTERX_SERVER_KEY`. The key has to be at least 32 charachters long and contain only lowercase and uppercase alphanumeric characters, `-` and `_`.
45+
4446
You can use the setup wizard to configure LobsterX interactively on the terminal:
4547

4648
```bash
@@ -54,7 +56,8 @@ lobsterx setup --provider google \
5456
--model gemini-3-flash-preview \
5557
--api-key $GOOGLE_API_KEY \
5658
--llama-cloud-key $LLAMA_CLOUD_API_KEY \
57-
--telegram-token $TELEGRAM_BOT_TOKEN
59+
--telegram-token $TELEGRAM_BOT_TOKEN \
60+
--server-key $SERVER_KEY
5861
```
5962

6063
This will create a `.env` file with the necessary variables, which will be loaded by LobsterX at runtime (make sure not to share it with anyone).
@@ -63,7 +66,9 @@ If you wish to further customize the instructions that LobsterX has access to, y
6366

6467
## Run
6568

66-
Run LobsterX as a CLI app:
69+
### As a Telegram Bot
70+
71+
Run LobsterX as a Telegram Bot:
6772

6873
```bash
6974
lobsterx run
@@ -88,15 +93,85 @@ docker run ghcr.io/astrabert/lobsterx:main \
8893
--env="TELEGRAM_BOT_TOKEN=tok-xxx"
8994
```
9095

91-
## Use as a Telegram Bot
92-
9396
When on Telegram, you can perform two actions:
9497

9598
- Sending PDF files, which will be downloaded by the bot
9699
- Sending text messages, which will work as prompts for the bot to start a new task
97100

98101
> _With `/start` command, you will have a welcome message explaining how to use the bot_
99102
103+
### As an API server
104+
105+
To run as an API server, you need to specify a series of options that are necessary for authentication, rate limiting and CORS.
106+
107+
- For **authentication**, you need to set the `LOBSTERX_SERVER_KEY` within the environment or in a `.env` file in the same working directory as the agent
108+
- For **CORS**, you can set a list of allowed origins
109+
- For **rate limiting**, you can set the maximum limits of file uploads, task creations, task polling and task deletion per minute
110+
111+
In addition to these, you will also need to provide the host (`0.0.0.0` e.g.), port (`8000` e.g.) and protocol (`http` or `https`) on which the server will run.
112+
113+
You can provide all of these details directly from the CLI:
114+
115+
```bash
116+
lobsterx serve \
117+
--file-downloads-per-minute 300 \
118+
--create-tasks-per-minute 60 \
119+
--delete-tasks-per-minute 60 \
120+
--poll-tasks-per-minute 300 \
121+
--bind 0.0.0.0 \
122+
--port 8000 \
123+
--protocol http \
124+
--allow https://example.com \
125+
--allow https://anotherexample.com
126+
```
127+
128+
> All of these options have sensible defaults, but personalization is always recommended
129+
130+
Or create a JSON configuration ([as in thie example](config.api.json)) following this specification:
131+
132+
```json
133+
{
134+
"allow_origins": [],
135+
"file_downloads_per_minute": 300,
136+
"create_tasks_per_minute": 60,
137+
"delete_tasks_per_minute": 60,
138+
"poll_tasks_per_minute": 300,
139+
"host": "0.0.0.0",
140+
"port": 8000,
141+
"protocol": "http"
142+
}
143+
```
144+
145+
And provide it to the CLI:
146+
147+
```bash
148+
lobsterx serve --config config.api.json
149+
```
150+
151+
> The configuration approach is recommended, as it can be re-use through different API-related commands.
152+
153+
Once you are serving your API through `lobsterx serve`, you can:
154+
155+
- Upload files, by sending a POST request to `/files`
156+
- Create tasks, by sending a POST request to `/task`
157+
- Get the status of a task, by sending a GET request to `/task/{task_id}`
158+
- Cancel a task, by sending a DELETE request to `/task/{task_id}`
159+
160+
You don't have to do this through raw API calls, the LobsterX CLI provides several commands to perform these operations on your behalf:
161+
162+
```bash
163+
# upload a file
164+
lobsterx upload-file path/to/file.pdf --config config.api.json # pass the server configuration
165+
# start a task
166+
lobsterx create-task "Your prompt" --config config.api.json # this will return a task ID
167+
# check the status of a task
168+
lobsterx get-task some-task-id --config config.api.json
169+
# cancel a task
170+
lobsterx cancel-task some-task-id --config config.api.json
171+
# wait until a task is complete
172+
lobsterx wait-task some-task-id --config config.api.json --polling-interval 2.0 --max-attempts 900 --verbose
173+
```
174+
100175
## How LobsterX Works
101176

102177
LobsterX is a generalist AI agent based on three main principles:
@@ -111,6 +186,16 @@ Here is what happens when you send a prompt to LobsterX:
111186

112187
Along with the final response, the agent will also send you a report of everything it did during its session as a markdown file (namedd `session-<random-id>-report.md`).
113188

189+
### The API server
190+
191+
While sharing the core desing principles outlined above, the API server has some more features related to the data flow:
192+
193+
- When a POST request to the `/tasks` endpoint (task creation) is made, a new `asyncio.Task` is spawned and stored within a in-memory task manager, using a locked dictionary to associate a task ID with an async Task.
194+
- When a GET request is sent to `/task/{task_id}`, the task manager provides details on the status of the task (`success`, `failed`, `cancelled`, `pending`). If the task was succesfull, failed or was cancelled, it is removed from the dictionary.
195+
- When a DELETE request is sent to `/task/{task_id}`, the async Task is cancelled and removed from the dictionary
196+
197+
Besides the Task Manager, the API server uses an in-memory rate limiter ([`fastapi-throttle`](https://github.com/AliYmn/fastapi-throttle)) and Starlette CORS and Auth middleawares to provide authentication (through a `Bearer` token provided with an `Authorization` header) and CORS servicres.
198+
114199
## License
115200

116201
This package is provided under [MIT License](./LICENSE)

packages/lobsterx/pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ build-backend = "uv_build"
44

55
[project]
66
name = "lobsterx"
7-
version = "0.2.0-beta"
7+
version = "0.2.1-beta"
88
description = "Background AI assistant working as a Telegram bot, built specifically for document-related use cases"
99
readme = "README.md"
1010
requires-python = ">=3.11"

packages/lobsterx/src/lobsterx/api/client.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1+
import asyncio
12
import os
3+
import sys
24
from mimetypes import guess_type
35
from typing import Literal
46

@@ -81,3 +83,38 @@ async def cancel_task(self, task_id: str) -> None:
8183
) as client:
8284
response = await client.delete(f"/tasks/{task_id}")
8385
response.raise_for_status()
86+
87+
async def poll_for_task(
88+
self,
89+
task_id: str,
90+
polling_interval: float = 2.0,
91+
max_attempts: int = 900, # 30 minutes
92+
verbose: bool = True,
93+
) -> GetTaskResponse | None:
94+
attempts = 0
95+
async with AsyncClient(
96+
base_url=self.base_url,
97+
headers={"Authorization": f"Bearer {self.api_key}"},
98+
timeout=600,
99+
) as client:
100+
while True:
101+
attempts += 1
102+
response = await client.get(f"/tasks/{task_id}")
103+
response.raise_for_status()
104+
json_response = response.json()
105+
validated = GetTaskResponse.model_validate(json_response)
106+
if validated.status.value == "pending" and attempts < max_attempts:
107+
if verbose:
108+
print(
109+
f"Attempt {attempts}: Task still pending...",
110+
file=sys.stderr,
111+
)
112+
await asyncio.sleep(polling_interval)
113+
elif validated.status.value == "pending" and attempts >= max_attempts:
114+
print(
115+
"Maximum number of attempts reached, exiting...",
116+
file=sys.stderr,
117+
)
118+
return
119+
else:
120+
return validated

packages/lobsterx/src/lobsterx/cli.py

Lines changed: 110 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from .api.shared import LobsterXApiConfig
1616
from .bot import run_bot
1717
from .constants import LOG_LEVELS
18+
from .utils import _setup_agentfs
1819

1920
app = Typer()
2021

@@ -161,7 +162,6 @@ def serve(
161162
int | None,
162163
Option(
163164
"--file-downloads-per-minute",
164-
"-a",
165165
help="Rate limit (per minute) on file downloads. Defaults to 300.",
166166
),
167167
] = None,
@@ -216,6 +216,7 @@ def serve(
216216
file_downloads_per_minute=file_downloads_per_minute,
217217
server_api_key=server_api_key,
218218
)
219+
asyncio.run(_setup_agentfs(with_print=True))
219220
uvicorn.run(app, host=host, port=port)
220221

221222

@@ -407,7 +408,114 @@ def get_task(
407408
)
408409
rprint(
409410
Markdown(
410-
f"## Final Outout\n\n{final_output}\n\n## Activity Report\n\n{report}"
411+
f"## Final Output\n\n{final_output}\n\n## Activity Report\n\n{report}"
412+
)
413+
)
414+
415+
416+
@app.command(
417+
name="wait-task",
418+
help="Poll for a task until it is completed.",
419+
)
420+
def wait_task(
421+
task_id: str,
422+
protocol: Annotated[
423+
Literal["http", "https"],
424+
Option(
425+
"--protocol",
426+
"-t",
427+
help="Protocol for the connection. Defaults to 'http'.",
428+
),
429+
] = "http",
430+
host: Annotated[
431+
str,
432+
Option(
433+
"--bind",
434+
"-b",
435+
help="Host to bind the server to. Defaults to 0.0.0.0",
436+
),
437+
] = "0.0.0.0",
438+
port: Annotated[
439+
int,
440+
Option(
441+
"--port",
442+
"-p",
443+
help="Port to bind the server to. Defaults to 8000",
444+
),
445+
] = 8000,
446+
polling_interval: Annotated[
447+
float,
448+
Option(
449+
"--polling-interval",
450+
"-i",
451+
help="Interval (in seconds) between a polling request and the following one. Defaults to 2 seconds.",
452+
),
453+
] = 2.0,
454+
max_attempts: Annotated[
455+
int,
456+
Option(
457+
"--max-attempts",
458+
"-m",
459+
help="Maximum number of polling attempts. Defaults to 900 (for a total of 30 minutes with the default polling interval).",
460+
),
461+
] = 900,
462+
server_api_key: Annotated[
463+
str | None,
464+
Option(
465+
"--server-key",
466+
help="API key to be used within the server to authorize requests. Reads from LOBSTERX_SERVER_KEY env variable if not provided.",
467+
),
468+
] = None,
469+
config_file: Annotated[
470+
str | None,
471+
Option(
472+
"--config",
473+
"-c",
474+
help="Config file from which to read the LobsterX server configuration. Configured options have precedence over CLI.",
475+
),
476+
] = None,
477+
verbose: Annotated[
478+
bool,
479+
Option(
480+
"--verbose/--no-verbose",
481+
help="Whether or not to enable verbose logging.",
482+
),
483+
] = True,
484+
) -> None:
485+
if config_file is not None:
486+
args = LobsterXApiConfig.load_from_config(config_file)
487+
port = args.port or port
488+
host = args.host or host
489+
protocol = args.protocol or protocol
490+
client = LobsterXClient(
491+
api_key=server_api_key, host=host, port=port, protocol=protocol
492+
)
493+
response = asyncio.run(
494+
client.poll_for_task(
495+
task_id,
496+
polling_interval=polling_interval,
497+
max_attempts=max_attempts,
498+
verbose=verbose,
499+
)
500+
)
501+
if response is None:
502+
return
503+
if response.status.value in ("cancelled", "failed"):
504+
rprint(f"[bold red]Task {task_id} was cancelled or produced an error[/]")
505+
if response.error is not None:
506+
rprint(f"[bold red]Error: {response.error}[/]")
507+
elif response.status.value == "pending":
508+
rprint(f"[bold cyan]Task {task_id} is still being executed[/]")
509+
else:
510+
final_output = (
511+
response.output[1] if response.output is not None else "No final output"
512+
)
513+
report = (
514+
response.output[0] if response.output is not None else "No activity report"
515+
)
516+
rprint(
517+
Markdown(
518+
f"## Final Output\n\n{final_output}\n\n## Activity Report\n\n{report}"
411519
)
412520
)
413521

packages/lobsterx/src/lobsterx/utils.py

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import logging
44
import mimetypes
55
import os
6+
import sys
67
from typing import cast
78

89
import aiofiles
@@ -168,17 +169,37 @@ async def _remove_temporary_report_file(path: str) -> None:
168169
pass
169170

170171

171-
async def _setup_agentfs() -> None:
172+
async def _setup_agentfs(with_print: bool = False) -> None:
172173
if not AGENTFS_FILE.exists():
173-
logging.info("Loading all files in the current working directory to AgentFS")
174+
if not with_print:
175+
logging.info(
176+
"Loading all files in the current working directory to AgentFS"
177+
)
178+
else:
179+
print(
180+
"Loading all files in the current working directory to AgentFS",
181+
file=sys.stderr,
182+
)
174183
await load_all_files(DEFAULT_TO_AVOID, DEFAULT_TO_AVOID_FILES, progress=True)
175-
logging.info(
176-
"Finished loading all files in the current working directory to AgentFS"
177-
)
184+
if not with_print:
185+
logging.info(
186+
"Finished loading all files in the current working directory to AgentFS"
187+
)
188+
else:
189+
print(
190+
"Finished loading all files in the current working directory to AgentFS",
191+
file=sys.stderr,
192+
)
178193
else:
179-
logging.info(
180-
f"Detected {str(AGENTFS_FILE)} in current working directory, will not load files."
181-
)
194+
if not with_print:
195+
logging.info(
196+
f"Detected {str(AGENTFS_FILE)} in current working directory, will not load files."
197+
)
198+
else:
199+
print(
200+
f"Detected {str(AGENTFS_FILE)} in current working directory, will not load files.",
201+
file=sys.stderr,
202+
)
182203

183204

184205
def _escape_markdow_for_tg(markdown: str) -> str:

0 commit comments

Comments
 (0)