Skip to content

Commit b9394b1

Browse files
joe4devclaude
andcommitted
feat(init): add runtime exit error reporting via supervisor and events API
Ports the supervisor and events API from PR #41 to enable proper error reporting when a Lambda runtime process exits unexpectedly (e.g. sys.exit() or missing wrapper script), instead of LocalStack timing out with a generic error. - Add LocalStackSupervisor: wraps ProcessSupervisor, detects unexpected runtime-* process exits and emits SendFault(RuntimeExit) events - Add LocalStackEventsAPI: wraps StandaloneEventsAPI, overrides SendFault to forward errors to LocalStack via SendStatus(error, ...) - Wire both into SandboxBuilder via SetEventsAPI / SetSupervisor - Refactor NewCustomInteropServer to accept a pre-created *LocalStackAdapter shared with the events API - Improve SendInitErrorResponse: properly deserialises the payload, includes RequestId, and sends asynchronously (non-blocking) Enables test_lambda_runtime_exit and test_lambda_runtime_wrapper_not_found. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 73001c5 commit b9394b1

4 files changed

Lines changed: 239 additions & 13 deletions

File tree

cmd/localstack/custom_interop.go

Lines changed: 40 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -97,15 +97,12 @@ type ErrorResponse struct {
9797
StackTrace []string `json:"stackTrace,omitempty"`
9898
}
9999

