diff --git a/internal/core/control_panel/launcher_local.go b/internal/core/control_panel/launcher_local.go index 128357545..60204b415 100644 --- a/internal/core/control_panel/launcher_local.go +++ b/internal/core/control_panel/launcher_local.go @@ -149,17 +149,17 @@ func (c *ControlPanel) LaunchLocalPlugin( }) }, OnRuntimeCloseImpl: func() { + // delete the runtime from the map + // this must happen AFTER all instances are confirmed shutdown + // to prevent process leaks where the runtime disappears from the map + // but subprocesses are still alive and no safety net can find them + c.localPluginRuntimes.Delete(pluginUniqueIdentifier) // notify the plugin totally stopped c.WalkNotifiers(func(notifier ControlPanelNotifier) { notifier.OnLocalRuntimeStopped(runtime) }) }, OnRuntimeStopScheduleImpl: func() { - // delete the runtime from the map - // Even if the runtime is not ready, deleting it still makes sense - // once a plugin is stopping schedule, all new requests to it need to be rejected - // so just remove it from map - c.localPluginRuntimes.Delete(pluginUniqueIdentifier) // notify the plugin is stopping c.WalkNotifiers(func(notifier ControlPanelNotifier) { notifier.OnLocalRuntimeStop(runtime) diff --git a/internal/core/plugin_manager/manager.go b/internal/core/plugin_manager/manager.go index f7dfa4eb8..d0fcccb94 100644 --- a/internal/core/plugin_manager/manager.go +++ b/internal/core/plugin_manager/manager.go @@ -53,6 +53,12 @@ type PluginManager struct { controlPanel *controlpanel.ControlPanel } +type PluginShutdownManager interface { + RemoveLocalPlugin(pluginUniqueIdentifier plugin_entities.PluginUniqueIdentifier) error + ShutdownLocalPluginGracefully(pluginUniqueIdentifier plugin_entities.PluginUniqueIdentifier) (<-chan error, error) + ShutdownLocalPluginForcefully(pluginUniqueIdentifier plugin_entities.PluginUniqueIdentifier) (<-chan error, error) +} + var ( manager *PluginManager ) diff --git a/internal/core/plugin_manager/runtime.go b/internal/core/plugin_manager/runtime.go index 3177f5983..652592ee4 100644 --- a/internal/core/plugin_manager/runtime.go +++ b/internal/core/plugin_manager/runtime.go @@ -49,3 +49,9 @@ func (p *PluginManager) ShutdownLocalPluginGracefully( ) (<-chan error, error) { return p.controlPanel.ShutdownLocalPluginGracefully(pluginUniqueIdentifier) } + +func (p *PluginManager) ShutdownLocalPluginForcefully( + pluginUniqueIdentifier plugin_entities.PluginUniqueIdentifier, +) (<-chan error, error) { + return p.controlPanel.ShutdownLocalPluginForcefully(pluginUniqueIdentifier) +} diff --git a/internal/tasks/install_plugin_utils.go b/internal/tasks/install_plugin_utils.go index 14e13a38a..f99a797fe 100644 --- a/internal/tasks/install_plugin_utils.go +++ b/internal/tasks/install_plugin_utils.go @@ -155,23 +155,65 @@ func DeleteTask(taskId string) error { }) } +// gracefulShutdownTimeout controls how long RemovePluginIfNeeded waits for +// graceful and forceful shutdown to complete. Package-level variable for test override. +var gracefulShutdownTimeout = 30 * time.Second + func RemovePluginIfNeeded( - manager *plugin_manager.PluginManager, + manager plugin_manager.PluginShutdownManager, originalPluginUniqueIdentifier plugin_entities.PluginUniqueIdentifier, response *curd.UpgradePluginResponse, ) error { shouldCleanup := response.IsOriginalPluginDeleted if shouldCleanup && response.DeletedPlugin != nil && response.DeletedPlugin.InstallType == plugin_entities.PLUGIN_RUNTIME_TYPE_LOCAL { - // uninstall plugin from local install bucket + // uninstall plugin from local install bucket first + // this must happen before shutdown so that the WatchDog safety net + // (removeUnusedLocalPlugins) can retry cleanup if shutdown fails if err := manager.RemoveLocalPlugin(originalPluginUniqueIdentifier); err != nil { return errors.Join(err, errors.New("failed to remove plugin from local install bucket")) } - // shutdown it gracefully - _, err := manager.ShutdownLocalPluginGracefully(originalPluginUniqueIdentifier) - if err != nil { - return errors.Join(err, errors.New("failed to shutdown plugin gracefully")) + shutdownGracefully := func() bool { + ch, err := manager.ShutdownLocalPluginGracefully(originalPluginUniqueIdentifier) + if err != nil { + // runtime not found in map, it may have been already cleaned up + return false + } + if ch == nil { + return true + } + + // wait for graceful shutdown with a timeout + select { + case <-ch: + return true + case <-time.After(gracefulShutdownTimeout): + log.Warn("graceful shutdown timed out, trying forceful shutdown", + "plugin", originalPluginUniqueIdentifier.String()) + return false + } + } + + if shutdownGracefully() { + return nil + } + + // graceful shutdown failed or timed out, try forceful as fallback + forceCh, forceErr := manager.ShutdownLocalPluginForcefully(originalPluginUniqueIdentifier) + if forceErr != nil { + return errors.Join( + forceErr, + errors.New("failed to shutdown plugin forcefully after graceful shutdown failed"), + ) + } + if forceCh != nil { + select { + case <-forceCh: + // forceful shutdown completed + case <-time.After(gracefulShutdownTimeout): + return errors.New("forceful shutdown timed out") + } } } return nil diff --git a/internal/tasks/remove_plugin_test.go b/internal/tasks/remove_plugin_test.go new file mode 100644 index 000000000..9fe1eb6b3 --- /dev/null +++ b/internal/tasks/remove_plugin_test.go @@ -0,0 +1,284 @@ +package tasks + +import ( + "errors" + "testing" + "time" + + "github.com/langgenius/dify-plugin-daemon/internal/types/models" + "github.com/langgenius/dify-plugin-daemon/internal/types/models/curd" + "github.com/langgenius/dify-plugin-daemon/pkg/entities/plugin_entities" +) + +var ErrTestRemoveLocalPlugin = errors.New("remove local plugin failed") +var ErrTestGracefulShutdown = errors.New("graceful shutdown failed") +var ErrTestForcefulShutdown = errors.New("forceful shutdown failed") + +type mockPluginShutdownManager struct { + removeLocalPluginFn func(plugin_entities.PluginUniqueIdentifier) error + shutdownLocalPluginGracefullyFn func(plugin_entities.PluginUniqueIdentifier) (<-chan error, error) + shutdownLocalPluginForcefullyFn func(plugin_entities.PluginUniqueIdentifier) (<-chan error, error) +} + +func (m *mockPluginShutdownManager) RemoveLocalPlugin(id plugin_entities.PluginUniqueIdentifier) error { + if m.removeLocalPluginFn != nil { + return m.removeLocalPluginFn(id) + } + return nil +} + +func (m *mockPluginShutdownManager) ShutdownLocalPluginGracefully(id plugin_entities.PluginUniqueIdentifier) (<-chan error, error) { + if m.shutdownLocalPluginGracefullyFn != nil { + return m.shutdownLocalPluginGracefullyFn(id) + } + ch := make(chan error) + close(ch) + return ch, nil +} + +func (m *mockPluginShutdownManager) ShutdownLocalPluginForcefully(id plugin_entities.PluginUniqueIdentifier) (<-chan error, error) { + if m.shutdownLocalPluginForcefullyFn != nil { + return m.shutdownLocalPluginForcefullyFn(id) + } + ch := make(chan error) + close(ch) + return ch, nil +} + +func localPluginResponse() *curd.UpgradePluginResponse { + return &curd.UpgradePluginResponse{ + IsOriginalPluginDeleted: true, + DeletedPlugin: &models.Plugin{ + InstallType: plugin_entities.PLUGIN_RUNTIME_TYPE_LOCAL, + }, + } +} + +func mustParsePluginUniqueIdentifier(t *testing.T, s string) plugin_entities.PluginUniqueIdentifier { + t.Helper() + id, err := plugin_entities.NewPluginUniqueIdentifier(s) + if err != nil { + t.Fatalf("failed to parse plugin unique identifier %q: %v", s, err) + } + return id +} + +// testPluginID is a valid plugin unique identifier for testing +const testPluginID = "langgenius/jina:0.0.4@a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4" + +func TestRemovePluginIfNeeded_IsOriginalPluginDeletedFalse(t *testing.T) { + manager := &mockPluginShutdownManager{} + response := &curd.UpgradePluginResponse{ + IsOriginalPluginDeleted: false, + } + pluginID := mustParsePluginUniqueIdentifier(t, testPluginID) + + err := RemovePluginIfNeeded(manager, pluginID, response) + if err != nil { + t.Errorf("expected nil error, got: %v", err) + } +} + +func TestRemovePluginIfNeeded_DeletedPluginNil(t *testing.T) { + manager := &mockPluginShutdownManager{} + response := &curd.UpgradePluginResponse{ + IsOriginalPluginDeleted: true, + DeletedPlugin: nil, + } + pluginID := mustParsePluginUniqueIdentifier(t, testPluginID) + + err := RemovePluginIfNeeded(manager, pluginID, response) + if err != nil { + t.Errorf("expected nil error, got: %v", err) + } +} + +func TestRemovePluginIfNeeded_NotLocalRuntime(t *testing.T) { + manager := &mockPluginShutdownManager{} + response := &curd.UpgradePluginResponse{ + IsOriginalPluginDeleted: true, + DeletedPlugin: &models.Plugin{ + InstallType: plugin_entities.PLUGIN_RUNTIME_TYPE_SERVERLESS, + }, + } + pluginID := mustParsePluginUniqueIdentifier(t, testPluginID) + + err := RemovePluginIfNeeded(manager, pluginID, response) + if err != nil { + t.Errorf("expected nil error, got: %v", err) + } +} + +func TestRemovePluginIfNeeded_RemoveLocalPluginFails(t *testing.T) { + manager := &mockPluginShutdownManager{ + removeLocalPluginFn: func(_ plugin_entities.PluginUniqueIdentifier) error { + return ErrTestRemoveLocalPlugin + }, + } + pluginID := mustParsePluginUniqueIdentifier(t, testPluginID) + + err := RemovePluginIfNeeded(manager, pluginID, localPluginResponse()) + if err == nil { + t.Fatal("expected error, got nil") + } + if !errors.Is(err, ErrTestRemoveLocalPlugin) { + t.Errorf("expected error to wrap ErrTestRemoveLocalPlugin, got: %v", err) + } +} + +func TestRemovePluginIfNeeded_GracefulShutdownSucceeds(t *testing.T) { + manager := &mockPluginShutdownManager{} + pluginID := mustParsePluginUniqueIdentifier(t, testPluginID) + + err := RemovePluginIfNeeded(manager, pluginID, localPluginResponse()) + if err != nil { + t.Errorf("expected nil error, got: %v", err) + } +} + +func TestRemovePluginIfNeeded_GracefulShutdownChannelCloses(t *testing.T) { + ch := make(chan error) + go func() { + time.Sleep(10 * time.Millisecond) + close(ch) + }() + + manager := &mockPluginShutdownManager{ + shutdownLocalPluginGracefullyFn: func(_ plugin_entities.PluginUniqueIdentifier) (<-chan error, error) { + return ch, nil + }, + } + pluginID := mustParsePluginUniqueIdentifier(t, testPluginID) + + err := RemovePluginIfNeeded(manager, pluginID, localPluginResponse()) + if err != nil { + t.Errorf("expected nil error, got: %v", err) + } +} + +func TestRemovePluginIfNeeded_GracefulShutdownReturnsNil(t *testing.T) { + manager := &mockPluginShutdownManager{ + shutdownLocalPluginGracefullyFn: func(_ plugin_entities.PluginUniqueIdentifier) (<-chan error, error) { + return nil, nil + }, + } + pluginID := mustParsePluginUniqueIdentifier(t, testPluginID) + + err := RemovePluginIfNeeded(manager, pluginID, localPluginResponse()) + if err != nil { + t.Errorf("expected nil error, got: %v", err) + } +} + +func TestRemovePluginIfNeeded_GracefulFails_ForcefulSucceeds(t *testing.T) { + manager := &mockPluginShutdownManager{ + shutdownLocalPluginGracefullyFn: func(_ plugin_entities.PluginUniqueIdentifier) (<-chan error, error) { + return nil, ErrTestGracefulShutdown + }, + } + pluginID := mustParsePluginUniqueIdentifier(t, testPluginID) + + err := RemovePluginIfNeeded(manager, pluginID, localPluginResponse()) + if err != nil { + t.Errorf("expected nil error, got: %v", err) + } +} + +func TestRemovePluginIfNeeded_GracefulFails_ForcefulAlsoFails(t *testing.T) { + manager := &mockPluginShutdownManager{ + shutdownLocalPluginGracefullyFn: func(_ plugin_entities.PluginUniqueIdentifier) (<-chan error, error) { + return nil, ErrTestGracefulShutdown + }, + shutdownLocalPluginForcefullyFn: func(_ plugin_entities.PluginUniqueIdentifier) (<-chan error, error) { + return nil, ErrTestForcefulShutdown + }, + } + pluginID := mustParsePluginUniqueIdentifier(t, testPluginID) + + err := RemovePluginIfNeeded(manager, pluginID, localPluginResponse()) + if err == nil { + t.Fatal("expected error, got nil") + } + if !errors.Is(err, ErrTestForcefulShutdown) { + t.Errorf("expected error to wrap ErrTestForcefulShutdown, got: %v", err) + } +} + +func TestRemovePluginIfNeeded_GracefulTimesOut_ForcefulSucceeds(t *testing.T) { + origTimeout := gracefulShutdownTimeout + gracefulShutdownTimeout = 50 * time.Millisecond + defer func() { gracefulShutdownTimeout = origTimeout }() + + gracefulCh := make(chan error) // never closes, will trigger timeout + + forceCh := make(chan error) + go func() { + time.Sleep(10 * time.Millisecond) + close(forceCh) + }() + + manager := &mockPluginShutdownManager{ + shutdownLocalPluginGracefullyFn: func(_ plugin_entities.PluginUniqueIdentifier) (<-chan error, error) { + return gracefulCh, nil + }, + shutdownLocalPluginForcefullyFn: func(_ plugin_entities.PluginUniqueIdentifier) (<-chan error, error) { + return forceCh, nil + }, + } + pluginID := mustParsePluginUniqueIdentifier(t, testPluginID) + + err := RemovePluginIfNeeded(manager, pluginID, localPluginResponse()) + if err != nil { + t.Errorf("expected nil error, got: %v", err) + } +} + +func TestRemovePluginIfNeeded_GracefulTimesOut_ForcefulTimesOut(t *testing.T) { + origTimeout := gracefulShutdownTimeout + gracefulShutdownTimeout = 50 * time.Millisecond + defer func() { gracefulShutdownTimeout = origTimeout }() + + gracefulCh := make(chan error) // never closes + forceCh := make(chan error) // never closes + + manager := &mockPluginShutdownManager{ + shutdownLocalPluginGracefullyFn: func(_ plugin_entities.PluginUniqueIdentifier) (<-chan error, error) { + return gracefulCh, nil + }, + shutdownLocalPluginForcefullyFn: func(_ plugin_entities.PluginUniqueIdentifier) (<-chan error, error) { + return forceCh, nil + }, + } + pluginID := mustParsePluginUniqueIdentifier(t, testPluginID) + + err := RemovePluginIfNeeded(manager, pluginID, localPluginResponse()) + if err == nil { + t.Fatal("expected error, got nil") + } + if err.Error() != "forceful shutdown timed out" { + t.Errorf("expected 'forceful shutdown timed out' error, got: %v", err) + } +} + +func TestRemovePluginIfNeeded_GracefulFails_ForcefulChannelCloses(t *testing.T) { + forceCh := make(chan error) + go func() { + time.Sleep(10 * time.Millisecond) + close(forceCh) + }() + + manager := &mockPluginShutdownManager{ + shutdownLocalPluginGracefullyFn: func(_ plugin_entities.PluginUniqueIdentifier) (<-chan error, error) { + return nil, ErrTestGracefulShutdown + }, + shutdownLocalPluginForcefullyFn: func(_ plugin_entities.PluginUniqueIdentifier) (<-chan error, error) { + return forceCh, nil + }, + } + pluginID := mustParsePluginUniqueIdentifier(t, testPluginID) + + err := RemovePluginIfNeeded(manager, pluginID, localPluginResponse()) + if err != nil { + t.Errorf("expected nil error, got: %v", err) + } +}