-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
42 lines (31 loc) · 1.06 KB
/
main.py
File metadata and controls
42 lines (31 loc) · 1.06 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import logging
from dependency_injector.wiring import inject, Provide
from watchdog.observers import Observer
from app.services.detect_event_file import DetectEventFile
from app.tasks import TaskSend
from containers import Container
@inject
def main(
path: str = Provide[Container.path],
file_types: list = Provide[Container.file_types],
event_handler: DetectEventFile = Provide[Container.event_handler_service],
observer: Observer = Provide[Container.observer_service]
):
# Initialize observer
observer.schedule(event_handler, path, recursive=True)
observer.start()
logging.info(f"Monitoring the directory: {path} for file types {file_types}")
try:
while observer.is_alive():
observer.join(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
container = Container()
container.init_resources()
container.wire(modules=[__name__])
celery = container.celery_service()
# Register tasks in Celery
send_notification = TaskSend()
celery.register_task(send_notification)
main()