Skip to content

Commit 258e7c8

Browse files
Managed execution frame with async call foundations (#1316)
* Managed execution frame with async call support * Pare the ExecutionFrame down to just capturing current functionality * Update the lifecycle expectations for AsyncOp
1 parent 14f6746 commit 258e7c8

4 files changed

Lines changed: 639 additions & 6 deletions

File tree

common/functions/functions.go

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,11 @@
1515
// Package functions defines the standard builtin functions supported by the interpreter
1616
package functions
1717

18-
import "github.com/google/cel-go/common/types/ref"
18+
import (
19+
"context"
20+
21+
"github.com/google/cel-go/common/types/ref"
22+
)
1923

2024
// Overload defines a named overload of a function, indicating an operand trait
2125
// which must be present on the first argument to the overload as well as one
@@ -41,21 +45,37 @@ type Overload struct {
4145
// Binary defines the overload with a BinaryOp implementation. May be nil.
4246
Binary BinaryOp
4347

44-
// Function defines the overload with a FunctionOp implementation. May be
45-
// nil.
48+
// Function defines the overload with a FunctionOp implementation. May be nil.
4649
Function FunctionOp
4750

51+
// Async defines the overload with an AsyncOp implementation. May be nil.
52+
Async AsyncOp
53+
4854
// NonStrict specifies whether the Overload will tolerate arguments that
4955
// are types.Err or types.Unknown.
5056
NonStrict bool
5157
}
5258

5359
// UnaryOp is a function that takes a single value and produces an output.
54-
type UnaryOp func(value ref.Val) ref.Val
60+
type UnaryOp func(ref.Val) ref.Val
5561

5662
// BinaryOp is a function that takes two values and produces an output.
57-
type BinaryOp func(lhs ref.Val, rhs ref.Val) ref.Val
63+
type BinaryOp func(ref.Val, ref.Val) ref.Val
5864

5965
// FunctionOp is a function with accepts zero or more arguments and produces
6066
// a value or error as a result.
61-
type FunctionOp func(values ...ref.Val) ref.Val
67+
type FunctionOp func(...ref.Val) ref.Val
68+
69+
// AsyncOp is a function that accepts zero or more arguments and produces
70+
// a value or error asynchronously via a channel.
71+
//
72+
// AsyncOp is an internal interface intended for use by CEL to manage goroutines and
73+
// channels associated with async calls. For public API usage, use BlockingAsyncOp.
74+
// Implementers should listen for context cancellation on the provided context for
75+
// resource cleanup.
76+
type AsyncOp func(context.Context, ...ref.Val) <-chan ref.Val
77+
78+
// BlockingAsyncOp is a function that accepts zero or more arguments and blocks until
79+
// the result is available. When used with AsyncBinding, the framework runs the function
80+
// in its own goroutine and manages channel lifecycle internally.
81+
type BlockingAsyncOp func(context.Context, ...ref.Val) ref.Val

common/types/unknown.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,20 @@ func (u *Unknown) GetAttributeTrails(id int64) ([]*AttributeTrail, bool) {
181181
return trails, found
182182
}
183183

184+
// HasUnknownFunction returns whether any of the attribute trails contained within the unknown
185+
// are unspecified. Unspecified attributes typically indicate an unresolved function call
186+
// or operation, rather than a missing variable.
187+
func (u *Unknown) HasUnknownFunction() bool {
188+
for _, trails := range u.attributeTrails {
189+
for _, t := range trails {
190+
if t.variable == "" {
191+
return true
192+
}
193+
}
194+
}
195+
return false
196+
}
197+
184198
// Contains returns true if the input unknown is a subset of the current unknown.
185199
func (u *Unknown) Contains(other *Unknown) bool {
186200
for id, otherTrails := range other.attributeTrails {

interpreter/frame.go

Lines changed: 221 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,221 @@
1+
// Copyright 2026 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package interpreter
16+
17+
import (
18+
"context"
19+
"sync"
20+
"sync/atomic"
21+
)
22+
23+
// evalContext contains the stateful information needed for a single evaluation.
24+
//
25+
// This state is shared across all frames within a single evaluation, including
26+
// child frames created for comprehension blocks.
27+
type evalContext struct {
28+
// interrupt exposes a callback channel for cancellation.
29+
interrupt <-chan struct{}
30+
31+
// interruptCheckCount is the number of times the interrupt channel has been checked.
32+
interruptCheckCount atomic.Uint64
33+
34+
// interruptCheckFrequency is the frequency at which the interrupt channel is checked.
35+
interruptCheckFrequency uint
36+
37+
// interrupted indicates whether the evaluation has been interrupted.
38+
interrupted atomic.Bool
39+
40+
// state provides the context for tracking the evaluation state.
41+
state EvalState
42+
43+
// costs provides the context for tracking the evaluation costs.
44+
costs *CostTracker
45+
46+
// ctx is the context for async call implementations to use.
47+
ctx context.Context
48+
49+
// cancel cancels the context when the evaluation is finished.
50+
cancel context.CancelFunc
51+
}
52+
53+
// ExecutionFrame provides the context for a single evaluation of an expression.
54+
//
55+
// The execution frame must not be stored in any fashion as its lifecycle is completely
56+
// controlled by the CEL evaluation process.
57+
type ExecutionFrame struct {
58+
// Activation provides the context for resolving variables by name.
59+
Activation
60+
61+
// parent provides the context for parent scopes (used for comprehension iterators).
62+
parent *ExecutionFrame
63+
64+
// ctx provides the shared evaluation state across frames.
65+
ctx *evalContext
66+
}
67+
68+
// NewExecutionFrame creates a new execution frame from the pool.
69+
func NewExecutionFrame(vars Activation) *ExecutionFrame {
70+
f := frameStack.Get().(*ExecutionFrame)
71+
f.Activation = vars
72+
return f
73+
}
74+
75+
// SetContext sets the context for the execution frame.
76+
func (f *ExecutionFrame) SetContext(ctx context.Context, interruptCheckFrequency uint) {
77+
if f.ctx == nil {
78+
f.ctx = evalContextPool.Get().(*evalContext)
79+
}
80+
f.ctx.ctx, f.ctx.cancel = context.WithCancel(ctx)
81+
f.ctx.interrupt = ctx.Done()
82+
f.ctx.interruptCheckFrequency = interruptCheckFrequency
83+
f.ctx.interruptCheckCount.Store(0)
84+
f.ctx.interrupted.Store(false)
85+
}
86+
87+
// Close releases the resources held by the execution frame and returns it to the pool.
88+
func (f *ExecutionFrame) Close() {
89+
if f.ctx != nil {
90+
if f.ctx.cancel != nil {
91+
f.ctx.cancel()
92+
f.ctx.cancel = nil
93+
}
94+
f.ctx.ctx = nil
95+
f.ctx.interrupt = nil
96+
f.ctx.state = nil
97+
f.ctx.costs = nil
98+
f.ctx.interrupted.Store(false)
99+
f.ctx.interruptCheckCount.Store(0)
100+
f.ctx.interruptCheckFrequency = 0
101+
evalContextPool.Put(f.ctx)
102+
f.ctx = nil
103+
}
104+
f.parent = nil
105+
activationStack.release(f.Activation)
106+
f.Activation = nil
107+
frameStack.Put(f)
108+
}
109+
110+
// push pushes the given activation onto the activation stack and returns the new frame.
111+
//
112+
// This operation is internal to the interpreter and is used to handle comprehension
113+
// scoping. The child frame inherits the shared evalContext from the parent.
114+
func (f *ExecutionFrame) push(activation Activation) *ExecutionFrame {
115+
child := frameStack.Get().(*ExecutionFrame)
116+
child.parent = f
117+
child.ctx = f.ctx
118+
child.Activation = activationStack.create(f.Activation, activation)
119+
return child
120+
}
121+
122+
// pop returns the parent frame, releasing the current frame back to the pool.
123+
func (f *ExecutionFrame) pop() *ExecutionFrame {
124+
parent := f.parent
125+
activationStack.release(f.Activation)
126+
f.Activation = nil
127+
f.parent = nil
128+
f.ctx = nil
129+
frameStack.Put(f)
130+
return parent
131+
}
132+
133+
// ResolveName implements the Activation interface by proxying to the internal activation.
134+
func (f *ExecutionFrame) ResolveName(name string) (any, bool) {
135+
return f.Activation.ResolveName(name)
136+
}
137+
138+
// Parent implements the Activation interface by proxying to the internal activation.
139+
func (f *ExecutionFrame) Parent() Activation {
140+
return f.Activation.Parent()
141+
}
142+
143+
// AsPartialActivation implements the PartialActivation interface by proxying to the internal activation.
144+
func (f *ExecutionFrame) AsPartialActivation() (PartialActivation, bool) {
145+
return AsPartialActivation(f.Activation)
146+
}
147+
148+
// Unwrap returns the internal activation.
149+
func (f *ExecutionFrame) Unwrap() Activation {
150+
return f.Activation
151+
}
152+
153+
// CheckInterrupt returns whether the evaluation has been interrupted.
154+
func (f *ExecutionFrame) CheckInterrupt() bool {
155+
if f.ctx == nil {
156+
return false
157+
}
158+
if f.ctx.interrupted.Load() {
159+
return true
160+
}
161+
count := f.ctx.interruptCheckCount.Add(1)
162+
if f.ctx.interruptCheckFrequency > 0 && count%uint64(f.ctx.interruptCheckFrequency) == 0 {
163+
select {
164+
case <-f.ctx.interrupt:
165+
f.ctx.interrupted.Store(true)
166+
return true
167+
default:
168+
return false
169+
}
170+
}
171+
return false
172+
}
173+
174+
// frameStack provides a synchronized pool of ExecutionFrames.
175+
var frameStack = &sync.Pool{
176+
New: func() any {
177+
return &ExecutionFrame{}
178+
},
179+
}
180+
181+
// evalContextPool provides a synchronized pool of evalContexts.
182+
var evalContextPool = &sync.Pool{
183+
New: func() any {
184+
return &evalContext{}
185+
},
186+
}
187+
188+
type activationStackPool struct {
189+
sync.Pool
190+
}
191+
192+
func (pool *activationStackPool) create(parent, child Activation) Activation {
193+
h := pool.Get().(*hierarchicalActivation)
194+
h.child = child
195+
h.parent = parent
196+
return h
197+
}
198+
199+
func (pool *activationStackPool) release(activation Activation) {
200+
h, ok := activation.(*hierarchicalActivation)
201+
if !ok {
202+
return
203+
}
204+
h.parent = nil
205+
h.child = nil
206+
pool.Pool.Put(h)
207+
}
208+
209+
func newActivationPool() *activationStackPool {
210+
return &activationStackPool{
211+
Pool: sync.Pool{
212+
New: func() any {
213+
return &hierarchicalActivation{}
214+
},
215+
},
216+
}
217+
}
218+
219+
var (
220+
activationStack = newActivationPool()
221+
)

0 commit comments

Comments
 (0)