Skip to content

Commit f791b52

Browse files
authored
Caching pre-dialog assigned IDs (#590)
1 parent 3034bb7 commit f791b52

3 files changed

Lines changed: 192 additions & 16 deletions

File tree

pkg/sip/inbound.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -437,6 +437,10 @@ func (s *Server) processInvite(req *sip.Request, tx sip.ServerTransaction) (retE
437437
}
438438
s.getCallInfo(cc.SIPCallID()).countInvite(log, req)
439439
if !s.handleInviteAuth(tid, log, req, tx, from.User, r.Username, r.Password) {
440+
// Store (call-ID + from tag) to (to tag) mapping
441+
s.cmu.Lock()
442+
s.provisionalInvites.Add([2]string{cc.SIPCallID(), string(cc.Tag())}, cc.ID())
443+
s.cmu.Unlock()
440444
cmon.InviteErrorShort("unauthorized")
441445
// handleInviteAuth will generate the SIP Response as needed
442446
return psrpc.NewErrorf(psrpc.PermissionDenied, "invalid credentials were provided")
@@ -1447,8 +1451,17 @@ func (s *Server) newInbound(invite *sip.Request, inviteTx sip.ServerTransaction,
14471451
}
14481452
toTag, ok := toHdr.Params.Get("tag")
14491453
if !ok || toTag == "" {
1450-
// No to-tag means a new dialog is being created. Generate a local tag
1451-
toTag = lksip.NewCallID()
1454+
// Check if the call-ID + from tag is in the provisional invites cache
1455+
s.cmu.Lock()
1456+
cachedTag, ok := s.provisionalInvites.Get([2]string{sipCallID, fromTag})
1457+
s.cmu.Unlock()
1458+
if ok && cachedTag != "" {
1459+
// Use the cached tag to reuse the originally assigned SCL ID
1460+
toTag = string(cachedTag)
1461+
} else {
1462+
// New dialog is being created. Generate a local tag
1463+
toTag = lksip.NewCallID()
1464+
}
14521465
toHdr.Params.Add("tag", toTag)
14531466
}
14541467

pkg/sip/server.go

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -142,11 +142,12 @@ type Server struct {
142142
imu sync.Mutex
143143
inProgressInvites []*inProgressInvite
144144

145-
closing core.Fuse
146-
cmu sync.RWMutex
147-
byRemoteTag map[RemoteTag]*inboundCall
148-
byLocalTag map[LocalTag]*inboundCall
149-
byCallID map[string]*inboundCall
145+
closing core.Fuse
146+
cmu sync.RWMutex
147+
byRemoteTag map[RemoteTag]*inboundCall
148+
byLocalTag map[LocalTag]*inboundCall
149+
byCallID map[string]*inboundCall
150+
provisionalInvites *expirable.LRU[[2]string, LocalTag]
150151

151152
infos struct {
152153
sync.Mutex
@@ -180,15 +181,16 @@ func NewServer(region string, conf *config.Config, log logger.Logger, mon *stats
180181
log = logger.GetLogger()
181182
}
182183
s := &Server{
183-
log: log,
184-
conf: conf,
185-
region: region,
186-
mon: mon,
187-
getIOClient: getIOClient,
188-
getRoom: DefaultGetRoomFunc,
189-
byRemoteTag: make(map[RemoteTag]*inboundCall),
190-
byLocalTag: make(map[LocalTag]*inboundCall),
191-
byCallID: make(map[string]*inboundCall),
184+
log: log,
185+
conf: conf,
186+
region: region,
187+
mon: mon,
188+
getIOClient: getIOClient,
189+
getRoom: DefaultGetRoomFunc,
190+
byRemoteTag: make(map[RemoteTag]*inboundCall),
191+
byLocalTag: make(map[LocalTag]*inboundCall),
192+
byCallID: make(map[string]*inboundCall),
193+
provisionalInvites: expirable.NewLRU[[2]string, LocalTag](maxCallCache, nil, callCacheTTL),
192194
}
193195
for _, option := range options {
194196
option(s)

pkg/sip/service_test.go

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -788,3 +788,164 @@ func TestCANCELSendsBothResponses(t *testing.T) {
788788
// Verify we received the critical 487 response
789789
require.True(t, invite487Received, "Should have received 487 Request Terminated response to INVITE when CANCEL is sent")
790790
}
791+
792+
// TestSameCallIDForAuthFlow verifies that the same LiveKit call ID is assigned to both
793+
// the initial INVITE (without auth) and the subsequent INVITE (with auth)
794+
func TestSameCallIDForAuthFlow(t *testing.T) {
795+
const (
796+
fromUser = "test@example.com"
797+
toUser = "agent@example.com"
798+
username = "testuser"
799+
password = "testpass"
800+
callID = "same-call-id@test.com"
801+
fromTag = "fixed-from-tag-12345"
802+
)
803+
804+
var capturedCallIDs []string
805+
var mu sync.Mutex
806+
807+
log := logger.NewTestLoggerLevel(t, 1)
808+
809+
h := &TestHandler{
810+
GetAuthCredentialsFunc: func(ctx context.Context, call *rpc.SIPCall) (AuthInfo, error) {
811+
// Capture the LiveKit call ID from the first request
812+
mu.Lock()
813+
capturedCallIDs = append(capturedCallIDs, call.LkCallId)
814+
mu.Unlock()
815+
816+
log.Infow("GetAuthCredentials called", "callID", call.LkCallId)
817+
818+
return AuthInfo{
819+
Result: AuthPassword,
820+
Username: username,
821+
Password: password,
822+
}, nil
823+
},
824+
DispatchCallFunc: func(ctx context.Context, info *CallInfo) CallDispatch {
825+
return CallDispatch{
826+
Result: DispatchNoRuleReject,
827+
// No room config needed for reject
828+
}
829+
},
830+
OnSessionEndFunc: func(ctx context.Context, callIdentifier *CallIdentifier, callInfo *livekit.SIPCallInfo, reason string) {
831+
// No-op for tests to avoid async logging issues
832+
},
833+
}
834+
835+
// Create service with authentication enabled
836+
sipPort := rand.Intn(testPortSIPMax-testPortSIPMin) + testPortSIPMin
837+
localIP, err := config.GetLocalIP()
838+
require.NoError(t, err)
839+
840+
sipServerAddress := fmt.Sprintf("%s:%d", localIP, sipPort)
841+
842+
mon, err := stats.NewMonitor(&config.Config{MaxCpuUtilization: 0.9})
843+
require.NoError(t, err)
844+
845+
s, err := NewService("", &config.Config{
846+
HideInboundPort: false, // Enable authentication
847+
SIPPort: sipPort,
848+
SIPPortListen: sipPort,
849+
RTPPort: rtcconfig.PortRange{Start: testPortRTPMin, End: testPortRTPMax},
850+
}, mon, log, func(projectID string) rpc.IOInfoClient { return nil })
851+
require.NoError(t, err)
852+
require.NotNil(t, s)
853+
854+
s.SetHandler(h)
855+
require.NoError(t, s.Start())
856+
t.Cleanup(s.Stop)
857+
858+
sipUserAgent, err := sipgo.NewUA(
859+
sipgo.WithUserAgent(fromUser),
860+
sipgo.WithUserAgentLogger(slog.New(logger.ToSlogHandler(s.log))),
861+
)
862+
require.NoError(t, err)
863+
864+
sipClient, err := sipgo.NewClient(sipUserAgent)
865+
require.NoError(t, err)
866+
867+
offer, err := sdp.NewOffer(localIP, 0xB0B, sdp.EncryptionNone)
868+
require.NoError(t, err)
869+
offerData, err := offer.SDP.Marshal()
870+
require.NoError(t, err)
871+
872+
inviteFromHeader := sip.FromHeader{
873+
DisplayName: fromUser,
874+
Address: sip.Uri{User: fromUser, Host: sipServerAddress},
875+
Params: sip.NewParams().Add("tag", fromTag), // Key bit here
876+
}
877+
878+
// Create first INVITE request (without auth)
879+
inviteRecipient := sip.Uri{User: toUser, Host: sipServerAddress}
880+
inviteRequest1 := sip.NewRequest(sip.INVITE, inviteRecipient)
881+
inviteRequest1.SetDestination(sipServerAddress)
882+
inviteRequest1.SetBody(offerData)
883+
inviteRequest1.AppendHeader(sip.NewHeader("Content-Type", "application/sdp"))
884+
inviteRequest1.AppendHeader(sip.NewHeader("Call-ID", callID))
885+
inviteRequest1.AppendHeader(&inviteFromHeader)
886+
887+
tx1, err := sipClient.TransactionRequest(inviteRequest1)
888+
require.NoError(t, err)
889+
t.Cleanup(tx1.Terminate)
890+
891+
// Should receive 100 Trying first, then 407 Unauthorized
892+
res1 := getResponseOrFail(t, tx1)
893+
require.Equal(t, sip.StatusCode(100), res1.StatusCode, "First request should receive 100 Trying")
894+
res1 = getResponseOrFail(t, tx1)
895+
require.Equal(t, sip.StatusCode(407), res1.StatusCode, "First request should receive 407 Unauthorized")
896+
897+
// Get the To tag from the 407 response
898+
toHeader := res1.To()
899+
require.NotNil(t, toHeader, "407 response should have To header")
900+
_, ok := toHeader.Params.Get("tag")
901+
require.True(t, ok, "407 response To header should have tag parameter")
902+
903+
// Get the challenge from first response
904+
authHeader1 := res1.GetHeader("Proxy-Authenticate")
905+
require.NotNil(t, authHeader1, "First response should have Proxy-Authenticate header")
906+
challenge1 := authHeader1.Value()
907+
908+
// Parse the challenge to extract nonce and realm
909+
challenge, err := digest.ParseChallenge(challenge1)
910+
require.NoError(t, err, "Should be able to parse challenge")
911+
912+
// Compute the digest response using the challenge and credentials
913+
cred, err := digest.Digest(challenge, digest.Options{
914+
Method: "INVITE",
915+
URI: inviteRecipient.String(),
916+
Username: username,
917+
Password: password,
918+
})
919+
require.NoError(t, err, "Should be able to compute digest response")
920+
921+
// Create second INVITE request (with auth) using the SAME Call-ID, From tag, and To tag
922+
inviteRequest2 := sip.NewRequest(sip.INVITE, inviteRecipient)
923+
inviteRequest2.SetDestination(sipServerAddress)
924+
inviteRequest2.SetBody(offerData)
925+
inviteRequest2.AppendHeader(sip.NewHeader("Content-Type", "application/sdp"))
926+
inviteRequest2.AppendHeader(sip.NewHeader("Call-ID", callID))
927+
inviteRequest2.AppendHeader(sip.NewHeader("Proxy-Authorization", cred.String()))
928+
inviteRequest2.AppendHeader(&inviteFromHeader)
929+
930+
tx2, err := sipClient.TransactionRequest(inviteRequest2)
931+
require.NoError(t, err)
932+
t.Cleanup(tx2.Terminate)
933+
934+
// Should receive 100 Trying first, then proceed with authentication
935+
res2 := getResponseOrFail(t, tx2)
936+
require.Equal(t, sip.StatusCode(100), res2.StatusCode, "Second request should receive 100 Trying")
937+
938+
// Wait a bit for the handler to be called
939+
time.Sleep(100 * time.Millisecond)
940+
941+
// Verify we captured exactly 2 call IDs
942+
mu.Lock()
943+
require.Len(t, capturedCallIDs, 2, "Should have captured 2 call IDs")
944+
require.Equal(t, capturedCallIDs[0], capturedCallIDs[1], "Both requests should have the same LiveKit call ID")
945+
require.NotEmpty(t, capturedCallIDs[0], "Call ID should not be empty")
946+
require.Contains(t, capturedCallIDs[0], "SCL_", "Call ID should have SCL_ prefix")
947+
mu.Unlock()
948+
949+
t.Logf("First call ID: %s", capturedCallIDs[0])
950+
t.Logf("Second call ID: %s", capturedCallIDs[1])
951+
}

0 commit comments

Comments
 (0)