-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdatabase.py
More file actions
109 lines (99 loc) · 3.54 KB
/
database.py
File metadata and controls
109 lines (99 loc) · 3.54 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import sqlite3
import os
DB_PATH = os.path.join(os.path.dirname(__file__), 'db', 'os_analyzer.db')
def get_connection():
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def init_db():
with get_connection() as conn:
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS OStargets (
id INTEGER PRIMARY KEY AUTOINCREMENT,
domain TEXT NOT NULL,
subdomain TEXT NOT NULL,
appname TEXT NOT NULL,
accesskey TEXT UNIQUE NOT NULL,
createdate TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
error TEXT,
status INT
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS AIcache (
id INTEGER PRIMARY KEY AUTOINCREMENT,
problem_key TEXT UNIQUE NOT NULL,
response TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
conn.commit()
def get_scanhistory_items():
with get_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT domain, AppName, Accesskey, createDate FROM OSTargets ORDER BY createdate DESC")
return cursor.fetchall()
def delete_scanhistory_item(accesskey: str) -> bool:
try:
with get_connection() as conn:
cursor = conn.cursor()
cursor.execute("DELETE FROM OSTargets WHERE Accesskey = ?", (accesskey,))
conn.commit()
return True
except Exception as e:
print(f"Erro ao deletar item: {e}")
return False
def clear_all_scanhistory() -> bool:
try:
with get_connection() as conn:
cursor = conn.cursor()
cursor.execute("DELETE FROM OSTargets")
conn.commit()
return True
except Exception as e:
print(f"Erro ao limpar histórico: {e}")
return False
def ai_get_cached_response(problem_key: str):
with get_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT response FROM AIcache WHERE problem_key = ?", (problem_key,))
row = cursor.fetchone()
return row[0] if row else None
def ai_save_cache(problem_key: str, response: str) -> bool:
try:
with get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"INSERT INTO AIcache (problem_key, response) VALUES (?, ?)",
(problem_key, response)
)
conn.commit()
return True
except sqlite3.Error as e:
print(f"Erro ao salvar cache: {e}")
return False
def ai_clear_cache() -> bool:
try:
with get_connection() as conn:
cursor = conn.cursor()
cursor.execute("DELETE FROM AIcache")
conn.commit()
return True
except Exception as e:
print(f"Erro ao limpar cache: {e}")
return False
def db_insert_targetinformations(domain: str, subdomain: str, app_name: str, access_key: str, status: int = 1) -> bool:
try:
with get_connection() as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT INTO OStargets
(domain, subdomain, AppName, accesskey, status)
VALUES (?, ?, ?, ?, ?)
''', (domain, subdomain, app_name, access_key, status))
conn.commit()
return True
except Exception as e:
print(f"Erro ao inserir alvo no banco: {e}")
return False