-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdatabase.py
More file actions
201 lines (182 loc) · 7.8 KB
/
database.py
File metadata and controls
201 lines (182 loc) · 7.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
import aiosqlite
import os
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
DB_PATH = os.path.join(BASE_DIR, "data", "levels.db")
async def init_db():
os.makedirs(os.path.join(BASE_DIR, "data"), exist_ok=True)
async with aiosqlite.connect(DB_PATH) as db:
# Enable Write-Ahead Logging for better concurrency and performance
await db.execute("PRAGMA journal_mode=WAL;")
await db.execute("""
CREATE TABLE IF NOT EXISTS users (
guild_id TEXT NOT NULL,
user_id TEXT NOT NULL,
xp INTEGER DEFAULT 0,
level INTEGER DEFAULT 0,
messages INTEGER DEFAULT 0,
voice_time INTEGER DEFAULT 0,
PRIMARY KEY (guild_id, user_id)
)
""")
# Create an index to massively speed up leaderboard and get_rank queries
await db.execute("CREATE INDEX IF NOT EXISTS idx_users_guild_xp ON users(guild_id, xp DESC);")
await db.execute("""
CREATE TABLE IF NOT EXISTS guild_settings (
guild_id TEXT PRIMARY KEY,
xp_per_message INTEGER DEFAULT 15,
xp_per_voice_min INTEGER DEFAULT 10,
xp_cooldown INTEGER DEFAULT 60,
level_up_channel TEXT DEFAULT NULL,
level_up_message TEXT DEFAULT NULL,
xp_multiplier REAL DEFAULT 1.0,
no_xp_roles TEXT DEFAULT '',
no_xp_channels TEXT DEFAULT ''
)
""")
await db.execute("""
CREATE TABLE IF NOT EXISTS level_roles (
guild_id TEXT NOT NULL,
level INTEGER NOT NULL,
role_id TEXT NOT NULL,
PRIMARY KEY (guild_id, level)
)
""")
await db.execute("""
CREATE TABLE IF NOT EXISTS admins (
user_id TEXT PRIMARY KEY
)
""")
await db.commit()
async def get_user(guild_id: int, user_id: int) -> dict:
async with aiosqlite.connect(DB_PATH) as db:
db.row_factory = aiosqlite.Row
async with db.execute(
"SELECT * FROM users WHERE guild_id=? AND user_id=?",
(str(guild_id), str(user_id))
) as cursor:
row = await cursor.fetchone()
if row:
return dict(row)
# Auto-create
await db.execute(
"INSERT OR IGNORE INTO users (guild_id, user_id) VALUES (?,?)",
(str(guild_id), str(user_id))
)
await db.commit()
return {"guild_id": str(guild_id), "user_id": str(user_id),
"xp": 0, "level": 0, "messages": 0, "voice_time": 0}
async def update_user(guild_id: int, user_id: int, **kwargs):
sets = ", ".join(f"{k}=?" for k in kwargs)
vals = list(kwargs.values()) + [str(guild_id), str(user_id)]
async with aiosqlite.connect(DB_PATH) as db:
await db.execute(
f"UPDATE users SET {sets} WHERE guild_id=? AND user_id=?", vals
)
await db.commit()
async def add_xp(guild_id: int, user_id: int, amount: int) -> dict:
"""Add XP and return updated user dict."""
async with aiosqlite.connect(DB_PATH) as db:
await db.execute(
"INSERT OR IGNORE INTO users (guild_id, user_id) VALUES (?,?)",
(str(guild_id), str(user_id))
)
await db.execute(
"UPDATE users SET xp=xp+? WHERE guild_id=? AND user_id=?",
(amount, str(guild_id), str(user_id))
)
await db.commit()
db.row_factory = aiosqlite.Row
async with db.execute(
"SELECT * FROM users WHERE guild_id=? AND user_id=?",
(str(guild_id), str(user_id))
) as cur:
return dict(await cur.fetchone())
async def get_leaderboard(guild_id: int, limit: int = 10, offset: int = 0) -> list:
async with aiosqlite.connect(DB_PATH) as db:
db.row_factory = aiosqlite.Row
async with db.execute(
"SELECT * FROM users WHERE guild_id=? ORDER BY xp DESC LIMIT ? OFFSET ?",
(str(guild_id), limit, offset)
) as cur:
return [dict(r) for r in await cur.fetchall()]
async def get_user_count(guild_id: int) -> int:
async with aiosqlite.connect(DB_PATH) as db:
async with db.execute(
"SELECT COUNT(*) FROM users WHERE guild_id=?", (str(guild_id),)
) as cur:
row = await cur.fetchone()
return row[0] if row else 0
async def get_rank(guild_id: int, user_id: int) -> int:
async with aiosqlite.connect(DB_PATH) as db:
async with db.execute(
"SELECT COUNT(*)+1 FROM users WHERE guild_id=? AND xp > (SELECT xp FROM users WHERE guild_id=? AND user_id=?)",
(str(guild_id), str(guild_id), str(user_id))
) as cur:
row = await cur.fetchone()
return row[0] if row else 1
async def get_settings(guild_id: int) -> dict:
async with aiosqlite.connect(DB_PATH) as db:
db.row_factory = aiosqlite.Row
async with db.execute(
"SELECT * FROM guild_settings WHERE guild_id=?", (str(guild_id),)
) as cur:
row = await cur.fetchone()
if row:
return dict(row)
await db.execute(
"INSERT OR IGNORE INTO guild_settings (guild_id) VALUES (?)", (str(guild_id),)
)
await db.commit()
return {"guild_id": str(guild_id), "xp_per_message": 15,
"xp_per_voice_min": 10, "xp_cooldown": 60,
"level_up_channel": None, "level_up_message": None,
"xp_multiplier": 1.0, "no_xp_roles": "", "no_xp_channels": "1475908072453964010"}
async def set_setting(guild_id: int, key: str, value):
async with aiosqlite.connect(DB_PATH) as db:
await db.execute(
"INSERT OR IGNORE INTO guild_settings (guild_id) VALUES (?)", (str(guild_id),)
)
await db.execute(
f"UPDATE guild_settings SET {key}=? WHERE guild_id=?", (value, str(guild_id))
)
await db.commit()
async def get_level_roles(guild_id: int) -> list:
async with aiosqlite.connect(DB_PATH) as db:
db.row_factory = aiosqlite.Row
async with db.execute(
"SELECT * FROM level_roles WHERE guild_id=? ORDER BY level",
(str(guild_id),)
) as cur:
return [dict(r) for r in await cur.fetchall()]
async def set_level_role(guild_id: int, level: int, role_id: int):
async with aiosqlite.connect(DB_PATH) as db:
await db.execute(
"INSERT OR REPLACE INTO level_roles (guild_id, level, role_id) VALUES (?,?,?)",
(str(guild_id), level, str(role_id))
)
await db.commit()
async def remove_level_role(guild_id: int, level: int):
async with aiosqlite.connect(DB_PATH) as db:
await db.execute(
"DELETE FROM level_roles WHERE guild_id=? AND level=?",
(str(guild_id), level)
)
await db.commit()
async def reset_all_users():
"""Delete all user leveling data, leaving settings and roles intact."""
async with aiosqlite.connect(DB_PATH) as db:
await db.execute("DELETE FROM users")
await db.commit()
async def is_db_admin(user_id: int) -> bool:
async with aiosqlite.connect(DB_PATH) as db:
async with db.execute(
"SELECT 1 FROM admins WHERE user_id=?", (str(user_id),)
) as cur:
row = await cur.fetchone()
return row is not None
async def add_admin(user_id: int):
async with aiosqlite.connect(DB_PATH) as db:
await db.execute(
"INSERT OR IGNORE INTO admins (user_id) VALUES (?)", (str(user_id),)
)
await db.commit()