Skip to content

Commit 7e3c9c8

Browse files
authored
Merge branch 'master' into update/LLAMA_VERSION
2 parents 7dc1fe4 + 059c493 commit 7e3c9c8

8 files changed

Lines changed: 350 additions & 5 deletions

File tree

.github/workflows/backend_build_darwin.yml

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ jobs:
9494
/opt/homebrew/Cellar/llvm
9595
/opt/homebrew/Cellar/ccache
9696
/opt/homebrew/Cellar/blake3
97+
/opt/homebrew/Cellar/fmt
9798
/opt/homebrew/Cellar/hiredis
9899
/opt/homebrew/Cellar/xxhash
99100
/opt/homebrew/Cellar/zstd
@@ -104,18 +105,23 @@ jobs:
104105
# ccache is always installed (used by the llama-cpp variant build) so
105106
# the brew cache content stays stable across every backend in the
106107
# matrix — they all share one cache key.
107-
# blake3, hiredis, xxhash, zstd are ccache's runtime dylib deps.
108+
# blake3, fmt, hiredis, xxhash, zstd are ccache's runtime dylib deps.
108109
# Without explicitly installing them, a brew cache-hit run restores
109110
# ccache's Cellar dir but skips installing those transitive deps,
110-
# and ccache fails at runtime with `dyld: Library not loaded:
111-
# libblake3.0.dylib`.
112-
brew install protobuf grpc make protoc-gen-go protoc-gen-go-grpc libomp llvm ccache blake3 hiredis xxhash zstd
111+
# and ccache fails at runtime with `dyld: Library not loaded`.
112+
brew install protobuf grpc make protoc-gen-go protoc-gen-go-grpc libomp llvm ccache blake3 fmt hiredis xxhash zstd
113+
# Force-reinstall ccache so brew re-validates its full runtime-dep
114+
# closure on every run. This is the durable fix: when the upstream
115+
# ccache formula gains a new transitive dep (as it has multiple times
116+
# already), we don't have to chase missing dylibs one at a time.
117+
# The downloads cache makes the reinstall fast (~5s on a hit).
118+
brew reinstall ccache
113119
# The brew cache restores the Cellar dirs but NOT the bin symlinks
114120
# at /opt/homebrew/bin/*. brew install above sees the Cellar present
115121
# and decides "already installed" without re-linking, so on a cache-
116122
# hit run the formulas aren't on PATH. Force-link them; --overwrite
117123
# tolerates pre-existing symlinks from earlier installs.
118-
brew link --overwrite protobuf grpc make protoc-gen-go protoc-gen-go-grpc libomp llvm ccache blake3 hiredis xxhash zstd 2>/dev/null || true
124+
brew link --overwrite protobuf grpc make protoc-gen-go protoc-gen-go-grpc libomp llvm ccache blake3 fmt hiredis xxhash zstd 2>/dev/null || true
119125
120126
- name: Save Homebrew cache
121127
if: github.event_name != 'pull_request' && steps.brew-cache.outputs.cache-hit != 'true'
@@ -131,6 +137,7 @@ jobs:
131137
/opt/homebrew/Cellar/llvm
132138
/opt/homebrew/Cellar/ccache
133139
/opt/homebrew/Cellar/blake3
140+
/opt/homebrew/Cellar/fmt
134141
/opt/homebrew/Cellar/hiredis
135142
/opt/homebrew/Cellar/xxhash
136143
/opt/homebrew/Cellar/zstd

