Skip to content

Commit 90abf86

Browse files
authored
fix: fix some panic and fix test failed (#715)
1 parent e4513fb commit 90abf86

9 files changed

Lines changed: 320 additions & 110 deletions

File tree

.github/workflows/benchmark.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ jobs:
3434
env:
3535
UV_PATH: ${{ env.UV_PATH }}
3636
run: |
37-
go test -run=^$ -bench=. -benchmem -benchtime=30s ./... > result.txt
37+
go test -run=^$ -bench=. -benchmem -benchtime=30s ./...
3838
3939
- name: Remove License
4040
run: git stash

internal/core/control_panel/server_debugger.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,10 @@ func (c *ControlPanel) onDebuggingRuntimeConnected(
5050
func (c *ControlPanel) onDebuggingRuntimeDisconnected(
5151
rpr *debugging_runtime.RemotePluginRuntime,
5252
) {
53+
if !rpr.Initialized() {
54+
return
55+
}
56+
5357
// handle plugin disconnecting
5458
pluginIdentifier, err := rpr.Identity()
5559
if err != nil {
Lines changed: 88 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -1,90 +1,113 @@
11
package controlpanel
22

33
import (
4-
"reflect"
5-
"testing"
6-
"unsafe"
7-
8-
"github.com/google/uuid"
9-
"github.com/stretchr/testify/assert"
10-
11-
"github.com/langgenius/dify-plugin-daemon/internal/core/debugging_runtime"
12-
"github.com/langgenius/dify-plugin-daemon/internal/core/local_runtime"
13-
"github.com/langgenius/dify-plugin-daemon/internal/types/app"
14-
"github.com/langgenius/dify-plugin-daemon/pkg/entities/manifest_entities"
15-
"github.com/langgenius/dify-plugin-daemon/pkg/entities/plugin_entities"
4+
"reflect"
5+
"testing"
6+
"unsafe"
7+
8+
"github.com/google/uuid"
9+
"github.com/stretchr/testify/assert"
10+
11+
"github.com/langgenius/dify-plugin-daemon/internal/core/debugging_runtime"
12+
"github.com/langgenius/dify-plugin-daemon/internal/core/local_runtime"
13+
"github.com/langgenius/dify-plugin-daemon/internal/types/app"
14+
"github.com/langgenius/dify-plugin-daemon/pkg/entities/manifest_entities"
15+
"github.com/langgenius/dify-plugin-daemon/pkg/entities/plugin_entities"
1616
)
1717

1818
type mockNotifier struct {
19-
connected bool
20-
disconnected bool
19+
connected bool
20+
disconnected bool
2121
}
2222

23-
func (m *mockNotifier) OnLocalRuntimeStarting(plugin_entities.PluginUniqueIdentifier) {}
24-
func (m *mockNotifier) OnLocalRuntimeReady(*local_runtime.LocalPluginRuntime) {}
23+
func (m *mockNotifier) OnLocalRuntimeStarting(plugin_entities.PluginUniqueIdentifier) {}
24+
func (m *mockNotifier) OnLocalRuntimeReady(*local_runtime.LocalPluginRuntime) {}
2525
func (m *mockNotifier) OnLocalRuntimeStartFailed(plugin_entities.PluginUniqueIdentifier, error) {}
26-
func (m *mockNotifier) OnLocalRuntimeStop(*local_runtime.LocalPluginRuntime) {}
27-
func (m *mockNotifier) OnLocalRuntimeStopped(*local_runtime.LocalPluginRuntime) {}
28-
func (m *mockNotifier) OnLocalRuntimeScaleUp(*local_runtime.LocalPluginRuntime, int32) {}
29-
func (m *mockNotifier) OnLocalRuntimeScaleDown(*local_runtime.LocalPluginRuntime, int32) {}
30-
func (m *mockNotifier) OnLocalRuntimeInstanceLog(*local_runtime.LocalPluginRuntime, *local_runtime.PluginInstance, plugin_entities.PluginLogEvent) {}
31-
func (m *mockNotifier) OnDebuggingRuntimeConnected(r *debugging_runtime.RemotePluginRuntime) { m.connected = true }
32-
func (m *mockNotifier) OnDebuggingRuntimeDisconnected(r *debugging_runtime.RemotePluginRuntime) { m.disconnected = true }
26+
func (m *mockNotifier) OnLocalRuntimeStop(*local_runtime.LocalPluginRuntime) {}
27+
func (m *mockNotifier) OnLocalRuntimeStopped(*local_runtime.LocalPluginRuntime) {}
28+
func (m *mockNotifier) OnLocalRuntimeScaleUp(*local_runtime.LocalPluginRuntime, int32) {}
29+
func (m *mockNotifier) OnLocalRuntimeScaleDown(*local_runtime.LocalPluginRuntime, int32) {}
30+
func (m *mockNotifier) OnLocalRuntimeInstanceLog(*local_runtime.LocalPluginRuntime, *local_runtime.PluginInstance, plugin_entities.PluginLogEvent) {
31+
}
32+
func (m *mockNotifier) OnDebuggingRuntimeConnected(r *debugging_runtime.RemotePluginRuntime) {
33+
m.connected = true
34+
}
35+
func (m *mockNotifier) OnDebuggingRuntimeDisconnected(r *debugging_runtime.RemotePluginRuntime) {
36+
m.disconnected = true
37+
}
3338

3439
// setPrivateString sets an unexported string field on a struct value via unsafe reflection.
3540
func setPrivateString(target any, field string, value string) {
36-
rv := reflect.ValueOf(target).Elem()
37-
f := rv.FieldByName(field)
38-
reflect.NewAt(f.Type(), unsafe.Pointer(f.UnsafeAddr())).Elem().SetString(value)
41+
rv := reflect.ValueOf(target).Elem()
42+
f := rv.FieldByName(field)
43+
reflect.NewAt(f.Type(), unsafe.Pointer(f.UnsafeAddr())).Elem().SetString(value)
3944
}
4045

4146
func newFakeRemoteRuntime(t *testing.T, name, version string) *debugging_runtime.RemotePluginRuntime {
42-
t.Helper()
43-
44-
r := &debugging_runtime.RemotePluginRuntime{
45-
PluginRuntime: plugin_entities.PluginRuntime{
46-
Config: plugin_entities.PluginDeclaration{
47-
PluginDeclarationWithoutAdvancedFields: plugin_entities.PluginDeclarationWithoutAdvancedFields{
48-
// Author is overwritten with tenantId inside Identity()
49-
Name: name,
50-
Version: manifest_entities.Version(version),
51-
},
52-
},
53-
},
54-
}
55-
56-
// Provide required tenantId and checksum so Identity() succeeds
57-
setPrivateString(r, "tenantId", uuid.New().String())
58-
setPrivateString(r, "checksum", "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef")
59-
60-
return r
47+
t.Helper()
48+
49+
r := &debugging_runtime.RemotePluginRuntime{
50+
PluginRuntime: plugin_entities.PluginRuntime{
51+
Config: plugin_entities.PluginDeclaration{
52+
PluginDeclarationWithoutAdvancedFields: plugin_entities.PluginDeclarationWithoutAdvancedFields{
53+
// Author is overwritten with tenantId inside Identity()
54+
Name: name,
55+
Version: manifest_entities.Version(version),
56+
},
57+
},
58+
},
59+
}
60+
61+
// Provide required tenantId and checksum so Identity() succeeds
62+
setPrivateString(r, "tenantId", uuid.New().String())
63+
setPrivateString(r, "checksum", "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef")
64+
reflect.NewAt(
65+
reflect.ValueOf(r).Elem().FieldByName("initialized").Type(),
66+
unsafe.Pointer(reflect.ValueOf(r).Elem().FieldByName("initialized").UnsafeAddr()),
67+
).Elem().SetBool(true)
68+
69+
return r
6170
}
6271

6372
func TestOnDebuggingRuntimeConnectedAndDisconnected(t *testing.T) {
64-
cp := NewControlPanel(&app.Config{PluginLocalLaunchingConcurrent: 1}, nil, nil, nil, nil)
73+
cp := NewControlPanel(&app.Config{PluginLocalLaunchingConcurrent: 1}, nil, nil, nil, nil)
6574

66-
// Attach mock notifier to observe callbacks
67-
mn := &mockNotifier{}
68-
cp.AddNotifier(mn)
75+
// Attach mock notifier to observe callbacks
76+
mn := &mockNotifier{}
77+
cp.AddNotifier(mn)
6978

70-
r := newFakeRemoteRuntime(t, "conn_test", "1.2.3")
79+
r := newFakeRemoteRuntime(t, "conn_test", "1.2.3")
7180

72-
// Connected path
73-
err := cp.onDebuggingRuntimeConnected(r)
74-
assert.NoError(t, err)
81+
// Connected path
82+
err := cp.onDebuggingRuntimeConnected(r)
83+
assert.NoError(t, err)
7584

76-
id, err := r.Identity()
77-
assert.NoError(t, err)
85+
id, err := r.Identity()
86+
assert.NoError(t, err)
7887

79-
// Stored in runtime map and notifier called
80-
_, ok := cp.debuggingPluginRuntime.Load(id)
81-
assert.True(t, ok, "runtime should be stored on connect")
82-
assert.True(t, mn.connected, "connected notifier should be triggered")
88+
// Stored in runtime map and notifier called
89+
_, ok := cp.debuggingPluginRuntime.Load(id)
90+
assert.True(t, ok, "runtime should be stored on connect")
91+
assert.True(t, mn.connected, "connected notifier should be triggered")
8392

84-
// Disconnected path
85-
cp.onDebuggingRuntimeDisconnected(r)
93+
// Disconnected path
94+
cp.onDebuggingRuntimeDisconnected(r)
95+
96+
_, ok = cp.debuggingPluginRuntime.Load(id)
97+
assert.False(t, ok, "runtime should be removed on disconnect")
98+
assert.True(t, mn.disconnected, "disconnected notifier should be triggered")
99+
}
86100

87-
_, ok = cp.debuggingPluginRuntime.Load(id)
88-
assert.False(t, ok, "runtime should be removed on disconnect")
89-
assert.True(t, mn.disconnected, "disconnected notifier should be triggered")
90-
}
101+
func TestOnDebuggingRuntimeDisconnected_IgnoresUninitializedRuntime(t *testing.T) {
102+
cp := NewControlPanel(&app.Config{PluginLocalLaunchingConcurrent: 1}, nil, nil, nil, nil)
103+
104+
mn := &mockNotifier{}
105+
cp.AddNotifier(mn)
106+
107+
r := &debugging_runtime.RemotePluginRuntime{}
108+
109+
assert.NotPanics(t, func() {
110+
cp.onDebuggingRuntimeDisconnected(r)
111+
})
112+
assert.False(t, mn.disconnected, "disconnected notifier should not be triggered for uninitialized runtime")
113+
}

internal/core/debugging_runtime/type.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,10 @@ func (r *RemotePluginRuntime) SetInstallationId(installationId string) {
156156
r.installationId = installationId
157157
}
158158

159+
func (r *RemotePluginRuntime) Initialized() bool {
160+
return r.initialized
161+
}
162+
159163
func (r *RemotePluginRuntime) Identity() (plugin_entities.PluginUniqueIdentifier, error) {
160164
// FIXME: it's a little bit tricky that replace author with current tenant_id
161165
// just as a flag to identify debugging plugin

internal/core/io_tunnel/backwards_invocation/transaction/serverless_handler.go

Lines changed: 43 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package transaction
22

33
import (
4+
"fmt"
45
"io"
56
"net/http"
7+
"sync"
68
"sync/atomic"
79
"time"
810

@@ -28,17 +30,46 @@ func NewServerlessTransactionHandler(maxTimeout time.Duration) *ServerlessTransa
2830
type serverlessTransactionWriteCloser struct {
2931
done chan bool
3032
closed int32
33+
mu sync.Mutex
3134

3235
writer func([]byte) (int, error)
3336
flush func()
3437
}
3538

36-
func (a *serverlessTransactionWriteCloser) Write(data []byte) (int, error) {
37-
return a.writer(data)
39+
func (w *serverlessTransactionWriteCloser) Write(data []byte) (n int, err error) {
40+
w.mu.Lock()
41+
defer w.mu.Unlock()
42+
43+
if atomic.LoadInt32(&w.closed) != 0 {
44+
return 0, io.ErrClosedPipe
45+
}
46+
47+
defer func() {
48+
if recovered := recover(); recovered != nil {
49+
n = 0
50+
err = fmt.Errorf("serverless transaction write panic: %v", recovered)
51+
_ = w.Close()
52+
}
53+
}()
54+
55+
return w.writer(data)
3856
}
3957

40-
func (a *serverlessTransactionWriteCloser) Flush() {
41-
a.flush()
58+
func (w *serverlessTransactionWriteCloser) Flush() {
59+
w.mu.Lock()
60+
defer w.mu.Unlock()
61+
62+
if atomic.LoadInt32(&w.closed) != 0 {
63+
return
64+
}
65+
66+
defer func() {
67+
if recover() != nil {
68+
_ = w.Close()
69+
}
70+
}()
71+
72+
w.flush()
4273
}
4374

4475
func (w *serverlessTransactionWriteCloser) Close() error {
@@ -64,9 +95,6 @@ func (h *ServerlessTransactionHandler) Handle(ctx *gin.Context, sessionId string
6495
return
6596
}
6697

67-
ctx.Writer.WriteHeader(http.StatusOK)
68-
ctx.Writer.Header().Set("Content-Type", "text/event-stream")
69-
7098
plugin_entities.ParsePluginUniversalEvent(
7199
bytes,
72100
"",
@@ -121,15 +149,17 @@ func (h *ServerlessTransactionHandler) Handle(ctx *gin.Context, sessionId string
121149
return
122150
}
123151

152+
ctx.Writer.Header().Set("Content-Type", "text/event-stream")
153+
124154
// replace trace context, propagate it to gin
125155
ctxRequestContext := ctx.Request.Context()
126156
ctxRequestContext = log.WithTrace(ctxRequestContext, session.TraceContext)
127157
ctxRequestContext = log.WithIdentity(ctxRequestContext, session.IdentityContext)
128158
ctx.Request = ctx.Request.WithContext(ctxRequestContext)
129159

130160
// bind the backwards invocation
131-
plugin_manager := plugin_manager.Manager()
132-
session.BindBackwardsInvocation(plugin_manager.BackwardsInvocation())
161+
pluginManager := plugin_manager.Manager()
162+
session.BindBackwardsInvocation(pluginManager.BackwardsInvocation())
133163

134164
serverlessResponseWriter := NewServerlessTransactionWriter(session, writer)
135165

@@ -140,8 +170,10 @@ func (h *ServerlessTransactionHandler) Handle(ctx *gin.Context, sessionId string
140170
serverlessResponseWriter,
141171
sessionMessage.Data,
142172
); err != nil {
143-
ctx.Writer.WriteHeader(http.StatusInternalServerError)
144-
ctx.Writer.Write([]byte("failed to parse request"))
173+
if !ctx.Writer.Written() {
174+
ctx.Writer.WriteHeader(http.StatusInternalServerError)
175+
ctx.Writer.Write([]byte("failed to parse request"))
176+
}
145177
writer.Close()
146178
}
147179
},

0 commit comments

Comments
 (0)