-
Notifications
You must be signed in to change notification settings - Fork 59
Expand file tree
/
Copy pathapi_runtime_console_ops.py
More file actions
2004 lines (1841 loc) Β· 78.9 KB
/
api_runtime_console_ops.py
File metadata and controls
2004 lines (1841 loc) Β· 78.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""Console config, gateway, and custom MCP operations."""
from __future__ import annotations
from collections.abc import Mapping
from datetime import UTC, datetime
from pathlib import Path
import asyncio
import json
import os
import re
import subprocess
import sys
import tempfile
import time
from typing import Any
from packages.tools import sync_custom_mcp_tools
from packages.runtime_config import (
global_config_path_for_state_dir,
global_config_schema,
load_global_config,
load_extensions_from_config,
parse_global_config_text,
read_global_config_text,
write_global_config,
)
def _write_manifest_to_config(state_dir: Path, manifest: Mapping[str, Any]) -> Path:
"""Write manifest data (gateway, extensions) to config.yaml.
Treats the fields *present* in ``manifest`` as authoritative:
- If ``manifest["gateway"]`` is a Mapping, the persisted gateway
section is *replaced* by it (not merged). This ensures keys
removed from the manifest (e.g. dropping the last account) are
actually dropped on disk.
- If ``manifest["gateway"]`` is explicitly ``None`` or an empty
Mapping, the gateway section is deleted from config. Callers
signal "drop this section" by setting ``manifest["gateway"] =
None``.
- If ``manifest`` omits ``gateway`` entirely, the persisted
gateway section is left untouched. This preserves compatibility
with callers (e.g. operator settings patcher) that only care
about other sections.
"""
config_path = global_config_path_for_state_dir(state_dir)
config = load_global_config(config_path, state_dir=state_dir)
# Gateway section
if "gateway" in manifest:
gateway_payload = manifest.get("gateway")
if isinstance(gateway_payload, Mapping) and gateway_payload:
# Replace rather than merge so removed keys are honoured.
config["gateway"] = dict(gateway_payload)
else:
# Explicit None / empty mapping β delete the section.
config.pop("gateway", None)
# (else: caller omitted gateway entirely β leave persisted value alone.)
# Provider section
provider_profile = manifest.get("provider_profile")
if isinstance(provider_profile, Mapping):
models = config.get("models", {})
models["provider"] = dict(provider_profile)
models["default_provider_source"] = "config"
config["models"] = models
# Extension keys
extension_keys = (
"tool_manifests",
"skill_manifests",
"skill_overrides",
"tool_overrides",
"skill_packages",
)
extensions = config.get("extensions", {})
for key in extension_keys:
if key in manifest:
extensions[key] = manifest[key]
if extensions:
config["extensions"] = extensions
write_global_config(config_path, config)
return config_path
def _read_json_file(path: Path) -> Any:
try:
return json.loads(path.read_text(encoding="utf-8"))
except (OSError, ValueError, json.JSONDecodeError):
return None
def _load_manifest_from_config(state_dir: Path) -> dict[str, Any]:
"""Load manifest data (gateway, extensions) from config.yaml for the given state_dir."""
from packages.runtime_config import global_config_path_for_state_dir
config_path = global_config_path_for_state_dir(state_dir)
try:
config = load_global_config(config_path, state_dir=state_dir)
result: dict[str, Any] = {}
extensions = load_extensions_from_config(config)
if extensions:
result.update(extensions)
gateway = config.get("gateway")
if isinstance(gateway, Mapping):
result["gateway"] = dict(gateway)
return result
except (OSError, ValueError):
pass
return {}
def _read_text_file(path: Path, *, max_chars: int = 20_000) -> str | None:
try:
text = path.read_text(encoding="utf-8", errors="replace")
except OSError:
return None
if len(text) <= max_chars:
return text
return text[-max_chars:]
GATEWAY_LOCAL_SECRET_ENV_FILE = "gateway-local-secrets.json"
DEFAULT_GATEWAY_ACCOUNT_ID = "default"
_GATEWAY_SERVICE_SPECS: tuple[dict[str, Any], ...] = (
{
"service": "weixin",
"label": "WeChat",
"adapterId": "messaging.weixin",
"surface": "weixin-ilink",
"defaultTransport": "ilink",
"transports": ("ilink",),
"summary": "WeChat iLink bridge with the same scan-to-login QR flow as `elephant gateway setup`.",
"eventPath": "/weixin/events",
"secretFields": (),
"supportsDirectConfig": True,
"setupNote": "Click Connect & start WeChat, scan the QR with WeChat, then Dashboard automatically detects confirmation and starts the bridge.",
},
{
"service": "feishu",
"label": "Feishu",
"adapterId": "messaging.feishu",
"surface": "feishu-messaging",
"defaultTransport": "long-connection",
"transports": ("long-connection",),
"summary": "Feishu bot long-connection bridge for p2p and group chat messages.",
"eventPath": "/feishu/events",
"secretFields": (
{
"key": "app_id",
"label": "App ID",
"defaultEnvVar": "ELEPHANT_FEISHU_APP_ID",
},
{
"key": "app_secret",
"label": "App Secret",
"defaultEnvVar": "ELEPHANT_FEISHU_APP_SECRET",
},
),
"supportsDirectConfig": True,
},
{
"service": "discord",
"label": "Discord",
"adapterId": "messaging.discord",
"surface": "discord-gateway",
"defaultTransport": "gateway",
"transports": ("gateway",),
"summary": "Discord bot gateway bridge for DMs, channels, and threads.",
"secretFields": (
{
"key": "bot_token",
"label": "Bot token",
"defaultEnvVar": "ELEPHANT_DISCORD_BOT_TOKEN",
},
),
"supportsDirectConfig": True,
},
{
"service": "dingding",
"label": "DingDing",
"adapterId": "messaging.dingding",
"surface": "dingding-stream",
"defaultTransport": "stream",
"transports": ("stream",),
"summary": "DingDing stream bridge for chatbot messages.",
"secretFields": (
{
"key": "client_id",
"label": "Client ID",
"defaultEnvVar": "ELEPHANT_DINGDING_CLIENT_ID",
},
{
"key": "client_secret",
"label": "Client Secret",
"defaultEnvVar": "ELEPHANT_DINGDING_CLIENT_SECRET",
},
{
"key": "robot_code",
"label": "Robot Code",
"defaultEnvVar": "ELEPHANT_DINGDING_ROBOT_CODE",
},
),
"supportsDirectConfig": True,
},
{
"service": "wecom",
"label": "WeCom",
"adapterId": "messaging.wecom",
"surface": "wecom-websocket",
"defaultTransport": "websocket",
"transports": ("websocket",),
"summary": "WeCom AI Bot WebSocket bridge for chats and groups.",
"secretFields": (
{
"key": "bot_id",
"label": "Bot ID",
"defaultEnvVar": "ELEPHANT_WECOM_BOT_ID",
},
{
"key": "secret",
"label": "Secret",
"defaultEnvVar": "ELEPHANT_WECOM_SECRET",
},
),
"supportsDirectConfig": True,
},
)
_GATEWAY_SERVICE_BY_KEY = {str(spec["service"]): spec for spec in _GATEWAY_SERVICE_SPECS}
def _tail_lines(path: Path, *, max_lines: int = 160) -> tuple[str, ...]:
text = _read_text_file(path, max_chars=80_000)
if not text:
return ()
return tuple(text.splitlines()[-max_lines:])
def _logs(state_dir: Path) -> list[dict[str, Any]]:
candidates = [
*state_dir.glob("*.log"),
]
seen: set[Path] = set()
rows: list[dict[str, Any]] = []
for path in sorted(candidates):
resolved = path.resolve()
if resolved in seen or not path.is_file():
continue
seen.add(resolved)
rows.append(
{
"name": path.name,
"path": str(path),
"size": path.stat().st_size,
"updatedAt": datetime.fromtimestamp(path.stat().st_mtime, UTC).isoformat(),
"tail": _tail_lines(path),
}
)
return rows
def _gateway_local_secret_env_path(gateway_dir: Path) -> Path:
return gateway_dir / GATEWAY_LOCAL_SECRET_ENV_FILE
def _load_gateway_local_secret_env(gateway_dir: Path) -> dict[str, str]:
payload = _read_json_file(_gateway_local_secret_env_path(gateway_dir))
if not isinstance(payload, Mapping):
return {}
return {str(key): str(value) for key, value in payload.items() if str(value).strip()}
def _persist_gateway_local_secret_env(gateway_dir: Path, updates: Mapping[str, str]) -> Path | None:
filtered = {str(key): str(value).strip() for key, value in updates.items() if str(value).strip()}
if not filtered:
return None
gateway_dir.mkdir(parents=True, exist_ok=True)
path = _gateway_local_secret_env_path(gateway_dir)
payload = _load_gateway_local_secret_env(gateway_dir)
payload.update(filtered)
path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
try:
os.chmod(path, 0o600)
except OSError:
pass
return path
def _delete_gateway_local_secret_env(gateway_dir: Path, keys: tuple[str, ...]) -> Path | None:
if not keys:
return None
path = _gateway_local_secret_env_path(gateway_dir)
payload = _load_gateway_local_secret_env(gateway_dir)
changed = False
for key in keys:
if key in payload:
payload.pop(key, None)
changed = True
if not changed:
return None
if payload:
path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
try:
os.chmod(path, 0o600)
except OSError:
pass
else:
try:
path.unlink()
except FileNotFoundError:
pass
return path
def _gateway_account_suffix(account_id: str) -> str:
return re.sub(r"[^A-Za-z0-9]+", "_", account_id.strip()).strip("_").upper() or "DEFAULT"
def _default_gateway_secret_env_var(service: str, account_id: str, secret_key: str, default_env_var: str) -> str:
if account_id == DEFAULT_GATEWAY_ACCOUNT_ID:
return default_env_var
return f"ELEPHANT_{service.upper()}_{_gateway_account_suffix(account_id)}_{secret_key.upper()}"
def _gateway_account_secret_env_var(
*,
service: str,
account: Mapping[str, Any],
account_id: str,
secret_key: str,
default_env_var: str,
) -> str:
env_payload = account.get("env")
if isinstance(env_payload, Mapping):
text = str(env_payload.get(secret_key) or "").strip()
if text:
return text
secret_refs = account.get("secret_references")
if isinstance(secret_refs, (list, tuple)):
for ref in secret_refs:
if not isinstance(ref, Mapping) or str(ref.get("secret_key") or "") != secret_key:
continue
metadata = ref.get("metadata")
if isinstance(metadata, Mapping):
text = str(metadata.get("env_var") or "").strip()
if text:
return text
return _default_gateway_secret_env_var(service, account_id, secret_key, default_env_var)
def _gateway_runtime_service_key(row: Mapping[str, Any]) -> str:
content = row.get("content")
if isinstance(content, Mapping):
return str(content.get("service_key") or "")
name = str(row.get("name") or "")
return name.split("-", 1)[0] if "-" in name else ""
def _pid_is_alive(pid: Any) -> bool | None:
"""Return True if pid is a live process, False if dead, None if no pid recorded."""
if pid is None:
return None
try:
pid_int = int(pid)
except (ValueError, TypeError):
return False
if pid_int <= 0:
return False
try:
os.kill(pid_int, 0)
except OSError:
return False
return True
def _gateway_runtime_status(row: Mapping[str, Any]) -> str:
"""Return one of 'running', 'starting', 'failed', 'stopped'.
Collapses recorded runtime status against actual pid liveness so the
dashboard reflects reality (e.g. a 'running' record whose pid died is
reported as 'stopped').
"""
content = row.get("content")
if not isinstance(content, Mapping):
return "stopped"
recorded = str(content.get("status") or "").lower()
alive = _pid_is_alive(content.get("pid"))
if recorded == "running":
if alive is False:
return "stopped"
return "running"
if recorded == "starting":
if alive is False:
return "stopped"
return "starting"
if recorded == "failed":
return "failed"
return "stopped"
def _gateway_runtime_is_running(row: Mapping[str, Any]) -> bool:
return _gateway_runtime_status(row) == "running"
def _gateway_runtime_is_starting(row: Mapping[str, Any]) -> bool:
return _gateway_runtime_status(row) == "starting"
def _gateway_services(
*,
gateway_dir: Path,
state_dir: Path | None,
runtime_files: list[dict[str, Any]],
) -> list[dict[str, Any]]:
manifest = _load_manifest_from_config(state_dir) if state_dir is not None else None
gateway_manifest = manifest.get("gateway") if isinstance(manifest, Mapping) else None
adapters = gateway_manifest.get("adapters") if isinstance(gateway_manifest, Mapping) else None
adapters_payload = adapters if isinstance(adapters, Mapping) else {}
local_secrets = _load_gateway_local_secret_env(gateway_dir)
rows: list[dict[str, Any]] = []
for spec in _GATEWAY_SERVICE_SPECS:
service = str(spec["service"])
adapter = adapters_payload.get(service)
adapter_payload = adapter if isinstance(adapter, Mapping) else {}
account_rows = (
[dict(item) for item in adapter_payload.get("accounts", ()) if isinstance(item, Mapping)]
if isinstance(adapter_payload.get("accounts"), (list, tuple))
else []
)
primary_account = account_rows[0] if account_rows else {}
account_id = str(primary_account.get("account_id") or DEFAULT_GATEWAY_ACCOUNT_ID)
secret_fields = []
for field in spec.get("secretFields", ()):
if not isinstance(field, Mapping):
continue
secret_key = str(field.get("key") or "").strip()
default_env_var = str(field.get("defaultEnvVar") or "").strip()
env_var = _gateway_account_secret_env_var(
service=service,
account=primary_account,
account_id=account_id,
secret_key=secret_key,
default_env_var=default_env_var,
)
secret_fields.append(
{
"key": secret_key,
"label": str(field.get("label") or secret_key),
"hasValue": bool(local_secrets.get(env_var)),
}
)
service_runtime_files = [row for row in runtime_files if _gateway_runtime_service_key(row) == service]
control = adapter_payload.get("control") if isinstance(adapter_payload.get("control"), Mapping) else {}
configured_transport = str(
primary_account.get("surface") or adapter_payload.get("surface") or spec.get("defaultTransport") or ""
)
enabled = adapter_payload.get("enabled") is True
runtime_states = [_gateway_runtime_status(row) for row in service_runtime_files]
is_running = any(state == "running" for state in runtime_states)
is_starting = (not is_running) and any(state == "starting" for state in runtime_states)
last_error = ""
for row in service_runtime_files:
content = row.get("content") if isinstance(row, Mapping) else None
if isinstance(content, Mapping):
err = str(content.get("last_error") or "").strip()
if err:
last_error = err
break
rows.append(
{
**{key: value for key, value in spec.items() if key != "secretFields"},
"enabled": enabled,
"configured": bool(account_rows),
"configuredTransport": configured_transport,
"accountCount": len(account_rows),
"accounts": tuple(account_rows),
"primaryAccountId": account_id,
"eventPath": str(
primary_account.get("event_path")
or adapter_payload.get("event_path")
or spec.get("eventPath")
or ""
),
"allowGroupChats": bool(control.get("allow_group_chats") is True),
"secretFields": tuple(secret_fields),
"runtimeFiles": tuple(service_runtime_files),
"running": is_running,
"starting": is_starting,
"lastError": last_error,
}
)
return rows
def _gateway(state_dir: Path) -> dict[str, Any]:
# Gateway shares CLI's state dir β runtime status files sit directly in it
# (no legacy `<state_dir>/gateway` subdir).
gateway_dir = state_dir
runtime_files = []
for path in sorted((*gateway_dir.glob("*.runtime.json"), *gateway_dir.glob("*.pid"))):
if not path.is_file():
continue
runtime_files.append(
{
"name": path.name,
"path": str(path),
"updatedAt": datetime.fromtimestamp(path.stat().st_mtime, UTC).isoformat(),
"content": _read_json_file(path) if path.suffix == ".json" else _read_text_file(path, max_chars=4_000),
}
)
services = _gateway_services(gateway_dir=gateway_dir, state_dir=state_dir, runtime_files=runtime_files)
return {
"gatewayDir": str(gateway_dir),
"exists": gateway_dir.exists(),
"runtimeFiles": runtime_files,
"logs": _logs(gateway_dir) if gateway_dir.exists() else [],
"services": services,
"configuredServiceCount": sum(1 for service in services if service["configured"]),
"runningServiceCount": sum(1 for service in services if service["running"]),
"startingServiceCount": sum(1 for service in services if service.get("starting")),
}
def _gateway_manifest(state_dir: Path) -> dict[str, Any]:
manifest = _load_manifest_from_config(state_dir)
return dict(manifest) if isinstance(manifest, Mapping) else {}
def _gateway_adapter_payload(
manifest: Mapping[str, Any], service: str
) -> tuple[dict[str, Any], dict[str, Any], dict[str, Any]]:
gateway_payload = manifest.get("gateway") if isinstance(manifest.get("gateway"), Mapping) else {}
adapters_payload = gateway_payload.get("adapters") if isinstance(gateway_payload.get("adapters"), Mapping) else {}
adapter_payload = adapters_payload.get(service) if isinstance(adapters_payload.get(service), Mapping) else {}
return dict(gateway_payload), dict(adapters_payload), dict(adapter_payload)
def _gateway_accounts(adapter_payload: Mapping[str, Any]) -> list[dict[str, Any]]:
accounts = adapter_payload.get("accounts")
if not isinstance(accounts, (list, tuple)):
return []
return [dict(account) for account in accounts if isinstance(account, Mapping)]
def _gateway_upsert_account(accounts: list[dict[str, Any]], account: Mapping[str, Any]) -> list[dict[str, Any]]:
account_id = str(account.get("account_id") or DEFAULT_GATEWAY_ACCOUNT_ID)
updated = False
rows: list[dict[str, Any]] = []
for existing in accounts:
if str(existing.get("account_id") or DEFAULT_GATEWAY_ACCOUNT_ID) == account_id:
rows.append(dict(account))
updated = True
else:
rows.append(existing)
if not updated:
rows.append(dict(account))
return rows
def _gateway_secret_reference(*, service: str, account_id: str, secret_key: str, env_var: str) -> dict[str, Any]:
normalized_account = service if account_id == DEFAULT_GATEWAY_ACCOUNT_ID else f"{service}-{account_id}"
return {
"reference_id": f"secret-{normalized_account}-{secret_key.replace('_', '-')}",
"provider_id": _GATEWAY_SERVICE_BY_KEY[service]["adapterId"],
"secret_name": secret_key,
"secret_key": secret_key,
"metadata": {"env_var": env_var},
}
def _gateway_qr_matrix(scan_data: str) -> tuple[tuple[int, ...], ...]:
try:
import qrcode
except Exception:
return ()
qr = qrcode.QRCode(border=2)
qr.add_data(scan_data)
qr.make(fit=True)
return tuple(tuple(1 if cell else 0 for cell in row) for row in qr.get_matrix())
def _gateway_weixin_session_store(self) -> dict[str, dict[str, Any]]:
store = getattr(self, "_gateway_weixin_qr_sessions", None)
if not isinstance(store, dict):
store = {}
setattr(self, "_gateway_weixin_qr_sessions", store)
return store
def _gateway_weixin_config_from_payload(payload: Mapping[str, Any]) -> dict[str, Any]:
config = payload.get("config") if isinstance(payload.get("config"), Mapping) else payload
return dict(config)
def _gateway_weixin_qr_payload(
session_id: str, session_state: Mapping[str, Any], *, status: str = "wait"
) -> dict[str, Any]:
scan_data = str(session_state.get("qrScanData") or "")
return {
"status": status,
"service": "weixin",
"action": "qr",
"sessionId": session_id,
"qrcode": session_state.get("qrcode"),
"qrcodeUrl": session_state.get("qrcodeUrl"),
"qrScanData": scan_data,
"qrMatrix": _gateway_qr_matrix(scan_data) if scan_data else (),
"expiresAt": session_state.get("expiresAt"),
}
async def _fetch_weixin_qr(*, bot_type: str) -> dict[str, Any]:
from apps.gateway import weixin_support as wx
if not wx.check_weixin_requirements():
raise RuntimeError(
"WeChat QR login requires aiohttp and cryptography. Install gateway WeChat dependencies first."
)
async with wx.aiohttp.ClientSession(trust_env=True, connector=wx._make_ssl_connector()) as session:
return await wx._api_get(
session,
base_url=wx.ILINK_BASE_URL,
endpoint=f"{wx.EP_GET_BOT_QR}?bot_type={bot_type}",
timeout_ms=wx.QR_TIMEOUT_MS,
)
async def _poll_weixin_qr(*, qrcode: str, base_url: str) -> dict[str, Any]:
from apps.gateway import weixin_support as wx
if not wx.check_weixin_requirements():
raise RuntimeError(
"WeChat QR login requires aiohttp and cryptography. Install gateway WeChat dependencies first."
)
async with wx.aiohttp.ClientSession(trust_env=True, connector=wx._make_ssl_connector()) as session:
return await wx._api_get(
session,
base_url=base_url,
endpoint=f"{wx.EP_GET_QR_STATUS}?qrcode={qrcode}",
timeout_ms=wx.QR_TIMEOUT_MS,
)
def _gateway_weixin_qr_start(self, payload: Mapping[str, Any]) -> dict[str, Any]:
qr_resp = asyncio.run(_fetch_weixin_qr(bot_type=str(payload.get("botType") or "3")))
qrcode_value = str(qr_resp.get("qrcode") or "")
qrcode_url = str(qr_resp.get("qrcode_img_content") or "")
if not qrcode_value:
raise ValueError("WeChat QR response did not include qrcode")
session_id = f"weixin-qr-{int(time.time() * 1000)}"
scan_data = qrcode_url if qrcode_url else qrcode_value
expires_at = datetime.fromtimestamp(time.time() + 480, UTC).isoformat()
session_state = {
"qrcode": qrcode_value,
"qrcodeUrl": qrcode_url,
"qrScanData": scan_data,
"baseUrl": "https://ilinkai.weixin.qq.com",
"expiresAt": expires_at,
"config": _gateway_weixin_config_from_payload(payload),
}
_gateway_weixin_session_store(self)[session_id] = session_state
return _gateway_weixin_qr_payload(session_id, session_state, status="wait")
def _gateway_persist_weixin_credentials(
self, credentials: Mapping[str, Any], config: Mapping[str, Any]
) -> dict[str, Any]:
from apps.gateway import weixin_support as wx
database_path = self.repository.database_path
state_dir = database_path.parent
manifest = _gateway_manifest(state_dir)
gateway_payload, adapters_payload, adapter_payload = _gateway_adapter_payload(manifest, "weixin")
accounts = _gateway_accounts(adapter_payload)
account_id = str(credentials.get("account_id") or credentials.get("ilink_bot_id") or "").strip()
token = str(credentials.get("token") or credentials.get("bot_token") or "").strip()
if not account_id or not token:
raise ValueError("WeChat QR confirmation did not include account_id and token")
wx.save_weixin_account(
str(state_dir),
account_id=account_id,
token=token,
base_url=str(credentials.get("base_url") or credentials.get("baseurl") or wx.ILINK_BASE_URL),
user_id=str(credentials.get("user_id") or credentials.get("ilink_user_id") or ""),
)
control_payload = (
dict(adapter_payload.get("control")) if isinstance(adapter_payload.get("control"), Mapping) else {}
)
allow_group_chats = (
bool(config.get("allowGroupChats"))
if isinstance(config.get("allowGroupChats"), bool)
else bool(control_payload.get("allow_group_chats") is True)
)
account_payload: dict[str, Any] = {
"account_id": account_id,
"token": token,
"base_url": str(credentials.get("base_url") or credentials.get("baseurl") or wx.ILINK_BASE_URL),
"user_id": str(credentials.get("user_id") or credentials.get("ilink_user_id") or ""),
"surface": "ilink",
"enabled": bool(config.get("accountEnabled")) if isinstance(config.get("accountEnabled"), bool) else True,
}
event_path = str(
config.get("eventPath") or config.get("event_path") or adapter_payload.get("event_path") or "/weixin/events"
).strip()
if event_path:
account_payload["event_path"] = event_path
adapter_payload["accounts"] = _gateway_upsert_account(accounts, account_payload)
adapter_payload["surface"] = "ilink"
adapter_payload["enabled"] = bool(config.get("enabled")) if isinstance(config.get("enabled"), bool) else True
adapter_payload["event_path"] = event_path
control_payload.pop("default_elephant_id", None)
control_payload.pop("default_session_id", None)
control_payload.pop("auto_create_elephant", None)
if allow_group_chats:
control_payload["allow_group_chats"] = True
else:
control_payload.pop("allow_group_chats", None)
if control_payload:
adapter_payload["control"] = control_payload
else:
adapter_payload.pop("control", None)
adapters_payload["weixin"] = adapter_payload
gateway_payload["adapters"] = adapters_payload
manifest["gateway"] = gateway_payload
manifest_path = _write_manifest_to_config(state_dir, manifest)
return {
"profileManifestPath": str(manifest_path),
"gateway": _gateway(state_dir),
}
def _gateway_weixin_qr_poll(self, payload: Mapping[str, Any]) -> dict[str, Any]:
session_id = str(payload.get("sessionId") or payload.get("session_id") or "").strip()
store = _gateway_weixin_session_store(self)
session_state = store.get(session_id)
if not session_id or session_state is None:
raise ValueError("WeChat QR session is missing or expired; start QR setup again")
if time.time() > datetime.fromisoformat(str(session_state["expiresAt"])).timestamp():
store.pop(session_id, None)
return {
**_gateway_weixin_qr_payload(session_id, session_state, status="expired"),
"message": "QR session expired; start again.",
}
status_resp = asyncio.run(
_poll_weixin_qr(
qrcode=str(session_state["qrcode"]),
base_url=str(session_state.get("baseUrl") or "https://ilinkai.weixin.qq.com"),
)
)
status = str(status_resp.get("status") or "wait")
if status == "scaned_but_redirect":
redirect_host = str(status_resp.get("redirect_host") or "").strip()
if redirect_host:
session_state["baseUrl"] = f"https://{redirect_host}"
return {
**_gateway_weixin_qr_payload(session_id, session_state, status=status),
"message": "Redirected QR polling host.",
}
if status == "confirmed":
credentials = {
"account_id": str(status_resp.get("ilink_bot_id") or ""),
"token": str(status_resp.get("bot_token") or ""),
"base_url": str(status_resp.get("baseurl") or "https://ilinkai.weixin.qq.com"),
"user_id": str(status_resp.get("ilink_user_id") or ""),
}
persisted = _gateway_persist_weixin_credentials(self, credentials, dict(session_state.get("config") or {}))
store.pop(session_id, None)
return {
**_gateway_weixin_qr_payload(session_id, session_state, status="confirmed"),
"message": f"WeChat connected as {credentials['account_id']}",
"credentials": {
"account_id": credentials["account_id"],
"base_url": credentials["base_url"],
"user_id": credentials["user_id"],
},
**persisted,
}
if status == "need_verifycode":
return {
**_gateway_weixin_qr_payload(session_id, session_state, status=status),
"message": "Scanned. Please confirm the verification code on your phone to continue.",
}
return {
**_gateway_weixin_qr_payload(session_id, session_state, status=status),
"message": "Scan the QR with WeChat and confirm login.",
}
def _gateway_configure_service(self, payload: Mapping[str, Any], *, service: str) -> dict[str, Any]:
spec = _GATEWAY_SERVICE_BY_KEY[service]
config = payload.get("config") if isinstance(payload.get("config"), Mapping) else payload
database_path = self.repository.database_path
state_dir = database_path.parent
gateway_dir = state_dir
manifest = _gateway_manifest(state_dir)
gateway_payload, adapters_payload, adapter_payload = _gateway_adapter_payload(manifest, service)
accounts = _gateway_accounts(adapter_payload)
account_id = (
str(config.get("accountId") or config.get("account_id") or DEFAULT_GATEWAY_ACCOUNT_ID).strip()
or DEFAULT_GATEWAY_ACCOUNT_ID
)
existing_account = next(
(account for account in accounts if str(account.get("account_id") or DEFAULT_GATEWAY_ACCOUNT_ID) == account_id),
{},
)
transport = str(
config.get("transport")
or existing_account.get("surface")
or adapter_payload.get("surface")
or spec.get("defaultTransport")
or ""
).strip()
if transport not in tuple(spec.get("transports", ())):
raise ValueError(f"gateway {service} transport must be one of {', '.join(spec.get('transports', ()))}")
enabled = (
bool(config.get("enabled"))
if isinstance(config.get("enabled"), bool)
else bool(adapter_payload.get("enabled") is not False)
)
account_enabled = (
bool(config.get("accountEnabled"))
if isinstance(config.get("accountEnabled"), bool)
else bool(existing_account.get("enabled") is not False)
)
event_path = str(
config.get("eventPath")
or config.get("event_path")
or existing_account.get("event_path")
or adapter_payload.get("event_path")
or spec.get("eventPath")
or ""
).strip()
allow_group_chats = (
bool(config.get("allowGroupChats"))
if isinstance(config.get("allowGroupChats"), bool)
else bool(
(adapter_payload.get("control") if isinstance(adapter_payload.get("control"), Mapping) else {}).get(
"allow_group_chats"
)
is True
)
)
secrets = config.get("secrets") if isinstance(config.get("secrets"), Mapping) else {}
secret_fields = tuple(field for field in spec.get("secretFields", ()) if isinstance(field, Mapping))
env_payload: dict[str, str] = {}
secret_updates: dict[str, str] = {}
for field in secret_fields:
secret_key = str(field.get("key") or "").strip()
default_env_var = str(field.get("defaultEnvVar") or "").strip()
env_var = _gateway_account_secret_env_var(
service=service,
account={},
account_id=account_id,
secret_key=secret_key,
default_env_var=default_env_var,
)
env_payload[secret_key] = env_var
raw_secret = str(secrets.get(secret_key) or "").strip()
if raw_secret:
secret_updates[env_var] = raw_secret
account_payload: dict[str, Any] = {
"account_id": account_id,
"surface": transport,
"enabled": account_enabled,
}
if event_path:
account_payload["event_path"] = event_path
if service == "feishu":
account_payload["secret_references"] = tuple(
_gateway_secret_reference(
service=service,
account_id=account_id,
secret_key=secret_key,
env_var=env_var,
)
for secret_key, env_var in env_payload.items()
)
elif env_payload:
account_payload["env"] = env_payload
for preserved_key in ("runtime", "token", "base_url", "user_id"):
if preserved_key in existing_account and preserved_key not in account_payload:
account_payload[preserved_key] = existing_account[preserved_key]
allow_guild_ids = config.get("allowGuildIds")
if isinstance(allow_guild_ids, list):
account_payload["allow_guild_ids"] = [str(item).strip() for item in allow_guild_ids if str(item).strip()]
allow_channel_ids = config.get("allowChannelIds")
if isinstance(allow_channel_ids, list):
account_payload["allow_channel_ids"] = [str(item).strip() for item in allow_channel_ids if str(item).strip()]
adapter_payload["accounts"] = _gateway_upsert_account(accounts, account_payload)
adapter_payload["surface"] = transport
adapter_payload["enabled"] = enabled
if event_path:
adapter_payload["event_path"] = event_path
control_payload = (
dict(adapter_payload.get("control")) if isinstance(adapter_payload.get("control"), Mapping) else {}
)
control_payload.pop("default_elephant_id", None)
control_payload.pop("default_session_id", None)
control_payload.pop("auto_create_elephant", None)
if allow_group_chats:
control_payload["allow_group_chats"] = True
else:
control_payload.pop("allow_group_chats", None)
if control_payload:
adapter_payload["control"] = control_payload
else:
adapter_payload.pop("control", None)
adapters_payload[service] = adapter_payload
gateway_payload["adapters"] = adapters_payload
manifest["gateway"] = gateway_payload
manifest_path = _write_manifest_to_config(state_dir, manifest)
secret_path = _persist_gateway_local_secret_env(gateway_dir, secret_updates)
return {
"status": "ok",
"service": service,
"action": "configured",
"profileManifestPath": str(manifest_path),
"secretPath": str(secret_path) if secret_path is not None else None,
"gateway": _gateway(state_dir),
}
def _gateway_remove_account_credentials(gateway_dir: Path, *, service: str, account_id: str) -> None:
"""Remove persisted credential files for the given service account."""
# WeChat stores credentials in gateway_dir/weixin/accounts/{account_id}.json
# Other services may use similar patterns in the future.
account_file = gateway_dir / service / "accounts" / f"{account_id}.json"
if account_file.is_file():
try:
account_file.unlink()
except OSError:
pass
# Also remove the sync buffer file if present
sync_file = gateway_dir / service / "accounts" / f"{account_id}.sync.json"
if sync_file.is_file():
try:
sync_file.unlink()
except OSError:
pass
def _gateway_cleanup_stale_runtime_files(gateway_dir: Path, *, service: str) -> None:
"""Update runtime.json files to 'stopped' when the recorded PID is no longer alive.
Applies to both 'running' and 'starting' records β a process that never
reached the running state should not linger as 'starting' forever.
"""
for path in gateway_dir.glob(f"{service}*.runtime.json"):
if not path.is_file():
continue
try:
content = json.loads(path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
continue
if not isinstance(content, dict):
continue
recorded = str(content.get("status") or "").lower()
if recorded not in ("running", "starting"):
continue
pid = content.get("pid")
if pid is None:
# No pid on a 'starting' record is ambiguous; leave it alone so a
# freshly-launched process has a moment to write its pid.
continue
try:
pid_int = int(pid)
if pid_int > 0:
os.kill(pid_int, 0)
# PID is alive β leave it alone
continue
except (ValueError, TypeError):
pass
except OSError:
pass
# PID is not alive β mark as stopped
content["status"] = "stopped"
content["stopped_at"] = datetime.now(UTC).isoformat()
content["last_error"] = f"process exited unexpectedly (was {recorded})"
try:
path.write_text(json.dumps(content, ensure_ascii=False, indent=2), encoding="utf-8")
except OSError:
pass
# Also clean up stale .pid files
for pid_path in gateway_dir.glob(f"{service}*.pid"):
if not pid_path.is_file():
continue
try:
pid_int = int(pid_path.read_text(encoding="utf-8").strip())
if pid_int > 0:
os.kill(pid_int, 0)
continue # still alive
except (OSError, ValueError):
pass
try:
pid_path.unlink()
except OSError:
pass
def _gateway_remove_service_account(self, payload: Mapping[str, Any], *, service: str) -> dict[str, Any]: