|
38 | 38 |
|
39 | 39 | from job_manager import WorkspaceManager |
40 | 40 | from key_store import KeyStore |
41 | | -from session_manager import run_claude |
| 41 | +from session_manager import run_claude_streaming |
42 | 42 | from user_store import UserStore |
43 | 43 |
|
44 | 44 | logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") |
|
70 | 70 | onboarding_state: dict[str, str] = {} |
71 | 71 | cluster_setup_state: dict[str, dict] = {} |
72 | 72 |
|
| 73 | +# Store last full response per user for /modelopt logs |
| 74 | +_last_response: dict[str, str] = {} |
| 75 | + |
73 | 76 | # ─── Helpers ───────────────────────────────────────────────────────── |
74 | 77 |
|
75 | 78 |
|
@@ -125,9 +128,13 @@ def is_dm(event: dict) -> bool: |
125 | 128 | • `/modelopt setup` — onboard (auth + cluster config) |
126 | 129 | • `/modelopt add-cluster` — configure a remote cluster |
127 | 130 | • `/modelopt clusters` — list your configured clusters |
| 131 | +• `/modelopt set-env KEY=VALUE` — set personal env var (DM only, e.g. `HF_TOKEN`, `NGC_API_KEY`) |
| 132 | +• `/modelopt env` — list your env vars |
| 133 | +• `/modelopt unset-env KEY` — remove an env var |
128 | 134 |
|
129 | | -*Workspaces:* |
| 135 | +*Workspaces & Logs:* |
130 | 136 | • `/modelopt workspaces` — list your workspaces |
| 137 | +• `/modelopt logs` — upload full output of last task as a file |
131 | 138 | • `/modelopt cleanup` — remove old workspaces |
132 | 139 | • `/modelopt status` — show your current status |
133 | 140 |
|
@@ -361,6 +368,34 @@ async def handle_slash_command(ack, command, say, respond): |
361 | 368 | else: |
362 | 369 | await respond(text="No clusters configured. Use `/modelopt add-cluster` to set one up.") |
363 | 370 |
|
| 371 | + elif subcmd == "set-env": |
| 372 | + if command.get("channel_name") != "directmessage": |
| 373 | + await respond(text=":warning: Use this command in a DM with me (contains secrets).") |
| 374 | + return |
| 375 | + if not args or "=" not in args: |
| 376 | + await respond(text="Usage: `/modelopt set-env HF_TOKEN=hf_abc123...`\n\nCommon variables: `HF_TOKEN`, `NGC_API_KEY`, `DOCKER_TOKEN`") |
| 377 | + return |
| 378 | + key, _, value = args.partition("=") |
| 379 | + user_store.set_env_var(user_id, key.strip(), value.strip()) |
| 380 | + await respond(text=f"`{key.strip()}` saved.") |
| 381 | + |
| 382 | + elif subcmd == "env": |
| 383 | + env_vars = user_store.get_env_vars(user_id) |
| 384 | + if env_vars: |
| 385 | + lines = [f"• `{k}` = `{v}`" for k, v in env_vars.items()] |
| 386 | + await respond(text="*Your env vars* (values masked):\n" + "\n".join(lines) + "\n\nUse `/modelopt set-env KEY=VALUE` to add/update, `/modelopt unset-env KEY` to remove.") |
| 387 | + else: |
| 388 | + await respond(text="No personal env vars set.\n\nUse `/modelopt set-env HF_TOKEN=hf_abc...` to add one.") |
| 389 | + |
| 390 | + elif subcmd == "unset-env": |
| 391 | + if not args: |
| 392 | + await respond(text="Usage: `/modelopt unset-env HF_TOKEN`") |
| 393 | + return |
| 394 | + if user_store.remove_env_var(user_id, args.strip()): |
| 395 | + await respond(text=f"`{args.strip()}` removed.") |
| 396 | + else: |
| 397 | + await respond(text=f"`{args.strip()}` not found.") |
| 398 | + |
364 | 399 | elif subcmd == "workspaces": |
365 | 400 | if not user_store.is_registered(user_id): |
366 | 401 | await respond(text="Not registered yet. Use `/modelopt setup` first.") |
@@ -396,6 +431,18 @@ async def handle_slash_command(ack, command, say, respond): |
396 | 431 | elif subcmd in ("help", ""): |
397 | 432 | await respond(text=HELP_MSG) |
398 | 433 |
|
| 434 | + elif subcmd == "logs": |
| 435 | + last = _last_response.get(user_id) |
| 436 | + if not last: |
| 437 | + await respond(text="No recent task output. Run a task first.") |
| 438 | + return |
| 439 | + await app.client.files_upload_v2( |
| 440 | + channel=channel, |
| 441 | + content=last, |
| 442 | + filename="modelopt_task_log.md", |
| 443 | + title="Last Task Output", |
| 444 | + ) |
| 445 | + |
399 | 446 | else: |
400 | 447 | # Treat as a prompt |
401 | 448 | await respond(text="Processing...") |
@@ -485,31 +532,95 @@ async def _run_job(user_id: str, prompt: str, say_func, channel: str, thread_ts: |
485 | 532 | f"Workspace root: {ws_root} (contains per-model workspaces). " |
486 | 533 | f"Upstream repo: {workspace_mgr.repo_dir} (read-only, use for fresh copies). " |
487 | 534 | f"Read skills/common/workspace-management.md before creating workspaces. " |
488 | | - f"Check existing workspaces with: ls $MODELOPT_WORKSPACE_ROOT/" |
| 535 | + f"Check existing workspaces with: ls $MODELOPT_WORKSPACE_ROOT/ " |
| 536 | + f"SAFETY: You are running unattended — no human can approve actions. " |
| 537 | + f"NEVER run destructive commands (rm -rf /, kill -9, fdisk, mkfs, etc.). " |
| 538 | + f"NEVER modify files outside your workspace ({ws_root}) or the user's remote home directory. " |
| 539 | + f"Do NOT modify the upstream repo ({workspace_mgr.repo_dir}). " |
| 540 | + f"Do NOT modify system files, global configs, or other users' data. " |
| 541 | + f"If a task seems risky or ambiguous, output a warning instead of proceeding." |
489 | 542 | ) |
490 | 543 |
|
491 | | - session_id = str(uuid.uuid5(uuid.NAMESPACE_DNS, f"modelopt-slack-{user_id}")) |
| 544 | + # Session per Slack thread: messages in the same thread share context, |
| 545 | + # new top-level messages start fresh sessions. |
| 546 | + # thread_ts is the parent message ts (or the message's own ts if it IS the parent). |
| 547 | + session_key = f"modelopt-slack-{user_id}-{thread_ts or 'ephemeral'}" |
| 548 | + session_id = str(uuid.uuid5(uuid.NAMESPACE_DNS, session_key)) |
492 | 549 |
|
493 | 550 | if thread_ts: |
494 | | - await say_func(text=":rocket: Working on it...", thread_ts=thread_ts) |
| 551 | + await say_func( |
| 552 | + text=":rocket: Working on it — this may take a while. I'll let you know when it's done.", |
| 553 | + thread_ts=thread_ts, |
| 554 | + ) |
495 | 555 |
|
| 556 | + # Stream internally to keep idle detection alive. Only send final result to Slack. |
| 557 | + full_response = "" |
496 | 558 | try: |
497 | | - response = await run_claude( |
| 559 | + async for chunk in run_claude_streaming( |
498 | 560 | prompt=prompt, |
499 | 561 | cwd=workspace, |
500 | 562 | env=env, |
501 | 563 | session_id=session_id, |
502 | 564 | system_prompt_extra=bot_context, |
503 | | - ) |
| 565 | + ): |
| 566 | + full_response += chunk.text |
| 567 | + if chunk.is_final: |
| 568 | + break |
| 569 | + |
504 | 570 | except Exception as e: |
505 | | - response = f":x: Failed: {e}" |
| 571 | + full_response += f"\n\n:x: Failed: {e}" |
506 | 572 | logger.error("Request failed for user %s: %s", user_id, e) |
507 | 573 |
|
| 574 | + # Send final response |
| 575 | + if not full_response.strip(): |
| 576 | + full_response = "No response from Claude." |
| 577 | + |
| 578 | + # Save for /modelopt logs |
| 579 | + _last_response[user_id] = full_response |
| 580 | + |
508 | 581 | kwargs = {"thread_ts": thread_ts} if thread_ts else {} |
509 | | - if channel and len(response) > MAX_SLACK_LENGTH: |
510 | | - await send_long_response(say_func, response, thread_ts, channel) |
| 582 | + if channel and len(full_response) > MAX_SLACK_LENGTH: |
| 583 | + await send_long_response(say_func, full_response, thread_ts, channel) |
511 | 584 | else: |
512 | | - await say_func(text=truncate(response), **kwargs) |
| 585 | + await say_func(text=truncate(full_response), **kwargs) |
| 586 | + |
| 587 | + |
| 588 | +# ─── Auto Cleanup ──────────────────────────────────────────────────── |
| 589 | + |
| 590 | +SESSION_MAX_AGE_DAYS = int(os.environ.get("SESSION_MAX_AGE_DAYS", "30")) |
| 591 | +CLEANUP_INTERVAL_HOURS = int(os.environ.get("CLEANUP_INTERVAL_HOURS", "6")) |
| 592 | + |
| 593 | + |
| 594 | +async def _auto_cleanup_loop(): |
| 595 | + """Periodically clean up old sessions and workspaces.""" |
| 596 | + while True: |
| 597 | + await asyncio.sleep(CLEANUP_INTERVAL_HOURS * 3600) |
| 598 | + try: |
| 599 | + import time |
| 600 | + |
| 601 | + cutoff = time.time() - SESSION_MAX_AGE_DAYS * 86400 |
| 602 | + total_removed = 0 |
| 603 | + |
| 604 | + for uid in user_store.list_users(): |
| 605 | + # Clean old Claude sessions |
| 606 | + config_dir = Path(user_store.get_claude_config_dir(uid)) |
| 607 | + sessions_dir = config_dir / "projects" |
| 608 | + if sessions_dir.exists(): |
| 609 | + for entry in sessions_dir.iterdir(): |
| 610 | + if entry.is_dir() and entry.stat().st_mtime < cutoff: |
| 611 | + import shutil |
| 612 | + shutil.rmtree(entry, ignore_errors=True) |
| 613 | + total_removed += 1 |
| 614 | + |
| 615 | + # Clean old workspaces (older than 7 days, not the default) |
| 616 | + ws_root = user_store.jobs_dir(uid) |
| 617 | + removed = await workspace_mgr.cleanup_old(ws_root, max_age_days=SESSION_MAX_AGE_DAYS) |
| 618 | + total_removed += removed |
| 619 | + |
| 620 | + if total_removed: |
| 621 | + logger.info("Auto-cleanup: removed %d old sessions/workspaces", total_removed) |
| 622 | + except Exception as e: |
| 623 | + logger.error("Auto-cleanup error: %s", e) |
513 | 624 |
|
514 | 625 |
|
515 | 626 | # ─── Main ──────────────────────────────────────────────────────────── |
@@ -538,6 +649,10 @@ async def main(): |
538 | 649 | logger.error("Claude CLI not found in PATH — bot will not work") |
539 | 650 |
|
540 | 651 | logger.info("Registered users: %d", len(user_store.list_users())) |
| 652 | + logger.info("Auto-cleanup: every %dh, sessions older than %dd", CLEANUP_INTERVAL_HOURS, SESSION_MAX_AGE_DAYS) |
| 653 | + |
| 654 | + # Start background cleanup task |
| 655 | + asyncio.create_task(_auto_cleanup_loop()) |
541 | 656 |
|
542 | 657 | handler = AsyncSocketModeHandler(app, SLACK_APP_TOKEN) |
543 | 658 | await handler.start_async() |
|
0 commit comments