Skip to content

Commit 23a4662

Browse files
authored
feat: add retry mechanism for serverless invocation on 502 errors (#569)
* feat: add retry mechanism for serverless invocation on 502 errors - Add MAX_SERVERLESS_RETRY_TIMES config (default: 3) for configurable retry attempts - Implement exponential backoff retry logic (500ms, 1s, 2s, ...) for 502 Bad Gateway errors - Only retry on 502 status code as it indicates transient AWS Lambda gateway issues - Add comprehensive test suite with 11 test cases covering all retry scenarios - Ensure proper HTTP response body cleanup on retries to prevent resource leaks * refactor: simplify retry logic and error handling in serverless invocation * fix: max time * fix: test
1 parent e8f8f17 commit 23a4662

4 files changed

Lines changed: 521 additions & 13 deletions

File tree

internal/core/serverless_runtime/io.go

Lines changed: 83 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@ import (
66
"errors"
77
"fmt"
88
"io"
9+
"net/http"
910
"net/url"
11+
"time"
1012

1113
"github.com/langgenius/dify-plugin-daemon/internal/core/io_tunnel/access_types"
1214
"github.com/langgenius/dify-plugin-daemon/pkg/entities"
@@ -27,6 +29,86 @@ func (r *ServerlessPluginRuntime) Listen(sessionId string) (
2729
return l, nil
2830
}
2931

32+
// shouldRetryStatusCode checks if the HTTP status code warrants a retry
33+
// Only 502 (Bad Gateway) errors are retried as they typically indicate temporary gateway issues
34+
//
35+
// To some AWS Lambda gateway errors, 502 randomly happens, and it's usually transient.
36+
// Thus we implement a retry mechanism for 502 errors.
37+
func shouldRetryStatusCode(statusCode int) bool {
38+
return statusCode == 502
39+
}
40+
41+
// invokeServerlessWithRetry invokes the serverless endpoint with retry logic
42+
// It will retry up to MaxRetryTimes attempts on 502 errors with exponential backoff
43+
// Backoff duration is capped at 30 seconds to prevent unreasonable wait times
44+
func (r *ServerlessPluginRuntime) invokeServerlessWithRetry(
45+
url string,
46+
sessionId string,
47+
data []byte,
48+
) (*http.Response, error) {
49+
const maxBackoffDuration = 30 * time.Second
50+
51+
var lastErr error
52+
53+
maxRetries := r.MaxRetryTimes
54+
if maxRetries <= 0 {
55+
maxRetries = 1
56+
}
57+
58+
for attempt := 0; attempt < maxRetries; attempt++ {
59+
// Apply exponential backoff for retry attempts (500ms, 1000ms, 2000ms, ...)
60+
// Capped at 30 seconds to prevent unreasonable wait times
61+
if attempt > 0 {
62+
backoffDuration := time.Duration(500*(1<<uint(attempt-1))) * time.Millisecond
63+
if backoffDuration > maxBackoffDuration {
64+
backoffDuration = maxBackoffDuration
65+
}
66+
time.Sleep(backoffDuration)
67+
}
68+
69+
// Make HTTP request to serverless endpoint
70+
response, err := http_requests.Request(
71+
r.Client, url, "POST",
72+
http_requests.HttpHeader(map[string]string{
73+
"Content-Type": "application/json",
74+
"Accept": "text/event-stream",
75+
"Dify-Plugin-Session-ID": sessionId,
76+
}),
77+
http_requests.HttpPayloadReader(io.NopCloser(bytes.NewReader(data))),
78+
http_requests.HttpReadTimeout(int64(r.PluginMaxExecutionTimeout*1000)),
79+
)
80+
81+
if err != nil {
82+
lastErr = fmt.Errorf("attempt %d/%d failed: %w", attempt+1, maxRetries, err)
83+
continue
84+
}
85+
86+
statusCode := response.StatusCode
87+
// Success - return immediately
88+
if statusCode >= 200 && statusCode < 300 {
89+
return response, nil
90+
}
91+
92+
// Check if status code should trigger a retry (502 Bad Gateway only)
93+
if shouldRetryStatusCode(statusCode) {
94+
if response.Body != nil {
95+
response.Body.Close()
96+
}
97+
lastErr = fmt.Errorf("attempt %d/%d failed with status code: %d", attempt+1, maxRetries, statusCode)
98+
continue
99+
}
100+
101+
// Non-retryable error - return immediately
102+
return response, nil
103+
}
104+
105+
if lastErr != nil {
106+
return nil, fmt.Errorf("all %d attempts failed, last error: %w", maxRetries, lastErr)
107+
}
108+
109+
return nil, fmt.Errorf("all %d attempts failed with unknown error", maxRetries)
110+
}
111+
30112
// For Serverless, write is equivalent to http request, it's not a normal stream like stdio and tcp
31113
func (r *ServerlessPluginRuntime) Write(
32114
sessionId string,
@@ -49,26 +131,15 @@ func (r *ServerlessPluginRuntime) Write(
49131
routinepkg.RoutineLabelKeySessionID: sessionId,
50132
routinepkg.RoutineLabelKeyLambdaURL: r.LambdaURL,
51133
}, func() {
52-
// remove the session from listeners
53134
defer r.listeners.Delete(sessionId)
54135
defer l.Close()
55136
defer l.Send(plugin_entities.SessionMessage{
56137
Type: plugin_entities.SESSION_MESSAGE_TYPE_END,
57138
Data: []byte(""),
58139
})
59140

60-
// create a new http request to serverless runtimes
61141
url += "?action=" + string(action)
62-
response, err := http_requests.Request(
63-
r.Client, url, "POST",
64-
http_requests.HttpHeader(map[string]string{
65-
"Content-Type": "application/json",
66-
"Accept": "text/event-stream",
67-
"Dify-Plugin-Session-ID": sessionId,
68-
}),
69-
http_requests.HttpPayloadReader(io.NopCloser(bytes.NewReader(data))),
70-
http_requests.HttpReadTimeout(int64(r.PluginMaxExecutionTimeout*1000)),
71-
)
142+
response, err := r.invokeServerlessWithRetry(url, sessionId, data)
72143
if err != nil {
73144
l.Send(plugin_entities.SessionMessage{
74145
Type: plugin_entities.SESSION_MESSAGE_TYPE_ERROR,
@@ -80,7 +151,6 @@ func (r *ServerlessPluginRuntime) Write(
80151
return
81152
}
82153

83-
// write to data stream
84154
scanner := bufio.NewScanner(response.Body)
85155
defer response.Body.Close()
86156

0 commit comments

Comments
 (0)