Skip to content

Commit 8e5743a

Browse files
authored
feat: implement locking mechanism for concurrent plugin installation and upgrade (#526)
* add locking to prevent simultaneous installations of the same plugin * ensure proper unlocking of keys in case of errors during installation and upgrade * handle database not found error in DeletePluginInstallationItemFromTask
1 parent 0d282de commit 8e5743a

2 files changed

Lines changed: 50 additions & 0 deletions

File tree

internal/service/install_plugin.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package service
33
import (
44
"errors"
55
"fmt"
6+
"time"
67

78
"github.com/gin-gonic/gin"
89
controlpanel "github.com/langgenius/dify-plugin-daemon/internal/core/control_panel"
@@ -104,21 +105,49 @@ func InstallMultiplePluginsToTenant(
104105
})
105106
}
106107

108+
// acquire locks to prevent concurrent installation of the same plugin
109+
lockKeys := make(map[string]string, len(pluginUniqueIdentifiers))
110+
releaseLocks := func() {
111+
for _, key := range lockKeys {
112+
if err := cache.Unlock(key); err != nil {
113+
log.Error("failed to unlock key %s: %v", key, err)
114+
}
115+
}
116+
}
117+
for _, pluginUniqueIdentifier := range pluginUniqueIdentifiers {
118+
lockKey := fmt.Sprintf("plugin:install:%s:%s", tenantId, pluginUniqueIdentifier.PluginID())
119+
if err := cache.Lock(lockKey, time.Minute*10, time.Millisecond*100); err != nil {
120+
releaseLocks()
121+
if err == cache.ErrLockTimeout {
122+
return exception.BadRequestError(errors.New("plugin installation is already in progress")).ToResponse()
123+
}
124+
return exception.InternalServerError(err).ToResponse()
125+
}
126+
lockKeys[pluginUniqueIdentifier.PluginID()] = lockKey
127+
}
128+
107129
// create tasks for each plugin
108130
statuses := buildTaskStatuses(pluginUniqueIdentifiers, declarations)
109131
taskRegistry, err := createInstallTasks(tenants, statuses)
110132
if err != nil {
133+
releaseLocks()
111134
return exception.InternalServerError(err).ToResponse()
112135
}
113136
taskIDs := taskRegistry.IDs()
114137

115138
for _, job := range jobs {
116139
jobCopy := job
140+
lockKey := lockKeys[jobCopy.Identifier.PluginID()]
117141
// start a new goroutine to install the plugin
118142
routine.Submit(routinepkg.Labels{
119143
routinepkg.RoutineLabelKeyModule: "service",
120144
routinepkg.RoutineLabelKeyMethod: "InstallPlugin",
121145
}, func() {
146+
defer func() {
147+
if err := cache.Unlock(lockKey); err != nil {
148+
log.Error("failed to unlock key %s: %v", lockKey, err)
149+
}
150+
}()
122151
tasks.ProcessInstallJob(
123152
manager,
124153
tenants,
@@ -259,6 +288,15 @@ func UpgradePlugin(
259288
return exception.InternalServerError(err).ToResponse()
260289
}
261290

291+
// acquire lock to prevent concurrent upgrade of the same plugin
292+
lockKey := fmt.Sprintf("plugin:install:%s:%s", tenantId, newPluginUniqueIdentifier.PluginID())
293+
if err := cache.Lock(lockKey, time.Minute*10, time.Millisecond*100); err != nil {
294+
if err == cache.ErrLockTimeout {
295+
return exception.BadRequestError(errors.New("plugin installation is already in progress")).ToResponse()
296+
}
297+
return exception.InternalServerError(err).ToResponse()
298+
}
299+
262300
// construct tenant jobs
263301
tenants := []string{tenantId}
264302

@@ -277,6 +315,9 @@ func UpgradePlugin(
277315

278316
taskRegistry, err := createInstallTasks(tenants, statuses)
279317
if err != nil {
318+
if unlockErr := cache.Unlock(lockKey); unlockErr != nil {
319+
log.Error("failed to unlock key %s: %v", lockKey, unlockErr)
320+
}
280321
return exception.InternalServerError(err).ToResponse()
281322
}
282323

@@ -286,6 +327,11 @@ func UpgradePlugin(
286327
routinepkg.RoutineLabelKeyModule: "service",
287328
routinepkg.RoutineLabelKeyMethod: "UpgradePlugin",
288329
}, func() {
330+
defer func() {
331+
if err := cache.Unlock(lockKey); err != nil {
332+
log.Error("failed to unlock key %s: %v", lockKey, err)
333+
}
334+
}()
289335
tasks.ProcessUpgradeJob(
290336
manager,
291337
tenants,

internal/service/install_task_service.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,10 @@ func DeletePluginInstallationItemFromTask(
8989
db.WLock(),
9090
)
9191

92+
if err == db.ErrDatabaseNotFound {
93+
return nil
94+
}
95+
9296
if err != nil {
9397
return err
9498
}

0 commit comments

Comments
 (0)