Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions internal/core/control_panel/launcher_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,17 +149,17 @@ func (c *ControlPanel) LaunchLocalPlugin(
})
},
OnRuntimeCloseImpl: func() {
// delete the runtime from the map
// this must happen AFTER all instances are confirmed shutdown
// to prevent process leaks where the runtime disappears from the map
// but subprocesses are still alive and no safety net can find them
c.localPluginRuntimes.Delete(pluginUniqueIdentifier)
// notify the plugin totally stopped
c.WalkNotifiers(func(notifier ControlPanelNotifier) {
notifier.OnLocalRuntimeStopped(runtime)
})
},
OnRuntimeStopScheduleImpl: func() {
// delete the runtime from the map
// Even if the runtime is not ready, deleting it still makes sense
// once a plugin is stopping schedule, all new requests to it need to be rejected
// so just remove it from map
c.localPluginRuntimes.Delete(pluginUniqueIdentifier)
// notify the plugin is stopping
c.WalkNotifiers(func(notifier ControlPanelNotifier) {
notifier.OnLocalRuntimeStop(runtime)
Expand Down
6 changes: 6 additions & 0 deletions internal/core/plugin_manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ type PluginManager struct {
controlPanel *controlpanel.ControlPanel
}

type PluginShutdownManager interface {
RemoveLocalPlugin(pluginUniqueIdentifier plugin_entities.PluginUniqueIdentifier) error
ShutdownLocalPluginGracefully(pluginUniqueIdentifier plugin_entities.PluginUniqueIdentifier) (<-chan error, error)
ShutdownLocalPluginForcefully(pluginUniqueIdentifier plugin_entities.PluginUniqueIdentifier) (<-chan error, error)
}

var (
manager *PluginManager
)
Expand Down
6 changes: 6 additions & 0 deletions internal/core/plugin_manager/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,9 @@ func (p *PluginManager) ShutdownLocalPluginGracefully(
) (<-chan error, error) {
return p.controlPanel.ShutdownLocalPluginGracefully(pluginUniqueIdentifier)
}

func (p *PluginManager) ShutdownLocalPluginForcefully(
pluginUniqueIdentifier plugin_entities.PluginUniqueIdentifier,
) (<-chan error, error) {
return p.controlPanel.ShutdownLocalPluginForcefully(pluginUniqueIdentifier)
}
54 changes: 48 additions & 6 deletions internal/tasks/install_plugin_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,23 +155,65 @@ func DeleteTask(taskId string) error {
})
}

// gracefulShutdownTimeout controls how long RemovePluginIfNeeded waits for
// graceful and forceful shutdown to complete. Package-level variable for test override.
var gracefulShutdownTimeout = 30 * time.Second

func RemovePluginIfNeeded(
manager *plugin_manager.PluginManager,
manager plugin_manager.PluginShutdownManager,
originalPluginUniqueIdentifier plugin_entities.PluginUniqueIdentifier,
response *curd.UpgradePluginResponse,
) error {
shouldCleanup := response.IsOriginalPluginDeleted

if shouldCleanup && response.DeletedPlugin != nil && response.DeletedPlugin.InstallType == plugin_entities.PLUGIN_RUNTIME_TYPE_LOCAL {
// uninstall plugin from local install bucket
// uninstall plugin from local install bucket first
// this must happen before shutdown so that the WatchDog safety net
// (removeUnusedLocalPlugins) can retry cleanup if shutdown fails
if err := manager.RemoveLocalPlugin(originalPluginUniqueIdentifier); err != nil {
return errors.Join(err, errors.New("failed to remove plugin from local install bucket"))
}

// shutdown it gracefully
_, err := manager.ShutdownLocalPluginGracefully(originalPluginUniqueIdentifier)
if err != nil {
return errors.Join(err, errors.New("failed to shutdown plugin gracefully"))
shutdownGracefully := func() bool {
ch, err := manager.ShutdownLocalPluginGracefully(originalPluginUniqueIdentifier)
if err != nil {
// runtime not found in map, it may have been already cleaned up
return false
}
Comment thread
fatelei marked this conversation as resolved.
if ch == nil {
return true
}

// wait for graceful shutdown with a timeout
select {
case <-ch:
return true
case <-time.After(gracefulShutdownTimeout):
log.Warn("graceful shutdown timed out, trying forceful shutdown",
"plugin", originalPluginUniqueIdentifier.String())
return false
}
}

if shutdownGracefully() {
return nil
}

// graceful shutdown failed or timed out, try forceful as fallback
forceCh, forceErr := manager.ShutdownLocalPluginForcefully(originalPluginUniqueIdentifier)
if forceErr != nil {
return errors.Join(
forceErr,
errors.New("failed to shutdown plugin forcefully after graceful shutdown failed"),
)
}
Comment thread
fatelei marked this conversation as resolved.
if forceCh != nil {
select {
case <-forceCh:
// forceful shutdown completed
case <-time.After(gracefulShutdownTimeout):
return errors.New("forceful shutdown timed out")
}
}
}
return nil
Expand Down
Loading
Loading