Skip to content

Commit 8a71aa1

Browse files
joe4devclaude
andcommitted
fix(init): own the init-timeout retry and report init failures via rapidcore
Move the Lambda init-phase timeout retry into the RIE and replace the custom supervisor with rapidcore's existing init-failure machinery. - rapidcore: add AwaitInitializedWithDetails (structured init outcome) and AwaitInitializedWithTimeout (timer-aware; does NOT consume the init-failures channel on timeout, so the invoke path's Reserve() can still drive suppressed init). Refactor awaitInitialized into interpretInitFailure. - main.go: on init-phase timeout (LOCALSTACK_INIT_PHASE_TIMEOUT, default 10s), emit INIT_REPORT, signal ready, and reset the in-progress init so the first invoke re-runs it (suppressed init) under the function timeout. Genuine init failures are reported via SendInitError. - custom_interop.go: SendInitError crash-path fallback with an initErrorForwarded dedup guard, formatted as AWS's "RequestId: <id> Error: <msg>"; ReportInitTimeout + initTimedOut-driven Init Duration suppression. - Remove the custom LocalStackSupervisor/LocalStackEventsAPI; drop unused IsInitRetry from lsapi. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent fa40f4b commit 8a71aa1

6 files changed

Lines changed: 178 additions & 220 deletions

File tree

cmd/localstack/custom_interop.go

Lines changed: 77 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,17 @@ import (
1212
"net/http"
1313
"strconv"
1414
"strings"
15+
"sync/atomic"
1516
"time"
1617

1718
"github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda/core/statejson"
19+
"github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda/fatalerror"
1820
"github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda/interop"
1921
"github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda/rapidcore"
2022
"github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda/rapidcore/standalone"
2123
"github.com/aws/aws-lambda-runtime-interface-emulator/internal/lsapi"
2224
"github.com/go-chi/chi/v5"
25+
"github.com/google/uuid"
2326
log "github.com/sirupsen/logrus"
2427
)
2528

@@ -28,11 +31,23 @@ type CustomInteropServer struct {
2831
localStackAdapter *LocalStackAdapter
2932
port string
3033
upstreamEndpoint string
34+
// logCollector accumulates the runtime's stdout/stderr plus the synthetic START/REPORT/
35+
// INIT_REPORT lines that are flushed to LocalStack with each invocation's logs.
36+
logCollector *LogCollector
3137
// initStart is set once in Init() and warmStart is flipped on the first invoke.
3238
// Both are accessed only from the single sequential init -> invoke flow (the RIE
3339
// processes one invocation at a time), so they need no additional synchronization.
3440
initStart time.Time
3541
warmStart bool
42+
// initTimedOut is set by ReportInitTimeout when the init phase exceeds its timeout. It is
43+
// written from the init-await flow and read from the invoke flow, so it uses atomic access.
44+
// When set, the first invocation's REPORT omits Init Duration (init was already reported as
45+
// timed out and is re-run as a suppressed init during that invocation).
46+
initTimedOut atomic.Bool
47+
// initErrorForwarded is set once the runtime's own /init/error has been forwarded to
48+
// LocalStack via SendInitErrorResponse, so the crash-path fallback (SendInitError) does
49+
// not send a duplicate error status for the same failed initialization.
50+
initErrorForwarded atomic.Bool
3651
}
3752

