-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathutilities.py
More file actions
205 lines (167 loc) · 7.31 KB
/
utilities.py
File metadata and controls
205 lines (167 loc) · 7.31 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
import json
import random
import sys
import time
from os.path import exists
import yaml
from yaml import SafeLoader
from cache import sync_channels_cache
def graceful_exit(message=""):
"""Exit program gracefully with a pause for user to read the message."""
if message:
print(message)
input("Press Enter to exit...")
sys.exit()
def config_file_generator():
"""Generate the template of config file"""
with open('config.yml', 'w', encoding="utf8") as file:
file.write("""# ++--------------------------------++
# | LineBackupToDiscord (MIT LICENSE)|
# | Made by LD v0.1.0 |
# ++--------------------------------++
# Line Channel Access Token & Secret
# You can get it from https://developers.line.biz/console/
line_channel_access_token: ''
line_channel_secret: ''
# Discord Bot token
# You can get it from https://discord.com/developers/applications
discord_bot_token: ''
# Backend server configuration, aka the webhook server port for LINE.
# If you change port, make sure to change the port in your reverse proxy(ngrok etc..) as well.
webhook_port: 5000
# (Optional settings)
# You can fill in your own bot invite link and hosted by information
# This will be shown when someone types /about
# Noted that if you share your bot invite link, anyone can invite your bot to their server
bot_hosted_by: 'PlayfunI Network'
line_bot_invite_link: ''
discord_bot_invite_link: ''
"""
)
file.close()
graceful_exit("Config file created successfully.\n"
"Please fill in config.yml then restart the program!")
def read_config():
"""Read config file.
Check if config file exists, if not, create one.
if exists, read config file and return config with dict type.
:rtype: dict
"""
if not exists('./config.yml'):
with open('config.yml', 'w', encoding="utf8"):
config_file_generator()
try:
with open('config.yml', encoding="utf8") as file:
data = yaml.load(file, Loader=SafeLoader)
config = {
'line_channel_access_token': data['line_channel_access_token'],
'line_channel_secret': data['line_channel_secret'],
'discord_bot_token': data['discord_bot_token'],
'webhook_port': data['webhook_port'],
'bot_hosted_by': data.get('bot_hosted_by', 'PlayfunI Network'),
'line_bot_invite_link': data['line_bot_invite_link'],
'discord_bot_invite_link': data['discord_bot_invite_link']
}
file.close()
except (KeyError, TypeError):
graceful_exit(
"An error occurred while reading config.yml, please check if the file is corrected filled.\n"
"If the problem can't be solved, consider delete config.yml and restart the program.\n")
required_fields = ['line_channel_access_token', 'line_channel_secret',
'discord_bot_token', 'webhook_port']
for field in required_fields:
if field not in config or not config[field]:
graceful_exit(f"Missing required field: {field} in config.yml")
sys.exit()
return config
def read_sync_channels():
"""Read sync_channels.json file."""
if not exists('./sync_channels.json'):
print("sync_channels.json not found, create one by default.")
with open('sync_channels.json', 'w', encoding="utf8") as file:
json.dump([], file, indent=4)
file.close()
data = json.load(open('sync_channels.json', 'r', encoding="utf8"))
return data
def add_new_sync_channel(line_group_id: str, line_group_name: str, discord_channel_id: int,
discord_channel_name: str, discord_channel_webhook: str):
"""Add new sync channel.
:param str line_group_id: Line group id.
:param str line_group_name: Line group name.
:param int discord_channel_id: Discord channel id.
:param str discord_channel_name: Discord channel name.
:param str discord_channel_webhook: Discord channel webhook.
"""
data = json.load(open('sync_channels.json', 'r', encoding="utf8"))
if not data: # If the file is empty
sub_num = 1
else: # Get the max sub_num and add 1
max_dict = max(data, key=lambda x: x.get('sub_num', 0))
sub_num = max_dict.get('sub_num', 0) + 1
folder_name = f'{line_group_name}_{discord_channel_name}'
data.append({
'sub_num': sub_num,
'folder_name': folder_name,
'line_group_id': line_group_id,
'line_group_name': line_group_name,
'discord_channel_id': discord_channel_id,
'discord_channel_name': discord_channel_name,
'discord_channel_webhook': discord_channel_webhook
})
update_json('sync_channels.json', data)
sync_channels_cache.add_sync_channel(sub_num, folder_name, line_group_id, line_group_name,
discord_channel_id, discord_channel_name,
discord_channel_webhook)
def remove_sync_channel(line_group_id: str = None, discord_channel_id: int = None):
"""Remove sync channel.
:param str line_group_id: Line group id.
:param int discord_channel_id: Discord channel id.
"""
data = json.load(open('sync_channels.json', 'r', encoding="utf8"))
if line_group_id:
data = [x for x in data if x['line_group_id'] != line_group_id]
elif discord_channel_id:
data = [x for x in data if x['discord_channel_id'] != discord_channel_id]
update_json('sync_channels.json', data)
sync_channels_cache.remove_sync_channel(line_group_id, discord_channel_id)
def generate_binding_code(line_group_id: str, line_group_name: str) -> int:
"""Generate binding code.
:param str line_group_id: Line group id.
:param str line_group_name: Line group name.
:return int: Binding code.
"""
if not exists('./binding_codes.json'):
with open('binding_codes.json', 'w', encoding="utf8") as file:
json.dump({}, file, indent=4)
file.close()
data = json.load(open('binding_codes.json', 'r', encoding="utf8"))
binding_code = random.randint(100000, 999999)
data[binding_code] = {'line_group_id': line_group_id, 'line_group_name': line_group_name,
'expiration': time.time() + 300}
update_json('binding_codes.json', data)
return binding_code
def remove_binding_code(binding_code: int):
"""Remove binding code from binding_codes.json.
:param int binding_code: Binding code.
"""
data = json.load(open('binding_codes.json', 'r', encoding="utf8"))
if binding_code in data:
data.pop(binding_code)
update_json('binding_codes.json', data)
def get_binding_code_info(binding_code: int) -> dict | None:
"""Get binding code info.
:param int binding_code: Binding code.
:return dict: Return dict contains line_group_id and expiration if it exists, else None
"""
data = json.load(open('binding_codes.json', 'r', encoding="utf8"))
if binding_code in data:
return data[binding_code]
return None
def update_json(file_name: str, data: dict):
"""Update a json file.
:param str file_name: The file to update.
:param dict data: The data to update.
"""
with open(file_name, 'w', encoding="utf8") as file:
json.dump(data, file, indent=4, ensure_ascii=False)
file.close()