Skip to content

Commit c9e903c

Browse files
committed
Fixes for incoming/outgoing calls
1 parent 9a07ee2 commit c9e903c

2 files changed

Lines changed: 240 additions & 28 deletions

File tree

calling/call.go

Lines changed: 116 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -528,11 +528,27 @@ func (c *Call) HandleMobiusEvent(event *MobiusCallEvent) {
528528
c.Emitter.Emit(string(CallEventDisconnect), c.callID)
529529

530530
case MobiusEventCallSetup:
531-
// Incoming call setup
531+
// Skip redundant setup events if the call is already connected.
532+
// BroadWorks can re-send mobius.call after the call is established.
533+
c.mu.RLock()
534+
alreadyConnected := c.connected
535+
c.mu.RUnlock()
536+
if alreadyConnected {
537+
log.Printf("[INCOMING] Ignoring redundant call setup for already-connected callId=%s", c.callID)
538+
return
539+
}
540+
541+
// Incoming call setup — send sig_alerting to Mobius immediately
532542
c.mu.Lock()
533543
c.callID = data.CallID
534544
c.state = CallStateAlerting
535545
c.mu.Unlock()
546+
547+
log.Printf("[INCOMING] Call setup received, PATCHing sig_alerting for callId=%s", c.callID)
548+
if err := c.patchCallState("sig_alerting"); err != nil {
549+
log.Printf("[INCOMING] Failed to PATCH sig_alerting: %v", err)
550+
}
551+
536552
c.Emitter.Emit(string(CallEventAlerting), c.callID)
537553
}
538554

@@ -575,19 +591,74 @@ func (c *Call) handleRoapMessage(msg *RoapMessage) {
575591
}
576592

577593
case RoapMessageOffer:
578-
// Remote sent a new offer (renegotiation)
594+
log.Printf("[INCOMING] ROAP OFFER received (seq=%d, sdp length=%d) for callId=%s direction=%s", msg.Seq, len(msg.SDP), c.callID, c.direction)
595+
596+
// For incoming calls that haven't been connected yet, we must:
597+
// 1. Add audio track
598+
// 2. PATCH call state to "connected" on Mobius (accept the call)
599+
// 3. Then send the ROAP answer with media
600+
// This matches the JS SDK's answer() flow.
601+
isInitialInbound := c.direction == CallDirectionInbound && !c.connected
602+
603+
// Ensure audio track exists before setting the remote offer
604+
if c.media.GetLocalTrack() == nil {
605+
log.Printf("[INCOMING] Adding audio track")
606+
if _, err := c.media.AddAudioTrack(); err != nil {
607+
log.Printf("Failed to add audio track for incoming offer: %v", err)
608+
return
609+
}
610+
log.Printf("[INCOMING] Audio track added successfully")
611+
}
612+
613+
// Register remote track handler BEFORE SetRemoteOffer — OnTrack can
614+
// fire as soon as DTLS completes, which may happen before we finish
615+
// sending the ROAP answer.
616+
if isInitialInbound {
617+
c.media.OnRemoteTrack(func(track *webrtc.TrackRemote) {
618+
c.Emitter.Emit(string(CallEventRemoteMedia), track)
619+
})
620+
}
621+
622+
// PATCH sig_connected BEFORE sending ROAP answer — Mobius rejects media
623+
// when the call state is still ALERT/PROGRESS (error 400).
624+
if isInitialInbound {
625+
log.Printf("[INCOMING] PATCHing call to connected state")
626+
if err := c.patchCallState("sig_connected"); err != nil {
627+
log.Printf("[INCOMING] Failed to PATCH call state to connected: %v", err)
628+
}
629+
}
630+
579631
if err := c.media.SetRemoteOffer(msg.SDP); err != nil {
580632
log.Printf("Failed to set remote offer: %v", err)
581633
return
582634
}
635+
583636
sdp, err := c.media.CreateAnswer()
584637
if err != nil {
585638
log.Printf("Failed to create answer: %v", err)
586639
return
587640
}
641+
642+
sdp = ModifySdpForMobius(sdp)
643+
588644
answerMsg := SDPToRoapAnswer(sdp, msg.Seq)
645+
log.Printf("[INCOMING] Sending ROAP answer with seq=%d to callId=%s", msg.Seq, c.callID)
589646
if err := c.postMedia(answerMsg); err != nil {
590647
log.Printf("Failed to send ROAP answer: %v", err)
648+
return
649+
}
650+
log.Printf("[INCOMING] ROAP answer sent successfully")
651+
652+
// Transition local state for initial inbound calls
653+
if isInitialInbound {
654+
c.mu.Lock()
655+
c.state = CallStateConnected
656+
c.connected = true
657+
c.seq++
658+
c.mu.Unlock()
659+
660+
c.Emitter.Emit(string(CallEventConnect), c.callID)
661+
c.Emitter.Emit(string(CallEventEstablished), c.callID)
591662
}
592663

