Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion pkg/settings/cresettings/defaults.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@
"WASMExecutionTimeout": "1m0s",
"WASMMemoryLimit": "100mb",
"WASMBinarySizeLimit": "30mb",
"WASMCompressedBinarySizeLimit": "20mb",
"WASMConfigSizeLimit": "30mb",
"WASMSecretsSizeLimit": "30mb",
"WASMResponseSizeLimit": "5mb",
"ConsensusObservationSizeLimit": "10kb",
"ConsensusCallsLimit": "2",
"LogLineLimit": "1kb",
Expand Down Expand Up @@ -65,7 +69,7 @@
"CallLimit": "2"
},
"HTTPAction": {
"CallLimit": "3",
"CallLimit": "5",
"ResponseSizeLimit": "10kb",
"ConnectionTimeout": "10s",
"RequestSizeLimit": "100kb",
Expand Down
6 changes: 5 additions & 1 deletion pkg/settings/cresettings/defaults.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ ExecutionResponseLimit = '100kb'
WASMExecutionTimeout = '1m0s'
WASMMemoryLimit = '100mb'
WASMBinarySizeLimit = '30mb'
WASMCompressedBinarySizeLimit = '20mb'
WASMConfigSizeLimit = '30mb'
WASMSecretsSizeLimit = '30mb'
WASMResponseSizeLimit = '5mb'
ConsensusObservationSizeLimit = '10kb'
ConsensusCallsLimit = '2'
LogLineLimit = '1kb'
Expand Down Expand Up @@ -66,7 +70,7 @@ ObservationSizeLimit = '10kb'
CallLimit = '2'

[PerWorkflow.HTTPAction]
CallLimit = '3'
CallLimit = '5'
ResponseSizeLimit = '10kb'
ConnectionTimeout = '10s'
RequestSizeLimit = '100kb'
Expand Down
16 changes: 12 additions & 4 deletions pkg/settings/cresettings/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ var Default = Schema{
WASMExecutionTimeout: Duration(60 * time.Second),
WASMMemoryLimit: Size(100 * config.MByte),
WASMBinarySizeLimit: Size(30 * config.MByte),
WASMCompressedBinarySizeLimit: Size(20 * config.MByte),
WASMConfigSizeLimit: Size(30 * config.MByte),
WASMSecretsSizeLimit: Size(30 * config.MByte),
WASMResponseSizeLimit: Size(5 * config.MByte),
ConsensusObservationSizeLimit: Size(10 * config.KByte),
ConsensusCallsLimit: Int(2),
LogLineLimit: Size(config.KByte),
Expand Down Expand Up @@ -113,7 +117,7 @@ var Default = Schema{
CallLimit: Int(2),
},
HTTPAction: httpAction{
CallLimit: Int(3),
CallLimit: Int(5),
ResponseSizeLimit: Size(10 * config.KByte),
ConnectionTimeout: Duration(10 * time.Second),
RequestSizeLimit: Size(100 * config.KByte),
Expand Down Expand Up @@ -163,9 +167,13 @@ type Workflows struct {
ExecutionTimeout Setting[time.Duration]
ExecutionResponseLimit Setting[config.Size]

WASMExecutionTimeout Setting[time.Duration]
WASMMemoryLimit Setting[config.Size]
WASMBinarySizeLimit Setting[config.Size]
WASMExecutionTimeout Setting[time.Duration]
WASMMemoryLimit Setting[config.Size]
WASMBinarySizeLimit Setting[config.Size]
WASMCompressedBinarySizeLimit Setting[config.Size]
WASMConfigSizeLimit Setting[config.Size]
WASMSecretsSizeLimit Setting[config.Size]
WASMResponseSizeLimit Setting[config.Size]

// Deprecated: use Consensus.ObservationSizeLimit
ConsensusObservationSizeLimit Setting[config.Size]
Expand Down
143 changes: 100 additions & 43 deletions pkg/workflows/wasm/host/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@ import (
"github.com/bytecodealliance/wasmtime-go/v28"
"google.golang.org/protobuf/proto"

"github.com/smartcontractkit/chainlink-common/pkg/config"
"github.com/smartcontractkit/chainlink-common/pkg/custmsg"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/settings"
"github.com/smartcontractkit/chainlink-common/pkg/settings/limits"
dagsdk "github.com/smartcontractkit/chainlink-common/pkg/workflows/sdk"
"github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm"
wasmdagpb "github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/pb"
Expand Down Expand Up @@ -52,18 +55,22 @@ type DeterminismConfig struct {
Seed int64
}
type ModuleConfig struct {
TickInterval time.Duration
Timeout *time.Duration
MaxMemoryMBs uint64
MinMemoryMBs uint64
InitialFuel uint64
Logger logger.Logger
IsUncompressed bool
Fetch func(ctx context.Context, req *FetchRequest) (*FetchResponse, error)
MaxFetchRequests int
MaxCompressedBinarySize uint64
MaxDecompressedBinarySize uint64
MaxResponseSizeBytes uint64
TickInterval time.Duration
Timeout *time.Duration
MaxMemoryMBs uint64
MinMemoryMBs uint64
MemoryLimiter limits.BoundLimiter[config.Size] // supersedes Max/MinMemoryMBs if set
InitialFuel uint64
Logger logger.Logger
IsUncompressed bool
Fetch func(ctx context.Context, req *FetchRequest) (*FetchResponse, error)
MaxFetchRequests int
MaxCompressedBinarySize uint64
MaxCompressedBinaryLimiter limits.BoundLimiter[config.Size] // supersedes MaxCompressedBinarySize if set
MaxDecompressedBinarySize uint64
MaxDecompressedBinaryLimiter limits.BoundLimiter[config.Size] // supersedes MaxDecompressedBinarySize if set
MaxResponseSizeBytes uint64
MaxResponseSizeLimiter limits.BoundLimiter[config.Size] // supersedes MaxResponseSizeBytes if set

MaxLogLenBytes uint32
MaxLogCountDONMode uint32
Expand Down Expand Up @@ -143,7 +150,7 @@ func WithDeterminism() func(*ModuleConfig) {
}
}

func NewModule(modCfg *ModuleConfig, binary []byte, opts ...func(*ModuleConfig)) (*module, error) {
func NewModule(ctx context.Context, modCfg *ModuleConfig, binary []byte, opts ...func(*ModuleConfig)) (*module, error) {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to find a way to avoid breaking this signature, but there's not an elegant way around it without just introducing a new func to use instead. Happy to do that instead if folks prefer 🤷

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose another variation would be to extract the binary size checks to be pre-construction in order to avoid the issue and simplify this implementation. The fact that we already check the binary size elsewhere suggests that we could simplify a bit too.

// Apply options to the module config.
for _, opt := range opts {
opt(modCfg)
Expand Down Expand Up @@ -200,33 +207,59 @@ func NewModule(modCfg *ModuleConfig, binary []byte, opts ...func(*ModuleConfig))
modCfg.MaxLogCountNodeMode = uint32(defaultMaxLogCountNodeMode)
}

// Take the max of the min and the configured max memory mbs.
// We do this because Go requires a minimum of 16 megabytes to run,
// and local testing has shown that with less than the min, some
// binaries may error sporadically.
modCfg.MaxMemoryMBs = uint64(math.Max(float64(modCfg.MinMemoryMBs), float64(modCfg.MaxMemoryMBs)))

cfg := wasmtime.NewConfig()
cfg.SetEpochInterruption(true)
if modCfg.InitialFuel > 0 {
cfg.SetConsumeFuel(true)
lf := limits.Factory{Logger: modCfg.Logger}
if modCfg.MemoryLimiter == nil {
// Take the max of the min and the configured max memory mbs.
// We do this because Go requires a minimum of 16 megabytes to run,
// and local testing has shown that with less than the min, some
// binaries may error sporadically.
modCfg.MaxMemoryMBs = uint64(math.Max(float64(modCfg.MinMemoryMBs), float64(modCfg.MaxMemoryMBs)))
limit := settings.Size(config.Size(modCfg.MaxMemoryMBs) * config.MByte)
var err error
modCfg.MemoryLimiter, err = limits.MakeBoundLimiter(lf, limit)
if err != nil {
return nil, fmt.Errorf("failed to make memory limiter: %w", err)
}
}
if modCfg.MaxCompressedBinaryLimiter == nil {
limit := settings.Size(config.Size(modCfg.MaxCompressedBinarySize))
var err error
modCfg.MaxCompressedBinaryLimiter, err = limits.MakeBoundLimiter(lf, limit)
if err != nil {
return nil, fmt.Errorf("failed to make compressed binary size limiter: %w", err)
}
}
if modCfg.MaxDecompressedBinaryLimiter == nil {
limit := settings.Size(config.Size(modCfg.MaxDecompressedBinarySize))
var err error
modCfg.MaxDecompressedBinaryLimiter, err = limits.MakeBoundLimiter(lf, limit)
if err != nil {
return nil, fmt.Errorf("failed to make decompressed binary size limiter: %w", err)
}
}
if modCfg.MaxResponseSizeLimiter == nil {
limit := settings.Size(config.Size(modCfg.MaxResponseSizeBytes))
var err error
modCfg.MaxResponseSizeLimiter, err = limits.MakeBoundLimiter(lf, limit)
if err != nil {
return nil, fmt.Errorf("failed to make response size limiter: %w", err)
}
}

cfg.CacheConfigLoadDefault()
cfg.SetCraneliftOptLevel(wasmtime.OptLevelSpeedAndSize)

// Handled differenty based on host OS.
SetUnwinding(cfg)

engine := wasmtime.NewEngineWithConfig(cfg)
if !modCfg.IsUncompressed {
// validate the binary size before decompressing
// this is to prevent decompression bombs
if uint64(len(binary)) > modCfg.MaxCompressedBinarySize {
return nil, fmt.Errorf("compressed binary size exceeds the maximum allowed size of %d bytes", modCfg.MaxCompressedBinarySize)
if err := modCfg.MaxCompressedBinaryLimiter.Check(ctx, config.SizeOf(binary)); err != nil {
if errors.Is(err, limits.ErrorBoundLimited[config.Size]{}) {
return nil, fmt.Errorf("compressed binary size exceeds the maximum allowed size of %d bytes: %w", modCfg.MaxCompressedBinarySize, err)
}
return nil, fmt.Errorf("failed to check compressed binary size limit: %w", err)
}

rdr := io.LimitReader(brotli.NewReader(bytes.NewBuffer(binary)), int64(modCfg.MaxDecompressedBinarySize+1))
maxDecompressedBinarySize, err := modCfg.MaxDecompressedBinaryLimiter.Limit(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get decompressed binary size limit: %w", err)
}
rdr := io.LimitReader(brotli.NewReader(bytes.NewBuffer(binary)), int64(maxDecompressedBinarySize+1))
decompedBinary, err := io.ReadAll(rdr)
if err != nil {
return nil, fmt.Errorf("failed to decompress binary: %w", err)
Expand All @@ -238,9 +271,27 @@ func NewModule(modCfg *ModuleConfig, binary []byte, opts ...func(*ModuleConfig))
// Validate the decompressed binary size.
// io.LimitReader prevents decompression bombs by reading up to a set limit, but it will not return an error if the limit is reached.
// The Read() method will return io.EOF, and ReadAll will gracefully handle it and return nil.
if uint64(len(binary)) > modCfg.MaxDecompressedBinarySize {
return nil, fmt.Errorf("decompressed binary size reached the maximum allowed size of %d bytes", modCfg.MaxDecompressedBinarySize)
if err := modCfg.MaxDecompressedBinaryLimiter.Check(ctx, config.SizeOf(binary)); err != nil {
if errors.Is(err, limits.ErrorBoundLimited[config.Size]{}) {
return nil, fmt.Errorf("decompressed binary size reached the maximum allowed size of %d bytes: %w", modCfg.MaxDecompressedBinarySize, err)
}
return nil, fmt.Errorf("failed to check decompressed binary size limit: %w", err)
}

return newModule(modCfg, binary)
}

func newModule(modCfg *ModuleConfig, binary []byte) (*module, error) {
cfg := wasmtime.NewConfig()
cfg.SetEpochInterruption(true)
if modCfg.InitialFuel > 0 {
cfg.SetConsumeFuel(true)
}
cfg.CacheConfigLoadDefault()
cfg.SetCraneliftOptLevel(wasmtime.OptLevelSpeedAndSize)
SetUnwinding(cfg) // Handled differenty based on host OS.

engine := wasmtime.NewEngineWithConfig(cfg)

mod, err := wasmtime.NewModule(engine, binary)
if err != nil {
Expand All @@ -256,16 +307,14 @@ func NewModule(modCfg *ModuleConfig, binary []byte, opts ...func(*ModuleConfig))
}
}

m := &module{
return &module{
engine: engine,
module: mod,
wconfig: cfg,
cfg: modCfg,
stopCh: make(chan struct{}),
v2ImportName: v2ImportName,
}

return m, nil
}, nil
}

func linkNoDAG(m *module, store *wasmtime.Store, exec *execution[*sdkpb.ExecutionResult]) (*wasmtime.Instance, error) {
Expand Down Expand Up @@ -468,7 +517,7 @@ func (m *module) Run(ctx context.Context, request *wasmdagpb.Request) (*wasmdagp
computeRequest := r.GetComputeRequest()
if computeRequest != nil {
computeRequest.RuntimeConfig = &wasmdagpb.RuntimeConfig{
MaxResponseSizeBytes: int64(m.cfg.MaxResponseSizeBytes),
MaxResponseSizeBytes: int64(maxSize),
}
}
}
Expand All @@ -493,7 +542,11 @@ func runWasm[I, O proto.Message](

defer store.Close()

setMaxResponseSize(request, m.cfg.MaxResponseSizeBytes)
maxResponseSizeBytes, err := m.cfg.MaxResponseSizeLimiter.Limit(ctx)
if err != nil {
return o, fmt.Errorf("failed to get response size limit: %w", err)
}
setMaxResponseSize(request, uint64(maxResponseSizeBytes))
reqpb, err := proto.Marshal(request)
if err != nil {
return o, err
Expand All @@ -517,8 +570,12 @@ func runWasm[I, O proto.Message](
}

// Limit memory to max memory megabytes per instance.
maxMemoryBytes, err := m.cfg.MemoryLimiter.Limit(ctx)
if err != nil {
return o, fmt.Errorf("failed to get memory limit: %w", err)
}
store.Limiter(
int64(m.cfg.MaxMemoryMBs)*int64(math.Pow(10, 6)),
int64(maxMemoryBytes/config.MByte)*int64(math.Pow(10, 6)),
-1, // tableElements, -1 == default
1, // instances
1, // tables
Expand Down
2 changes: 1 addition & 1 deletion pkg/workflows/wasm/host/standard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ func makeTestModuleByName(t *testing.T, testName string, cfg *ModuleConfig) *mod
if cfg == nil {
cfg = defaultNoDAGModCfg(t)
}
mod, err := NewModule(cfg, binary)
mod, err := NewModule(t.Context(), cfg, binary)
require.NoError(t, err)
return mod
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/workflows/wasm/host/wasm.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
)

func GetWorkflowSpec(ctx context.Context, modCfg *ModuleConfig, binary []byte, config []byte) (*legacySdk.WorkflowSpec, error) {
m, err := NewModule(modCfg, binary, WithDeterminism())
m, err := NewModule(ctx, modCfg, binary, WithDeterminism())
if err != nil {
return nil, fmt.Errorf("could not instantiate module: %w", err)
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/workflows/wasm/host/wasm_nodag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-protos/cre/go/sdk"

mock "github.com/stretchr/testify/mock"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)

Expand All @@ -32,7 +32,7 @@ func Test_Sleep_Timeout(t *testing.T) {
mc := defaultNoDAGModCfg(t)
timeout := 1 * time.Second
mc.Timeout = &timeout
m, err := NewModule(mc, binary)
m, err := NewModule(t.Context(), mc, binary)
require.NoError(t, err)

m.v2ImportName = "test"
Expand Down Expand Up @@ -63,7 +63,7 @@ func Test_NoDag_Run(t *testing.T) {

t.Run("NOK fails with unset ExecutionHelper for trigger", func(t *testing.T) {
mc := defaultNoDAGModCfg(t)
m, err := NewModule(mc, binary)
m, err := NewModule(t.Context(), mc, binary)
require.NoError(t, err)

m.Start()
Expand All @@ -81,7 +81,7 @@ func Test_NoDag_Run(t *testing.T) {

t.Run("OK can subscribe without setting ExecutionHelper", func(t *testing.T) {
mc := defaultNoDAGModCfg(t)
m, err := NewModule(mc, binary)
m, err := NewModule(t.Context(), mc, binary)
require.NoError(t, err)

m.Start()
Expand Down Expand Up @@ -122,7 +122,7 @@ func Test_NoDAG_LoggingWithLimits(t *testing.T) {

binary := createTestBinary(loggingLimitsBinaryCmd, loggingLimitsBinaryLocation, true, t)

m, err := NewModule(cfg, binary)
m, err := NewModule(t.Context(), cfg, binary)
require.NoError(t, err)

_, err = m.Execute(t.Context(), executeRequest, mockExecutionHelper)
Expand Down
Loading
Loading