Skip to content

Commit 8b0a153

Browse files
committed
fix more aws logs
1 parent 0e9c3cf commit 8b0a153

1 file changed

Lines changed: 93 additions & 1 deletion

File tree

  • internal/core/serverless_runtime

internal/core/serverless_runtime/io.go

Lines changed: 93 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,65 @@ import (
1515
"github.com/langgenius/dify-plugin-daemon/pkg/entities/plugin_entities"
1616
routinepkg "github.com/langgenius/dify-plugin-daemon/pkg/routine"
1717
"github.com/langgenius/dify-plugin-daemon/pkg/utils/http_requests"
18+
"github.com/langgenius/dify-plugin-daemon/pkg/utils/log"
1819
"github.com/langgenius/dify-plugin-daemon/pkg/utils/parser"
1920
"github.com/langgenius/dify-plugin-daemon/pkg/utils/routine"
2021
)
2122

23+
const serverlessErrorResponsePreviewLimit = 4 * 1024
24+
25+
type limitedBuffer struct {
26+
buf bytes.Buffer
27+
limit int
28+
}
29+
30+
func (b *limitedBuffer) Write(p []byte) (int, error) {
31+
originalLen := len(p)
32+
if b.limit <= 0 || b.buf.Len() >= b.limit {
33+
return originalLen, nil
34+
}
35+
36+
remaining := b.limit - b.buf.Len()
37+
if len(p) > remaining {
38+
p = p[:remaining]
39+
}
40+
41+
_, err := b.buf.Write(p)
42+
return originalLen, err
43+
}
44+
45+
func (b *limitedBuffer) String() string {
46+
return b.buf.String()
47+
}
48+
49+
func serverlessResponseHeaders(response *http.Response) map[string]string {
50+
headers := map[string]string{}
51+
for _, key := range []string{
52+
"x-amzn-RequestId",
53+
"x-amzn-ErrorType",
54+
"x-amz-function-error",
55+
"content-type",
56+
"content-length",
57+
} {
58+
if value := response.Header.Get(key); value != "" {
59+
headers[key] = value
60+
}
61+
}
62+
return headers
63+
}
64+
65+
func readServerlessErrorPreview(response *http.Response) (string, error) {
66+
if response.Body == nil {
67+
return "", nil
68+
}
69+
70+
preview, err := io.ReadAll(io.LimitReader(response.Body, serverlessErrorResponsePreviewLimit))
71+
if err != nil {
72+
return "", err
73+
}
74+
return string(preview), nil
75+
}
76+
2277
func (r *ServerlessPluginRuntime) Listen(sessionId string) (
2378
*entities.Broadcast[plugin_entities.SessionMessage],
2479
error,
@@ -55,6 +110,8 @@ func (r *ServerlessPluginRuntime) invokeServerlessWithRetry(
55110
maxRetries = 1
56111
}
57112

113+
payloadSizeBytes := len(data)
114+
58115
for attempt := 0; attempt < maxRetries; attempt++ {
59116
// Apply exponential backoff for retry attempts (500ms, 1000ms, 2000ms, ...)
60117
// Capped at 30 seconds to prevent unreasonable wait times
@@ -79,6 +136,14 @@ func (r *ServerlessPluginRuntime) invokeServerlessWithRetry(
79136
)
80137

81138
if err != nil {
139+
log.Warn("serverless request failed",
140+
"session_id", sessionId,
141+
"attempt", attempt+1,
142+
"max_retries", maxRetries,
143+
"url", url,
144+
"payload_size_bytes", payloadSizeBytes,
145+
"error", err,
146+
)
82147
lastErr = fmt.Errorf("attempt %d/%d failed: %w", attempt+1, maxRetries, err)
83148
continue
84149
}
@@ -91,6 +156,19 @@ func (r *ServerlessPluginRuntime) invokeServerlessWithRetry(
91156

92157
// Check if status code should trigger a retry (502 Bad Gateway only)
93158
if shouldRetryStatusCode(statusCode) {
159+
responsePreview, readErr := readServerlessErrorPreview(response)
160+
log.Warn("serverless request returned retryable status",
161+
"session_id", sessionId,
162+
"attempt", attempt+1,
163+
"max_retries", maxRetries,
164+
"url", url,
165+
"payload_size_bytes", payloadSizeBytes,
166+
"status", response.Status,
167+
"status_code", statusCode,
168+
"headers", serverlessResponseHeaders(response),
169+
"response_preview", responsePreview,
170+
"response_preview_read_error", readErr,
171+
)
94172
if response.Body != nil {
95173
response.Body.Close()
96174
}
@@ -151,7 +229,8 @@ func (r *ServerlessPluginRuntime) Write(
151229
return
152230
}
153231

154-
scanner := bufio.NewScanner(response.Body)
232+
responsePreview := &limitedBuffer{limit: serverlessErrorResponsePreviewLimit}
233+
scanner := bufio.NewScanner(io.TeeReader(response.Body, responsePreview))
155234
defer response.Body.Close()
156235

157236
scanner.Buffer(make([]byte, r.RuntimeBufferSize), r.RuntimeMaxBufferSize)
@@ -204,6 +283,19 @@ func (r *ServerlessPluginRuntime) Write(
204283
}),
205284
})
206285
}
286+
287+
if response.StatusCode < 200 || response.StatusCode >= 300 {
288+
log.Warn("serverless request returned non-success status",
289+
"session_id", sessionId,
290+
"action", action,
291+
"lambda_url", r.LambdaURL,
292+
"payload_size_bytes", len(data),
293+
"status", response.Status,
294+
"status_code", response.StatusCode,
295+
"headers", serverlessResponseHeaders(response),
296+
"response_preview", responsePreview.String(),
297+
)
298+
}
207299
})
208300

209301
return nil

0 commit comments

Comments
 (0)