@@ -90,7 +90,7 @@ func (f *FileEventReceiver) watchJobEvents() {
9090 defer f .wg .Done ()
9191 defer close (f .Events )
9292
93- // Watch the job_events directory, not individual files
93+ // Watch the job_events directory
9494 // Ansible-runner writes artifacts to {inputDir}/artifacts/{ident}/job_events
9595 jobEventsDir := filepath .Join (f .ArtifactPath , "artifacts" , f .ident , "job_events" )
9696
@@ -102,9 +102,6 @@ func (f *FileEventReceiver) watchJobEvents() {
102102
103103 f .logger .Info ("Starting file-based event receiver" , "jobEventsDir" , jobEventsDir )
104104
105- // Process any existing files first
106- // f.processExistingFiles(jobEventsDir)
107-
108105 // Watch for new files
109106 watcher , err := fsnotify .NewWatcher ()
110107 if err != nil {
@@ -118,10 +115,6 @@ func (f *FileEventReceiver) watchJobEvents() {
118115 return
119116 }
120117
121- // // Ticker to periodically check for new files (in case we miss file events)
122- // ticker := time.NewTicker(1 * time.Second)
123- // defer ticker.Stop()
124-
125118 for {
126119 select {
127120 case event , ok := <- watcher .Events :
@@ -140,63 +133,20 @@ func (f *FileEventReceiver) watchJobEvents() {
140133 return
141134 }
142135 f .errChan <- fmt .Errorf ("file watcher error: %v" , err )
143- // case <-ticker.C:
144- // // Periodically scan for any missed files
145- // f.processExistingFiles(jobEventsDir)
146136 case <- f .ctx .Done ():
147- // When context is cancelled, do a final scan for any remaining files
148- f .logger .V (1 ).Info ("Context cancelled, doing final scan for events" )
149- // f.processExistingFiles(jobEventsDir)
137+ f .logger .V (1 ).Info ("Context cancelled" )
150138 return
151139 }
152140 }
153141}
154142
155- // processExistingFiles processes any existing event files in the directory
156- // func (f *FileEventReceiver) processExistingFiles(jobEventsDir string) {
157- // files, err := os.ReadDir(jobEventsDir)
158- // if err != nil {
159- // f.logger.Info("Could not read job_events directory", "dir", jobEventsDir, "error", err)
160- // return
161- // }
162-
163- // f.logger.Info("Scanning for existing event files", "dir", jobEventsDir, "fileCount", len(files))
164-
165- // // Sort files by name to process in order (ansible-runner prefixes with sequence number)
166- // var jsonFiles []string
167- // for _, file := range files {
168- // if !file.IsDir() && filepath.Ext(file.Name()) == ".json" {
169- // fullPath := filepath.Join(jobEventsDir, file.Name())
170-
171- // f.processMutex.Lock()
172- // alreadyProcessed := f.processedFiles[fullPath]
173- // f.processMutex.Unlock()
174-
175- // if !alreadyProcessed {
176- // jsonFiles = append(jsonFiles, fullPath)
177- // }
178- // }
179- // }
180-
181- // f.logger.Info("Found event files to process", "count", len(jsonFiles))
182-
183- // // Sort files to process in order
184- // sort.Strings(jsonFiles)
185-
186- // // Process in order
187- // for _, filePath := range jsonFiles {
188- // f.logger.V(2).Info("Processing event file", "file", filePath)
189- // f.processEventFile(filePath)
190- // }
191- // }
192-
193143// processEventFile reads and parses a single event file
194144func (r * FileEventReceiver ) processEventFile (filename string ) bool {
195145 // Check if already processed
196146 r .processMutex .Lock ()
197147 if r .processedFiles [filename ] {
198148 r .processMutex .Unlock ()
199- r .logger .Info ("already processed file" , "file" , filename )
149+ r .logger .Info ("Already processed file" , "file" , filename )
200150 return true
201151 }
202152 r .processMutex .Unlock ()
@@ -248,7 +198,7 @@ func (r *FileEventReceiver) processEventFile(filename string) bool {
248198 return true
249199 case <- timeout .C :
250200 r .logger .Info ("Timed out writing event to channel" )
251- return true // Consider it processed to avoid retry loops
201+ return true
252202 case <- r .ctx .Done ():
253203 r .logger .V (1 ).Info ("Context cancelled while writing event" )
254204 return false
0 commit comments