100-
func NewCustomInteropServer(lsOpts *LsOpts, delegate interop.Server, logCollector *LogCollector) (server *CustomInteropServer) {
100+
func NewCustomInteropServer(lsOpts *LsOpts, adapter *LocalStackAdapter, delegate interop.Server, logCollector *LogCollector) (server *CustomInteropServer) {
101101
server = &CustomInteropServer{
102-
delegate: delegate.(*rapidcore.Server),
103-
port: lsOpts.InteropPort,
104-
upstreamEndpoint: lsOpts.RuntimeEndpoint,
105-
localStackAdapter: &LocalStackAdapter{
106-
UpstreamEndpoint: lsOpts.RuntimeEndpoint,
107-
RuntimeId: lsOpts.RuntimeId,
108-
},
102+
delegate: delegate.(*rapidcore.Server),
103+
port: lsOpts.InteropPort,
104+
upstreamEndpoint: lsOpts.RuntimeEndpoint,
105+
localStackAdapter: adapter,
109106
}
110107

111108
// TODO: extract this
@@ -219,12 +216,44 @@ func (c *CustomInteropServer) SendErrorResponse(invokeID string, resp *interop.E
219216
return c.delegate.SendErrorResponse(invokeID, resp)
220217
}
221218

222-
// SendInitErrorResponse writes error response during init to a shared memory and sends GIRD FAULT.
219+
// SendInitErrorResponse forwards the init error to LocalStack and then propagates it to the delegate.
223220
func (c *CustomInteropServer) SendInitErrorResponse(resp *interop.ErrorInvokeResponse) error {
224221
log.Traceln("SendInitErrorResponse called")
225-
if err := c.localStackAdapter.SendStatus(Error, resp.Payload); err != nil {
226-
log.Fatalln("Failed to send init error to LocalStack " + err.Error() + ". Exiting.")
222+
223+
// Deserialize the raw payload so we can include the requestId and structured fields.
224+
var parsed struct {
225+
ErrorMessage string `json:"errorMessage"`
226+
ErrorType string `json:"errorType"`
227+
StackTrace []string `json:"stackTrace,omitempty"`
228+
}
229+
if err := json.Unmarshal(resp.Payload, &parsed); err != nil {
230+
log.WithError(err).Warn("Failed to parse init error payload; forwarding raw payload")
231+
if err := c.localStackAdapter.SendStatus(Error, resp.Payload); err != nil {
232+
log.WithError(err).WithField("runtime-id", c.localStackAdapter.RuntimeId).
233+
Error("Failed to send init error to LocalStack")
234+
}
235+
return c.delegate.SendInitErrorResponse(resp)
236+
}
237+
238+
adaptedResp := ErrorResponse{
239+
ErrorMessage: parsed.ErrorMessage,
240+
ErrorType: parsed.ErrorType,
241+
RequestId: c.delegate.GetCurrentInvokeID(),
242+
StackTrace: parsed.StackTrace,
243+
}
244+
body, err := json.Marshal(adaptedResp)
245+
if err != nil {
246+
log.WithError(err).Error("Failed to marshal adapted init error response")
247+
body = resp.Payload
227248
}
249+
250+
go func() {
251+
if err := c.localStackAdapter.SendStatus(Error, body); err != nil {
252+
log.WithError(err).WithField("runtime-id", c.localStackAdapter.RuntimeId).
253+
Error("Failed to send init error to LocalStack")
254+
}
255+
}()
256+
228257
return c.delegate.SendInitErrorResponse(resp)
229258
}
230259

cmd/localstack/events.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package main
2+
3+
import (
4+
"encoding/json"
5+
"fmt"
6+
"sync"
7+
8+
"github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda/interop"
9+
"github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda/rapidcore/standalone/telemetry"
10+
)
11+
12+
// LocalStackEventsAPI intercepts fault events and forwards them to LocalStack as error status callbacks.
13+
type LocalStackEventsAPI struct {
14+
*telemetry.StandaloneEventsAPI
15+
adapter *LocalStackAdapter
16+
requestID string
17+
mu sync.RWMutex
18+
}
19+
20+
func NewLocalStackEventsAPI(adapter *LocalStackAdapter) *LocalStackEventsAPI {
21+
return &LocalStackEventsAPI{
22+
adapter: adapter,
23+
StandaloneEventsAPI: new(telemetry.StandaloneEventsAPI),
24+
}
25+
}
26+
27+
func (ev *LocalStackEventsAPI) SendFault(data interop.FaultData) error {
28+
_ = ev.StandaloneEventsAPI.SendFault(data)
29+
30+
requestID := string(data.RequestID)
31+
if data.RequestID == "" {
32+
ev.mu.RLock()
33+
requestID = ev.requestID
34+
ev.mu.RUnlock()
35+
}
36+
37+
resp := ErrorResponse{
38+
ErrorMessage: fmt.Sprintf("RequestId: %s Error: %s", requestID, data.ErrorMessage),
39+
ErrorType: string(data.ErrorType),
40+
}
41+
42+
payload, err := json.Marshal(resp)
43+
if err != nil {
44+
return err
45+
}
46+
47+
return ev.adapter.SendStatus(Error, payload)
48+
}
49+
50+
func (ev *LocalStackEventsAPI) SetCurrentRequestID(id interop.RequestID) {
51+
ev.mu.Lock()
52+
defer ev.mu.Unlock()
53+
ev.requestID = string(id)
54+
ev.StandaloneEventsAPI.SetCurrentRequestID(id)
55+
}

cmd/localstack/main.go

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -179,18 +179,36 @@ func main() {
179179
localStackLogsEgressApi := NewLocalStackLogsEgressAPI(logCollector)
180180
tracer := NewLocalStackTracer()
181181

182+
// Create LocalStack adapter upfront so it can be shared with the events API and interop server
183+
lsAdapter := &LocalStackAdapter{
184+
UpstreamEndpoint: lsOpts.RuntimeEndpoint,
185+
RuntimeId: lsOpts.RuntimeId,
186+
}
187+
188+
// Events API forwards runtime fault events (unexpected exits) to LocalStack as error callbacks
189+
lsEventsAPI := NewLocalStackEventsAPI(lsAdapter)
190+
191+
// Supervisor intercepts runtime process terminations and emits fault events via the events API
192+
supervisorCtx, cancelSupervisor := context.WithCancel(context.Background())
193+
194+
localStackSupv := NewLocalStackSupervisor(supervisorCtx, lsEventsAPI)
195+
182196
// build sandbox
183197
sandbox := rapidcore.
184198
NewSandboxBuilder().
185199
//SetTracer(tracer).
186200
AddShutdownFunc(func() {
187201
log.Debugln("Stopping file watcher")
188202
cancelFileWatcher()
203+
log.Debugln("Stopping supervisor")
204+
cancelSupervisor()
189205
}).
190206
SetExtensionsFlag(true).
191207
SetInitCachingFlag(true).
192208
SetLogsEgressAPI(localStackLogsEgressApi).
193-
SetTracer(tracer)
209+
SetTracer(tracer).
210+
SetEventsAPI(lsEventsAPI).
211+
SetSupervisor(localStackSupv)
194212

195213
// Corresponds to the 'AWS_LAMBDA_RUNTIME_API' environment variable.
196214
// We need to ensure the runtime server is up before the INIT phase,
@@ -211,7 +229,7 @@ func main() {
211229
runDaemon(d) // async
212230

213231
defaultInterop := sandbox.DefaultInteropServer()
214-
interopServer := NewCustomInteropServer(lsOpts, defaultInterop, logCollector)
232+
interopServer := NewCustomInteropServer(lsOpts, lsAdapter, defaultInterop, logCollector)
215233
sandbox.SetInteropServer(interopServer)
216234
if len(handler) > 0 {
217235
sandbox.SetHandler(handler)

cmd/localstack/supervisor.go

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"strings"
7+
"sync/atomic"
8+
9+
"github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda/fatalerror"
10+
"github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda/interop"
11+
"github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda/supervisor"
12+
"github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda/supervisor/model"
13+
"github.com/google/uuid"
14+
log "github.com/sirupsen/logrus"
15+
)
16+
17+
// LocalStackSupervisor wraps a ProcessSupervisor and intercepts runtime process termination events.
18+
// When a runtime process exits unexpectedly it sends a fault event via the EventsAPI so LocalStack
19+
// receives a proper error instead of timing out.
20+
type LocalStackSupervisor struct {
21+
model.ProcessSupervisor
22+
eventsChan chan model.Event
23+
eventsAPI interop.EventsAPI
24+
25+
isShuttingDown *atomic.Bool
26+
}
27+
28+
func NewLocalStackSupervisor(ctx context.Context, evs interop.EventsAPI) *LocalStackSupervisor {
29+
var isShuttingDown atomic.Bool
30+
ls := &LocalStackSupervisor{
31+
ProcessSupervisor: supervisor.NewLocalSupervisor(),
32+
eventsAPI: evs,
33+
eventsChan: make(chan model.Event),
34+
isShuttingDown: &isShuttingDown,
35+
}
36+
37+
go ls.loop(ctx)
38+
39+
return ls
40+
}
41+
42+
func (ls *LocalStackSupervisor) loop(ctx context.Context) {
43+
inCh, err := ls.ProcessSupervisor.Events(ctx, nil)
44+
if err != nil {
45+
panic(err)
46+
}
47+
defer close(ls.eventsChan)
48+
for {
49+
select {
50+
case event, ok := <-inCh:
51+
if !ok {
52+
return
53+
}
54+
55+
select {
56+
case ls.eventsChan <- event:
57+
case <-ctx.Done():
58+
return
59+
}
60+
61+
if ls.isShuttingDown.Load() {
62+
continue
63+
}
64+
65+
termination := event.Event.ProcessTerminated()
66+
if termination == nil {
67+
continue
68+
}
69+
70+
if !strings.Contains(*termination.Name, "runtime-") {
71+
log.Debugf("Ignoring non-runtime process termination: %s", *termination.Name)
72+
continue
73+
}
74+
75+
if termination.Signaled() != nil {
76+
log.Debugf("Runtime process signalled: %d", *termination.Signo)
77+
}
78+
79+
faultData := interop.FaultData{
80+
RequestID: interop.RequestID(uuid.NewString()),
81+
ErrorMessage: fmt.Errorf("Runtime exited without providing a reason"),
82+
ErrorType: fatalerror.RuntimeExit,
83+
}
84+
if !termination.Success() {
85+
faultData.ErrorMessage = fmt.Errorf("Runtime exited with error: %s", termination.String())
86+
}
87+
88+
if err := ls.eventsAPI.SendFault(faultData); err != nil {
89+
log.WithError(err).Error("Failed to send runtime fault event")
90+
}
91+
case <-ctx.Done():
92+
return
93+
}
94+
}
95+
}
96+
97+
func (ls *LocalStackSupervisor) Exec(ctx context.Context, request *model.ExecRequest) error {
98+
if request.Domain == "runtime" {
99+
ls.isShuttingDown.Store(false)
100+
}
101+
return ls.ProcessSupervisor.Exec(ctx, request)
102+
}
103+
104+
func (ls *LocalStackSupervisor) Terminate(ctx context.Context, request *model.TerminateRequest) error {
105+
defer func() {
106+
if request.Domain == "runtime" && strings.HasPrefix(request.Name, "runtime-") {
107+
ls.isShuttingDown.Store(true)
108+
}
109+
}()
110+
return ls.ProcessSupervisor.Terminate(ctx, request)
111+
}
112+
113+
func (ls *LocalStackSupervisor) Kill(ctx context.Context, request *model.KillRequest) error {
114+
defer func() {
115+
if request.Domain == "runtime" && strings.HasPrefix(request.Name, "runtime-") {
116+
ls.isShuttingDown.Store(true)
117+
}
118+
}()
119+
return ls.ProcessSupervisor.Kill(ctx, request)
120+
}
121+
122+
func (ls *LocalStackSupervisor) Events(ctx context.Context, _ *model.EventsRequest) (<-chan model.Event, error) {
123+
return ls.eventsChan, nil
124+
}

0 commit comments

Comments
 (0)