3853
type LocalStackAdapter struct {
@@ -96,13 +111,13 @@ func (l *LocalStackAdapter) SendResult(invokeId string, body []byte, isError boo
96111
return nil
97112
}
98113

99-
100114
func NewCustomInteropServer(lsOpts *LsOpts, adapter *LocalStackAdapter, delegate interop.Server, logCollector *LogCollector) (server *CustomInteropServer) {
101115
server = &CustomInteropServer{
102116
delegate: delegate.(*rapidcore.Server),
103117
port: lsOpts.InteropPort,
104118
upstreamEndpoint: lsOpts.RuntimeEndpoint,
105119
localStackAdapter: adapter,
120+
logCollector: logCollector,
106121
}
107122

108123
// TODO: extract this
@@ -126,7 +141,7 @@ func NewCustomInteropServer(lsOpts *LsOpts, adapter *LocalStackAdapter, delegate
126141
_, _ = fmt.Fprintf(logCollector, "START RequestId: %s Version: %s\n", invokeR.InvokeId, functionVersion)
127142

128143
initDuration := ""
129-
if !server.warmStart && !invokeR.IsInitRetry {
144+
if !server.warmStart && !server.initTimedOut.Load() {
130145
initTimeMS := float64(time.Since(server.initStart).Nanoseconds()) / float64(time.Millisecond)
131146
initDuration = fmt.Sprintf("Init Duration: %.2f ms\t", initTimeMS)
132147
}
@@ -225,9 +240,15 @@ func (c *CustomInteropServer) SendErrorResponse(invokeID string, resp *interop.E
225240
return c.delegate.SendErrorResponse(invokeID, resp)
226241
}
227242

228-
// SendInitErrorResponse forwards the init error to LocalStack and then propagates it to the delegate.
243+
// SendInitErrorResponse forwards the init error reported by the runtime (via /init/error) to
244+
// LocalStack and then propagates it to the delegate. It marks initErrorForwarded so the
245+
// crash-path fallback in main.go (SendInitError) does not send a duplicate error status for
246+
// the same failed initialization.
229247
func (c *CustomInteropServer) SendInitErrorResponse(resp *interop.ErrorInvokeResponse) error {
230248
log.Traceln("SendInitErrorResponse called")
249+
// Mark synchronously, before sending: this runs in the init flow before
250+
// AwaitInitializedWithDetails unblocks in main.go, so the fallback observes the flag.
251+
c.initErrorForwarded.Store(true)
231252

232253
// Deserialize the raw payload so we can include the requestId and structured fields.
233254
var parsed struct {
@@ -267,6 +288,49 @@ func (c *CustomInteropServer) SendInitErrorResponse(resp *interop.ErrorInvokeRes
267288
return c.delegate.SendInitErrorResponse(resp)
268289
}
269290

291+
// SendInitError reports a structured init failure to LocalStack when the runtime failed to
292+
// initialize WITHOUT calling /init/error itself (e.g. it crashed, called sys.exit, or had an
293+
// invalid entrypoint). The init failure is detected by the existing rapidcore machinery
294+
// (watchEvents -> InitFailure -> AwaitInitializedWithDetails) and surfaced to main.go.
295+
// It is a no-op if SendInitErrorResponse already forwarded the runtime's own structured error.
296+
func (c *CustomInteropServer) SendInitError(errType fatalerror.ErrorType, errMsg error) {
297+
if c.initErrorForwarded.Load() {
298+
log.Debug("Init error already forwarded to LocalStack; skipping duplicate")
299+
return
300+
}
301+
302+
if errType == "" {
303+
errType = fatalerror.RuntimeExit
304+
}
305+
306+
message := "Runtime exited during initialization"
307+
if errMsg != nil {
308+
message = errMsg.Error()
309+
}
310+
311+
// Match AWS's fault message format "RequestId: <id> Error: <msg>". No invocation is active
312+
// during the init phase (LocalStack only dispatches an invoke after the runtime reports
313+
// ready), so synthesize a request ID, preferring the current invoke ID if one exists.
314+
requestID := c.delegate.GetCurrentInvokeID()
315+
if requestID == "" {
316+
requestID = uuid.NewString()
317+
}
318+
319+
payload, err := json.Marshal(lsapi.ErrorResponse{
320+
ErrorType: string(errType),
321+
ErrorMessage: fmt.Sprintf("RequestId: %s Error: %s", requestID, message),
322+
})
323+
if err != nil {
324+
log.WithError(err).Error("Failed to marshal init error response")
325+
return
326+
}
327+
328+
if err := c.localStackAdapter.SendStatus(Error, payload); err != nil {
329+
log.WithError(err).WithField("runtime-id", c.localStackAdapter.RuntimeId).
330+
Error("Failed to send init error to LocalStack")
331+
}
332+
}
333+
270334
func (c *CustomInteropServer) GetCurrentInvokeID() string {
271335
log.Traceln("GetCurrentInvokeID called")
272336
return c.delegate.GetCurrentInvokeID()
@@ -283,6 +347,16 @@ func (c *CustomInteropServer) Init(i *interop.Init, invokeTimeoutMs int64) error
283347
return c.delegate.Init(i, invokeTimeoutMs)
284348
}
285349

350+
// ReportInitTimeout emits an AWS-style INIT_REPORT timeout line into the log collector and
351+
// marks the init as timed out. The init is then re-run as a suppressed init during the first
352+
// invocation (under the function timeout), and that invocation's REPORT omits Init Duration.
353+
func (c *CustomInteropServer) ReportInitTimeout() {
354+
c.initTimedOut.Store(true)
355+
initTimeMS := float64(time.Since(c.initStart).Nanoseconds()) / float64(time.Millisecond)
356+
_, _ = fmt.Fprintf(c.logCollector,
357+
"INIT_REPORT Init Duration: %.2f ms\tPhase: init\tStatus: timeout\n", initTimeMS)
358+
}
359+
286360
func (c *CustomInteropServer) Invoke(responseWriter http.ResponseWriter, invoke *interop.Invoke) error {
287361
log.Traceln("Invoke called")
288362
return c.delegate.Invoke(responseWriter, invoke)

cmd/localstack/events.go

Lines changed: 0 additions & 63 deletions
This file was deleted.

cmd/localstack/main.go

Lines changed: 49 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ package main
44

55
import (
66
"context"
7+
"errors"
78
"os"
89
"runtime/debug"
910
"strconv"
@@ -15,6 +16,16 @@ import (
1516
log "github.com/sirupsen/logrus"
1617
)
1718

19+
const (
20+
// defaultInitPhaseTimeoutSeconds matches AWS's 10s init-phase limit. When init exceeds
21+
// this, the init is retried at the time of the first invocation under the function
22+
// timeout ("suppressed init"). Override via LOCALSTACK_INIT_PHASE_TIMEOUT.
23+
defaultInitPhaseTimeoutSeconds = 10
24+
// initResetTimeoutMs bounds the reset that aborts a timed-out init so rapidcore re-runs
25+
// it on the first invocation.
26+
initResetTimeoutMs = 2000
27+
)
28+
1829
type LsOpts struct {
1930
InteropPort string
2031
RuntimeEndpoint string
@@ -179,36 +190,24 @@ func main() {
179190
localStackLogsEgressApi := NewLocalStackLogsEgressAPI(logCollector)
180191
tracer := NewLocalStackTracer()
181192

182-
// Create LocalStack adapter upfront so it can be shared with the events API and interop server
193+
// Create LocalStack adapter upfront so it can be shared with the interop server
183194
lsAdapter := &LocalStackAdapter{
184195
UpstreamEndpoint: lsOpts.RuntimeEndpoint,
185196
RuntimeId: lsOpts.RuntimeId,
186197
}
187198

188-
// Events API forwards runtime fault events (unexpected exits) to LocalStack as error callbacks
189-
lsEventsAPI := NewLocalStackEventsAPI(lsAdapter)
190-
191-
// Supervisor intercepts runtime process terminations and emits fault events via the events API
192-
supervisorCtx, cancelSupervisor := context.WithCancel(context.Background())
193-
194-
localStackSupv := NewLocalStackSupervisor(supervisorCtx, lsEventsAPI)
195-
196199
// build sandbox
197200
sandbox := rapidcore.
198201
NewSandboxBuilder().
199202
//SetTracer(tracer).
200203
AddShutdownFunc(func() {
201204
log.Debugln("Stopping file watcher")
202205
cancelFileWatcher()
203-
log.Debugln("Stopping supervisor")
204-
cancelSupervisor()
205206
}).
206207
SetExtensionsFlag(true).
207208
SetInitCachingFlag(true).
208209
SetLogsEgressAPI(localStackLogsEgressApi).
209-
SetTracer(tracer).
210-
SetEventsAPI(lsEventsAPI).
211-
SetSupervisor(localStackSupv)
210+
SetTracer(tracer)
212211

213212
// Corresponds to the 'AWS_LAMBDA_RUNTIME_API' environment variable.
214213
// We need to ensure the runtime server is up before the INIT phase,
@@ -266,12 +265,44 @@ func main() {
266265
log.Debugln("Starting runtime init.")
267266
InitHandler(sandbox.LambdaInvokeAPI(), GetEnvOrDie("AWS_LAMBDA_FUNCTION_VERSION"), int64(invokeTimeoutSeconds), bootstrap, lsOpts.AccountId) // TODO: replace this with a custom init
268267

268+
initPhaseTimeoutSeconds := defaultInitPhaseTimeoutSeconds
269+
if v := os.Getenv("LOCALSTACK_INIT_PHASE_TIMEOUT"); v != "" {
270+
if parsed, perr := strconv.Atoi(v); perr == nil {
271+
initPhaseTimeoutSeconds = parsed
272+
} else {
273+
log.Warnln("Invalid LOCALSTACK_INIT_PHASE_TIMEOUT, using default:", perr)
274+
}
275+
}
276+
269277
log.Debugln("Awaiting initialization of runtime init.")
270-
if err := interopServer.delegate.AwaitInitialized(); err != nil {
271-
// Error cases: ErrInitDoneFailed or ErrInitResetReceived
278+
initResp, timedOut, err := interopServer.delegate.AwaitInitializedWithTimeout(
279+
time.Duration(initPhaseTimeoutSeconds) * time.Second,
280+
)
281+
switch {
282+
case timedOut:
283+
// AWS limits the init phase to 10s. When exceeded, init is retried at the time of the
284+
// first invocation under the function timeout ("suppressed init"). We report the init
285+
// timeout and signal ready so LocalStack dispatches the first invoke, then reset the
286+
// in-progress init so rapidcore re-runs a fresh Init phase when that invoke arrives.
287+
// The reset failure is intentionally left unconsumed here so the invoke path's
288+
// Reserve()/awaitInitialized() picks it up and triggers the suppressed init.
289+
log.Debugln("Init phase timed out; deferring to suppressed init on first invocation.")
290+
interopServer.ReportInitTimeout()
291+
go func() {
292+
if _, resetErr := interopServer.delegate.Reset("initTimeout", initResetTimeoutMs); resetErr != nil {
293+
log.Debugf("Reset after init timeout returned: %s", resetErr)
294+
}
295+
}()
296+
case err != nil:
297+
// Error cases: ErrInitDoneFailed (runtime crashed/exited or called /init/error) or
298+
// ErrInitResetReceived (init-phase reset). When the runtime reported its own error via
299+
// /init/error, SendInitErrorResponse already forwarded it and SendInitError is a no-op.
300+
// When the runtime instead crashed/exited without reporting, this is the only callback
301+
// that notifies LocalStack (otherwise it waits until the environment timeout).
272302
log.Errorln("Runtime init failed to initialize: " + err.Error() + ". Exiting.")
273-
// NOTE: Sending the error status to LocalStack is handled beforehand in the custom_interop.go through the
274-
// callback SendInitErrorResponse because it contains the correct error response payload.
303+
if !errors.Is(err, rapidcore.ErrInitResetReceived) {
304+
interopServer.SendInitError(initResp.InitErrorType, initResp.InitErrorMessage)
305+
}
275306
return
276307
}
277308

0 commit comments

Comments
 (0)