Skip to content

Commit a1840d8

Browse files
author
Patrick Gilhooley
committed
feat(fusion): use hand-neck anchors as fret priors
1 parent 66b72ee commit a1840d8

12 files changed

Lines changed: 550 additions & 12 deletions

File tree

docs/DECISIONS.md

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -357,3 +357,26 @@ per-cell posterior over (string, fret). Net new code is the projection
357357
- Fretting-hand identification — start with v0's handedness logic;
358358
switch to wrist-near-nut (now possible because we have the
359359
homography) only if the eval shows misidentification.
360+
361+
---
362+
363+
## 2026-05-07 — Phase 5 uses hand-neck anchors as first-class fusion priors
364+
365+
**Phase:** 5 (vision-fusion integration)
366+
**Decision tree:** Phase 4/5 fusion contract — whether exact per-finger
367+
posteriors are the primary video signal, or whether coarser neck-region
368+
evidence should guide candidate selection first.
369+
**Branch taken:** **Use coarse hand-neck anchors as first-class Phase 5
370+
priors.** MediaPipe + fretboard homography estimate the fretting hand's
371+
center fret/span; the pipeline converts each timed anchor into an
372+
`AudioEvent.fret_prior` before calling Viterbi/chord fusion.
373+
**Evidence:** Phase 4 manual fingering labels proved too expensive for the
374+
near-term path, while the detector/fretboard stack is already good at
375+
identifying the neck coordinate system. Phase 5 fusion already accepts
376+
`AudioEvent.fret_prior` as emission evidence, so the anchor signal can be
377+
integrated without changing §8 public function signatures.
378+
**Reasoning:** Exact fingertip-to-string/fret labels are brittle and costly
379+
to validate; "the hand is around frets 3-6" is a stronger, more stable
380+
visual prior for resolving audio's same-pitch string/fret ambiguity. Keeping
381+
the signal as a prior lets audio and playability override it when the visual
382+
evidence is weak or wrong.

