1414# limitations under the License.
1515
1616import os
17-
1817import eventlet
1918
2019from logshipper .tail import Tail
2423
2524class FileWatchSensor (Sensor ):
2625 def __init__ (self , sensor_service , config = None ):
27- super (FileWatchSensor , self ).__init__ (
28- sensor_service = sensor_service , config = config
29- )
30- self ._trigger = None
31- self ._logger = self ._sensor_service .get_logger (__name__ )
32- self ._tail = None
26+ super (FileWatchSensor , self ).__init__ (sensor_service = sensor_service , config = config )
27+ self .log = self ._sensor_service .get_logger (__name__ )
28+ self .tail = None
29+ self .file_ref = {}
3330
3431 def setup (self ):
35- self ._tail = Tail (filenames = [])
36- self ._tail .handler = self ._handle_line
37- self ._tail .should_run = True
32+ self .tail = Tail (filenames = [])
33+ self .tail .handler = self ._handle_line
34+ self .tail .should_run = True
3835
3936 def run (self ):
40- self ._tail .run ()
37+ self .tail .run ()
4138
4239 def cleanup (self ):
43- if self ._tail :
44- self ._tail .should_run = False
40+ if self .tail :
41+ self .tail .should_run = False
4542
4643 try :
47- self ._tail .notifier .stop ()
44+ self .tail .notifier .stop ()
4845 except Exception :
49- self ._logger .exception ("Unable to stop the tail notifier" )
46+ self .log .exception ("Unable to stop the tail notifier" )
5047
5148 def add_trigger (self , trigger ):
5249 file_path = trigger ["parameters" ].get ("file_path" , None )
5350
5451 if not file_path :
55- self ._logger .error ('Received trigger type without "file_path" field.' )
52+ self .log .error ('Received trigger type without "file_path" field.' )
5653 return
5754
58- self . _trigger = trigger .get ("ref" , None )
55+ trigger = trigger .get ("ref" , None )
5956
60- if not self . _trigger :
61- raise Exception ("Trigger %s did not contain a ref." % trigger )
57+ if not trigger :
58+ raise Exception (f "Trigger { trigger } did not contain a ref." )
6259
6360 # Wait a bit to avoid initialization race in logshipper library
6461 eventlet .sleep (1.0 )
6562
66- self ._tail .add_file (filename = file_path )
67- self ._logger .info ('Added file "%s"' % (file_path ))
63+ self .tail .add_file (filename = file_path )
64+ self .file_ref [file_path ] = trigger
65+
66+ self .log .info (f"Added file '{ file_path } ' ({ trigger } ) to watch list." )
6867
6968 def update_trigger (self , trigger ):
7069 pass
@@ -73,22 +72,24 @@ def remove_trigger(self, trigger):
7372 file_path = trigger ["parameters" ].get ("file_path" , None )
7473
7574 if not file_path :
76- self ._logger .error (' Received trigger type without " file_path" field.' )
75+ self .log .error (" Received trigger type without ' file_path' field." )
7776 return
7877
79- self ._tail .remove_file (filename = file_path )
80- self ._trigger = None
78+ self .tail .remove_file (filename = file_path )
79+ self .file_ref . pop ( file_path )
8180
82- self ._logger .info (' Removed file "%s"' % ( file_path ) )
81+ self .log .info (f" Removed file ' { file_path } ' ( { trigger } ) from watch list." )
8382
8483 def _handle_line (self , file_path , line ):
85- trigger = self ._trigger
84+ if file_path not in self .file_ref :
85+ self .log .error (f"No reference found for { file_path } , unable to emit trigger!" )
86+ return
87+
88+ trigger = self .file_ref [file_path ]
8689 payload = {
8790 "file_path" : file_path ,
8891 "file_name" : os .path .basename (file_path ),
8992 "line" : line ,
9093 }
91- self ._logger .debug (
92- "Sending payload %s for trigger %s to sensor_service." , payload , trigger
93- )
94+ self .log .debug (f"Sending payload { payload } for trigger { trigger } to sensor_service." )
9495 self .sensor_service .dispatch (trigger = trigger , payload = payload )
0 commit comments