-
Notifications
You must be signed in to change notification settings - Fork 210
Expand file tree
/
Copy pathserver.py
More file actions
3019 lines (2722 loc) · 134 KB
/
server.py
File metadata and controls
3019 lines (2722 loc) · 134 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
OpenAI-compatible HTTP server on top of test_dflash.
pip install fastapi uvicorn transformers
python3 scripts/server.py # serves on :8000
curl http://localhost:8000/v1/chat/completions \
-H 'Content-Type: application/json' \
-d '{"model":"luce-dflash","messages":[{"role":"user","content":"hi"}],"stream":true}'
Drop-in for Open WebUI / LM Studio / Cline by setting
OPENAI_API_BASE=http://localhost:8000/v1 OPENAI_API_KEY=sk-any
Streams tokens as Server-Sent Events using the OpenAI delta format.
"""
import argparse
import json
import logging
import os
import re
import struct
import subprocess
import sys
import tempfile
import time
import uuid
from pathlib import Path
from typing import Any, AsyncIterator
log = logging.getLogger("dflash.server")
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware # FIX 1: add CORS
from fastapi.responses import JSONResponse, StreamingResponse
from pydantic import BaseModel
from starlette.concurrency import iterate_in_threadpool
from transformers import AutoTokenizer
from _prefill_hook import (
PrefillConfig, add_cli_flags, config_from_args,
compress_text_via_daemon, _drain_until_sentinel,
)
from placement.server_resolver import resolve_server_placement
from prefix_cache import DaemonStdoutBus, PrefixCache
from tool_memory import ToolMemory
class OpenAICompatError(Exception):
def __init__(self, message: str, status_code: int = 400,
error_type: str = "invalid_request_error",
param: str | None = None, code: str | None = None):
super().__init__(message)
self.message = message
self.status_code = status_code
self.error_type = error_type
self.param = param
self.code = code
ROOT = Path(__file__).resolve().parent.parent
DEFAULT_TARGET = Path(os.environ.get(
"DFLASH_TARGET",
str(ROOT / "models" / "Qwen3.6-27B-Q4_K_M.gguf"),
))
DEFAULT_DRAFT_ROOT = ROOT / "models" / "draft"
DEFAULT_BIN = ROOT / "build" / ("test_dflash" + (".exe" if sys.platform == "win32" else ""))
DEFAULT_BUDGET = 22
def _detect_hip_arch() -> str | None:
"""Return the first non-host HIP GPU arch string, e.g. 'gfx1151', or None."""
for tool in ("rocm_agent_enumerator", "rocminfo"):
try:
out = subprocess.check_output([tool], stderr=subprocess.DEVNULL, timeout=5)
for line in out.decode().splitlines():
line = line.strip()
if tool == "rocm_agent_enumerator":
if line and line != "gfx000":
return line
else:
if line.startswith("Name:") and "gfx" in line:
return line.split("gfx", 1)[1].split()[0].strip()
except (FileNotFoundError, subprocess.CalledProcessError,
subprocess.TimeoutExpired):
continue
return None
_HIP_BUDGET_ADVISORY: dict[str, tuple[int, str]] = {
"gfx1100": (8, "+53% decode tok/s vs default 22 on RDNA3"),
}
_HIP_ARCH_CONFIRMED: dict[str, str] = {
"gfx1151": "RDNA3.5 (Strix Halo) — budget=22 optimal",
"gfx1201": "RDNA4 — budget=22 optimal",
}
def _print_hip_budget_advisory(budget: int) -> None:
arch = _detect_hip_arch()
if arch is None:
return
prefix = arch[:7]
if prefix in _HIP_BUDGET_ADVISORY:
rec, note = _HIP_BUDGET_ADVISORY[prefix]
if budget != rec:
print(f" [hip] {arch}: consider --budget={rec} ({note})", flush=True)
else:
print(f" [hip] {arch}: budget={budget} optimal", flush=True)
elif prefix in _HIP_ARCH_CONFIRMED:
print(f" [hip] {arch}: {_HIP_ARCH_CONFIRMED[prefix]}", flush=True)
else:
print(f" [hip] {arch}: no advisory; using budget={budget}", flush=True)
def _parse_bool(value: str | bool | None) -> bool:
if isinstance(value, bool):
return value
if value is None:
return False
return str(value).strip().lower() in {"1", "true", "yes", "on"}
def _extra_daemon_has_target_sharding(extra: list[str] | None) -> bool:
"""True if we spawn test_dflash with multi-GPU target layer split."""
if not extra:
return False
return any(tok.startswith("--target-gpus") for tok in extra)
MODEL_NAME = "luce-dflash"
# Architecture strings stored in `general.architecture` of every GGUF this
# server can drive. test_dflash dispatches by GGUF arch internally:
# qwen35 / qwen36 -> existing DFlash + DDTree pipeline
# laguna -> dflash::common::run_laguna_daemon() (no spec-decode)
# server.py just needs to omit --draft + the DFlash/DDTree flags when the
# arch doesn't support speculative decoding yet.
_QWEN35_ARCHES = {"qwen35", "qwen36"}
_LAGUNA_ARCHES = {"laguna"}
_ALLOWED_TEMPLATE_KWARGS = frozenset({"enable_thinking", "tools", "add_generation_prompt"})
def resolve_draft(root: Path) -> Path:
for pattern in ("dflash-draft-*.gguf", "*.gguf", "model.safetensors"):
for draft in sorted(root.rglob(pattern)):
return draft
raise FileNotFoundError(f"no DFlash draft GGUF or model.safetensors under {root}")
_QWEN35_FAMILY_TOKENIZERS = {
"Qwen3.5-27B": "Qwen/Qwen3.5-27B",
"Qwen3.6-27B": "Qwen/Qwen3.6-27B",
}
THINK_OPEN_TAG = "<think>"
THINK_CLOSE_TAG = "</think>"
_LAGUNA_FAMILY_TOKENIZERS = {
"Laguna-XS.2": "poolside/Laguna-XS.2",
"Laguna-XS": "poolside/Laguna-XS.2",
"laguna-xs2": "poolside/Laguna-XS.2",
}
def _read_gguf_str(reader, key: str) -> str | None:
f = reader.fields.get(key)
if f is None or not f.data:
return None
import numpy as np
p = f.parts[f.data[0]]
if not isinstance(p, np.ndarray):
return None
try:
return bytes(p).decode("utf-8", errors="replace")
except Exception:
return None
def _arch_from_gguf(gguf_path: Path) -> str:
"""Return the value of ``general.architecture`` from the GGUF, or 'unknown'.
server.py uses this to dispatch between the qwen35 stack (test_dflash +
DFlash + DDTree) and the laguna stack (test_laguna_daemon, autoregressive
only). 'unknown' falls back to the qwen35 path so existing setups keep
working when the field is missing.
"""
try:
from gguf import GGUFReader # type: ignore
r = GGUFReader(str(gguf_path))
v = _read_gguf_str(r, "general.architecture")
return v.lower() if v else "unknown"
except Exception:
return "unknown"
def _tokenizer_id_from_gguf(gguf_path: Path) -> str:
default = "Qwen/Qwen3.5-27B"
try:
from gguf import GGUFReader # type: ignore
r = GGUFReader(str(gguf_path))
arch = (_read_gguf_str(r, "general.architecture") or "").lower()
family = _LAGUNA_FAMILY_TOKENIZERS if arch in _LAGUNA_ARCHES else _QWEN35_FAMILY_TOKENIZERS
if arch in _LAGUNA_ARCHES:
default = next(iter(_LAGUNA_FAMILY_TOKENIZERS.values()))
for key in ("general.basename", "general.name"):
val = _read_gguf_str(r, key)
if val is None:
continue
for known, repo in family.items():
if known.lower() in val.lower():
return repo
except Exception:
pass
return default
# ─── tool-call & reasoning parsers ─────────────────────────────────
# Ported from vLLM (Apache-2.0):
# vllm/reasoning/qwen3_reasoning_parser.py
# vllm/tool_parsers/qwen3coder_tool_parser.py
TOOL_CALL_COMPLETE_RE = re.compile(r"<tool_call>(.*?)</tool_call>", re.DOTALL)
TOOL_CALL_FUNCTION_RE = re.compile(
r"<function=(.*?)</function>|<function=(.*)$", re.DOTALL,
)
TOOL_CALL_PARAMETER_RE = re.compile(
r"<parameter=(.*?)(?:</parameter>|(?=<parameter=)|(?=</function>)|$)",
re.DOTALL,
)
BARE_FUNCTION_XML_RE = re.compile(
r"<function=([A-Za-z_][\w.-]*)>(.*?)</function>(?:\s*</tool_call>)?",
re.DOTALL,
)
FUNCTION_SIGNATURE_RE = re.compile(
r"<function=([A-Za-z_][\w.-]*)\((.*?)\)</function>", re.DOTALL)
TOOL_CODE_RE = re.compile(r"<tool_code>(.*?)</tool_code>", re.DOTALL)
TOOL_OPEN_TAG = "<tool_call>"
TOOL_START_TAGS = (TOOL_OPEN_TAG, "<function=", "<tool_code>")
THINK_OPEN_TAG = "<think>"
THINK_CLOSE_TAG = "</think>"
def normalize_stop(stop) -> list[str]:
"""Coerce OpenAI's stop field (str | list[str] | None) to list[str]."""
if not stop:
return []
if isinstance(stop, str):
return [stop]
return [s for s in stop if isinstance(s, str) and s]
def first_stop_match(text: str, stops: list[str]) -> int:
"""Return the earliest index where any stop sequence appears, or -1."""
best = -1
for s in stops:
i = text.find(s)
if i != -1 and (best == -1 or i < best):
best = i
return best
def split_unclosed_thinking(text: str) -> tuple[str, str | None]:
"""Best-effort visible answer fallback when Qwen never emits </think>."""
value = text.strip()
if not value:
return "", None
marker_re = re.compile(
r"(?is)(?:^|\n)\s*(?:\d+\.\s*)?(?:\*+)?\s*"
r"(?:final answer|final output|answer|output|result)"
r"\s*(?:\*+)?\s*[::]\s*(?:\*+)?\s*(.+?)\s*$"
)
marker = None
for match in marker_re.finditer(value):
marker = match
if marker:
content = marker.group(1).strip()
chunks = [c.strip() for c in re.split(r"\n\s*\n+", content) if c.strip()]
if len(chunks) > 1:
content = chunks[-1]
idx = value.rfind(content)
reasoning = value[:idx].strip() if idx > 0 else None
return content, reasoning or None
chunks = [c.strip() for c in re.split(r"\n\s*\n+", value) if c.strip()]
if len(chunks) >= 2:
content = chunks[-1]
idx = value.rfind(content)
reasoning = value[:idx].strip() if idx > 0 else None
return content, reasoning or None
return value, None
def parse_reasoning(
text: str,
thinking_enabled: bool = True,
started_in_thinking: bool = False,
) -> tuple[str, str | None]:
"""Extract reasoning content from Qwen3.x's <think>...</think> blocks.
Handles paired, headless, and disabled thinking flavors.
``started_in_thinking`` accounts for prompts that end with ``<think>\n``
so the generated text contains only the reasoning body + ``</think>``.
Returns (cleaned_content, reasoning_content).
"""
def _strip_leading_think_closers(value: str) -> str:
return re.sub(r"^(?:\s*</think>\s*)+", "", value).strip()
parts = text.partition(THINK_OPEN_TAG)
saw_open_tag = bool(parts[1])
rest = parts[2] if saw_open_tag else parts[0]
if THINK_CLOSE_TAG not in rest:
if thinking_enabled and (started_in_thinking or saw_open_tag):
return split_unclosed_thinking(rest)
return _strip_leading_think_closers(rest), None
reasoning, _, content = rest.partition(THINK_CLOSE_TAG)
return _strip_leading_think_closers(content), (reasoning.strip() or None)
def _thinking_enabled(template_kwargs: dict | None) -> bool:
"""Return whether Qwen think blocks are enabled for this rendered prompt."""
return bool((template_kwargs or {}).get("enable_thinking", False))
def prompt_starts_in_thinking(prompt: str) -> bool:
"""True when the chat template ended by opening a think block."""
return bool(re.search(r"<think>\s*$", prompt))
def strip_closed_think_prefill(prompt: str) -> str:
"""Remove Qwen's no-thinking assistant prefill when it closes the turn."""
return re.sub(r"<think>\s*</think>\s*$", "", prompt)
def _find_tool_properties(tools, function_name):
"""Returns the parameters dict for a given function name, or {}."""
for t in tools or []:
fn = t.function if hasattr(t, "function") else t.get("function", {})
if hasattr(fn, "model_dump"):
fn = fn.model_dump()
if fn.get("name") == function_name:
params = fn.get("parameters", {})
if isinstance(params, dict):
return params.get("properties", {})
return {}
def _tool_allowed(tools, function_name: str) -> bool:
if not tools:
return True
for t in tools or []:
fn = t.function if hasattr(t, "function") else t.get("function", {})
if hasattr(fn, "model_dump"):
fn = fn.model_dump()
if isinstance(fn, dict) and fn.get("name") == function_name:
return True
return False
def _convert_param_value(param_value: str, param_name: str, param_config: dict,
func_name: str):
"""Coerce stringified XML values to their JSON-schema type."""
import ast
if param_value.lower() == "null":
return None
if param_name not in param_config:
return param_value
cfg = param_config[param_name]
if isinstance(cfg, dict) and "type" in cfg:
ptype = str(cfg["type"]).strip().lower()
elif isinstance(cfg, dict) and "anyOf" in cfg:
ptype = "object"
else:
ptype = "string"
if ptype in ("string", "str", "text", "varchar", "char", "enum"):
return param_value
if any(ptype.startswith(p) for p in ("int", "uint", "long", "short", "unsigned")):
try: return int(param_value)
except (ValueError, TypeError): return param_value
if ptype.startswith("num") or ptype.startswith("float"):
try:
f = float(param_value)
return f if f - int(f) != 0 else int(f)
except (ValueError, TypeError):
return param_value
if ptype in ("boolean", "bool", "binary"):
return param_value.lower() == "true"
if (ptype in ("object", "array", "arr")
or ptype.startswith("dict") or ptype.startswith("list")):
try: return json.loads(param_value)
except (json.JSONDecodeError, TypeError, ValueError): pass
try: return ast.literal_eval(param_value)
except (ValueError, SyntaxError, TypeError): return param_value
def _parse_function_signature_args(arg_text: str) -> dict | None:
"""Parse `<function=name(k="v")</function>` arguments without guessing."""
import ast
try:
expr = ast.parse(f"_f({arg_text})", mode="eval").body
except SyntaxError:
return None
if not isinstance(expr, ast.Call) or expr.args:
return None
args: dict = {}
for kw in expr.keywords:
if kw.arg is None:
return None
try:
args[kw.arg] = ast.literal_eval(kw.value)
except (ValueError, SyntaxError, TypeError):
return None
return args
def _parse_json_tool_call(obj) -> tuple[str, dict] | None:
"""Parse OpenAI-ish JSON tool call objects."""
if not isinstance(obj, dict):
return None
name = obj.get("name")
args = obj.get("arguments")
if not isinstance(name, str) and isinstance(obj.get("function"), dict):
fn = obj["function"]
name = fn.get("name")
args = fn.get("arguments")
if isinstance(args, str):
try:
args = json.loads(args)
except json.JSONDecodeError:
return None
if isinstance(name, str) and isinstance(args, dict):
return name, args
return None
def parse_tool_calls(text: str, tools=None) -> tuple[str, list[dict]]:
"""Parse textual tool-call shapes into OpenAI tool_calls format.
Supports Qwen XML, malformed function-call tags observed in model output,
bare JSON objects, and `<tool_code>{...}</tool_code>` wrappers.
Returns (cleaned_content, tool_calls_list).
"""
tool_calls: list[dict] = []
removals: list[tuple[int, int]] = []
def add_call(function_name: str, args: dict, start: int, end: int):
if not _tool_allowed(tools, function_name):
return
tool_calls.append({
"id": "call_" + uuid.uuid4().hex[:24],
"type": "function",
"function": {
"name": function_name,
"arguments": json.dumps(args, ensure_ascii=False),
},
})
removals.append((start, end))
def parse_xml_function(function_name: str, params_region: str) -> dict:
param_config = _find_tool_properties(tools, function_name)
args: dict = {}
for match_text in TOOL_CALL_PARAMETER_RE.findall(params_region):
eq_idx = match_text.find(">")
if eq_idx == -1:
continue
k = match_text[:eq_idx].strip()
v = match_text[eq_idx + 1:]
if v.startswith("\n"): v = v[1:]
if v.endswith("\n"): v = v[:-1]
args[k] = _convert_param_value(v, k, param_config, function_name)
return args
for m in TOOL_CALL_COMPLETE_RE.finditer(text):
body = m.group(1)
fn_match = TOOL_CALL_FUNCTION_RE.search(body)
if not fn_match:
continue
fn_text = fn_match.group(1) or fn_match.group(2) or ""
end_idx = fn_text.find(">")
if end_idx == -1:
continue
function_name = fn_text[:end_idx].strip()
params_region = fn_text[end_idx + 1:]
add_call(function_name, parse_xml_function(function_name, params_region),
m.start(), m.end())
for m in BARE_FUNCTION_XML_RE.finditer(text):
if any(lo <= m.start() < hi for lo, hi in removals):
continue
add_call(m.group(1), parse_xml_function(m.group(1), m.group(2)),
m.start(), m.end())
for m in FUNCTION_SIGNATURE_RE.finditer(text):
if any(lo <= m.start() < hi for lo, hi in removals):
continue
args = _parse_function_signature_args(m.group(2))
if args is not None:
add_call(m.group(1), args, m.start(), m.end())
for m in TOOL_CODE_RE.finditer(text):
try:
obj = json.loads(m.group(1).strip())
except json.JSONDecodeError:
continue
parsed = _parse_json_tool_call(obj)
if parsed is not None:
add_call(parsed[0], parsed[1], m.start(), m.end())
decoder = json.JSONDecoder()
cursor = 0
while cursor < len(text):
start = text.find("{", cursor)
if start == -1:
break
if any(lo <= start < hi for lo, hi in removals):
cursor = start + 1
continue
try:
obj, consumed = decoder.raw_decode(text[start:])
except json.JSONDecodeError:
cursor = start + 1
continue
parsed = _parse_json_tool_call(obj)
if parsed is not None:
add_call(parsed[0], parsed[1], start, start + consumed)
cursor = start + max(consumed, 1)
if removals:
parts: list[str] = []
cursor = 0
for start, end in sorted(set(removals)):
if start < cursor:
continue
parts.append(text[cursor:start])
cursor = end
parts.append(text[cursor:])
text = "".join(parts)
return text.strip(), tool_calls
# FIX 2: _content_to_str helper used for BOTH OpenAI and Anthropic message
# content fields (str | list[dict]). Previously OpenAI list[dict] content
# was passed raw to the tokenizer and caused a crash.
def _content_to_str(content: "str | list[dict] | None") -> str:
if content is None:
return ""
if isinstance(content, str):
return content
parts = []
for block in content:
if not isinstance(block, dict):
continue
if block.get("type") in ("text", "input_text", "output_text"):
parts.append(block.get("text", ""))
elif block.get("type") == "tool_result":
value = block.get("content", "")
parts.append(_content_to_str(value) if isinstance(value, list) else str(value))
return "".join(parts)
def _normalize_anthropic_system(system_text: str | None) -> str | None:
if not system_text:
return None
if os.environ.get("DFLASH_ANTHROPIC_RAW_SYSTEM", "0") == "1":
return system_text
# Claude Code's default system prompt is written for Claude and can be tens
# of thousands of tokens once skills/reminders are expanded. Qwen handles
# the Anthropic Messages route much more reliably with a compact adapter
# prompt while still receiving the user's message and advertised tools.
if (
"x-anthropic-billing-header:" in system_text
or "Claude Agent SDK" in system_text
or "Claude Code" in system_text
):
return (
"You are a concise coding assistant running behind an Anthropic "
"Messages compatible client. Answer the user's request directly. "
"Use the provided tools only when they are needed."
)
return system_text
def _normalize_anthropic_user_text(text: str) -> str:
if os.environ.get("DFLASH_ANTHROPIC_RAW_USER", "0") == "1":
return text
def replace_reminder(match: re.Match[str]) -> str:
block = match.group(0)
# Claude Code may inject long skill-selection reminders for Claude's
# own skill runtime. They are not useful to Qwen and can dominate the
# prompt. Keep ordinary user/system-reminder context such as dates.
if "SKIP:" in block and ("- init:" in block or "- review:" in block):
return ""
return block
return re.sub(
r"<system-reminder>.*?</system-reminder>",
replace_reminder,
text,
flags=re.DOTALL,
)
def _json_args_obj(args: str) -> dict:
try:
value = json.loads(args or "{}")
return value if isinstance(value, dict) else {"value": value}
except Exception:
return {"_raw": args}
# ─── pydantic schemas ──────────────────────────────────────────────
class ToolCallFunction(BaseModel):
name: str
arguments: str # JSON string per OpenAI spec
class ToolCall(BaseModel):
id: str | None = None
type: str = "function"
function: ToolCallFunction
class ChatMessage(BaseModel):
role: str
# FIX 2 cont: accept list[dict] in the model but always stringify it
content: Any | None = None # str, list, or null when tool_calls present
name: str | None = None
tool_call_id: str | None = None
tool_calls: list[ToolCall] | None = None
class ToolDef(BaseModel):
type: str = "function"
function: dict # {name, description, parameters: {...JSON schema...}}
def _anthropic_tools_to_openai(tools: list[dict] | None) -> list[ToolDef] | None:
if not tools:
return None
out: list[ToolDef] = []
for tool in tools:
if not isinstance(tool, dict):
continue
name = tool.get("name")
if not name:
continue
out.append(ToolDef(type="function", function={
"name": name,
"description": tool.get("description", ""),
"parameters": tool.get("input_schema") or tool.get("parameters") or {
"type": "object",
"properties": {},
},
}))
return out or None
# Default cap when the client omits ``max_tokens``. Override at start via
# the ``DFLASH_DEFAULT_MAX_TOKENS`` env var. Set to a value < ``max_ctx`` to
# avoid the unbounded-gen path; clients that send their own ``max_tokens``
# are unaffected.
DEFAULT_MAX_TOKENS = int(os.environ.get("DFLASH_DEFAULT_MAX_TOKENS", 4096))
def _default_chat_enable_thinking() -> bool:
return _parse_bool(os.environ.get("DFLASH_ENABLE_THINKING"))
class ChatRequest(BaseModel):
model: str = MODEL_NAME
messages: list[ChatMessage]
stream: bool = False
max_tokens: int = DEFAULT_MAX_TOKENS
max_completion_tokens: int | None = None
temperature: float | None = None # 0 = greedy, >0 = sample
seed: int | None = None # rng seed for sampling
top_p: float | None = None # nucleus, applied when temperature > 0
top_k: int | None = None # top-k, applied when temperature > 0
frequency_penalty: float | None = None # OAI -> rep_pen = 1 + freq_pen (sampling only)
stop: list[str] | str | None = None # FIX 3: accept stop field (Open WebUI sends it)
tools: list[ToolDef] | None = None
tool_choice: Any | None = None # "auto" | "none" | {"function": {...}}
chat_template_kwargs: dict | None = None
stream_options: dict | None = None # e.g. {"include_usage": true}
class AnthropicMessage(BaseModel):
role: str
content: str | list[dict]
class AnthropicMessagesRequest(BaseModel):
model: str = MODEL_NAME
max_tokens: int
messages: list[AnthropicMessage]
system: str | list[dict] | None = None
tools: list[dict] | None = None
tool_choice: Any | None = None
stream: bool = False
temperature: float | None = None
top_p: float | None = None
seed: int | None = None
frequency_penalty: float | None = None
stop_sequences: list[str] | None = None
chat_template_kwargs: dict | None = None
# ─── Responses API schemas (Codex wire protocol) ──────────────────
class ResponseInputMessage(BaseModel):
type: str = "message"
id: str | None = None
role: str = "user"
content: Any # str or list[dict] content parts
status: str | None = None
class ResponseFunctionCall(BaseModel):
type: str = "function_call"
id: str | None = None
call_id: str
name: str
arguments: str
status: str | None = None
class ResponseFunctionCallOutput(BaseModel):
type: str = "function_call_output"
id: str | None = None
call_id: str
output: Any # str or structured
status: str | None = None
class ResponseToolFunction(BaseModel):
type: str = "function"
name: str
description: str | None = None
parameters: dict | None = None
strict: bool | None = None
class ResponseReasoningConfig(BaseModel):
effort: str | None = None # "low" | "medium" | "high"
summary: str | None = None # "auto" | "concise" | "detailed" | "none"
class ResponsesCreateRequest(BaseModel):
model: str = MODEL_NAME
input: Any # str or list[InputItem dicts]
instructions: str | None = None
tools: list[dict] | None = None
tool_choice: Any | None = "auto"
parallel_tool_calls: bool | None = None
stream: bool | None = None
max_output_tokens: int | None = None
temperature: float | None = None
top_p: float | None = None
reasoning: ResponseReasoningConfig | None = None
store: bool | None = None
include: list[str] | None = None
text: dict | None = None
metadata: dict | None = None
previous_response_id: str | None = None
def _samp_suffix(req) -> str:
# Render ` samp=temp,top_p,top_k,rep_pen[,seed]` tail when the request asks for
# non-greedy decoding. Empty string keeps the daemon protocol greedy-compatible.
t = float(getattr(req, "temperature", 0.0) or 0.0)
if t <= 0.0:
return ""
tp = float(getattr(req, "top_p", 1.0) or 1.0)
tk = int(getattr(req, "top_k", 0) or 0)
rp = float(getattr(req, "frequency_penalty", 0.0) or 0.0) + 1.0
seed = int(getattr(req, "seed", 0) or 0)
return f" samp={t:.4f},{tp:.4f},{tk},{rp:.4f},{seed}"
def build_app(target: Path, draft: Path | None, bin_path: Path, budget: int, max_ctx: int,
tokenizer: AutoTokenizer, stop_ids: set[int],
prefill_cfg: PrefillConfig | None = None,
drafter_tokenizer: AutoTokenizer | None = None,
prefix_cache_slots: int = 4,
prefill_cache_slots: int = 4,
prefill_cache_bytes: int = 0,
arch: str = "qwen35",
verify_mode: str = "ddtree",
extra_daemon_args: list[str] | None = None,
lazy_draft: bool = False,
verbose_daemon: bool = False,
force_no_thinking: bool = False,
mtp_gguf: Path | None = None,
mtp_gamma: int = 3,
mtp_draft_source: str = "chain",
mtp_draft_topk: int = 1) -> FastAPI:
import asyncio
if _extra_daemon_has_target_sharding(extra_daemon_args):
if prefix_cache_slots > 0 or prefill_cache_slots > 0:
print(
" [cfg] target-gpus sharding: disabling prefix/full cache "
"(daemon SNAPSHOT/RESTORE not implemented for this mode)",
flush=True,
)
prefix_cache_slots = 0
prefill_cache_slots = 0
app = FastAPI(title="Luce DFlash OpenAI server")
@app.exception_handler(OpenAICompatError)
async def _openai_compat_error_handler(_request: Request, exc: OpenAICompatError):
error = {"message": exc.message, "type": exc.error_type}
if exc.param is not None:
error["param"] = exc.param
if exc.code is not None:
error["code"] = exc.code
return JSONResponse({"error": error}, status_code=exc.status_code)
# FIX 1: CORS middleware so Open WebUI / browser frontends on other ports
# can reach this server without being blocked by the browser.
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
daemon_lock = asyncio.Lock()
r_pipe, w_pipe = os.pipe()
if sys.platform == "win32":
import msvcrt
os.set_inheritable(w_pipe, True)
stream_fd_val = int(msvcrt.get_osfhandle(w_pipe))
else:
stream_fd_val = w_pipe
bin_abs = str(Path(bin_path).resolve())
dll_dir = str(Path(bin_abs).parent / "bin")
env = {**os.environ}
if sys.platform == "win32":
env["PATH"] = dll_dir + os.pathsep + str(Path(bin_abs).parent) + os.pathsep + env.get("PATH", "")
if arch in _LAGUNA_ARCHES:
# test_dflash detects arch=laguna from the GGUF and dispatches
# internally to dflash::common::run_laguna_daemon(). No --draft, no
# --fast-rollback, no --ddtree (no Laguna spec-decode draft yet).
# Tokens stream as int32 LE on stream_fd terminated by -1, byte-
# identical to the qwen35 path so SSE/stream consumers stay shared.
cmd = [bin_abs, str(target), "--daemon",
f"--max-ctx={max_ctx}",
f"--stream-fd={stream_fd_val}"]
elif mtp_gguf is not None:
# MTP mode: no --draft (MTP head lives inside target or mtp_gguf),
# no DFlash flags. Daemon dispatches to MTP code path via --mtp-gguf.
cmd = [bin_abs, str(target), "--daemon",
f"--max-ctx={max_ctx}",
f"--stream-fd={stream_fd_val}",
f"--mtp-gguf={mtp_gguf}",
f"--gamma={mtp_gamma}",
"--draft-source", mtp_draft_source]
if mtp_draft_source == "mtp_topk":
cmd.append(f"--draft-topk={mtp_draft_topk}")
if extra_daemon_args:
cmd.extend(extra_daemon_args)
else:
if draft is None:
raise SystemExit("qwen35 arch requires --draft <draft.gguf|model.safetensors>")
cmd = [bin_abs, str(target), str(draft), "--daemon",
f"--max-ctx={max_ctx}",
f"--stream-fd={stream_fd_val}"]
if verify_mode == "ddtree":
cmd.append("--fast-rollback")
cmd.extend(["--ddtree", f"--ddtree-budget={budget}"])
elif verify_mode == "fast":
cmd.append("--fast-rollback")
elif verify_mode == "seq":
cmd.append("--seq-verify")
elif verify_mode != "replay":
raise SystemExit(f"unknown verify_mode={verify_mode}")
if extra_daemon_args:
cmd.extend(extra_daemon_args)
if sys.platform == "win32":
daemon_proc = subprocess.Popen(cmd, close_fds=False, env=env,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE, bufsize=0)
else:
daemon_proc = subprocess.Popen(cmd, pass_fds=(w_pipe,), env=env,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE, bufsize=0)
os.close(w_pipe)
bus = DaemonStdoutBus(daemon_proc.stdout, verbose=verbose_daemon)
def _resolve_kv_k_type():
kv = "q4_0"
if os.environ.get("DFLASH27B_KV_F16", "0") != "0":
kv = "f16"
if os.environ.get("DFLASH27B_KV_Q4", "0") != "0":
kv = "q4_0"
if os.environ.get("DFLASH27B_KV_TQ3", "0") != "0":
kv = "tq3_0"
if os.environ.get("DFLASH27B_KV_K"):
kv = os.environ["DFLASH27B_KV_K"].lower()
return kv
_fa_window = int(os.environ.get("DFLASH27B_FA_WINDOW", 2048))
prefix_cache = PrefixCache(
daemon_stdin=daemon_proc.stdin,
await_reply=bus.await_reply,
daemon_lock=daemon_lock,
tokenizer=tokenizer,
kv_k_type=_resolve_kv_k_type(),
fa_window=_fa_window,
cap=prefix_cache_slots,
)
if prefill_cfg is not None and prefill_cache_slots > 0:
prefix_cache.init_full_cache(prefill_cache_slots)
# Issue #114: invalidate the Python LRU whenever the daemon frees every
# prefix snapshot slot during OOM recovery. Without this the next request
# would RESTORE a freed slot and stream nothing back.
bus.register_line_callback("[snap] all-cleared", prefix_cache.mark_all_cleared)
tool_memory = ToolMemory(
max_entries=int(os.environ.get("DFLASH_TOOL_MEMORY_MAX_ENTRIES", "50000")),
max_bytes=int(os.environ.get("DFLASH_TOOL_MEMORY_MAX_BYTES", str(64 * 1024 * 1024))),
)
def _remember_tool_call_text(raw_text: str, tool_calls: list[dict] | None) -> None:
if not raw_text or not tool_calls:
return
call_ids = [
tc.get("id")
for tc in tool_calls
if isinstance(tc, dict) and isinstance(tc.get("id"), str) and tc.get("id")
]
if call_ids:
tool_memory.remember(call_ids, raw_text)
@app.on_event("startup")
async def _startup():
bus.start(asyncio.get_running_loop())
await prefix_cache.startup_sync()
if not getattr(prefix_cache, "_full_disabled", True):
restored = await prefix_cache.rehydrate_full_cache(
_rehydrate_full_cache_entry)
if restored:
log.info("full-cache restored %d entries from disk", restored)
if lazy_draft:
log.info("lazy-draft: parking decode draft at startup to free ~3.3 GB")
daemon_proc.stdin.write(b"park draft\n")
daemon_proc.stdin.flush()
_drain_until_sentinel(r_pipe)
# FIX 4: /health endpoint — Open WebUI and many clients ping this before
# sending requests. Without it they show a permanent "disconnected" badge.
@app.get("/health")
def health():
alive = daemon_proc.poll() is None
if not alive:
return JSONResponse({"status": "error", "detail": "daemon exited"}, status_code=503)
return {"status": "ok"}
# FIX 5: richer /v1/models response — Open WebUI uses `context_length` and
# `created` to populate the model picker and context-bar correctly.
@app.get("/v1/models")
def list_models(request: Request):
# Codex sends ?client_version= — serve the Codex-specific schema
if "client_version" in request.query_params:
return {"models": [{
"slug": MODEL_NAME,
"display_name": MODEL_NAME,
"description": "Local DFlash speculative-decoding server",
"default_reasoning_level": "low",
"supported_reasoning_levels": [
{"effort": "low", "description": "No thinking"},
{"effort": "medium", "description": "Thinking enabled"},
],
"shell_type": "shell_command",
"visibility": "list",
"supported_in_api": True,
"priority": 1,
"context_window": max_ctx,
"supports_reasoning_summaries": False,
"supports_parallel_tool_calls": False,
}]}
return {
"object": "list",
"data": [{
"id": MODEL_NAME,
"object": "model",
"owned_by": "luce",
"created": 1700000000,
"context_length": max_ctx, # shown in Open WebUI header
"max_context_length": max_ctx,