Skip to content

Commit 48a0028

Browse files
fix release plan collaboration cleanup and diff order
1 parent 21b8f89 commit 48a0028

2 files changed

Lines changed: 805 additions & 55 deletions

File tree

pkg/microservice/aslan/core/release_plan/service/collaboration.go

Lines changed: 88 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"time"
3030

3131
"github.com/gin-gonic/gin"
32+
"github.com/google/uuid"
3233
"github.com/gorilla/websocket"
3334
"github.com/pkg/errors"
3435

@@ -60,6 +61,7 @@ var upgrader = websocket.Upgrader{
6061
type ReleasePlanEditingSession struct {
6162
PlanID string `json:"plan_id"`
6263
SessionID string `json:"session_id"`
64+
ConnectionID string `json:"connection_id,omitempty"`
6365
UserID string `json:"user_id"`
6466
UserName string `json:"user_name"`
6567
Account string `json:"account"`
@@ -103,8 +105,12 @@ type releasePlanCollabWSOutbound struct {
103105

104106
type collaborationClient struct {
105107
planID string
108+
id string
106109
conn *websocket.Conn
107110
send chan []byte
111+
112+
sessionMu sync.Mutex
113+
sessionIDs map[string]struct{}
108114
}
109115

110116
var collaborationHub = struct {
@@ -233,6 +239,75 @@ func unregisterCollaborationClient(planID string, client *collaborationClient) {
233239
}
234240
}
235241

242+
func rememberCollaborationClientSession(client *collaborationClient, sessionID string) {
243+
if client == nil || sessionID == "" {
244+
return
245+
}
246+
247+
client.sessionMu.Lock()
248+
defer client.sessionMu.Unlock()
249+
250+
if client.sessionIDs == nil {
251+
client.sessionIDs = make(map[string]struct{})
252+
}
253+
client.sessionIDs[sessionID] = struct{}{}
254+
}
255+
256+
func forgetCollaborationClientSession(client *collaborationClient, sessionID string) {
257+
if client == nil || sessionID == "" {
258+
return
259+
}
260+
261+
client.sessionMu.Lock()
262+
defer client.sessionMu.Unlock()
263+
264+
delete(client.sessionIDs, sessionID)
265+
}
266+
267+
func listCollaborationClientSessionIDs(client *collaborationClient) []string {
268+
if client == nil {
269+
return nil
270+
}
271+
272+
client.sessionMu.Lock()
273+
defer client.sessionMu.Unlock()
274+
275+
resp := make([]string, 0, len(client.sessionIDs))
276+
for sessionID := range client.sessionIDs {
277+
resp = append(resp, sessionID)
278+
}
279+
sort.Strings(resp)
280+
return resp
281+
}
282+
283+
func shouldCleanupReleasePlanEditingSession(session *ReleasePlanEditingSession, connectionID string) bool {
284+
if session == nil || connectionID == "" {
285+
return false
286+
}
287+
return session.ConnectionID == connectionID
288+
}
289+
290+
func cleanupReleasePlanEditingSessionsForClient(client *collaborationClient) {
291+
if client == nil || client.planID == "" {
292+
return
293+
}
294+
295+
for _, sessionID := range listCollaborationClientSessionIDs(client) {
296+
session, err := getReleasePlanEditingSession(client.planID, sessionID)
297+
if err != nil {
298+
continue
299+
}
300+
if !shouldCleanupReleasePlanEditingSession(session, client.id) {
301+
continue
302+
}
303+
if err := removeReleasePlanEditingSession(client.planID, sessionID); err != nil {
304+
log.Errorf("remove release plan editing session on disconnect error: %v", err)
305+
continue
306+
}
307+
forgetCollaborationClientSession(client, sessionID)
308+
}
309+
}
310+
236311
func sendSnapshotToLocalClients(planID string, snapshot *ReleasePlanCollaborationSnapshot) {
237312
if snapshot == nil {
238313
return
@@ -316,7 +391,9 @@ func GetReleasePlanCollaborationSnapshot(planID string) (*ReleasePlanCollaborati
316391
groupMap[key] = group
317392
groupOrder = append(groupOrder, key)
318393
}
319-
group.Editors = append(group.Editors, session)
394+
displaySession := *session
395+
displaySession.ConnectionID = ""
396+
group.Editors = append(group.Editors, &displaySession)
320397
}
321398

322399
sort.Strings(groupOrder)
@@ -473,11 +550,14 @@ func openReleasePlanCollaborationWS(gCtx *gin.Context, ctx *handler.Context, pla
473550
ensureReleasePlanCollaborationLoop()
474551

475552
client := &collaborationClient{
476-
planID: planID,
477-
conn: ws,
478-
send: make(chan []byte, 16),
553+
planID: planID,
554+
id: uuid.NewString(),
555+
conn: ws,
556+
send: make(chan []byte, 16),
557+
sessionIDs: map[string]struct{}{},
479558
}
480559
registerCollaborationClient(planID, client)
560+
defer cleanupReleasePlanEditingSessionsForClient(client)
481561
defer unregisterCollaborationClient(planID, client)
482562

483563
done := make(chan struct{})
@@ -517,6 +597,7 @@ func openReleasePlanCollaborationWS(gCtx *gin.Context, ctx *handler.Context, pla
517597
session := &ReleasePlanEditingSession{
518598
PlanID: planID,
519599
SessionID: msg.SessionID,
600+
ConnectionID: client.id,
520601
UserID: ctx.UserID,
521602
UserName: ctx.UserName,
522603
Account: ctx.Account,
@@ -544,6 +625,7 @@ func openReleasePlanCollaborationWS(gCtx *gin.Context, ctx *handler.Context, pla
544625
queueCollaborationClientMessage(client, &releasePlanCollabWSOutbound{Type: "error", Error: err.Error()})
545626
continue
546627
}
628+
rememberCollaborationClientSession(client, msg.SessionID)
547629
snapshot, err := GetReleasePlanCollaborationSnapshot(planID)
548630
if err == nil {
549631
queueCollaborationClientMessage(client, &releasePlanCollabWSOutbound{Type: "snapshot", Snapshot: snapshot})
@@ -560,7 +642,9 @@ func openReleasePlanCollaborationWS(gCtx *gin.Context, ctx *handler.Context, pla
560642
}
561643
if err := removeReleasePlanEditingSession(planID, msg.SessionID); err != nil {
562644
queueCollaborationClientMessage(client, &releasePlanCollabWSOutbound{Type: "error", Error: err.Error()})
645+
continue
563646
}
647+
forgetCollaborationClientSession(client, msg.SessionID)
564648
}
565649
}
566650
})

0 commit comments

Comments
 (0)