-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathxml_db.py
More file actions
116 lines (95 loc) · 3.21 KB
/
xml_db.py
File metadata and controls
116 lines (95 loc) · 3.21 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
from pathlib import Path
from threading import Lock
from xml.etree import ElementTree as ET
DB_LOCK = Lock()
class XMLDatabaseError(Exception):
pass
def _safe_parse(db_path):
try:
tree = ET.parse(db_path)
except FileNotFoundError as exc:
raise XMLDatabaseError(f"XML file not found: {db_path}") from exc
except ET.ParseError as exc:
raise XMLDatabaseError(f"XML file is malformed: {exc}") from exc
except OSError as exc:
raise XMLDatabaseError(f"Cannot read XML file: {exc}") from exc
root = tree.getroot()
if root.tag != "data":
raise XMLDatabaseError("Root should be <data>")
return tree
def _safe_write(tree, db_path):
path = Path(db_path)
temp_path = path.with_suffix(path.suffix + ".tmp")
try:
if hasattr(ET, "indent"):
ET.indent(tree, space=" ")
tree.write(temp_path, encoding="utf-8", xml_declaration=True)
temp_path.replace(path)
except OSError as exc:
raise XMLDatabaseError(f"Cannot write XML file: {exc}") from exc
finally:
if temp_path.exists():
try:
temp_path.unlink()
except OSError:
pass
def init_db(db_path):
path = Path(db_path)
try:
if path.exists():
_safe_parse(path)
return
path.parent.mkdir(parents=True, exist_ok=True)
root = ET.Element("data")
tree = ET.ElementTree(root)
_safe_write(tree, path)
except XMLDatabaseError:
raise
except OSError as exc:
raise XMLDatabaseError(f"Cannot initialize XML database: {exc}") from exc
def _find_topic(root, topic_name):
for topic in root.findall("topic"):
if topic.get("name") == topic_name:
return topic
return None
def add_note(db_path, topic_name, note_name, text, timestamp):
with DB_LOCK:
tree = _safe_parse(db_path)
root = tree.getroot()
topic = _find_topic(root, topic_name)
created_topic = False
if topic is None:
topic = ET.SubElement(root, "topic", {"name": topic_name})
created_topic = True
note = ET.SubElement(topic, "note", {"name": note_name})
text_tag = ET.SubElement(note, "text")
text_tag.text = text
time_tag = ET.SubElement(note, "timestamp")
time_tag.text = timestamp
_safe_write(tree, db_path)
return {
"topic": topic_name,
"created_topic": created_topic,
"note": {
"name": note_name,
"text": text,
"timestamp": timestamp,
},
}
def get_topic(db_path, topic_name):
with DB_LOCK:
tree = _safe_parse(db_path)
root = tree.getroot()
topic = _find_topic(root, topic_name)
if topic is None:
return None
notes = []
for note in topic.findall("note"):
notes.append(
{
"name": note.get("name", ""),
"text": (note.findtext("text") or "").strip(),
"timestamp": (note.findtext("timestamp") or "").strip(),
}
)
return {"topic": topic_name, "notes": notes}