-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcontainers.py
More file actions
75 lines (68 loc) · 1.96 KB
/
containers.py
File metadata and controls
75 lines (68 loc) · 1.96 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import logging.config
import os
from celery import Celery
from dependency_injector import containers, providers
from plex_api_client import PlexAPI
from telegram import Bot
from watchdog.observers.polling import PollingObserver
from app.services.bot import TelegramBot
from app.services.detect_event_file import DetectEventFile
from app.services.plex_sdk import PlexSDK
from app.services.send_message import SendMessage
class Container(containers.DeclarativeContainer):
config = providers.Configuration(
json_files=["./config.json"]
)
config.load()
# Resources
logging = providers.Resource(
logging.config.fileConfig,
fname="logging.ini",
)
# Services
base_path = providers.Callable(
lambda:
os.path.expanduser('~')
)
directory_name = providers.Callable(lambda: "Videos")
path = providers.Callable(
lambda:
os.path.join(
Container.base_path(),
Container.directory_name()
)
)
celery_service = providers.Singleton(
Celery,
'tasks',
broker_url='redis://localhost:6379/0',
include = ['app.tasks']
)
file_types = providers.Callable(lambda: ['.mp4', '.mkv', '.avi'])
bot_service = providers.Factory(
Bot,
token=config.bot.token()
)
sdk_service = providers.Factory(
PlexAPI,
access_token=str(config.plex.access_token()),
server_url=str(config.plex.server_url())
)
sdk_plex_service = providers.Factory(
PlexSDK,
s=sdk_service
)
telegram_service = providers.Factory(
TelegramBot,
bot=bot_service
)
send_message_service = providers.Factory(
SendMessage
)
event_handler_service = providers.Factory(
DetectEventFile,
file_types=file_types,
plex_service=sdk_plex_service,
send_message=send_message_service
)
observer_service = providers.Singleton(PollingObserver)