Skip to content

Commit 6a8dc6d

Browse files
committed
Refactor FileWatchSensor to use watchdog instead of our logshipper fork
1 parent 22b450e commit 6a8dc6d

1 file changed

Lines changed: 203 additions & 32 deletions

File tree

contrib/linux/sensors/file_watch_sensor.py

Lines changed: 203 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -14,38 +14,202 @@
1414
# limitations under the License.
1515

1616
import os
17+
import signal
18+
import time
19+
import sys
1720

18-
import eventlet
21+
from watchdog.observers import Observer
22+
from watchdog.events import FileSystemEventHandler
1923

20-
from logshipper.tail import Tail
24+
try:
25+
from st2reactor.sensor.base import Sensor
26+
except ImportError:
27+
Sensor = object
2128

22-
from st2reactor.sensor.base import Sensor
29+
30+
class FileEventHandler(FileSystemEventHandler):
31+
def __init__(self, *args, callbacks=None, **kwargs):
32+
self.callbacks = callbacks or {}
33+
34+
def dispatch(self, event):
35+
if not event.is_synthetic and not event.is_directory:
36+
super().dispatch(event)
37+
38+
def on_created(self, event):
39+
cb = self.callbacks.get('created')
40+
if cb:
41+
cb(event=event)
42+
43+
def on_modified(self, event):
44+
cb = self.callbacks.get('modified')
45+
if cb:
46+
cb(event=event)
47+
48+
def on_moved(self, event):
49+
cb = self.callbacks.get('moved')
50+
if cb:
51+
cb(event=event)
52+
53+
def on_deleted(self, event):
54+
cb = self.callbacks.get('deleted')
55+
if cb:
56+
cb(event=event)
57+
58+
59+
class SingleFileTail(object):
60+
def __init__(self, path, handler, read_all=False, observer=None):
61+
self.path = path
62+
self.handler = handler
63+
self.read_all = read_all
64+
self.buffer = ''
65+
self.observer = observer or Observer()
66+
67+
self.open()
68+
69+
def read(self, event=None):
70+
while True:
71+
# Buffer 1024 bytes at a time
72+
buff = os.read(self.fd, 1024)
73+
if not buff:
74+
return
75+
76+
# Possible bug? What if the 1024 cuts off in the middle of a utf8
77+
# code point?
78+
# We use errors='replace' to have Python replace the unreadable
79+
# character with an "official U+FFFD REPLACEMENT CHARACTER"
80+
# This isn't great, but it's better than the previous behavior,
81+
# which blew up on any issues.
82+
buff = buff.decode(encoding='utf8', errors='replace')
83+
84+
# An alternative is to try to read additional bytes one at a time
85+
# until we can decode the string properly
86+
# while True:
87+
# try:
88+
# buff = buff.decode(encoding='utf8')
89+
# except UnicodeDecodeError:
90+
# # Try to read another byte (this may not read anything)
91+
# b = os.read(self.fd, 1)
92+
# # If we read something
93+
# if b:
94+
# buff += b
95+
# else:
96+
# buff = buff.decode(encoding='utf8', errors='ignore')
97+
# else:
98+
# # If we could decode to UTF-8, then continue
99+
# break
100+
101+
# Append to previous buffer
102+
if self.buffer:
103+
buff = self.buffer + buff
104+
self.buffer = ''
105+
106+
lines = buff.splitlines(True)
107+
# If the last character of the last line is not a newline
108+
if lines[-1][-1] != '\n': # Incomplete line in the buffer
109+
self.buffer = lines[-1] # Save the last line fragment
110+
lines = lines[:-1]
111+
112+
for line in lines:
113+
self.handler(self.path, line[:-1])
114+
115+
def reopen(self, event=None, skip_to_end=False):
116+
# stat the file on disk
117+
file_stat = os.stat(self.path)
118+
119+
# stat the file from the existing file descriptor
120+
fd_stat = os.fstat(self.fd)
121+
# Seek right back where we thought we were
122+
pos = os.lseek(self.fd, 0, os.SEEK_CUR)
123+
124+
# If the file now on disk is larger than where we were currently reading
125+
if fd_stat.st_size > pos:
126+
# More data to read - read as normal
127+
self.read()
128+
# If the file now on disk is smaller (eg: if the file is a freshly
129+
# rotated log), or if its inode has changed
130+
if self.stat.st_size > file_stat.st_size or \
131+
self.stat.st_ino != file_stat:
132+
self.close()
133+
# Since we already read the entirety of the previous file, we don't
134+
# want to skip any of the new file's contents, so don't seek to the
135+
# end, and try to read from it immediately
136+
self.open(seek_to_end=False)
137+
self.read()
138+
139+
def open(self, seek_to_end=False):
140+
self.stat = os.stat(self.path)
141+
self.fd = os.open(self.path, os.O_RDONLY | os.O_NONBLOCK)
142+
143+
if not self.read_all or seek_to_end:
144+
os.lseek(self.fd, 0, os.SEEK_END)
145+
146+
file_event_handler = FileEventHandler(callbacks={
147+
'created': None,
148+
'modified': self.read,
149+
'moved': self.reopen,
150+
'deleted': self.reopen,
151+
})
152+
self.watch = self.observer.schedule(file_event_handler, self.path)
153+
154+
def close(self):
155+
os.close(self.fd)
156+
self.observer.unschedule(self.watch)
157+
if self.buffer:
158+
self.handler(self.path, self.buffer)
159+
160+
161+
class TailManager(object):
162+
def __init__(self, *args, **kwargs):
163+
self.observer = Observer()
164+
self.tails = {}
165+
166+
def tail_file(self, path, handler, read_all=False):
167+
if handler not in self.tails.setdefault(path, {}):
168+
sft = SingleFileTail(path, handler,
169+
read_all=read_all, observer=self.observer)
170+
self.tails[path][handler] = sft
171+
172+
def stop_tailing_file(self, path, handler):
173+
tailed_file = self.tails.get(path, {}).pop(handler)
174+
tailed_file.close()
175+
# Amortize some cleanup while we're at it
176+
if not self.tails.get(path):
177+
self.tails.pop(path)
178+
179+
def run(self):
180+
self.start()
181+
while True:
182+
time.sleep(1)
183+
184+
def start(self):
185+
self.observer.start()
186+
187+
def stop(self):
188+
for handlers in self.tails.values():
189+
for tailed_file in handlers.values():
190+
tailed_file.close()
191+
self.observer.stop()
192+
self.observer.join()
23193

