-
Notifications
You must be signed in to change notification settings - Fork 9
Expand file tree
/
Copy pathcodec_ava_client.py
More file actions
245 lines (202 loc) · 7.98 KB
/
codec_ava_client.py
File metadata and controls
245 lines (202 loc) · 7.98 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
"""AVA proxy client — opt-in cloud LLM path for CODEC.
This module is ADDITIVE. It does not modify any existing CODEC paths.
The default local Qwen3.6 pipeline keeps working exactly as before. Callers
that explicitly want to route a query through AVA's hosted cloud proxy
(Gemini, OpenAI, Claude) use `ava_chat()` from this module.
Config lives under `ava:` in ~/.codec/config.json:
"ava": {
"enabled": true,
"proxy_url": "https://ava-proxy.lucyvpa.com",
"license_key": "eyJhbGci...",
"default_cloud_model": "gemini-2.5-flash-lite",
"available_cloud_models": [ ... ]
}
When you're ready to migrate dashboard / skills to support a model picker,
import `ava_chat` from here and branch on user selection. Nothing in this
file auto-wires anything.
"""
from __future__ import annotations
import json
import logging
import os
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Iterator, Optional
import requests
log = logging.getLogger("codec.ava")
CONFIG_PATH = Path(os.path.expanduser("~/.codec/config.json"))
# ── Config helpers ──
@dataclass
class AvaConfig:
enabled: bool
proxy_url: str
license_key: str
default_cloud_model: str
available_cloud_models: list[dict[str, str]]
@property
def is_ready(self) -> bool:
return bool(self.enabled and self.proxy_url and self.license_key)
def load_config() -> AvaConfig | None:
"""Load AVA config from ~/.codec/config.json.
Returns None if the file is missing or `ava` block isn't present.
"""
try:
data = json.loads(CONFIG_PATH.read_text())
except (FileNotFoundError, json.JSONDecodeError):
return None
ava = data.get("ava")
if not ava or not isinstance(ava, dict):
return None
return AvaConfig(
enabled=bool(ava.get("enabled", False)),
proxy_url=(ava.get("proxy_url") or "").rstrip("/"),
license_key=ava.get("license_key", ""),
default_cloud_model=ava.get("default_cloud_model", "gemini-2.5-flash-lite"),
available_cloud_models=list(ava.get("available_cloud_models", [])),
)
# ── License status check (called at startup) ──
def verify_license(cfg: Optional[AvaConfig] = None, timeout: float = 5.0) -> dict[str, Any] | None:
"""Hit /api/v1/status on the AVA license server. Returns status dict or None on failure.
Doesn't raise — CODEC should keep working if proxy is unreachable (local Qwen still fine).
"""
cfg = cfg or load_config()
if not cfg or not cfg.is_ready:
return None
# License server sits on a SIBLING subdomain (same Cloudflare tunnel)
# ava-proxy.lucyvpa.com → ava-license.lucyvpa.com
license_url = cfg.proxy_url.replace("ava-proxy", "ava-license")
try:
r = requests.get(
f"{license_url}/api/v1/status",
params={"license_jwt": cfg.license_key},
timeout=timeout,
)
if r.ok:
return r.json()
log.warning("ava license check %s: %s", r.status_code, r.text[:200])
return {"status": "error", "http": r.status_code, "detail": r.text[:200]}
except requests.RequestException as e:
log.warning("ava license unreachable: %s", e)
return None
# ── Cloud chat (OpenAI-compatible shape) ──
class AvaProxyError(Exception):
pass
def ava_chat(
messages: list[dict],
model: str | None = None,
stream: bool = False,
max_tokens: int | None = None,
temperature: float = 0.7,
timeout: float = 60.0,
cfg: Optional[AvaConfig] = None,
**extra,
) -> dict | Iterator[dict]:
"""Send a chat-completion request through the AVA proxy.
Returns the parsed JSON dict when `stream=False`.
Returns an iterator over parsed SSE delta dicts when `stream=True`.
Raises `AvaProxyError` on config / transport / auth problems so the caller
can fall back to local Qwen if desired.
"""
cfg = cfg or load_config()
if not cfg:
raise AvaProxyError("AVA config missing in ~/.codec/config.json")
if not cfg.is_ready:
raise AvaProxyError("AVA config present but incomplete "
"(enabled/proxy_url/license_key)")
model = model or cfg.default_cloud_model
payload: dict[str, Any] = {
"model": model,
"messages": messages,
"stream": stream,
"temperature": temperature,
**extra,
}
if max_tokens is not None:
payload["max_tokens"] = max_tokens
headers = {
"Authorization": f"Bearer {cfg.license_key}",
"Content-Type": "application/json",
}
url = f"{cfg.proxy_url}/v1/chat/completions"
t0 = time.monotonic()
if not stream:
r = requests.post(url, json=payload, headers=headers, timeout=timeout)
if not r.ok:
raise AvaProxyError(f"ava proxy {r.status_code}: {r.text[:500]}")
data = r.json()
log.info("ava non-stream %s %dms tokens=%s",
model, int((time.monotonic() - t0) * 1000),
data.get("usage", {}).get("total_tokens"))
return data
# Streaming — yield parsed deltas
def _stream():
with requests.post(url, json=payload, headers=headers, stream=True, timeout=timeout) as r:
if not r.ok:
raise AvaProxyError(f"ava proxy {r.status_code}: {r.text[:500]}")
for line in r.iter_lines(decode_unicode=True):
if not line or not line.startswith("data: "):
continue
data = line[6:]
if data == "[DONE]":
break
try:
yield json.loads(data)
except json.JSONDecodeError:
continue
return _stream()
def ava_chat_simple(prompt: str, system: str | None = None, **kwargs) -> str:
"""Convenience wrapper: take a plain string prompt, return a plain string answer."""
messages = []
if system:
messages.append({"role": "system", "content": system})
messages.append({"role": "user", "content": prompt})
data = ava_chat(messages, stream=False, **kwargs)
return data["choices"][0]["message"]["content"]
# ── Model picker helpers for future dashboard UI ──
def list_cloud_models() -> list[dict[str, str]]:
cfg = load_config()
return cfg.available_cloud_models if cfg else []
def choose_model(user_preference: str | None) -> str:
"""Resolve user preference to a proxy model id. Falls back to default."""
cfg = load_config()
if not cfg:
return "gemini-2.5-flash-lite"
if user_preference:
# Allow shorthand like "fast" / "pro" / "balanced"
shortcuts = {
"fast": "gemini-2.5-flash-lite",
"cheap": "gemini-2.5-flash-lite",
"balanced": "gemini-2.5-flash",
"pro": "gemini-2.5-pro",
"quality": "gemini-2.5-pro",
}
if user_preference.lower() in shortcuts:
return shortcuts[user_preference.lower()]
# exact model id
valid = {m["id"] for m in cfg.available_cloud_models}
if user_preference in valid:
return user_preference
return cfg.default_cloud_model
if __name__ == "__main__":
# `python codec_ava_client.py` → quick smoke test
logging.basicConfig(level=logging.INFO, format="%(levelname)s %(name)s %(message)s")
print("Loading AVA config…")
cfg = load_config()
if not cfg:
print("❌ no ava block in ~/.codec/config.json"); raise SystemExit(1)
print(f" proxy: {cfg.proxy_url}")
print(f" license: {cfg.license_key[:30]}…")
print(f" default model: {cfg.default_cloud_model}")
print("\nChecking license status…")
status = verify_license(cfg)
print(f" → {status}")
print("\nAsking Gemini a quick question…")
try:
answer = ava_chat_simple(
"Hello CODEC. Confirm you're alive and name yourself in under 10 words.",
max_tokens=40,
)
print(f"\n🟢 Gemini says:\n {answer}")
except AvaProxyError as e:
print(f"❌ {e}")