Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions pkg/filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,10 @@ type Filter interface {
ApplyFilter(filterStr string, data map[string]any) ([]byte, error)
FilterInfo() string
}

// CompiledFilter is a pre-compiled filter ready to be applied to data.
// Compile the filter expression once and reuse it across many Apply calls
// to avoid repeated parse/compile overhead.
type CompiledFilter interface {
Apply(data map[string]any) ([]byte, error)
}
94 changes: 74 additions & 20 deletions pkg/filter/jq/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ import (
"github.com/flant/shell-operator/pkg/filter"
)

var _ filter.Filter = (*Filter)(nil)
var (
_ filter.Filter = (*Filter)(nil)
_ filter.CompiledFilter = (*CompiledJqFilter)(nil)
)

func NewFilter() *Filter {
return &Filter{}
Expand All @@ -35,6 +38,76 @@ func (f *Filter) ApplyFilter(jqFilter string, data map[string]any) ([]byte, erro
}

iter := query.Run(workData)
return collectResults(iter)
}

func (f *Filter) FilterInfo() string {
return "jqFilter implementation: using itchyny/gojq"
}

// CompiledJqFilter holds a pre-compiled gojq program. Compile once and reuse
// across many Apply calls to eliminate repeated parse+compile overhead.
type CompiledJqFilter struct {
code *gojq.Code
originalStr string
}

// Compile parses and compiles jqFilter once. The returned *CompiledJqFilter is
// safe for concurrent use and can be reused for every event that carries the
// same filter expression.
func Compile(jqFilter string) (*CompiledJqFilter, error) {
query, err := gojq.Parse(jqFilter)
if err != nil {
return nil, err
}

code, err := gojq.Compile(query)
if err != nil {
return nil, err
}

return &CompiledJqFilter{code: code, originalStr: jqFilter}, nil
}

// Apply executes the pre-compiled jq program against data.
func (c *CompiledJqFilter) Apply(data map[string]any) ([]byte, error) {
var workData any
var err error
if data == nil {
workData = nil
} else {
workData, err = deepCopyAny(data)
if err != nil {
return nil, err
}
}

iter := c.code.Run(workData)
return collectResults(iter)
}

// String returns the original jq filter expression for diagnostics.
func (c *CompiledJqFilter) String() string {
return c.originalStr
}

func deepCopyAny(input any) (any, error) {
if input == nil {
return nil, nil
}
data, err := json.Marshal(input)
if err != nil {
return nil, err
}
var output any
if err := json.Unmarshal(data, &output); err != nil {
return nil, err
}
return output, nil
}