24194

25195
class FileWatchSensor(Sensor):
26-
def __init__(self, sensor_service, config=None):
27-
super(FileWatchSensor, self).__init__(sensor_service=sensor_service,
28-
config=config)
29-
self._trigger = None
30-
self._logger = self._sensor_service.get_logger(__name__)
31-
self._tail = None
196+
def __init__(self, *args, **kwargs):
197+
super().__init__(*args, **kwargs)
198+
self._stop = False
199+
self.trigger = None
200+
self.logger = self.sensor_service.get_logger(__name__)
32201

33202
def setup(self):
34-
self._tail = Tail(filenames=[])
35-
self._tail.handler = self._handle_line
36-
self._tail.should_run = True
203+
self.tail_manager = TailManager()
37204

38205
def run(self):
39-
self._tail.run()
206+
self.tail_manager.run()
207+
while not self._stop:
208+
eventlet.sleep(60)
40209

41210
def cleanup(self):
42-
if self._tail:
43-
self._tail.should_run = False
44-
45-
try:
46-
self._tail.notifier.stop()
47-
except Exception:
48-
pass
211+
self._stop = True
212+
self.tail_manager.stop()
49213

50214
def add_trigger(self, trigger):
51215
file_path = trigger['parameters'].get('file_path', None)
@@ -54,16 +218,13 @@ def add_trigger(self, trigger):
54218
self._logger.error('Received trigger type without "file_path" field.')
55219
return
56220

57-
self._trigger = trigger.get('ref', None)
221+
self.trigger = trigger.get('ref', None)
58222

59223
if not self._trigger:
60224
raise Exception('Trigger %s did not contain a ref.' % trigger)
61225

62-
# Wait a bit to avoid initialization race in logshipper library
63-
eventlet.sleep(1.0)
64-
65-
self._tail.add_file(filename=file_path)
66-
self._logger.info('Added file "%s"' % (file_path))
226+
self.tail_manager.tail_file(file_path, self._handle_line)
227+
self.logger.info('Added file "%s"' % (file_path))
67228

68229
def update_trigger(self, trigger):
69230
pass
@@ -72,21 +233,31 @@ def remove_trigger(self, trigger):
72233
file_path = trigger['parameters'].get('file_path', None)
73234

74235
if not file_path:
75-
self._logger.error('Received trigger type without "file_path" field.')
236+
self.logger.error('Received trigger type without "file_path" field.')
76237
return
77238

78-
self._tail.remove_file(filename=file_path)
79-
self._trigger = None
239+
self.tail_manager.stop_tailing_file(file_path, self._handle_line)
240+
self.trigger = None
80241

81-
self._logger.info('Removed file "%s"' % (file_path))
242+
self.logger.info('Removed file "%s"' % (file_path))
82243

83244
def _handle_line(self, file_path, line):
84-
trigger = self._trigger
85245
payload = {
86246
'file_path': file_path,
87247
'file_name': os.path.basename(file_path),
88248
'line': line
89249
}
90250
self._logger.debug('Sending payload %s for trigger %s to sensor_service.',
91-
payload, trigger)
251+
payload, self.trigger)
92252
self.sensor_service.dispatch(trigger=trigger, payload=payload)
253+
254+
255+
if __name__ == '__main__':
256+
tm = TailManager()
257+
tm.tail_file('test.py', handler=print)
258+
tm.run()
259+
260+
def halt(sig, frame):
261+
tm.stop()
262+
sys.exit(0)
263+
signal.signal(signal.SIGINT, halt)

0 commit comments

Comments
 (0)