593664
case RoapMessageOK:
@@ -742,10 +813,52 @@ func (c *Call) postToMobius(url string, payload interface{}) error {
742813
}
743814
defer func() { _ = resp.Body.Close() }()
744815

816+
body, _ := io.ReadAll(resp.Body)
745817
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
746-
body, _ := io.ReadAll(resp.Body)
747818
return fmt.Errorf("request failed with status %d: %s", resp.StatusCode, string(body))
748819
}
820+
return nil
821+
}
822+
823+
// patchCallState sends a PATCH to Mobius to transition the call state.
824+
// This is required for incoming calls: Mobius must be told the call is "connected"
825+
// before media answers can be sent (mirrors the JS SDK's answer flow).
826+
func (c *Call) patchCallState(state string) error {
827+
payload := map[string]interface{}{
828+
"device": map[string]string{
829+
"deviceId": c.deviceID,
830+
"correlationId": c.correlationID,
831+
},
832+
"callId": c.callID,
833+
"callState": state,
834+
"inbandMedia": false,
835+
}
836+
837+
payloadBytes, err := json.Marshal(payload)
838+
if err != nil {
839+
return fmt.Errorf("error marshaling PATCH payload: %w", err)
840+
}
841+
842+
url := fmt.Sprintf("%sdevices/%s/calls/%s", c.mobiusURL, c.deviceID, c.callID)
843+
log.Printf("PATCH callState=%s for callId=%s", state, c.callID)
844+
845+
req, err := http.NewRequest(http.MethodPatch, url, bytes.NewBuffer(payloadBytes))
846+
if err != nil {
847+
return fmt.Errorf("error creating PATCH request: %w", err)
848+
}
849+
850+
c.setMobiusHeaders(req)
851+
852+
resp, err := c.core.GetHTTPClient().Do(req)
853+
if err != nil {
854+
return fmt.Errorf("error making PATCH request: %w", err)
855+
}
856+
defer func() { _ = resp.Body.Close() }()
857+
858+
body, _ := io.ReadAll(resp.Body)
859+
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
860+
return fmt.Errorf("PATCH failed with status %d: %s", resp.StatusCode, string(body))
861+
}
749862

750863
return nil
751864
}

calling/media.go

Lines changed: 124 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -61,27 +61,28 @@ func NewMediaEngine(config *MediaConfig) (*MediaEngine, error) {
6161
config = DefaultMediaConfig()
6262
}
6363

64-
// Register only PCMU and PCMA — BroadWorks/Mobius consistently selects PCMU.
65-
// Avoid RegisterDefaultCodecs which adds Opus/G722/video codecs that
66-
// BroadWorks doesn't support and can cause negotiation issues.
64+
// Register audio codecs for BroadWorks/Mobius compatibility.
6765
m := &webrtc.MediaEngine{}
68-
if err := m.RegisterCodec(webrtc.RTPCodecParameters{
69-
RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypePCMU, ClockRate: 8000},
70-
PayloadType: 0,
71-
}, webrtc.RTPCodecTypeAudio); err != nil {
72-
return nil, fmt.Errorf("failed to register PCMU: %w", err)
73-
}
74-
if err := m.RegisterCodec(webrtc.RTPCodecParameters{
75-
RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypePCMA, ClockRate: 8000},
76-
PayloadType: 8,
77-
}, webrtc.RTPCodecTypeAudio); err != nil {
78-
return nil, fmt.Errorf("failed to register PCMA: %w", err)
66+
for _, codec := range []webrtc.RTPCodecParameters{
67+
{RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypePCMU, ClockRate: 8000}, PayloadType: 0},
68+
{RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypePCMA, ClockRate: 8000}, PayloadType: 8},
69+
{RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: "audio/telephone-event", ClockRate: 8000, SDPFmtpLine: "0-15"}, PayloadType: 101},
70+
{RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: "audio/CN", ClockRate: 8000}, PayloadType: 19},
71+
} {
72+
if err := m.RegisterCodec(codec, webrtc.RTPCodecTypeAudio); err != nil {
73+
return nil, fmt.Errorf("failed to register codec %s: %w", codec.MimeType, err)
74+
}
7975
}
8076