// collectResults drains a gojq iterator and serialises the results to JSON.
func collectResults(iter gojq.Iter) ([]byte, error) {
result := make([]any, 0)
for {
v, ok := iter.Next()
Expand All @@ -60,22 +133,3 @@ func (f *Filter) ApplyFilter(jqFilter string, data map[string]any) ([]byte, erro
return json.Marshal(result)
}
}

func (f *Filter) FilterInfo() string {
return "jqFilter implementation: using itchyny/gojq"
}

func deepCopyAny(input any) (any, error) {
if input == nil {
return nil, nil
}
data, err := json.Marshal(input)
if err != nil {
return nil, err
}
var output any
if err := json.Unmarshal(data, &output); err != nil {
return nil, err
}
return output, nil
}
177 changes: 177 additions & 0 deletions pkg/filter/jq/apply_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,3 +200,180 @@ func Test_deepCopyAny(t *testing.T) {
g.Expect(err).ShouldNot(BeNil())
g.Expect(copyInvalid).Should(BeNil())
}

// ---- Compile / CompiledJqFilter tests ----

func Test_Compile_ValidExpression(t *testing.T) {
g := NewWithT(t)

cf, err := Compile(`.metadata.name`)
g.Expect(err).Should(BeNil())
g.Expect(cf).ShouldNot(BeNil())
g.Expect(cf.String()).Should(Equal(`.metadata.name`))
}

func Test_Compile_InvalidExpression(t *testing.T) {
g := NewWithT(t)

cf, err := Compile(`this is not jq`)
g.Expect(err).ShouldNot(BeNil())
g.Expect(cf).Should(BeNil())
}

func Test_Compile_EmptyExpression(t *testing.T) {
g := NewWithT(t)

// Empty string is not valid jq.
cf, err := Compile(``)
g.Expect(err).ShouldNot(BeNil())
g.Expect(cf).Should(BeNil())
}

func Test_CompiledJqFilter_Apply_SingleDocumentModification(t *testing.T) {
g := NewWithT(t)

cf, err := Compile(`. + {"status": "active"}`)
g.Expect(err).Should(BeNil())

result, err := cf.Apply(map[string]any{"name": "Alice", "age": 25})
g.Expect(err).Should(BeNil())

var got any
g.Expect(json.Unmarshal(result, &got)).Should(BeNil())
g.Expect(got).Should(Equal(map[string]any{"name": "Alice", "age": float64(25), "status": "active"}))
}

func Test_CompiledJqFilter_Apply_ExtractField(t *testing.T) {
g := NewWithT(t)

cf, err := Compile(`.metadata.labels`)
g.Expect(err).Should(BeNil())

input := map[string]any{
"metadata": map[string]any{
"labels": map[string]any{"app": "foo", "env": "prod"},
},
}
result, err := cf.Apply(input)
g.Expect(err).Should(BeNil())

var got any
g.Expect(json.Unmarshal(result, &got)).Should(BeNil())
g.Expect(got).Should(Equal(map[string]any{"app": "foo", "env": "prod"}))
}

func Test_CompiledJqFilter_Apply_MultipleResults(t *testing.T) {
g := NewWithT(t)

cf, err := Compile(`.users[] | .name`)
g.Expect(err).Should(BeNil())

input := map[string]any{
"users": []any{
map[string]any{"name": "Alice"},
map[string]any{"name": "Bob"},
},
}
result, err := cf.Apply(input)
g.Expect(err).Should(BeNil())

var got []any
g.Expect(json.Unmarshal(result, &got)).Should(BeNil())
g.Expect(got).Should(ConsistOf("Alice", "Bob"))
}

func Test_CompiledJqFilter_Apply_NullResult(t *testing.T) {
g := NewWithT(t)

cf, err := Compile(`.nonexistent`)
g.Expect(err).Should(BeNil())

result, err := cf.Apply(map[string]any{"name": "Alice"})
g.Expect(err).Should(BeNil())
g.Expect(result).Should(Equal([]byte("null")))
}

func Test_CompiledJqFilter_Apply_NilInput(t *testing.T) {
g := NewWithT(t)

cf, err := Compile(`.`)
g.Expect(err).Should(BeNil())

result, err := cf.Apply(nil)
g.Expect(err).Should(BeNil())
g.Expect(result).ShouldNot(BeNil())
}

func Test_CompiledJqFilter_Apply_RuntimeError(t *testing.T) {
g := NewWithT(t)

// .foo on a non-object (null) causes a runtime jq error.
cf, err := Compile(`.foo`)
g.Expect(err).Should(BeNil())

// Passing nil as input means workData == nil; trying .foo on null returns null.
result, err := cf.Apply(nil)
g.Expect(err).Should(BeNil())
g.Expect(result).Should(Equal([]byte("null")))
}

func Test_CompiledJqFilter_Apply_UnmarshalableInput(t *testing.T) {
g := NewWithT(t)

cf, err := Compile(`.`)
g.Expect(err).Should(BeNil())

_, err = cf.Apply(map[string]any{"ch": make(chan int)})
g.Expect(err).ShouldNot(BeNil())
}

// Test_CompiledJqFilter_Reuse verifies that the same compiled filter can be
// applied to different inputs and produces correct independent results.
func Test_CompiledJqFilter_Reuse(t *testing.T) {
g := NewWithT(t)

cf, err := Compile(`.spec.replicas`)
g.Expect(err).Should(BeNil())

inputs := []map[string]any{
{"spec": map[string]any{"replicas": float64(1)}},
{"spec": map[string]any{"replicas": float64(3)}},
{"spec": map[string]any{"replicas": float64(5)}},
}
expected := []float64{1, 3, 5}

for i, input := range inputs {
result, err := cf.Apply(input)
g.Expect(err).Should(BeNil(), "input index %d", i)

var got float64
g.Expect(json.Unmarshal(result, &got)).Should(BeNil(), "input index %d", i)
g.Expect(got).Should(Equal(expected[i]), "input index %d", i)
}
}

// Test_Compile_ProducesIdenticalResultsToApplyFilter verifies that the
// compiled path and the interpreted path yield identical output.
func Test_Compile_ProducesIdenticalResultsToApplyFilter(t *testing.T) {
g := NewWithT(t)

filterStr := `.metadata | {name, namespace}`
input := map[string]any{
"metadata": map[string]any{
"name": "my-pod",
"namespace": "default",
"labels": map[string]any{"app": "foo"},
},
}

interpreted := NewFilter()
resultInterpreted, err := interpreted.ApplyFilter(filterStr, input)
g.Expect(err).Should(BeNil())

cf, err := Compile(filterStr)
g.Expect(err).Should(BeNil())
resultCompiled, err := cf.Apply(input)
g.Expect(err).Should(BeNil())

g.Expect(resultCompiled).Should(Equal(resultInterpreted))
}
4 changes: 3 additions & 1 deletion pkg/hook/config/config_v0.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,9 @@ func (cv0 *HookConfigV0) ConvertAndCheck(c *HookConfig) error {
})
}
monitor.WithLabelSelector(kubeCfg.Selector)
monitor.JqFilter = kubeCfg.JqFilter
if err := monitor.WithJqFilter(kubeCfg.JqFilter); err != nil {
return fmt.Errorf("kubernetes config [%s] jqFilter: %w", kubeCfg.Name, err)
}

