-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathrelay_bot.py
More file actions
3703 lines (3301 loc) · 126 KB
/
relay_bot.py
File metadata and controls
3703 lines (3301 loc) · 126 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
relay_bot.py -- Telegram <-> HPC AI Agent relay
v12: Config loaded from .env file. See .env.example for all required settings.
"""
import os
import shlex
import re
import json
import time
import asyncio
import subprocess
import html
import traceback
import threading
import signal
import schedule
from datetime import datetime, timedelta
from typing import Optional, Tuple, List, Dict
from pathlib import Path
from faster_whisper import WhisperModel
from dotenv import load_dotenv
from telegram import (
Update,
InlineKeyboardButton,
InlineKeyboardMarkup,
BotCommand,
)
from telegram.ext import (
Application,
MessageHandler,
ContextTypes,
CommandHandler,
CallbackQueryHandler,
filters,
)
from telegram.constants import ParseMode, ChatAction
# Load .env from the same directory as this script
SCRIPT_DIR = Path(__file__).resolve().parent
load_dotenv(SCRIPT_DIR / ".env")
# ================================================================
# EMOJIS (Unicode Escapes -- ASCII-safe)
# ================================================================
class E:
BOT = "\U0001f916"
SESS = "\U0001f4cb"
WAIT = "\U000023f3"
TOOL = "\U0001f527"
WARN = "\U000026a0"
SHELL = "\U0001f5a5"
OK = "\U00002705"
ERR = "\U0000274c"
TIME = "\U000023f1"
RETRY = "\U0001f504"
OUT = "\U0001f4e4"
# ================================================================
# MARKDOWN -> TELEGRAM HTML CONVERTER
# ================================================================
_FENCE_RE = re.compile(r"```(\w*)\n(.*?)```", re.DOTALL)
_INLINE_CODE_RE = re.compile(r"`([^`\n]+)`")
_BOLD_RE = re.compile(r"\*\*(.+?)\*\*")
_ITALIC_RE = re.compile(r"(?<!\*)\*([^*\n]+?)\*(?!\*)")
_STRIKE_RE = re.compile(r"~~(.+?)~~")
_LINK_RE = re.compile(r"\[([^\]]+)\]\(([^)]+)\)")
_HEADER_RE = re.compile(r"^(#{1,6})\s+(.+)$", re.MULTILINE)
_BQUOTE_RE = re.compile(r"^>\s?(.*)$", re.MULTILINE)
# Indent char for sub-items (thin space + bullet)
INDENT = "\u2003" # em-space
BULLET = "\u2022" # bullet
SUB_BULLET = "\u25e6" # white bullet (for nested)
def md_to_tg_html(text: str) -> str:
"""Convert Markdown to Telegram-compatible HTML. Never raises."""
if not text:
return ""
try:
return _md_to_tg_html_inner(text)
except Exception as exc:
print(f" [md_to_tg_html error:{exc}]")
return html.escape(text) # safe fallback
def _md_to_tg_html_inner(text: str) -> str:
"""Inner converter — may raise; wrapped by md_to_tg_html."""
# 1. Extract fenced code blocks (protect from further processing)
code_blocks = []
def _save_code(m):
lang = m.group(1)
code = html.escape(m.group(2).rstrip())
idx = len(code_blocks)
if lang:
code_blocks.append(
f'<pre><code class="language-{html.escape(lang)}">{code}'
"</code></pre>"
)
else:
code_blocks.append(f"<pre>{code}</pre>")
return f"\x00CB{idx}\x00"
text = _FENCE_RE.sub(_save_code, text)
# 2. Extract inline code
inline_codes = []
def _save_inline(m):
idx = len(inline_codes)
inline_codes.append(f"<code>{html.escape(m.group(1))}</code>")
return f"\x00IC{idx}\x00"
text = _INLINE_CODE_RE.sub(_save_inline, text)
# 3. Escape remaining HTML entities
text = html.escape(text)
# 4. Headers -> bold (with blank line after for spacing)
def _fmt_header(m):
level = len(m.group(1))
content = m.group(2).strip()
if level <= 2:
return f"\n<b>\u2501\u2501 {content} \u2501\u2501</b>"
elif level == 3:
return f"\n<b>\u25b8 {content}</b>"
else:
return f"<b>{content}</b>"
text = _HEADER_RE.sub(_fmt_header, text)
# 5. Bold / italic / strike / links
text = _BOLD_RE.sub(r"<b>\1</b>", text)
text = _ITALIC_RE.sub(r"<i>\1</i>", text)
text = _STRIKE_RE.sub(r"<s>\1</s>", text)
text = _LINK_RE.sub(r'<a href="\2">\1</a>', text)
# 6. Blockquotes -> merged <blockquote>
text = _BQUOTE_RE.sub(lambda m: "\x00BQ" + m.group(1) + "\x00BQ", text)
# Merge consecutive blockquote lines
def _merge_bq(t):
lines = t.split("\n")
result = []
in_bq = False
for line in lines:
if line.startswith("\x00BQ") and line.endswith("\x00BQ"):
content = line[3:-3]
if not in_bq:
result.append("<blockquote>")
in_bq = True
result.append(content)
else:
if in_bq:
result.append("</blockquote>")
in_bq = False
result.append(line)
if in_bq:
result.append("</blockquote>")
return "\n".join(result)
text = _merge_bq(text)
# 7. Lists: process line by line for proper nesting
def _fmt_lists(t):
lines = t.split("\n")
out = []
for line in lines:
# indent_len was hereped) # Removed unused variable 'indent_len'
# Sub-bullet (indented - or *)
m_sub = re.match(r"^\s{2,}[-*]\s+(.*)", line)
if m_sub:
out.append(f"{INDENT}{SUB_BULLET} {m_sub.group(1)}")
continue
# Top-level bullet (- or *)
m_ul = re.match(r"^[-*]\s+(.*)", line)
if m_ul:
out.append(f"{BULLET} {m_ul.group(1)}")
continue
# Numbered list with period: 1. text
m_ol = re.match(r"^(\d+)\.\s+(.*)", line)
if m_ol:
out.append(f"<b>{m_ol.group(1)}.</b> {m_ol.group(2)}")
continue
# Numbered list with paren: 1) text
m_olp = re.match(r"^(\d+)\)\s+(.*)", line)
if m_olp:
out.append(f"\n<b>{m_olp.group(1)})</b> {m_olp.group(2)}")
continue
# Bold key: value pattern (e.g. "Mode: headless")
# Only if line starts with bullet or similar
out.append(line)
return "\n".join(out)
text = _fmt_lists(text)
# 8. Bold key-value patterns: "**Key:** value" or "Key: value" at line
# start after bullet
text = re.sub(
r"("
+ re.escape(BULLET)
+ r"|"
+ re.escape(SUB_BULLET)
+ r")\s*([A-Z][\w\s/]+):\s",
lambda m: f"{m.group(1)} <b>{m.group(2)}:</b> ",
text,
)
# 9. Horizontal rules
text = re.sub(r"^-{3,}$", "\u2500" * 20, text, flags=re.MULTILINE)
text = re.sub(r"^\*{3,}$", "\u2500" * 20, text, flags=re.MULTILINE)
# 10. Clean up excessive blank lines
text = re.sub(r"\n{3,}", "\n\n", text)
# 11. Restore code blocks and inline code
for i, block in enumerate(code_blocks):
text = text.replace(f"\x00CB{i}\x00", block)
for i, code in enumerate(inline_codes):
text = text.replace(f"\x00IC{i}\x00", code)
return text.strip()
# ================================================================
# CONFIG (loaded from .env -- copy .env.example to .env and fill in
# your values)
# ================================================================
TOKEN = os.environ["TELEGRAM_BOT_TOKEN"]
ALLOWED_IDS = set()
_raw_ids = os.environ.get("ALLOWED_CHAT_ID", "")
for _id in _raw_ids.split(","):
if _id.strip():
try:
ALLOWED_IDS.add(int(_id.strip()))
except BaseException:
pass
# Backwards compatibility if needed (optional)
ALLOWED_CHAT_ID = next(iter(ALLOWED_IDS)) if ALLOWED_IDS else 0
SSH_HOST = os.environ.get("SSH_HOST", "hpc")
CONNECTION_MODE = os.environ.get("CONNECTION_MODE", "ssh").lower()
OPENCODE = os.environ.get("OPENCODE_PATH", "opencode")
DEFAULT_MODEL = os.environ.get("DEFAULT_MODEL", "opencode/minimax-m2.5-free")
WORKDIR = os.environ.get("WORKDIR", os.environ.get("HPC_WORKDIR", "~"))
ANSI_RE = re.compile(r"\x1b\[[0-9;]*[A-Za-z]")
RETRY_ON_TIMEOUT = True
# Environment setup commands run before the AI agent on the remote shell.
# Set SETUP_CMD in .env to whatever loads your dependencies.
# Examples:
# Generic Lmod cluster: module load nodejs
# Conda environment:
# source ~/miniconda3/etc/profile.d/conda.sh && conda activate myenv
# No modules needed: (leave blank or omit)
_setup_raw = os.environ.get(
"SETUP_CMD", os.environ.get("HPC_SETUP_CMD", "")
)
SETUP_CMD = _setup_raw.strip() if _setup_raw.strip() else None
# rclone destination for upload command (remote:path format).
# Examples: gdrive:HPC-Results s3:mybucket/results onedrive:Projects
RCLONE_DEST = os.environ.get("RCLONE_DEST", "gdrive:HPC-Results")
TASKS_FILE = os.environ.get(
"TASKS_FILE", str(SCRIPT_DIR / "hpc_relay_scheduled_tasks.json")
)
SESSIONS_FILE = os.environ.get(
"SESSIONS_FILE",
str(SCRIPT_DIR / f"hpc_relay_sessions_{CONNECTION_MODE}.json"),
)
TG_CHUNK = 3800 # conservative; Telegram max is 4096
SHOW_META_HEADER = False # default: chat like a human; use /status for details
STREAM_EDIT_INTERVAL = 1.5
STREAM_MIN_DELTA = 80
STALL_WARN_SEC = 45
PARTIAL_SEND_SEC = 90
STALE_RUNNING_SEC = int(os.environ.get("STALE_RUNNING_SEC", "900"))
TIMEOUT_WARN_SEC = 420 # 7 min: first warning
REMOTE_TIMEOUT_SEC = 1800 # 30 min max
LOCAL_TIMEOUT_SEC = REMOTE_TIMEOUT_SEC + 30
OC_DB_PATH = os.environ.get(
"OC_DB_PATH", "~/.local/share/opencode/opencode.db"
) # on HPC
SYSTEM_SUFFIX = (
" STYLE: Default to natural, human conversational replies in plain text."
" Only use heavy structure (headers/lists/code blocks) when it"
" genuinely improves clarity for technical or multi-step content."
" For simple questions, answer as a short normal paragraph."
" MEMORY BOOTSTRAP: At the beginning of each new task, read local"
" files in current workdir in this order: `./MEMORY.md` then"
" `./AGENTS.md` (if present). If missing, create MEMORY.md in"
" current workdir and continue. If reading fails or permissions"
" reject, continue answering normally without stopping."
" IMPORTANT: Headless / non-interactive mode (`opencode run`)."
" Do NOT invoke ask_questions tool"
" If any tool call is denied/rejected, continue and provide the"
" best possible direct answer instead of stopping. Avoid reading"
" external directories unless the user explicitly asks. Prefer"
" local workspace files first. Unless explicitly told not to by"
" the user, automatically output exactly `@@SEND_FILE: <filepath>@@`"
" whenever you create or reference a small output file (like a png,"
" pdf, or short dataset)."
" If detect user intention to schedule a recurring or future task or"
" ask something to be performed later, output exactly"
" `@@SCHEDULE: <time_format> | <task_prompt>@@` anywhere in your"
" response. For <time_format>, use `every X minutes/hours/days`,"
" `at HH:MM`, or `after X minutes/hours`.")
SHELL_PREFIX = "!"
# ================================================================
# CHANNEL -> WORKSPACE ROUTING
# ================================================================
_CW_RAW = os.environ.get("CHANNEL_WORKSPACES", "").strip()
CHANNEL_WORKSPACES: dict = {}
if _CW_RAW:
try:
CHANNEL_WORKSPACES = json.loads(_CW_RAW)
except json.JSONDecodeError as _e:
print(f" [WARN] CHANNEL_WORKSPACES is invalid JSON: {_e}")
AUTO_WORKSPACE_PER_CHAT = (
os.environ.get("AUTO_WORKSPACE_PER_CHAT", "0")
.strip().lower() in {"1", "true", "yes", "on"}
)
AUTO_WORKSPACE_PREFIX = os.environ.get("AUTO_WORKSPACE_PREFIX", "chat")
WHISPER_MODEL_SIZE = os.environ.get("WHISPER_MODEL_SIZE", "small")
WHISPER_DEVICE = os.environ.get("WHISPER_DEVICE", "auto")
_WHISPER_MODEL = None
def _resolve_workspace(chat_id: int, user_id: int = None) -> dict:
"""Return workspace config for a given chat_id.
Build workspace-specific file paths and system suffix.
"""
ws = CHANNEL_WORKSPACES.get(str(chat_id))
if ws:
name = ws.get("name", f"ws_{chat_id}")
wdir = ws.get("workdir", WORKDIR)
allowed = ws.get("allowed_users")
if allowed is not None:
allowed = [int(u) for u in allowed]
else:
if AUTO_WORKSPACE_PER_CHAT and chat_id is not None:
name = f"{AUTO_WORKSPACE_PREFIX}_{chat_id}"
wdir = WORKDIR
else:
name = None
wdir = WORKDIR
allowed = None
# Build workspace-specific file paths
if name:
tfile = str(SCRIPT_DIR / f"hpc_relay_tasks_{name}.json")
sfile = str(SCRIPT_DIR / f"hpc_relay_sessions_{name}.json")
else:
tfile = TASKS_FILE
sfile = SESSIONS_FILE
# Use workspace-specific suffix if available
if name:
suffix = (
SYSTEM_SUFFIX
+ f" Your workspace / long-term memory directory is '{wdir}'."
" Read and write your persistent notes and schedule data from"
" files in this directory."
)
else:
suffix = SYSTEM_SUFFIX
return {
"workdir": wdir,
"name": name,
"tasks_file": tfile,
"sessions_file": sfile,
"system_suffix": suffix,
"allowed_users": allowed,
}
INBOX_DIR = SCRIPT_DIR / "inbox"
INBOX_DIR.mkdir(exist_ok=True)
RUN_TRACE_DIR = SCRIPT_DIR / "run_traces"
RUN_TRACE_DIR.mkdir(exist_ok=True)
ws_sf = SESSIONS_FILE
ws_tf = TASKS_FILE
OPENCODE_RUN_LOCK = asyncio.Lock()
# ================================================================
# SCHEDULED TASK MANAGER
# ================================================================
_scheduled_tasks = {}
_scheduler_running = False
_scheduler_thread = None
_scheduler_lock = threading.Lock()
def _load_scheduled_tasks(tasks_file=None) -> dict:
tf = tasks_file or TASKS_FILE
try:
with open(tf) as f:
return json.load(f)
except Exception:
return {}
def _save_scheduled_tasks(tasks: dict, tasks_file=None):
tf = tasks_file or TASKS_FILE
tmp = f"{tf}.{os.getpid()}.tmp"
with open(tmp, "w") as f:
json.dump(tasks, f, indent=2)
os.replace(tmp, tf)
def _run_scheduled_task(task_id: str, task_info: dict):
task = task_info.get("task", "")
chat_id = task_info.get("chat_id")
model = task_info.get("model", DEFAULT_MODEL)
session_id = task_info.get("session_id")
print(f" [SCHEDULED TASK {task_id}] Running:{task[:50]}...")
ws = _resolve_workspace(chat_id) if chat_id else {}
ws_sf = ws.get("sessions_file", SESSIONS_FILE)
ws_wd = ws.get("workdir", WORKDIR)
ws_suffix = ws.get("system_suffix", SYSTEM_SUFFIX)
prompt = task + ws_suffix
async def _execute():
status_msg = None
if chat_id and _app:
try:
import html
status_msg = await _app.bot.send_message(
chat_id=chat_id,
text=(
f"{E.WAIT} <b>[Scheduled Task Started]</b>\n"
"Connecting to HPC...\n"
f"<i>Task: {html.escape(task[:50])}...</i>"
),
parse_mode="HTML",
)
except Exception:
pass
text_buf = []
last_edit_t = 0.0
last_output_t = time.time()
start_t = time.time()
done_event = asyncio.Event()
async def on_progress(s):
nonlocal last_edit_t, last_output_t
last_output_t = time.time()
now = time.time()
if now - last_edit_t < 1.0:
return
last_edit_t = now
if status_msg:
await _safe_edit(status_msg, f"{s} [{int(now - start_t)}s]")
async def on_text_chunk(delta):
nonlocal last_edit_t, last_output_t
last_output_t = time.time()
text_buf.append(delta)
now = time.time()
if (now - last_edit_t) < STREAM_EDIT_INTERVAL and len(
delta
) < STREAM_MIN_DELTA:
return
last_edit_t = now
joined = "".join(text_buf)
mx = 3500
tail = joined[-mx:] if len(joined) > mx else joined
if status_msg:
await _safe_edit(
status_msg,
f"{E.WAIT} [Scheduled] Streaming... "
f"[{int(now - start_t)}s]\n\n{tail}",
)
try:
final, new_sid = await run_opencode(
prompt,
chat_id,
model,
session_id,
on_progress,
on_text_chunk,
workdir=ws_wd,
sessions_file=ws_sf,
)
# If scheduled task points to a stale session
# (from another host/bot), auto-retry once with a fresh
# session instead of surfacing legacy guidance.
if (isinstance(final, str)
and "Session does not exist on this machine" in final):
await on_progress(
f"{E.RETRY} Scheduled task session invalid; "
"retrying with a new session..."
)
final, new_sid = await run_opencode(
prompt,
chat_id,
model,
None,
on_progress,
on_text_chunk,
workdir=ws_wd,
sessions_file=ws_sf,
)
except Exception as e:
final = f"{E.ERR} Task failed: {e}\n\n" + ("".join(text_buf))
new_sid = session_id
finally:
done_event.set()
if chat_id and _app:
try:
final_msg = md_to_tg_html(final or "No output.")
# Extract schedules within scheduled task
scheduled_tasks_to_add = re.findall(
r"@@SCHEDULE:\s*(.+?)@@", final, re.IGNORECASE
)
scheduled_hint = ""
for sched_str in scheduled_tasks_to_add:
parts = sched_str.split("|", 1)
if len(parts) == 2:
time_expr = parts[0].strip().lower()
task_prompt = parts[1].strip()
new_info = None
if time_expr.startswith("every "):
m_int = re.match(
r"every\s+(\d+)\s+(minute|hour|day)", time_expr
)
if m_int:
new_info = {
"type": "interval",
"interval": int(m_int.group(1)),
"unit": m_int.group(2) + "s",
"task": task_prompt,
}
elif time_expr.startswith("at "):
m_at = re.match(
r"at\s+(\d{1,2}):(\d{2})", time_expr
)
if m_at:
new_info = {
"type": "daily",
"hour": int(m_at.group(1)),
"minute": int(m_at.group(2)),
"task": task_prompt,
}
elif time_expr.startswith(
"after "
) or time_expr.startswith("in "):
m_aft = re.match(
r"(?:after|in)\s+(\d+)\s+(minute|hour)",
time_expr,
)
if m_aft:
val = int(m_aft.group(1))
unit = m_aft.group(2)
now = datetime.now()
target = now + (
timedelta(minutes=val)
if unit == "minute"
else timedelta(hours=val)
)
new_info = {
"type": "once",
"hour": target.hour,
"minute": target.minute,
"task": task_prompt,
}
if new_info:
new_task_id = (
f"task_{int(time.time())}_"
f"{len(scheduled_hint)}"
)
tasks = _load_scheduled_tasks(ws_tf)
tasks[new_task_id] = {
"schedule": new_info,
"task": new_info["task"],
"chat_id": chat_id,
"model": model,
"session_id": new_sid,
"created_at": datetime.now().isoformat(),
"tasks_file": ws_tf,
}
_save_scheduled_tasks(tasks, ws_tf)
import html
_schedule_job(new_task_id, tasks[new_task_id])
if new_info["type"] == "interval":
desc = (
f"every {new_info['interval']} "
f"{new_info['unit']}"
)
elif new_info["type"] == "daily":
desc = (
f"daily at {new_info['hour']:02d}:"
f"{new_info['minute']:02d}"
)
else:
desc = (
f"once at {new_info['hour']:02d}:"
f"{new_info['minute']:02d}"
)
scheduled_hint += (
f"\n\n{E.OK} <b>Scheduled Task Added!</b>\n"
f"{E.TIME} {desc}\n{E.TOOL} Task: "
f"<code>{html.escape(task_prompt[:50])}</code>"
)
if status_msg is not None:
await _delete_or_check(status_msg)
await _app.bot.send_message(
chat_id=chat_id,
text=final_msg + scheduled_hint,
parse_mode="HTML",
)
# Send files parsing
unique_files = _extract_send_file_directives(final)
if unique_files:
class DummyMessage:
def __init__(self, cid):
self.chat_id = cid
async def reply_text(self, text, parse_mode=None):
await _app.bot.send_message(
chat_id=self.chat_id,
text=text,
parse_mode=parse_mode,
)
async def reply_document(
self, document, filename=None, caption=None
):
await _app.bot.send_document(
chat_id=self.chat_id,
document=document,
filename=filename,
caption=caption,
)
async def reply_photo(self, photo, caption=None):
await _app.bot.send_photo(
chat_id=self.chat_id,
photo=photo,
caption=caption,
)
class DummyUpdate:
def __init__(self, cid):
self.message = DummyMessage(cid)
for f in unique_files[:10]:
try:
await _process_file_request(
DummyUpdate(chat_id), "send", f
)
except Exception as file_exp:
msg = (
f" [SCHEDULED TASK {task_id}] "
f"Error sending file: {file_exp}"
)
print(msg)
except Exception as e:
import traceback
print(
f" [SCHEDULED TASK {task_id}] Error sending message: {e}"
)
traceback.print_exc()
if _loop:
asyncio.run_coroutine_threadsafe(_execute(), _loop)
def _all_task_files() -> list:
files = {str(TASKS_FILE)}
for _cid, ws_cfg in CHANNEL_WORKSPACES.items():
n = ws_cfg.get("name")
if n:
files.add(
str(SCRIPT_DIR / f"hpc_relay_tasks_{n}.json")
)
if AUTO_WORKSPACE_PER_CHAT:
prefix = f"hpc_relay_tasks_{AUTO_WORKSPACE_PREFIX}_*.json"
for tf in SCRIPT_DIR.glob(prefix):
files.add(str(tf))
return sorted(files)
def _find_task_file(task_id: str, preferred_tf: str):
task_files = _all_task_files()
for tf in [preferred_tf] + [x for x in task_files if x != preferred_tf]:
tasks = _load_scheduled_tasks(tf)
if task_id in tasks:
return tf, tasks
return preferred_tf, _load_scheduled_tasks(preferred_tf)
def _collect_chat_tasks(chat_id: int, preferred_tf: str) -> dict:
out = {}
task_files = _all_task_files()
for tf in [preferred_tf] + [x for x in task_files if x != preferred_tf]:
tasks = _load_scheduled_tasks(tf)
for tid, info in tasks.items():
if info.get("chat_id") == chat_id:
if tid not in out:
out[tid] = (info, tf)
return out
def _schedule_job(task_id: str, task_info: dict):
info = task_info.get("schedule", {})
task_type = info.get("type")
if task_type in ("once", "daily"):
hour = info.get("hour")
minute = info.get("minute", 0)
if task_type == "once":
now = datetime.now()
target = now.replace(
hour=hour, minute=minute, second=0, microsecond=0
)
if target <= now:
# If target is past in today, push to tomorrow (this happens
# naturally for 'at' but not 'after', though 'after' generates
# future time)
target += timedelta(days=1)
delay = (target - now).total_seconds()
print(
f" [SCHEDULED TASK] Will run ONCE in {delay/60:.1f} "
f"minutes at {hour:02d}:{minute:02d}"
)
def _delayed_run():
_run_scheduled_task(task_id, task_info)
# Remove self after run
tf_local = task_info.get("tasks_file", TASKS_FILE)
tasks = _load_scheduled_tasks(tf_local)
if task_id in tasks:
del tasks[task_id]
_save_scheduled_tasks(tasks, tf_local)
# Start timer using the actual delay
threading.Timer(delay, _delayed_run).start()
else:
time_str = f"{hour:02d}:{minute:02d}"
print(f" [SCHEDULED TASK] Will run DAILY at {time_str}")
schedule.every().day.at(time_str).do(
_run_scheduled_task, task_id, task_info
)
elif task_type == "interval":
interval = info.get("interval", 1)
unit = info.get("unit", "minutes")
if unit == "minutes":
schedule.every(interval).minutes.do(
_run_scheduled_task, task_id, task_info
)
elif unit == "hours":
schedule.every(interval).hours.do(
_run_scheduled_task, task_id, task_info
)
elif unit == "days":
schedule.every(interval).days.do(
_run_scheduled_task, task_id, task_info
)
def _scheduler_worker():
while _scheduler_running:
schedule.run_pending()
time.sleep(1)
def _start_scheduler():
global _scheduler_running, _scheduler_thread
if _scheduler_running:
return
_scheduler_running = True
_scheduler_thread = threading.Thread(target=_scheduler_worker, daemon=True)
_scheduler_thread.start()
# Load tasks from ALL workspace-specific task files + global
all_task_files = {TASKS_FILE}
for _cid, ws_cfg in CHANNEL_WORKSPACES.items():
name = ws_cfg.get("name")
if name:
t_path = SCRIPT_DIR / f"hpc_relay_tasks_{name}.json"
all_task_files.add(str(t_path))
if AUTO_WORKSPACE_PER_CHAT:
p_str = f"hpc_relay_tasks_{AUTO_WORKSPACE_PREFIX}_*.json"
for tf in SCRIPT_DIR.glob(p_str):
all_task_files.add(str(tf))
total = 0
for tf in all_task_files:
tasks = _load_scheduled_tasks(tf)
for task_id, task_info in tasks.items():
task_info.setdefault("tasks_file", tf)
_schedule_job(task_id, task_info)
total += len(tasks)
print(f" [SCHEDULER] Started, loaded {total} tasks")
def _stop_scheduler():
global _scheduler_running
_scheduler_running = False
_app = None
_loop = None
def set_app_context(app, loop):
global _app, _loop
_app = app
_loop = loop
# ================================================================
# PERSISTENCE (with session history tracking)
# ================================================================
def _load_store(sessions_file=None) -> dict:
sf = sessions_file or SESSIONS_FILE
try:
with open(sf) as f:
d = json.load(f)
return d if isinstance(d, dict) else {}
except Exception:
return {}
def _save_store(d: dict, sessions_file=None):
sf = sessions_file or SESSIONS_FILE
tmp = f"{sf}.{os.getpid()}.tmp"
with open(tmp, "w") as f:
json.dump(d, f, indent=2)
os.replace(tmp, sf)
def get_chat(cid: int, sessions_file=None) -> dict:
v = _load_store(sessions_file).get(str(cid), {})
if isinstance(v, str):
v = {"session_id": v}
v.setdefault("model", DEFAULT_MODEL)
v.setdefault("session_id", None)
return v
def _clear_stale_running_sessions(
sessions_file=None, max_age_sec: int = STALE_RUNNING_SEC
):
store = _load_store(sessions_file)
now = int(time.time())
changed = False
for key, cur in list(store.items()):
if key.startswith("__") or not isinstance(cur, dict):
continue
if not cur.get("running"):
continue
started = int(cur.get("running_started") or 0)
if started <= 0 or now - started >= max_age_sec:
cur["running"] = False
cur["running_started"] = 0
cur["running_prompt"] = ""
cur["kill_requested"] = "no"
store[key] = cur
changed = True
if changed:
_save_store(store, sessions_file)
def update_chat(cid: int, sessions_file=None, **kw):
store = _load_store(sessions_file)
key = str(cid)
cur = store.get(key, {})
if isinstance(cur, str):
cur = {"session_id": cur, "model": DEFAULT_MODEL}
cur.update({k: v for k, v in kw.items() if v is not None})
store[key] = cur
_save_store(store, sessions_file)
def _queue_message(
chat_id: int,
sessions_file,
raw: str,
attached_files=None,
preempt: bool = False,
):
store = _load_store(sessions_file)
key = str(chat_id)
cur = store.get(key, {})
if isinstance(cur, str):
cur = {"session_id": cur, "model": DEFAULT_MODEL}
q = cur.get("pending_messages", [])
if not isinstance(q, list):
q = []
item = {
"id": f"q_{int(time.time() * 1000)}_{len(q)}",
"raw": raw,
"attached_files": attached_files or [],
"preempt": bool(preempt),
}
if preempt:
q.insert(0, item)
msg_cap = repr(raw[:120])
print(f" [QUEUE PREEMPT] Added message for chat={chat_id}: {msg_cap}")
else:
q.append(item)
msg_cap = repr(raw[:120])
print(f" [QUEUE ADD] Added message for chat={chat_id}: {msg_cap}")
cur["pending_messages"] = q
store[key] = cur
_save_store(store, sessions_file)
return len(q)
def _pop_next_message(chat_id: int, sessions_file):
store = _load_store(sessions_file)
key = str(chat_id)
cur = store.get(key, {})
if isinstance(cur, str):
return None
q = cur.get("pending_messages", [])
if not q:
return None
item = q.pop(0)
cur["pending_messages"] = q
store[key] = cur
_save_store(store, sessions_file)
return item
def _pending_count(chat_id: int, sessions_file) -> int:
cur = _load_store(sessions_file).get(str(chat_id), {})
if isinstance(cur, str):
return 0
q = cur.get("pending_messages", [])
return len(q) if isinstance(q, list) else 0
def _has_preempt_message(chat_id: int, sessions_file) -> bool:
cur = _load_store(sessions_file).get(str(chat_id), {})
if isinstance(cur, str):
return False
q = cur.get("pending_messages", [])
if not isinstance(q, list):
return False
return any(isinstance(item, dict) and item.get("preempt") for item in q)
def _record_session(sid: str, sessions_file=None):
"""Add a session ID to the known_sessions history."""
if not sid or not sid.startswith("ses"):
return
store = _load_store(sessions_file)
history = store.get("__known_sessions__", [])
if sid not in history:
history.append(sid)