@@ -4,13 +4,14 @@ import (
44 "context"
55 "encoding/json"
66 "io"
7+ "kubecloud/internal/auth"
78 "kubecloud/internal/core/models"
9+ "kubecloud/internal/infrastructure/logger"
810 "net/http"
11+ "strings"
912 "sync"
1013 "time"
1114
12- "kubecloud/internal/infrastructure/logger"
13-
1415 "github.com/gin-gonic/gin"
1516)
1617
@@ -23,10 +24,11 @@ const (
2324
2425// SSEManager handles Server-Sent Events for real-time notifications
2526type SSEManager struct {
26- clients map [int ][]chan SSEMessage // userID -> client channels
27- mu sync.RWMutex
28- ctx context.Context
29- cancel context.CancelFunc
27+ clients map [int ][]chan SSEMessage // userID -> client channels
28+ mu sync.RWMutex
29+ ctx context.Context
30+ cancel context.CancelFunc
31+ tokenManager auth.TokenManager
3032}
3133
3234// SSEMessage represents a server-sent event message
@@ -40,12 +42,13 @@ type SSEMessage struct {
4042}
4143
4244// NewSSEManager creates a new SSE manager
43- func NewSSEManager () * SSEManager {
45+ func NewSSEManager (tokenManager auth. TokenManager ) * SSEManager {
4446 ctx , cancel := context .WithCancel (context .Background ())
4547 manager := & SSEManager {
46- clients : make (map [int ][]chan SSEMessage ),
47- ctx : ctx ,
48- cancel : cancel ,
48+ clients : make (map [int ][]chan SSEMessage ),
49+ ctx : ctx ,
50+ cancel : cancel ,
51+ tokenManager : tokenManager ,
4952 }
5053
5154 return manager
@@ -69,7 +72,7 @@ func (s *SSEManager) Stop() {
6972}
7073
7174// AddClient adds a new client channel for a user
72- func (s * SSEManager ) AddClient (userID int ) chan SSEMessage {
75+ func (s * SSEManager ) addClient (userID int ) chan SSEMessage {
7376 s .mu .Lock ()
7477 defer s .mu .Unlock ()
7578
@@ -80,7 +83,7 @@ func (s *SSEManager) AddClient(userID int) chan SSEMessage {
8083}
8184
8285// RemoveClient removes a client channel for a user
83- func (s * SSEManager ) RemoveClient (userID int , clientChan chan SSEMessage ) {
86+ func (s * SSEManager ) removeClient (userID int , clientChan chan SSEMessage ) {
8487 s .mu .Lock ()
8588 defer s .mu .Unlock ()
8689
@@ -124,14 +127,23 @@ func (s *SSEManager) Notify(userID int, msgType string, severity models.Notifica
124127 // Message sent successfully
125128 case <- time .After (2 * time .Second ):
126129 // Client not responding, remove it
127- go s .RemoveClient (userID , ch )
130+ go s .removeClient (userID , ch )
128131 case <- s .ctx .Done ():
129132 return
130133 }
131134 }
132135
133136}
134137
138+ // setupExpiryTimer creates a timer that fires when the token expires
139+ func (s * SSEManager ) setupExpiryTimer (claims * auth.TokenClaims ) * time.Timer {
140+ if claims == nil || claims .ExpiresAt == nil {
141+ return nil
142+ }
143+
144+ return time .NewTimer (time .Until (claims .ExpiresAt .Time ))
145+ }
146+
135147// HandleSSE handles SSE HTTP connections
136148func (s * SSEManager ) HandleSSE (c * gin.Context ) {
137149 userID := c .GetInt ("user_id" )
@@ -140,20 +152,48 @@ func (s *SSEManager) HandleSSE(c *gin.Context) {
140152 return
141153 }
142154
155+ // Extract token from query or Authorization header
156+ tokenStr := c .Query ("token" )
157+ if tokenStr == "" {
158+ authHeader := c .GetHeader ("Authorization" )
159+ if authHeader == "" || ! strings .HasPrefix (authHeader , "Bearer " ) {
160+ c .JSON (http .StatusUnauthorized , gin.H {"error" : "unauthorized" })
161+ return
162+ }
163+ tokenStr = strings .TrimPrefix (authHeader , "Bearer " )
164+ }
165+
166+ claims , err := s .tokenManager .VerifyToken (tokenStr )
167+ if err != nil {
168+ c .JSON (http .StatusUnauthorized , gin.H {"error" : "invalid token" })
169+ return
170+ }
171+
172+ // Setup token expiry enforcement timer using claims
173+ expiryTimer := s .setupExpiryTimer (claims )
174+ if expiryTimer != nil {
175+ defer expiryTimer .Stop ()
176+ }
177+
143178 // Set SSE headers
144179 c .Header ("Content-Type" , "text/event-stream" )
145180 c .Header ("Cache-Control" , "no-cache" )
146181 c .Header ("Connection" , "keep-alive" )
147182
148183 // Add client and get channel
149- clientChan := s .AddClient (userID )
150- defer s .RemoveClient (userID , clientChan )
184+ clientChan := s .addClient (userID )
185+ defer s .removeClient (userID , clientChan )
151186
152187 log := logger .ForOperation ("sse" , "handle_connection" ).With ().Int ("user_id" , userID ).Logger ()
153188
154189 // Send initial connection message
155190 s .Notify (userID , "connected" , models .NotificationSeverityInfo , map [string ]string {"status" : "connected" }, "" )
156191
192+ var expiryC <- chan time.Time
193+ if expiryTimer != nil {
194+ expiryC = expiryTimer .C
195+ }
196+
157197 // Stream messages to client
158198 c .Stream (func (w io.Writer ) bool {
159199 select {
@@ -171,6 +211,10 @@ func (s *SSEManager) HandleSSE(c *gin.Context) {
171211 c .SSEvent ("message" , string (data ))
172212 return true
173213
214+ case <- expiryC :
215+ // Token expired (nil channel never fires)
216+ return false
217+
174218 case <- c .Request .Context ().Done ():
175219 log .Debug ().Msg ("Client disconnected" )
176220 return false
0 commit comments