Skip to content

Commit f255c14

Browse files
committed
Use file based watch for ansible-runner-http alternative
Signed-off-by: chiragkyal <ckyal@redhat.com>
1 parent fd1eecd commit f255c14

5 files changed

Lines changed: 343 additions & 10 deletions

File tree

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ module github.com/operator-framework/ansible-operator-plugins
33
go 1.23.6
44

55
require (
6+
github.com/fsnotify/fsnotify v1.8.0
67
github.com/go-logr/logr v1.4.2
78
github.com/kr/text v0.2.0
89
github.com/maxbrunsfeld/counterfeiter/v6 v6.11.2
@@ -42,7 +43,6 @@ require (
4243
github.com/emicklei/go-restful/v3 v3.11.2 // indirect
4344
github.com/evanphx/json-patch/v5 v5.9.11 // indirect
4445
github.com/felixge/httpsnoop v1.0.4 // indirect
45-
github.com/fsnotify/fsnotify v1.8.0 // indirect
4646
github.com/fxamacker/cbor/v2 v2.7.0 // indirect
4747
github.com/go-logr/stdr v1.2.2 // indirect
4848
github.com/go-logr/zapr v1.3.0 // indirect
Lines changed: 223 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,223 @@
1+
// Copyright 2018 The Operator-SDK Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package eventapi
16+
17+
import (
18+
"context"
19+
"encoding/json"
20+
"fmt"
21+
"os"
22+
"path/filepath"
23+
"sync"
24+
"time"
25+
26+
"github.com/fsnotify/fsnotify"
27+
"github.com/go-logr/logr"
28+
logf "sigs.k8s.io/controller-runtime/pkg/log"
29+
)
30+
31+
// FileEventReceiver watches ansible-runner artifact files for events
32+
type FileEventReceiver struct {
33+
// Events is the channel used to send JobEvents back to the runner
34+
Events chan JobEvent
35+
36+
// ArtifactPath is the path where ansible-runner writes artifact files
37+
ArtifactPath string
38+
39+
// stopped indicates if this receiver has permanently stopped receiving events
40+
stopped bool
41+
42+
// mutex controls access to the "stopped" bool
43+
mutex sync.RWMutex
44+
45+
// ident is the unique identifier for a particular run of ansible-runner
46+
ident string
47+
48+
// logger holds a logger that has some fields already set
49+
logger logr.Logger
50+
51+
// errChan is a channel for errors
52+
errChan chan<- error
53+
54+
// ctx is the context for cancellation
55+
ctx context.Context
56+
cancelFunc context.CancelFunc
57+
58+
// processedFiles keeps track of which files have been processed
59+
processedFiles map[string]bool
60+
processMutex sync.Mutex
61+
62+
// wg tracks goroutines for clean shutdown
63+
wg sync.WaitGroup
64+
}
65+
66+
// NewFileEventReceiver creates a new file-based event receiver
67+
func NewFileEventReceiver(ident string, artifactPath string, errChan chan<- error) (*FileEventReceiver, error) {
68+
ctx, cancel := context.WithCancel(context.Background())
69+
70+
receiver := &FileEventReceiver{
71+
Events: make(chan JobEvent, 1000),
72+
ArtifactPath: artifactPath,
73+
ident: ident,
74+
logger: logf.Log.WithName("fileapi").WithValues("job", ident),
75+
errChan: errChan,
76+
ctx: ctx,
77+
cancelFunc: cancel,
78+
processedFiles: make(map[string]bool),
79+
}
80+
81+
// Start watching for file changes
82+
receiver.wg.Add(1)
83+
go receiver.watchJobEvents()
84+
85+
return receiver, nil
86+
}
87+
88+
// watchJobEvents monitors the job_events directory for new files
89+
func (f *FileEventReceiver) watchJobEvents() {
90+
defer f.wg.Done()
91+
defer close(f.Events)
92+
93+
// Watch the job_events directory
94+
// Ansible-runner writes artifacts to {inputDir}/artifacts/{ident}/job_events
95+
jobEventsDir := filepath.Join(f.ArtifactPath, "artifacts", f.ident, "job_events")
96+
97+
// Ensure directory exists before watching
98+
if err := os.MkdirAll(jobEventsDir, 0755); err != nil {
99+
f.errChan <- fmt.Errorf("failed to create job_events directory: %v", err)
100+
return
101+
}
102+
103+
f.logger.Info("Starting file-based event receiver", "jobEventsDir", jobEventsDir)
104+
105+
// Watch for new files
106+
watcher, err := fsnotify.NewWatcher()
107+
if err != nil {
108+
f.errChan <- fmt.Errorf("failed to create file watcher: %v", err)
109+
return
110+
}
111+
defer watcher.Close()
112+
113+
if err := watcher.Add(jobEventsDir); err != nil {
114+
f.errChan <- fmt.Errorf("failed to watch job_events directory: %v", err)
115+
return
116+
}
117+
118+
for {
119+
select {
120+
case event, ok := <-watcher.Events:
121+
if !ok {
122+
return
123+
}
124+
// Process CREATE and WRITE events for JSON files
125+
if (event.Op&fsnotify.Create == fsnotify.Create ||
126+
event.Op&fsnotify.Write == fsnotify.Write) &&
127+
filepath.Ext(event.Name) == ".json" {
128+
time.Sleep(100 * time.Millisecond) // Brief delay to ensure file is fully written
129+
f.processEventFile(event.Name)
130+
}
131+
case err, ok := <-watcher.Errors:
132+
if !ok {
133+
return
134+
}
135+
f.errChan <- fmt.Errorf("file watcher error: %v", err)
136+
case <-f.ctx.Done():
137+
f.logger.V(1).Info("Context cancelled")
138+
return
139+
}
140+
}
141+
}
142+
143+
// processEventFile reads and parses a single event file
144+
func (r *FileEventReceiver) processEventFile(filename string) bool {
145+
// Check if already processed
146+
r.processMutex.Lock()
147+
if r.processedFiles[filename] {
148+
r.processMutex.Unlock()
149+
r.logger.Info("Already processed file", "file", filename)
150+
return true
151+
}
152+
r.processMutex.Unlock()
153+
154+
// Add small delay to ensure file is fully written
155+
time.Sleep(50 * time.Millisecond)
156+
157+
file, err := os.Open(filename)
158+
if err != nil {
159+
r.logger.V(2).Info("Could not open event file (may not be ready)", "file", filename)
160+
return false
161+
}
162+
defer file.Close()
163+
164+
// Parse JSON event from file
165+
var event JobEvent
166+
decoder := json.NewDecoder(file)
167+
if err := decoder.Decode(&event); err != nil {
168+
// Skip files that aren't valid JSON (might be partial writes)
169+
r.logger.V(2).Info("Could not parse event file (may be incomplete)", "file", filename, "error", err)
170+
return false
171+
}
172+
173+
// Mark as processed
174+
r.processMutex.Lock()
175+
r.processedFiles[filename] = true
176+
r.processMutex.Unlock()
177+
178+
// Check if receiver is stopped
179+
r.mutex.RLock()
180+
stopped := r.stopped
181+
r.mutex.RUnlock()
182+
183+
if stopped {
184+
r.logger.V(1).Info("Receiver stopped, dropping event", "event", event.Event)
185+
return false
186+
}
187+
188+
// Send event to channel with timeout
189+
timeout := time.NewTimer(10 * time.Second)
190+
defer timeout.Stop()
191+
192+
select {
193+
case r.Events <- event:
194+
r.logger.V(2).Info("Processed event", "event", event.Event, "uuid", event.UUID)
195+
if event.Event == EventPlaybookOnStats {
196+
r.logger.Info("Successfully processed playbook_on_stats event")
197+
}
198+
return true
199+
case <-timeout.C:
200+
r.logger.Info("Timed out writing event to channel")
201+
return true
202+
case <-r.ctx.Done():
203+
r.logger.V(1).Info("Context cancelled while writing event")
204+
return false
205+
}
206+
}
207+
208+
// Close ensures that appropriate resources are cleaned up
209+
func (r *FileEventReceiver) Close() {
210+
r.mutex.Lock()
211+
r.stopped = true
212+
r.mutex.Unlock()
213+
214+
// Cancel context to signal goroutines to stop
215+
if r.cancelFunc != nil {
216+
r.cancelFunc()
217+
}
218+
219+
// Wait for all goroutines to finish
220+
r.wg.Wait()
221+
222+
r.logger.V(1).Info("File Event API stopped")
223+
}

0 commit comments

Comments
 (0)