-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdatabase.py
More file actions
186 lines (169 loc) · 6.93 KB
/
database.py
File metadata and controls
186 lines (169 loc) · 6.93 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
# -*- coding: utf-8 -*-
import sqlite3
from language import Language
class DataBase():
tableFields = {
"CS": ["ID", "DATE", "RATING", "RATING0", "RATING1", "RATING2", "RATING3", "COMMENDATION", "SUGGESTION", "SERVICE"],
"SETTING": ["KEY", "VALUE"]
}
password = ""
status = ""
def __init__(self, db):
# tries to establish connection, creates on fail.
try:
self.connection = sqlite3.connect(db)
c = self.connection.cursor()
c.execute(f'''
CREATE TABLE IF NOT EXISTS CS
({self.tableFields["CS"][0]} INTEGER PRIMARY KEY AUTOINCREMENT,
{self.tableFields["CS"][1]} TINYTEXT,
{self.tableFields["CS"][2]} INTEGER,
{self.tableFields["CS"][3]} INTEGER,
{self.tableFields["CS"][4]} INTEGER,
{self.tableFields["CS"][5]} INTEGER,
{self.tableFields["CS"][6]} INTEGER,
{self.tableFields["CS"][7]} TEXT,
{self.tableFields["CS"][8]} TEXT,
{self.tableFields["CS"][9]} TINYTEXT);''')
c.execute(f'''
CREATE TABLE IF NOT EXISTS SETTING
({self.tableFields["SETTING"][0]} TINYTEXT UNIQUE,
{self.tableFields["SETTING"][1]} TINYTEXT);''')
self.connection.commit()
getpwd = self.read(["VALUE"], "SETTING", {"KEY":"password"})
self.password = getpwd[0][0] if getpwd else ''
except Exception as error:
self.status +=f" {error} {db}"
def __del__(self):
self.connection.close()
def write(self, table="", key_value={}, where={}):
condition, values, updates = [], [], []
for key in where:
condition.append(f"{self.sanitize(key, False)}={self.sanitize(where[key])}")
for column in self.tableFields[table]:
if column in key_value:
values.append(f"{self.sanitize(key_value[column])}")
updates.append(f"{column}={self.sanitize(key_value[column])}")
else:
values.append("NULL")
# try to update exsting
if len(condition):
self.connection.execute(f"UPDATE {table} SET {', '.join(updates)} WHERE {' AND '.join(condition)};")
# otherwise insert
self.connection.execute(f"INSERT OR IGNORE INTO {table} ({', '.join(self.tableFields[table])}) VALUES ({', '.join(values)});")
self.connection.commit()
if table=="CS":
# keep track of latest insert_id to have rows updated instead of appended if submitted
cursor = self.connection.cursor()
cursor.execute(f"SELECT MAX(ID) FROM CS;")
result = cursor.fetchone()
if result is not None:
return result[0]
return True
def read(self, fields = [], table = "", where = {}):
condition, fields = [], [self.sanitize(field, False) for field in fields] if fields else "*"
for key in where:
condition.append(f"{self.sanitize(key, False)}={self.sanitize(where[key])}")
cursor = self.connection.cursor()
cursor.execute(f"SELECT {' AND '.join(fields)} FROM {table} WHERE {' AND '.join(condition)};")
result = cursor.fetchall()
if result is not None:
return result
return False
def delete(self, table = "", where = {}):
condition= []
for key in where:
condition.append(f"{self.sanitize(key, False)}={self.sanitize(where[key])}")
if len(condition):
self.connection.execute(f"DELETE FROM {table} WHERE {' AND '.join(condition)};")
self.connection.commit()
return True
def has_content(self,table):
cursor = self.connection.cursor()
cursor.execute(f"SELECT COUNT(*) FROM {table} WHERE 1;")
result = cursor.fetchall()
if result is not None and result[0][0]:
return True
return False
def sanitize(self, value = "", quotes = True):
# sanitary strings to concatenate to sql queries.
# if not quotes it probbly is a column key
if type(value)==str and value != "NULL":
if value.strip()=="":
return "NULL"
return value.replace('\'','\'\'') if not quotes else "'" + value.replace('\'','\'\'') + "'"
return value
def clear(self, tables=[]):
# reset database
for table in tables:
self.connection.executescript(f"DELETE FROM {table}; VACUUM;")
self.connection.commit()
return True
def csv(self):
quote = '"'
delimiter = ";"
output = ""
for column in self.tableFields['CS']:
value = str(column).replace('"','""') if quote else str(column)
output += f'{quote}{value}{quote}' if quote else value
output += delimiter
output = output[:-1] + "\n"
cursor = self.connection.cursor()
cursor.execute("SELECT * FROM CS;")
result = cursor.fetchall()
if result is not None:
for row in result:
for column in row:
value = str(column).replace('"','""') if quote else str(column)
output += f'{quote}{value}{quote}' if quote else value
output += delimiter
output = output[:-1] + "\n"
return output
def rtf(self, language):
# create a report
text = Language(language, language)
rtfHead = text.admin("rtfHead")
rtfTotal = text.admin("rtfTotal")
rtfDetail = text.admin("rtfDetail")
rtfTextInput = text.admin("rtfTextInput")
rtfCommendation = text.survey("commendationLabel")
rtfSuggestion = text.survey("suggestionLabel")
output = "{\\rtf1 \\ansi\\ansicpg1252\\deff0\\nouicompat "
# total and main review
cursor = self.connection.cursor()
cursor.execute("SELECT MIN(DATE) AS zero, MAX(DATE) AS one, COUNT(ID) AS two, AVG(RATING) AS three FROM CS;")
result = cursor.fetchall()
if result is None:
return False
result = result[0]
output += f"{{\\b {rtfHead[0]} {result[0]} {rtfHead[1]} {result[1]}}} \par "
output += f"{{\\b {rtfTotal[0]}:}} "
output += f"\line {rtfTotal[1]} {result[0]} {rtfTotal[2]} {result[1]} {rtfTotal[3]} {result[2]} {rtfTotal[4]} {round(result[3]*50, 2)} % \par "
# topic related statistics
details = [text.survey("detailratingAvailability"), text.survey("detailratingProcessing"), text.survey("detailratingExpertise"), text.survey("detailratingKindness")]
for i, detail in enumerate(details):
cursor.execute(f"SELECT MIN(DATE) AS zero, MAX(DATE) AS one, COUNT(ID) AS two, AVG(RATING{i}) AS three FROM CS WHERE RATING{i} IS NOT NULL;")
result = cursor.fetchall()
if result is None:
output += f"\par {{\i {detail}}} {rtfDetail[5]} \line "
continue
result = result[0]
output += f"\par {rtfDetail[0]} {{\i {detail}}} "
output += f"\line {rtfDetail[1]} {result[0]} {rtfDetail[2]} {result[1]} {rtfDetail[3]} {result[2]} {rtfDetail[4]} {round(result[3]*50, 2)if result[3] else None} %"
# customer text inputs
rating = [text.survey("detailratingBad"), text.survey("detailratingMeh"), text.survey("detailratingGood")]
output += f"\par \line {{\\b {rtfTextInput[0]}}} "
cursor.execute("SELECT * FROM CS WHERE COMMENDATION IS NOT NULL OR SUGGESTION IS NOT NULL OR SERVICE IS NOT NULL;")
result = cursor.fetchall()
if result is None:
output += f"{rtfTextInput[2]}"
else:
for r in result:
output += f"\par \line {{\\b {r[1]}}} {r[9] if r[9] else ''} \line "
output += f"{{\i {rtfCommendation}:}} {r[7]} \line " if r[7] != None else ""
output += f"{{\i {rtfSuggestion}:}} {r[8]} \line " if r[8] != None else ""
for i, detail in enumerate(details):
output += f"{detail}: {rating[r[3+i]]} / " if r[3+i] != None else ""
output += f"{rtfTextInput[1]}: {rating[r[2]]} "
output +="}"
return output