tabvision/tabvision/fusion/__init__.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,16 @@
66
(string, fret) sequence respecting playability constraints.
77
"""
88

9+
from tabvision.fusion.neck_prior import (
10+
TimedNeckAnchor,
11+
anchor_position_prior,
12+
apply_neck_anchor_priors,
13+
)
914
from tabvision.fusion.viterbi import fuse
1015

11-
__all__ = ["fuse"]
16+
__all__ = [
17+
"TimedNeckAnchor",
18+
"anchor_position_prior",
19+
"apply_neck_anchor_priors",
20+
"fuse",
21+
]
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
"""Attach coarse hand-neck anchors to audio events as fret priors."""
2+
3+
from __future__ import annotations
4+
5+
from collections.abc import Sequence
6+
from dataclasses import replace
7+
from typing import Protocol
8+
9+
import numpy as np
10+
11+
from tabvision.types import AudioEvent, GuitarConfig
12+
13+
14+
class NeckAnchorLike(Protocol):
15+
center_fret: float
16+
min_fret: float
17+
max_fret: float
18+
confidence: float
19+
20+
21+
TimedNeckAnchor = tuple[float, NeckAnchorLike]
22+
23+
24+
def apply_neck_anchor_priors(
25+
events: Sequence[AudioEvent],
26+
anchors: Sequence[TimedNeckAnchor],
27+
cfg: GuitarConfig,
28+
*,
29+
max_time_distance_s: float = 0.15,
30+
) -> list[AudioEvent]:
31+
"""Return events enriched with nearest video-anchor position priors.
32+
33+
The resulting ``AudioEvent.fret_prior`` has shape
34+
``(cfg.n_strings, cfg.max_fret + 1)`` so Phase 5 playability emission can
35+
consume it as a per-position prior.
36+
"""
37+
if not anchors:
38+
return list(events)
39+
40+
out: list[AudioEvent] = []
41+
for ev in events:
42+
nearest = min(anchors, key=lambda item: abs(item[0] - ev.onset_s))
43+
dt = abs(nearest[0] - ev.onset_s)
44+
if dt > max_time_distance_s or nearest[1].confidence <= 0.0:
45+
out.append(ev)
46+
continue
47+
prior = anchor_position_prior(nearest[1], cfg)
48+
if ev.fret_prior is not None:
49+
prior = _combine_priors(ev.fret_prior, prior, cfg)
50+
out.append(replace(ev, fret_prior=prior))
51+
return out
52+
53+
54+
def anchor_position_prior(anchor: NeckAnchorLike, cfg: GuitarConfig) -> np.ndarray:
55+
"""Return a normalized ``(string, fret)`` prior from a hand-neck anchor."""
56+
frets = np.arange(cfg.max_fret + 1, dtype=np.float64)
57+
uniform_fret = np.full(cfg.max_fret + 1, 1.0 / (cfg.max_fret + 1), dtype=np.float64)
58+
if anchor.confidence <= 0.0:
59+
fret_probs = uniform_fret
60+
else:
61+
sigma = max((float(anchor.max_fret) - float(anchor.min_fret)) / 2.0, 1.0)
62+
logits = -0.5 * ((frets - float(anchor.center_fret)) / sigma) ** 2
63+
gaussian = np.exp(logits - float(logits.max()))
64+
gaussian /= float(gaussian.sum())
65+
weight = min(max(float(anchor.confidence), 0.0), 1.0)
66+
fret_probs = weight * gaussian + (1.0 - weight) * uniform_fret
67+
fret_probs /= float(fret_probs.sum())
68+
69+
prior = np.tile(fret_probs[None, :], (cfg.n_strings, 1))
70+
prior /= float(prior.sum())
71+
return prior
72+
73+
74+
def _combine_priors(
75+
existing: np.ndarray, anchor_prior: np.ndarray, cfg: GuitarConfig
76+
) -> np.ndarray:
77+
existing_position = _as_position_prior(existing, cfg)
78+
combined = existing_position * anchor_prior
79+
denom = float(combined.sum())
80+
if denom <= 0.0:
81+
return anchor_prior
82+
return np.asarray(combined / denom, dtype=np.float64)
83+
84+
85+
def _as_position_prior(prior: np.ndarray, cfg: GuitarConfig) -> np.ndarray:
86+
arr = np.asarray(prior, dtype=np.float64)
87+
if arr.shape == (cfg.n_strings, cfg.max_fret + 1):
88+
denom = float(arr.sum())
89+
if denom > 0.0:
90+
return np.asarray(arr / denom, dtype=np.float64)
91+
return anchor_position_prior(_ZeroAnchor(), cfg)
92+
if arr.shape == (cfg.max_fret + 1,):
93+
out = np.tile(arr[None, :], (cfg.n_strings, 1))
94+
denom = float(out.sum())
95+
if denom > 0.0:
96+
return np.asarray(out / denom, dtype=np.float64)
97+
return anchor_position_prior(_ZeroAnchor(), cfg)
98+
return anchor_position_prior(_ZeroAnchor(), cfg)
99+
100+
101+
class _ZeroAnchor:
102+
center_fret = 0.0
103+
min_fret = 0.0
104+
max_fret = 0.0
105+
confidence = 0.0
106+
107+
108+
__all__ = [
109+
"NeckAnchorLike",
110+
"TimedNeckAnchor",
111+
"anchor_position_prior",
112+
"apply_neck_anchor_priors",
113+
]

tabvision/tabvision/fusion/playability.py

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -101,8 +101,9 @@ def emission_cost(
101101
102102
- ``-log(event.confidence)`` — per-event constant (does not affect
103103
ranking within a single event but matters across events).
104-
- ``-log(event.fret_prior[s, f])`` — only when the audio backend
105-
provides a per-position prior (e.g. Phase 2 ``tabcnn``).
104+
- ``-log(event.fret_prior[s, f])`` — only when the audio backend or
105+
video neck-anchor path provides a prior. A one-dimensional fret-only
106+
prior is also accepted and read as ``event.fret_prior[f]``.
106107
- ``lambda_vision * -log(P_vision[s, f])`` — vision marginal at
107108
``event.onset_s``. Skipped when ``fingering is None``.
108109
- ``LOW_FRET_BIAS * fret`` — gentle low-fret preference.
@@ -111,7 +112,7 @@ def emission_cost(
111112
cost = -math.log(max(event.confidence, EPS))
112113

113114
if event.fret_prior is not None:
114-
prior = float(event.fret_prior[candidate.string_idx, candidate.fret])
115+
prior = _candidate_prior(event.fret_prior, candidate)
115116
cost += -math.log(max(prior, EPS))
116117

117118
if fingering is not None:
@@ -126,6 +127,20 @@ def emission_cost(
126127
return cost
127128

128129

130+
def _candidate_prior(prior: object, candidate: Candidate) -> float:
131+
"""Read a candidate prior from either a 2D position prior or 1D fret prior."""
132+
try:
133+
arr = prior # keep mypy's object handling local to this helper
134+
shape = getattr(arr, "shape", ())
135+
if len(shape) == 2:
136+
return float(arr[candidate.string_idx, candidate.fret]) # type: ignore[index]
137+
if len(shape) == 1:
138+
return float(arr[candidate.fret]) # type: ignore[index]
139+
except (IndexError, TypeError, ValueError):
140+
return 0.0
141+
return 0.0
142+
143+
129144
def transition_cost(prev: Candidate, curr: Candidate, cfg: GuitarConfig) -> float:
130145
"""Transition cost from ``prev`` to ``curr``.
131146