.github/workflows/test-extra.yml

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ jobs:
4343
insightface: ${{ steps.detect.outputs.insightface }}
4444
speaker-recognition: ${{ steps.detect.outputs.speaker-recognition }}
4545
sherpa-onnx: ${{ steps.detect.outputs.sherpa-onnx }}
46+
whisper: ${{ steps.detect.outputs.whisper }}
4647
steps:
4748
- name: Checkout repository
4849
uses: actions/checkout@v6
@@ -583,6 +584,27 @@ jobs:
583584
- name: Build sherpa-onnx backend image and run streaming ASR gRPC e2e tests
584585
run: |
585586
make test-extra-backend-sherpa-onnx-transcription
587+
# End-to-end transcription via the e2e-backends gRPC harness against
588+
# the whisper.cpp backend. Drives AudioTranscription (offline) and
589+
# AudioTranscriptionStream (real, segment-callback-driven deltas) on
590+
# ggml-base.en + the JFK 11s clip.
591+
tests-whisper-grpc-transcription:
592+
needs: detect-changes
593+
if: needs.detect-changes.outputs.whisper == 'true' || needs.detect-changes.outputs.run-all == 'true'
594+
runs-on: ubuntu-latest
595+
timeout-minutes: 90
596+
steps:
597+
- name: Clone
598+
uses: actions/checkout@v6
599+
with:
600+
submodules: true
601+
- name: Setup Go
602+
uses: actions/setup-go@v5
603+
with:
604+
go-version: '1.25.4'
605+
- name: Build whisper backend image and run transcription gRPC e2e tests
606+
run: |
607+
make test-extra-backend-whisper-transcription
586608
# VITS TTS via the sherpa-onnx backend. Drives both TTS (file write) and
587609
# TTSStream (PCM chunks) on the e2e-backends harness.
588610
tests-sherpa-onnx-grpc-tts:

Makefile

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -897,6 +897,18 @@ test-extra-backend-vibevoice-cpp-transcription: docker-build-vibevoice-cpp
897897
BACKEND_TEST_CAPS=health,load,transcription \
898898
$(MAKE) test-extra-backend
899899

900+
## Audio transcription wrapper for the whisper.cpp backend.
901+
## Drives the AudioTranscription / AudioTranscriptionStream RPCs against
902+
## ggml-base.en (~145 MB) using the JFK 11s clip. The streaming spec
903+
## asserts len(deltas) >= 1 and concat(deltas) == final.Text - whisper-
904+
## specific multi-segment assertions live in backend/go/whisper/gowhisper_test.go.
905+
test-extra-backend-whisper-transcription: docker-build-whisper
906+
BACKEND_IMAGE=local-ai-backend:whisper \
907+
BACKEND_TEST_MODEL_URL=https://huggingface.co/ggerganov/whisper.cpp/resolve/main/ggml-base.en.bin \
908+
BACKEND_TEST_AUDIO_URL=https://github.com/ggml-org/whisper.cpp/raw/master/samples/jfk.wav \
909+
BACKEND_TEST_CAPS=health,load,transcription \
910+
$(MAKE) test-extra-backend
911+
900912
## LocalVQE audio transform (joint AEC + noise suppression + dereverb).
901913
## Exercises the audio_transform capability end-to-end: batch transform
902914
## of a real WAV fixture and bidi streaming of synthetic silent frames.

backend/go/whisper/cpp/gowhisper.cpp

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,38 @@ static std::vector<float> flat_segs;
1010

1111
static std::atomic<int> g_abort{0};
1212

13+
static std::atomic<uintptr_t> g_go_new_segment_cb{0};
14+
static std::atomic<uintptr_t> g_go_new_segment_user_data{0};
15+
1316
static bool abort_cb(void * /*user_data*/) {
1417
return g_abort.load(std::memory_order_relaxed) != 0;
1518
}
1619

20+
static void new_segment_cb(struct whisper_context *cb_ctx,
21+
struct whisper_state * /*state*/, int n_new,
22+
void * /*user_data*/) {
23+
uintptr_t go_cb = g_go_new_segment_cb.load(std::memory_order_relaxed);
24+
if (go_cb == 0) {
25+
return;
26+
}
27+
int total = whisper_full_n_segments(cb_ctx);
28+
int idx_first = total - n_new;
29+
if (idx_first < 0) {
30+
idx_first = 0;
31+
}
32+
uintptr_t ud = g_go_new_segment_user_data.load(std::memory_order_relaxed);
33+
reinterpret_cast<go_new_segment_cb>(go_cb)(idx_first, n_new, ud);
34+
}
35+
1736
extern "C" void set_abort(int v) {
1837
g_abort.store(v, std::memory_order_relaxed);
1938
}
2039