8177
// BroadWorks (ice-lite) sends RTP before Pion finishes processing the SDP answer.
8278
// Enable undeclared SSRC handling so OnTrack fires for early media.
8379
settings := webrtc.SettingEngine{}
8480
settings.SetHandleUndeclaredSSRCWithoutAnswer(true)
81+
// BroadWorks is ice-lite and offers a=setup:actpass. It will NOT initiate
82+
// DTLS (it only acts as server when we're active, or client when we're passive).
83+
// With ice-lite, we must be the DTLS client (active). Pion defaults to passive
84+
// when answering, which causes a DTLS deadlock — nobody initiates the handshake.
85+
settings.SetAnsweringDTLSRole(webrtc.DTLSRoleClient)
8586

8687
// Register default interceptors (RTCP reports, NACK, TWCC) — required when
8788
// using a custom MediaEngine/SettingEngine, otherwise Pion won't process
@@ -158,11 +159,19 @@ func NewMediaEngine(config *MediaConfig) (*MediaEngine, error) {
158159
return engine, nil
159160
}
160161

161-
// OnRemoteTrack sets the callback for when a remote audio track is received
162+
// OnRemoteTrack sets the callback for when a remote audio track is received.
163+
// If a track was already received before the handler was set, the handler is
164+
// called immediately with that track.
162165
func (me *MediaEngine) OnRemoteTrack(handler func(track *webrtc.TrackRemote)) {
163166
me.mu.Lock()
164-
defer me.mu.Unlock()
165167
me.onRemoteTrack = handler
168+
existingTrack := me.remoteTrack
169+
me.mu.Unlock()
170+
171+
if existingTrack != nil && handler != nil {
172+
log.Printf("OnRemoteTrack: track already available, calling handler immediately")
173+
handler(existingTrack)
174+
}
166175
}
167176

168177
// OnICECandidate sets the callback for when an ICE candidate is gathered
@@ -278,7 +287,6 @@ func (me *MediaEngine) CreateAnswer() (string, error) {
278287
if localDesc == nil {
279288
return "", fmt.Errorf("local description is nil after gathering")
280289
}
281-
282290
return localDesc.SDP, nil
283291
}
284292

@@ -287,9 +295,10 @@ func (me *MediaEngine) SetRemoteOffer(sdp string) error {
287295
me.mu.Lock()
288296
defer me.mu.Unlock()
289297

298+
fixed := fixIncomingSdp(sdp)
290299
return me.peerConnection.SetRemoteDescription(webrtc.SessionDescription{
291300
Type: webrtc.SDPTypeOffer,
292-
SDP: fixIncomingSdp(sdp),
301+
SDP: fixed,
293302
})
294303
}
295304