tabvision/tabvision/pipeline.py

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,15 @@
2323

2424
import logging
2525
from collections.abc import Iterable, Iterator
26-
from dataclasses import replace
26+
from dataclasses import dataclass, replace
2727
from pathlib import Path
28+
from typing import cast
2829

2930
import numpy as np
3031

3132
from tabvision.demux import demux
32-
from tabvision.fusion import fuse
33+
from tabvision.fusion import TimedNeckAnchor, apply_neck_anchor_priors, fuse
34+
from tabvision.fusion.neck_prior import NeckAnchorLike
3335
from tabvision.types import (
3436
AudioBackend,
3537
AudioEvent,
@@ -38,6 +40,7 @@
3840
GuitarBackend,
3941
GuitarConfig,
4042
HandBackend,
43+
Homography,
4144
SessionConfig,
4245
TabEvent,
4346
)
@@ -49,6 +52,12 @@ class _VideoImportError(RuntimeError):
4952
"""Internal signal: a soft-optional video dep failed to import."""
5053

5154

55+
@dataclass(frozen=True)
56+
class _VideoStackResult:
57+
fingerings: list[FrameFingering]
58+
neck_anchors: list[TimedNeckAnchor]
59+
60+
5261
def run_pipeline(
5362
video_path: str | Path,
5463
*,
@@ -80,22 +89,29 @@ def run_pipeline(
8089
logger.info("audio backend produced %d events", len(audio_events))
8190

8291
fingerings: list[FrameFingering] = []
92+
neck_anchors: list[TimedNeckAnchor] = []
8393
if video_enabled:
8494
try:
85-
fingerings = _run_video_stack(
95+
video_result = _run_video_stack(
8696
demuxed.frame_iterator,
8797
stride=video_stride,
8898
cfg=cfg,
8999
guitar_backend=guitar_backend,
90100
fretboard_backend=fretboard_backend,
91101
hand_backend=hand_backend,
92102
)
103+
fingerings = video_result.fingerings
104+
neck_anchors = video_result.neck_anchors
93105
except _VideoImportError as exc:
94106
logger.warning(
95107
"video stack unavailable, falling back to audio-only: %s",
96108
exc,
97109
)
98110

111+
if lambda_vision > 0.0 and neck_anchors:
112+
audio_events = apply_neck_anchor_priors(audio_events, neck_anchors, cfg)
113+
logger.info("attached %d hand-neck anchors as audio fret priors", len(neck_anchors))
114+
99115
logger.info(
100116
"running fuse() with %d audio events, %d fingerings, lambda_vision=%.2f",
101117
len(audio_events),
@@ -118,7 +134,7 @@ def _run_video_stack(
118134
guitar_backend: GuitarBackend | None,
119135
fretboard_backend: FretboardBackend | None,
120136
hand_backend: HandBackend | None,
121-
) -> list[FrameFingering]:
137+
) -> _VideoStackResult:
122138
"""Single-pass walk producing one ``FrameFingering`` per sampled frame.
123139
124140
Skipped-by-stride frames produce nothing; sampled frames produce
@@ -138,6 +154,7 @@ def _run_video_stack(
138154
hand_backend = _make_hand_backend()
139155

140156
fingerings: list[FrameFingering] = []
157+
neck_anchors: list[TimedNeckAnchor] = []
141158
n_fingers = 4 # fretting fingers; matches Phase 4 convention.
142159
empty_logits = np.zeros((n_fingers, cfg.n_strings, cfg.max_fret + 1), dtype=np.float64)
143160

@@ -167,8 +184,28 @@ def _run_video_stack(
167184
ff = hand_backend.detect(frame, H, cfg)
168185
# Backends produce a degenerate t=0.0; stamp the real timestamp here.
169186
fingerings.append(replace(ff, t=t))
187+
anchor = _detect_neck_anchor(hand_backend, frame, H, cfg)
188+
if anchor is not None and anchor.confidence > 0.0:
189+
neck_anchors.append((t, anchor))
190+
191+
return _VideoStackResult(fingerings=fingerings, neck_anchors=neck_anchors)
170192

171-
return fingerings
193+
194+
def _detect_neck_anchor(
195+
hand_backend: HandBackend,
196+
frame: np.ndarray,
197+
H: Homography, # noqa: N803 — optional extension outside the §8 protocol
198+
cfg: GuitarConfig,
199+
) -> NeckAnchorLike | None:
200+
"""Use a backend's optional coarse neck-anchor hook when available."""
201+
detect_anchor = getattr(hand_backend, "detect_anchor", None)
202+
if detect_anchor is None:
203+
return None
204+
try:
205+
return cast(NeckAnchorLike | None, detect_anchor(frame, H, cfg))
206+
except Exception as exc: # noqa: BLE001 — optional evidence must degrade softly
207+
logger.debug("hand-neck anchor unavailable on frame: %s", exc)
208+
return None
172209

173210

174211
# ---------------------------------------------------------------------------

tabvision/tabvision/types.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ def transcribe(
167167
class GuitarBackend(Protocol):
168168
name: str
169169

170-
def detect(self, frame: np.ndarray) -> GuitarBBox:
170+
def detect(self, frame: np.ndarray) -> GuitarBBox | None:
171171
...
172172

173173

tabvision/tabvision/video/hand/__init__.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
HandBackend,
2626
Homography,
2727
)
28+
from tabvision.video.hand.neck_anchor import HandNeckAnchor, NeckAnchorConfig, compute_neck_anchor
2829

2930

3031
def track_hand(
@@ -52,4 +53,9 @@ def track_hand(
5253
return out
5354

5455

55-
__all__ = ["track_hand"]
56+
__all__ = [
57+
"HandNeckAnchor",
58+
"NeckAnchorConfig",
59+
"compute_neck_anchor",
60+
"track_hand",
61+
]

tabvision/tabvision/video/hand/mediapipe_backend.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
PosteriorConfig,
3030
compute_fingering,
3131
)
32+
from tabvision.video.hand.neck_anchor import HandNeckAnchor, compute_neck_anchor
3233

3334
logger = logging.getLogger(__name__)
3435

@@ -106,6 +107,21 @@ def detect(
106107
self.config.posterior,
107108
)
108109

110+
def detect_anchor(
111+
self, frame: np.ndarray, H: Homography, cfg: GuitarConfig # noqa: N803
112+
) -> HandNeckAnchor:
113+
"""Return the coarse fretting-hand neck region for fusion.
114+
115+
This deliberately sits outside the §8 ``HandBackend`` protocol:
116+
exact per-finger posteriors remain available through ``detect()``,
117+
while Phase 5 fusion can use this more robust hand-region prior.
118+
"""
119+
if frame.ndim != 3 or frame.shape[-1] != 3:
120+
raise BackendError(f"expected BGR frame, got shape {frame.shape}")
121+
122+
landmarks = self._extract_fretting_hand(frame)
123+
return compute_neck_anchor(landmarks, H, cfg)
124+
109125
def close(self) -> None:
110126
if self._landmarker is not None:
111127
self._landmarker.close()

0 commit comments

Comments
 (0)