40+
extern "C" void set_new_segment_callback(uintptr_t cb_ptr, uintptr_t user_data) {
41+
g_go_new_segment_cb.store(cb_ptr, std::memory_order_relaxed);
42+
g_go_new_segment_user_data.store(user_data, std::memory_order_relaxed);
43+
}
44+
2145
static void ggml_log_cb(enum ggml_log_level level, const char *log,
2246
void *data) {
2347
const char *level_str;
@@ -139,6 +163,14 @@ int transcribe(uint32_t threads, char *lang, bool translate, bool tdrz,
139163
// ggml abort hook so a subsequent set_abort(1) from Go aborts the next
140164
// compute graph step.
141165
g_abort.store(0, std::memory_order_relaxed);
166+
// Only install the new-segment callback when streaming is requested
167+
// (Go side calls set_new_segment_callback before transcribe()). Leaving
168+
// it always-on is harmless but adds a function-pointer dispatch per
169+
// segment for the offline path.
170+
if (g_go_new_segment_cb.load(std::memory_order_relaxed) != 0) {
171+
wparams.new_segment_callback = new_segment_cb;
172+
wparams.new_segment_callback_user_data = nullptr;
173+
}
142174
wparams.abort_callback = abort_cb;
143175
wparams.abort_callback_user_data = nullptr;
144176

backend/go/whisper/cpp/gowhisper.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,15 @@ int n_tokens(int i);
1616
int32_t get_token_id(int i, int j);
1717
bool get_segment_speaker_turn_next(int i);
1818
void set_abort(int v);
19+
20+
// Function pointer from Go (returned by purego.NewCallback). Invoked once
21+
// per new-segment event during whisper_full(). The callback runs on the
22+
// decode thread - if Go blocks (slow gRPC consumer), the decode blocks
23+
// too. That is the intended backpressure path.
24+
typedef void (*go_new_segment_cb)(int idx_first, int n_new, uintptr_t user_data);
25+
26+
// Install the callback used by the next transcribe() call. Pass cb=0 to
27+
// clear. user_data is opaque to C; the Go side uses it to look up
28+
// per-call state.
29+
void set_new_segment_callback(uintptr_t cb_ptr, uintptr_t user_data);
1930
}

backend/go/whisper/gowhisper.go

Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"path/filepath"
88
"strings"
99
"sync"
10+
"sync/atomic"
1011
"unsafe"
1112

1213
"github.com/go-audio/wav"
@@ -29,8 +30,83 @@ var (
2930
CppGetTokenID func(i int, j int) int
3031
CppGetSegmentSpeakerTurnNext func(i int) bool
3132
CppSetAbort func(v int)
33+
// Set by main.go via purego.RegisterLibFunc. Installs (or clears with cb=0)
34+
// the C-side trampoline that whisper.cpp invokes per new segment.
35+
CppSetNewSegmentCallback func(cbPtr uintptr, userData uintptr)
3236
)
3337

38+
// streamCallStates maps per-AudioTranscriptionStream call IDs to the
39+
// state the Go callback needs to emit deltas. Only one entry is ever
40+
// live today (base.SingleThread), but the map shape mirrors
41+
// sherpa-onnx's TTS callback registry and survives a future SingleThread
42+
// removal without a contract change.
43+
var (
44+
streamCallStates sync.Map // uint64 -> *streamCallState
45+
streamCallSeq atomic.Uint64
46+
goNewSegmentCb uintptr // purego.NewCallback(onNewSegment) result; set in main.go at boot
47+
)
48+
49+
type streamCallState struct {
50+
results chan *pb.TranscriptStreamResponse
51+
diarize bool
52+
// nextIdx tracks how many segments we've already emitted. The C
53+
// trampoline passes idx_first = total - n_new, but we walk from
54+
// nextIdx to (idx_first + n_new) defensively in case whisper.cpp ever
55+
// coalesces multiple commits into a single callback invocation.
56+
nextIdx int
57+
// assembled mirrors the literal concat of every Delta sent on results.
58+
// We reuse it as the final TranscriptResult.Text so the e2e
59+
// invariant `final.Text == concat(deltas)` holds exactly. Written from
60+
// the cgo decode thread inside onNewSegment and read by the streaming
61+
// method after CppTranscribe returns; the cgo boundary provides the
62+
// happens-before edge.
63+
assembled strings.Builder
64+
}
65+
66+
// onNewSegment is the Go side of the C trampoline declared in
67+
// gowhisper.cpp:new_segment_cb. Whisper.cpp invokes it once per
68+
// new-segment event during whisper_full(). Reads segment text via the
69+
// existing CppGetSegment* getters (safe to call against the singleton
70+
// ctx; whisper.cpp is the only writer and it has already published the
71+
// segments by the time this fires).
72+
//
73+
// Sends deltas synchronously: if the channel is full, this blocks the
74+
// whisper decode thread. That's the intended backpressure path -
75+
// dropping deltas would break the concat(deltas) == final.Text invariant
76+
// the e2e suite asserts.
77+
func onNewSegment(idxFirst int32, nNew int32, userData uintptr) {
78+
v, ok := streamCallStates.Load(uint64(userData))
79+
if !ok {
80+
return // call already torn down (race with cancel + cb fire)
81+
}
82+
state := v.(*streamCallState)
83+
end := int(idxFirst) + int(nNew)
84+
for i := state.nextIdx; i < end; i++ {
85+
txt := strings.ToValidUTF8(strings.Clone(CppGetSegmentText(i)), "�")
86+
txt = strings.TrimSpace(txt)
87+
if state.diarize && CppGetSegmentSpeakerTurnNext(i) {
88+
txt += " [SPEAKER_TURN]"
89+
}
90+
if txt == "" {
91+
state.nextIdx = i + 1
92+
continue
93+
}
94+
// Prefix subsequent deltas with a single space so the assembled
95+
// stream reads as one space-joined transcript. The first delta has
96+
// no leading space, otherwise concat(deltas) would not match
97+
// final.Text and the e2e invariant would break.
98+
var delta string
99+
if state.assembled.Len() == 0 {
100+
delta = txt
101+
} else {
102+
delta = " " + txt
103+
}
104+
state.results <- &pb.TranscriptStreamResponse{Delta: delta}
105+
state.assembled.WriteString(delta)
106+
state.nextIdx = i + 1
107+
}
108+
}
109+
34110
type Whisper struct {
35111
base.SingleThread
36112
}
@@ -200,3 +276,120 @@ func (w *Whisper) AudioTranscription(ctx context.Context, opts *pb.TranscriptReq
200276
Duration: duration,
201277
}, nil
202278
}
279+
280+
// AudioTranscriptionStream runs whisper_full() and emits deltas via
281+
// whisper.cpp's new_segment_callback as segments are decoded, then a
282+
// final TranscriptResult. The offline AudioTranscription is unchanged;
283+
// both paths share whisper's single-instance ctx and the SingleThread
284+
// concurrency model.
285+
func (w *Whisper) AudioTranscriptionStream(ctx context.Context, opts *pb.TranscriptRequest, results chan *pb.TranscriptStreamResponse) error {
286+
defer close(results)
287+
288+
if err := ctx.Err(); err != nil {
289+
return status.Error(codes.Canceled, "transcription cancelled")
290+
}
291+
292+
dir, err := os.MkdirTemp("", "whisper")
293+
if err != nil {
294+
return err
295+
}
296+
defer func() { _ = os.RemoveAll(dir) }()
297+
298+
convertedPath := filepath.Join(dir, "converted.wav")
299+
if err := utils.AudioToWav(opts.Dst, convertedPath); err != nil {
300+
return err
301+
}
302+
303+
fh, err := os.Open(convertedPath)
304+
if err != nil {
305+
return err
306+
}
307+
defer func() { _ = fh.Close() }()
308+
309+
d := wav.NewDecoder(fh)
310+
buf, err := d.FullPCMBuffer()
311+
if err != nil {
312+
return err
313+
}
314+
data := buf.AsFloat32Buffer().Data
315+
var duration float32
316+
if buf.Format != nil && buf.Format.SampleRate > 0 {
317+
duration = float32(len(data)) / float32(buf.Format.SampleRate)
318+
}
319+
320+
// Register per-call state and install the C-side callback. defer
321+
// teardown so even a panic clears the C pointer (otherwise a stale
322+
// callback fires on the next AudioTranscription call).
323+
callID := streamCallSeq.Add(1)
324+
state := &streamCallState{
325+
results: results,
326+
diarize: opts.Diarize,
327+
}
328+
streamCallStates.Store(callID, state)
329+
CppSetNewSegmentCallback(goNewSegmentCb, uintptr(callID))
330+
defer func() {
331+
CppSetNewSegmentCallback(0, 0)
332+
streamCallStates.Delete(callID)
333+
}()
334+
335+
// Same abort-watcher pattern as AudioTranscription. Joined synchronously
336+
// so a late CppSetAbort(1) cannot fire after this function returns.
337+
done := make(chan struct{})
338+
var wg sync.WaitGroup
339+
wg.Add(1)
340+
go func() {
341+
defer wg.Done()
342+
select {
343+
case <-ctx.Done():
344+
CppSetAbort(1)
345+
case <-done:
346+
}
347+
}()
348+
defer func() {
349+
close(done)
350+
wg.Wait()
351+
}()
352+
353+
segsLen := uintptr(0xdeadbeef)
354+
segsLenPtr := unsafe.Pointer(&segsLen)
355+
ret := CppTranscribe(opts.Threads, opts.Language, opts.Translate, opts.Diarize, data, uintptr(len(data)), segsLenPtr, opts.Prompt)
356+
if ret == 2 {
357+
return status.Error(codes.Canceled, "transcription cancelled")
358+
}
359+
if ret != 0 {
360+
return fmt.Errorf("Failed Transcribe")
361+
}
362+
363+
// Build the final TranscriptResult. Segments[] mirrors the offline
364+
// path so the SSE done event carries the same per-segment shape.
365+
// final.Text reuses the assembled stream so concat(deltas) == final.Text
366+
// holds exactly, matching the e2e contract.
367+
segments := []*pb.TranscriptSegment{}
368+
for i := range int(segsLen) {
369+
s := CppGetSegmentStart(i) * 10000000
370+
t := CppGetSegmentEnd(i) * 10000000
371+
txt := strings.ToValidUTF8(strings.Clone(CppGetSegmentText(i)), "�")
372+
tokens := make([]int32, CppNTokens(i))
373+
if opts.Diarize && CppGetSegmentSpeakerTurnNext(i) {
374+
txt += " [SPEAKER_TURN]"
375+
}
376+
for j := range tokens {
377+
tokens[j] = int32(CppGetTokenID(i, j))
378+
}
379+
segments = append(segments, &pb.TranscriptSegment{
380+
Id: int32(i),
381+
Text: txt,
382+
Start: s, End: t,
383+
Tokens: tokens,
384+
})
385+
}
386+
387+
final := &pb.TranscriptResult{
388+
Segments: segments,
389+
Text: state.assembled.String(),
390+
Language: opts.Language,
391+
Duration: duration,
392+
}
393+
results <- &pb.TranscriptStreamResponse{FinalResult: final}
394+
return nil
395+
}

0 commit comments

Comments
 (0)