Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/integ-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ on:
branches:
- develop
- main

merge_group:

permissions:
contents: read

Expand Down
20 changes: 20 additions & 0 deletions .github/workflows/validate-branch-into-main.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
name: Validate PR Branch into Main

on:
pull_request:
branches:
- main

jobs:
validate-pr-branch:
runs-on: ubuntu-latest
steps:
- name: Check source branch
run: |
SOURCE_BRANCH="${{ github.head_ref }}"
if [[ "$SOURCE_BRANCH" != "develop" ]]; then
echo "Error: Only pull requests from develop branch are allowed into main"
echo "Current source branch ($SOURCE_BRANCH)."
exit 1
fi
echo "Source branch is develop - merge allowed"

Check warning

Code scanning / CodeQL

Workflow does not contain permissions Medium

Actions job or workflow does not limit the permissions of the GITHUB_TOKEN. Consider setting an explicit permissions block, using the following as a minimal starting point: {}
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -193,4 +193,4 @@ See [CONTRIBUTING](CONTRIBUTING.md#security-issue-notifications) for more inform
## License
This project is licensed under the Apache-2.0 License.
This project is licensed under the Apache-2.0 License.
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,9 @@ func GetInitRequestMessage(fileUtil utils.FileUtil, args []string) (intmodel.Ini
XrayTracingMode: intmodel.XRayTracingModePassThrough,
CurrentWorkingDir: cwd,
RuntimeBinaryCommand: cmd,
AvailabilityZoneId: "",
AmiId: "",

AvailabilityZoneId: "use1-az1",
AmiId: "",
}, nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func Test_getInitRequestMessage(t *testing.T) {
XrayTracingMode: intmodel.XRayTracingModePassThrough,
CurrentWorkingDir: "REPLACE",
RuntimeBinaryCommand: []string{"/path/to/bootstrap"},
AvailabilityZoneId: "",
AvailabilityZoneId: "use1-az1",
AmiId: "",
},
},
Expand Down Expand Up @@ -116,7 +116,7 @@ func Test_getInitRequestMessage(t *testing.T) {
XrayTracingMode: intmodel.XRayTracingModePassThrough,
CurrentWorkingDir: "/var/task",
RuntimeBinaryCommand: []string{"/custom/bootstrap", "custom_handler"},
AvailabilityZoneId: "",
AvailabilityZoneId: "use1-az1",
AmiId: "",
},
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ import (
"os"
"time"

"github.com/google/uuid"
"github.com/aws/aws-lambda-runtime-interface-emulator/internal/lmds"

rieinvoke "github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda-managed-instances/aws-lambda-rie/internal/invoke"
"github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda-managed-instances/aws-lambda-rie/internal/telemetry"
"github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda-managed-instances/interop"
Expand Down Expand Up @@ -47,8 +50,9 @@ func Run(supv supvmodel.ProcessSupervisor, args []string, fileUtil utils.FileUti
responderFactoryFunc := func(_ context.Context, invokeReq interop.InvokeRequest) invoke.InvokeResponseSender {
return rieinvoke.NewResponder(invokeReq)
}
invokeRouter := invoke.NewInvokeRouter(rapid.MaxIdleRuntimesQueueSize, eventsAPI, responderFactoryFunc, timeout.NewRecentCache())
invokeRouter := invoke.NewInvokeRouter(rapid.RuntimePoolSize, eventsAPI, responderFactoryFunc, timeout.NewRecentCache())

metadataToken := uuid.NewString()
deps := rapid.Dependencies{
EventsAPI: eventsAPI,
LogsEgressAPI: telemetry.NewLogsEgress(telemetryAPIRelay, os.Stdout),
Expand All @@ -57,9 +61,10 @@ func Run(supv supvmodel.ProcessSupervisor, args []string, fileUtil utils.FileUti
RuntimeAPIAddrPort: runtimeAPIAddr,
FileUtils: fileUtil,
InvokeRouter: invokeRouter,
MetadataService: lmds.NewService(metadataToken),
}

raptorApp, err := raptor.StartApp(deps, "", noOpLogger{})
raptorApp, err := raptor.StartApp(deps, "", metadataToken, noOpLogger{})
if err != nil {
return nil, nil, nil, fmt.Errorf("could not start runtime api server: %w", err)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# RIE Telemetry Package

The RIE (Runtime Interface Emulator) telemetry package provides Telemetry API.

## Architecture Overview

```
┌─────────────────┐ ┌─────────────────┐ ┌──────────────────┐
│ EventsAPI │ │ LogsEgress │ │ SubscriptionAPI │
│ │ │ │ │ │
│ • Platform │ │ • Runtime logs │ │ • Subscription │
│ events │ │ • Extension │ │ management │
│ • Lifecycle │ │ logs │ │ • Schema │
│ events │ │ • Log capture │ │ validation │
└─────────┬───────┘ └─────────┬───────┘ └──────────┬───────┘
│ │ │
└──────────────┬───────────────────────────────┘
┌────▼────┐
│ Relay │
│ │
│ Event │
│ Broker │
└────┬────┘
┌──────────────┼──────────────┐
│ │ │
┌─────▼─────┐ ┌─────▼─────┐ ┌─────▼─────┐
│Subscriber │ │Subscriber │ │Subscriber │
│ A │ │ B │ │ C │
└─────┬─────┘ └─────┬─────┘ └─────┬─────┘
│ │ │
┌─────▼─────┐ ┌─────▼─────┐ ┌─────▼─────┐
│TCP Client │ │HTTP Client│ │TCP Client │
└───────────┘ └───────────┘ └───────────┘
```

## Core Components

### 1. EventsAPI (`events_api.go`)
**Responsibility**: Platform event generation and distribution

The EventsAPI serves as the primary interface for generating and broadcasting AWS Lambda platform events. It implements the `EventsAPI` interface and handles various lifecycle events including initialization, invocation, and error reporting.

### 2. LogsEgress (`logs_egress.go`)
**Responsibility**: Log capture and forwarding

The LogsEgress component implements the `StdLogsEgressAPI` interface to capture stdout/stderr from both runtime and extensions, forwarding them to telemetry subscribers while maintaining original console output.

### 3. Relay (`relay.go`)
**Responsibility**: Event broadcasting and subscriber management

The Relay acts as a central event broker, managing subscribers and broadcasting events to all registered telemetry consumers.

### 4. SubscriptionAPI (`subscription_api.go`)
**Responsibility**: Subscription management and validation

The SubscriptionAPI handles telemetry subscription requests, validates them against JSON schemas, and manages the subscription lifecycle.

## Internal Components

### 1. Subscriber (`internal/subscriber.go`)
**Responsibility**: Event batching and delivery

Each subscriber represents a telemetry consumer and manages efficient event delivery through batching and asynchronous processing.

### 2. Client (`internal/client.go`)
**Responsibility**: Protocol-specific event delivery

The client abstraction provides protocol-specific implementations for delivering events to telemetry consumers.

### 3. Batch (`internal/batch.go`)
**Responsibility**: Event collection and timing

The batch component manages collections of events with size and time-based flushing logic.

### 4. Types (`internal/types.go`)
**Responsibility**: Type definitions and constants

Centralized type definitions for protocols, event categories, and configuration structures.

## Event Flow

### 1. Subscription Flow
```
Extension/Agent → SubscriptionAPI → Schema Validation → Subscriber Creation → Relay Registration
```

### 2. Event Flow
```
Event Source → EventsAPI → Relay → Subscribers → Batching → Client
```

### 3. Log Flow
```
Runtime/Extension → LogsEgress → Console Output + Relay → Subscribers → Batching → Client
```
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestSubscriber(t *testing.T) {

agentName := fmt.Sprintf("test-name-%d", rand.Uint32())
sub := NewSubscriber(agentName, map[EventCategory]struct{}{CategoryPlatform: {}}, BufferingConfig{MaxItems: 2, MaxBytes: math.MaxInt, Timeout: math.MaxInt}, client, logsDroppedEventAPI)
time.Sleep(100 * time.Millisecond)
time.Sleep(200 * time.Millisecond)
assert.Equal(t, agentName, sub.AgentName())

sub.Flush(context.Background())
Expand All @@ -43,19 +43,23 @@ func TestSubscriber(t *testing.T) {
client.On("send", mock.Anything, mock.Anything).Return(nil)
sub.SendAsync(event, CategoryPlatform)

time.Sleep(100 * time.Millisecond)

require.Eventually(t, func() bool {
return client.AssertNumberOfCalls(t, "send", 1)
}, time.Second, 10*time.Millisecond)
}, 2*time.Second, 10*time.Millisecond)

sub.SendAsync(event, CategoryPlatform)

time.Sleep(100 * time.Millisecond)
assert.Eventually(
t,
func() bool {

sub.Flush(context.Background())
return client.AssertNumberOfCalls(t, "send", 2)
},
time.Second,
2*time.Second,
10*time.Millisecond,
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda-managed-instances/aws-lambda-rie/internal/telemetry/internal"
Expand Down Expand Up @@ -53,17 +54,31 @@ func TestLogsEgress(t *testing.T) {
require.NotNil(t, stderr)

line := []byte("test\n")
relay.On("broadcast", "test", tt.expectedCategory, tt.expectedCategory).Twice()

done := make(chan struct{}, 2)

relay.
On("broadcast", "test", tt.expectedCategory, tt.expectedCategory).
Twice().
Run(func(args mock.Arguments) {
done <- struct{}{}
})

n, err := stdout.Write(line)
assert.NoError(t, err)
assert.Len(t, line, n)
n, err = stderr.Write(line)
assert.NoError(t, err)
assert.Len(t, line, n)

assert.Eventually(t, func() bool {
return relay.AssertNumberOfCalls(t, "broadcast", 2)
}, 1*time.Second, 10*time.Millisecond)
for i := 0; i < 2; i++ {
select {
case <-done:

case <-time.After(2 * time.Second):
t.Fatal("timeout waiting for broadcast calls")
}
}
})
}
}
12 changes: 12 additions & 0 deletions internal/lambda-managed-instances/aws-lambda-rie/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

package main

import (
"github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda-managed-instances/aws-lambda-rie/run"
)

func main() {
run.Run()
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ func (_m *MockInvokeMetrics) SendMetrics(_a0 model.AppError) error {
return r0
}

func (_m *MockInvokeMetrics) SetReservationUsed(wasReserved bool) {
_m.Called(wasReserved)
}

func (_m *MockInvokeMetrics) TriggerGetRequest() {
_m.Called()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

package interop

import mock "github.com/stretchr/testify/mock"

type MockReserveIdleRuntimeRequest struct {
mock.Mock
}

func (_m *MockReserveIdleRuntimeRequest) InvokeID() string {
ret := _m.Called()

if len(ret) == 0 {
panic("no return value specified for InvokeID")
}

var r0 string
if rf, ok := ret.Get(0).(func() string); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(string)
}

return r0
}

func NewMockReserveIdleRuntimeRequest(t interface {
mock.TestingT
Cleanup(func())
}) *MockReserveIdleRuntimeRequest {
mock := &MockReserveIdleRuntimeRequest{}
mock.Mock.Test(t)

t.Cleanup(func() { mock.AssertExpectations(t) })

return mock
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

package interop

import mock "github.com/stretchr/testify/mock"

type MockReserveIdleRuntimeResponse struct {
mock.Mock
}

func (_m *MockReserveIdleRuntimeResponse) reserveIdleRuntimeResponse() {
_m.Called()
}

func NewMockReserveIdleRuntimeResponse(t interface {
mock.TestingT
Cleanup(func())
}) *MockReserveIdleRuntimeResponse {
mock := &MockReserveIdleRuntimeResponse{}
mock.Mock.Test(t)

t.Cleanup(func() { mock.AssertExpectations(t) })

return mock
}
Loading
Loading