@@ -409,11 +418,16 @@ func NewRoapOK(seq int) *RoapMessage {
409418
// fixIncomingSdp patches incoming BroadWorks SDP for Pion v4 compatibility:
410419
// - Injects a=mid:0 after the first m= line if missing (Pion v4 requires mid)
411420
// - Adds a=group:BUNDLE 0 at session level if missing
421+
// - Normalises line endings to \r\n (SDP spec)
412422
func fixIncomingSdp(sdp string) string {
413-
lines := strings.Split(sdp, "\r\n")
414-
result := make([]string, 0, len(lines)+2)
423+
// Normalize line endings: BroadWorks may send \n only
424+
sdp = strings.ReplaceAll(sdp, "\r\n", "\n")
425+
lines := strings.Split(sdp, "\n")
426+
427+
result := make([]string, 0, len(lines)+4)
415428
hasMid := false
416429
hasBundle := false
430+
hasDirection := false
417431
inMedia := false
418432

419433
// First pass: check what's present
@@ -424,6 +438,10 @@ func fixIncomingSdp(sdp string) string {
424438
if strings.HasPrefix(line, "a=group:BUNDLE") {
425439
hasBundle = true
426440
}
441+
if strings.HasPrefix(line, "a=sendrecv") || strings.HasPrefix(line, "a=sendonly") ||
442+
strings.HasPrefix(line, "a=recvonly") || strings.HasPrefix(line, "a=inactive") {
443+
hasDirection = true
444+
}
427445
}
428446

429447
// Second pass: inject missing attributes
@@ -435,10 +453,15 @@ func fixIncomingSdp(sdp string) string {
435453
}
436454
inMedia = true
437455
result = append(result, line)
438-
// After m= line, inject mid if missing
456+
// After m= line, inject mid and direction if missing
439457
if !hasMid {
440458
result = append(result, "a=mid:0")
441459
}
460+
// Pion v4 requires an explicit direction attribute; BroadWorks omits it
461+
// (SDP spec says default is sendrecv, but Pion doesn't apply the default)
462+
if !hasDirection {
463+
result = append(result, "a=sendrecv")
464+
}
442465
continue
443466
}
444467
result = append(result, line)
@@ -447,16 +470,84 @@ func fixIncomingSdp(sdp string) string {
447470
return strings.Join(result, "\r\n")
448471
}
449472

450-
// ModifySdpForMobius cleans up the SDP offer for BroadWorks/Mobius compatibility:
473+
// ModifySdpForMobius cleans up the SDP offer/answer for BroadWorks/Mobius compatibility:
451474
// - Removes IPv6 candidates (BroadWorks only supports IPv4)
452-
// - Converts port 9 to 0 in m= line (JS SDK: convertPort9to0)
453475
// - Removes rtcp-fb lines (BroadWorks doesn't support transport-cc)
454476
// - Removes extmap lines (BroadWorks doesn't support RTP header extensions)
455477
// - Removes extmap-allow-mixed
478+
// - Copies c= line from media level to session level (copyClineToSessionLevel)
456479
func ModifySdpForMobius(sdp string) string {
457480
lines := strings.Split(sdp, "\r\n")
458481
filtered := make([]string, 0, len(lines))
482+
483+
// First pass: collect info we need
484+
// - Find first IPv4 srflx candidate (preferred) or host candidate for c= line
485+
// - Find media-level c= line for copyClineToSessionLevel
486+
var bestCandidateIP, bestCandidatePort string
487+
var mediaCline string
488+
hasSessionCline := false
489+
inMedia := false
459490
for _, line := range lines {
491+
if strings.HasPrefix(line, "m=") {
492+
inMedia = true
493+
}
494+
if strings.HasPrefix(line, "c=") {
495+
if inMedia {
496+
mediaCline = line
497+
} else {
498+
hasSessionCline = true
499+
}
500+
}
501+
if strings.HasPrefix(line, "a=candidate:") {
502+
parts := strings.Fields(line)
503+
if len(parts) >= 8 {
504+
addr := parts[4]
505+
port := parts[5]
506+
// Skip IPv6
507+
if strings.Contains(addr, ":") {
508+
continue
509+
}
510+
candidateType := ""
511+
if len(parts) >= 8 {
512+
candidateType = parts[7] // "host" or "srflx"
513+
}
514+
// Prefer srflx (public IP), fall back to host
515+
if candidateType == "srflx" || bestCandidateIP == "" {
516+
bestCandidateIP = addr
517+
bestCandidatePort = port
518+
}
519+
}
520+
}
521+
}
522+
523+
inMedia = false
524+
for _, line := range lines {
525+
if strings.HasPrefix(line, "m=") {
526+
inMedia = true
527+
}
528+
529+
// Copy c= line to session level (before first m= line)
530+
// Use candidate IP if the c= line has 0.0.0.0
531+
if strings.HasPrefix(line, "m=") && !hasSessionCline {
532+
if bestCandidateIP != "" {
533+
filtered = append(filtered, fmt.Sprintf("c=IN IP4 %s", bestCandidateIP))
534+
} else if mediaCline != "" {
535+
filtered = append(filtered, mediaCline)
536+
}
537+
hasSessionCline = true
538+
}
539+
540+
// Replace c=IN IP4 0.0.0.0 with real candidate IP
541+
if strings.HasPrefix(line, "c=") && strings.Contains(line, "0.0.0.0") && bestCandidateIP != "" {
542+
filtered = append(filtered, fmt.Sprintf("c=IN IP4 %s", bestCandidateIP))
543+
continue
544+
}
545+
546+
// Replace port 9 in m= line with real candidate port
547+
if strings.HasPrefix(line, "m=audio 9 ") && bestCandidatePort != "" {
548+
line = strings.Replace(line, "m=audio 9 ", "m=audio "+bestCandidatePort+" ", 1)
549+
}
550+
460551
// Skip IPv6 candidates (address contains ":")
461552
if strings.HasPrefix(line, "a=candidate:") {
462553
parts := strings.Fields(line)
@@ -479,8 +570,16 @@ func ModifySdpForMobius(sdp string) string {
479570
if strings.HasPrefix(line, "a=extmap-allow-mixed") {
480571
continue
481572
}
482-
// Note: port 9 in SDP is a placeholder (Pion default). Do NOT convert to 0
483-
// as port 0 means "reject this media stream" in SDP.
573+
// Skip rtcp-rsize (not supported by BroadWorks)
574+
if strings.HasPrefix(line, "a=rtcp-rsize") {
575+
continue
576+
}
577+
// BroadWorks ice-lite + actpass offer: we must be DTLS client (active).
578+
// Pion defaults to passive when answering, but ice-lite won't initiate DTLS.
579+
if line == "a=setup:passive" {
580+
filtered = append(filtered, "a=setup:active")
581+
continue
582+
}
484583
filtered = append(filtered, line)
485584
}
486585
return strings.Join(filtered, "\r\n")

0 commit comments

Comments
 (0)