kubeConfig := htypes.OnKubernetesEventConfig{}
kubeConfig.Monitor = monitor
Expand Down
4 changes: 3 additions & 1 deletion pkg/hook/config/config_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,9 @@ func (cv1 *HookConfigV1) ConvertAndCheck(c *HookConfig) error {
monitor.WithFieldSelector((*kemtypes.FieldSelector)(kubeCfg.FieldSelector))
monitor.WithNamespaceSelector((*kemtypes.NamespaceSelector)(kubeCfg.Namespace))
monitor.WithLabelSelector(kubeCfg.LabelSelector)
monitor.JqFilter = kubeCfg.JqFilter
if err := monitor.WithJqFilter(kubeCfg.JqFilter); err != nil {
return fmt.Errorf("invalid kubernetes config [%d] jqFilter: %w", i, err)
}
// executeHookOnEvent is a priority
if kubeCfg.ExecuteHookOnEvents != nil {
monitor.WithEventTypes(kubeCfg.ExecuteHookOnEvents)
Expand Down
18 changes: 9 additions & 9 deletions pkg/kube_events_manager/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,18 @@ import (
utils_checksum "github.com/flant/shell-operator/pkg/utils/checksum"
)

// applyFilter filters object json representation with jq expression, calculate checksum
// over result and return ObjectAndFilterResult. If jqFilter is empty, no filter
// is required and checksum is calculated over full json representation of the object.
func applyFilter(jqFilter string, fl filter.Filter, filterFn func(obj *unstructured.Unstructured) (result interface{}, err error), obj *unstructured.Unstructured) (*kemtypes.ObjectAndFilterResult, error) {
// applyFilter filters object json representation with a pre-compiled jq expression,
// calculates checksum over the result and returns ObjectAndFilterResult.
// If compiledFilter is nil, no jq filtering is applied and checksum is calculated
// over full json representation of the object.
// jqFilterStr is stored in result metadata for informational purposes only.
func applyFilter(compiledFilter filter.CompiledFilter, jqFilterStr string, filterFn func(obj *unstructured.Unstructured) (result interface{}, err error), obj *unstructured.Unstructured) (*kemtypes.ObjectAndFilterResult, error) {
defer trace.StartRegion(context.Background(), "ApplyJqFilter").End()

res := &kemtypes.ObjectAndFilterResult{
Object: obj,
}
res.Metadata.JqFilter = jqFilter
res.Metadata.JqFilter = jqFilterStr
res.Metadata.ResourceId = resourceId(obj)

// If filterFn is passed, run it and return result.
Expand All @@ -46,16 +48,14 @@ func applyFilter(jqFilter string, fl filter.Filter, filterFn func(obj *unstructu
}

// Render obj to JSON text to apply jq filter.
if jqFilter == "" {
if compiledFilter == nil {
data, err := json.Marshal(obj)
if err != nil {
return nil, err
}
res.Metadata.Checksum = utils_checksum.CalculateChecksum(string(data))
} else {
var err error
var filtered []byte
filtered, err = fl.ApplyFilter(jqFilter, obj.UnstructuredContent())
filtered, err := compiledFilter.Apply(obj.UnstructuredContent())
if err != nil {
return nil, fmt.Errorf("jqFilter: %v", err)
}
Expand Down
Loading
Loading