-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathindexes.py
More file actions
277 lines (200 loc) · 9.17 KB
/
indexes.py
File metadata and controls
277 lines (200 loc) · 9.17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
from collections import defaultdict
from sortedcontainers import SortedDict
from typing import Any, List
from sqlalchemy.sql import operators
from ..helpers.ordered_set import OrderedSet
class IndexManager:
__slots__ = ('hash_index', 'range_index', 'table_indexes', 'columns_mapping', )
def __init__(self):
self.hash_index = HashIndex()
self.range_index = RangeIndex()
self.table_indexes = {}
self.columns_mapping = {}
def get_indexes(self, obj):
"""
Retrieve index from object's table as dict: indexname => list of column name
"""
tablename = obj.__tablename__
if tablename not in self.table_indexes:
self.table_indexes[tablename] = {}
pk_col_name = obj.__table__.primary_key.columns[0].name
for index in obj.__table__.indexes:
if len(index.expressions) > 1:
# Ignoring compound indexes for now ...
continue
if index.name == pk_col_name:
pk_col_name = None
self.table_indexes[tablename][index.name] = [
col.name
for col in index.expressions
]
if pk_col_name:
self.table_indexes[tablename][pk_col_name] = [pk_col_name]
return self.table_indexes[tablename]
def _column_to_index(self, tablename, colname):
"""
Get index name from tablename & column name
"""
if tablename not in self.columns_mapping:
self.columns_mapping[tablename] = {}
if colname not in self.columns_mapping[tablename]:
for indexname, indexcols in self.table_indexes.get(tablename, {}).items():
if colname in indexcols:
self.columns_mapping[tablename][colname] = indexname
return indexname
self.columns_mapping[tablename][colname] = None
return self.columns_mapping[tablename][colname]
def _get_index_key(self, obj, columns):
if len(columns) == 1:
return getattr(obj, columns[0])
return tuple(getattr(obj, c) for c in columns)
def on_insert(self, obj):
tablename = obj.__tablename__
indexes = self.get_indexes(obj)
for indexname, columns in indexes.items():
value = self._get_index_key(obj, columns)
self.hash_index.add(tablename, indexname, value, obj)
self.range_index.add(tablename, indexname, value, obj)
def on_delete(self, obj):
tablename = obj.__tablename__
indexes = self.get_indexes(obj)
for indexname, columns in indexes.items():
value = self._get_index_key(obj, columns)
self.hash_index.remove(tablename, indexname, value, obj)
self.range_index.remove(tablename, indexname, value, obj)
def on_update(self, obj, updates):
tablename = obj.__tablename__
indexes = self.get_indexes(obj)
for indexname, columns in indexes.items():
if columns[0] not in updates:
continue
old_value = updates[columns[0]]["old"]
new_value = updates[columns[0]]["new"]
self.hash_index.remove(tablename, indexname, old_value, obj)
self.range_index.remove(tablename, indexname, old_value, obj)
self.hash_index.add(tablename, indexname, new_value, obj)
self.range_index.add(tablename, indexname, new_value, obj)
def query(self, collection, tablename, colname, operator, value):
indexname = self._column_to_index(tablename, colname)
if not indexname:
return None
# Use hash index for = / != / IN / NOT IN operators
if operator == operators.eq:
result = self.hash_index.query(tablename, indexname, value)
return list(set(result) & set(collection))
elif operator == operators.ne:
# All values except the given one
excluded = self.hash_index.query(tablename, indexname, value)
return list(set(collection) - set(excluded))
elif operator == operators.in_op:
result = []
for v in value:
result.extend(self.hash_index.query(tablename, indexname, v))
return list(set(result) & set(collection))
elif operator == operators.notin_op:
excluded = []
for v in value:
excluded.extend(self.hash_index.query(tablename, indexname, v))
return list(set(collection) - set(excluded))
# Use range index
if operator == operators.gt:
result = self.range_index.query(tablename, indexname, gt=value)
return list(set(result) & set(collection))
elif operator == operators.ge:
result = self.range_index.query(tablename, indexname, gte=value)
return list(set(result) & set(collection))
elif operator == operators.lt:
result = self.range_index.query(tablename, indexname, lt=value)
return list(set(result) & set(collection))
elif operator == operators.le:
result = self.range_index.query(tablename, indexname, lte=value)
return list(set(result) & set(collection))
elif operator == operators.between_op and isinstance(value, (tuple, list)) and len(value) == 2:
result = self.range_index.query(tablename, indexname, gte=value[0], lte=value[1])
return list(set(result) & set(collection))
elif operator == operators.not_between_op and isinstance(value, (tuple, list)) and len(value) == 2:
in_range = self.range_index.query(tablename, indexname, gte=value[0], lte=value[1])
return list(set(collection) - set(in_range))
def get_selectivity(self, tablename, colname, operator, value, total_count):
"""
Estimate selectivity: higher means worst filtering.
"""
indexname = self._column_to_index(tablename, colname)
if not indexname:
# Column isn't indexed
return total_count
if indexname in self.hash_index.index[tablename]:
index = self.hash_index.index[tablename][indexname]
num_keys = len(index)
if operator == operators.eq:
return len(index.get(value, []))
elif operator == operators.ne:
matched = len(index.get(value, []))
return total_count - matched
elif operator == operators.in_op:
return sum(len(index.get(v, [])) for v in value)
elif operator == operators.notin_op:
matched = sum(len(index.get(v, [])) for v in value)
return total_count - matched
return total_count / num_keys
return total_count
class HashIndex:
"""
A hash-based index structure for fast exact-match lookups on table columns.
Structure:
index[tablename][indexname][value] = [obj1, obj2, ...]
Maintains insertion order of objects.
"""
__slots__ = ('index',)
def __init__(self):
self.index = defaultdict(lambda: defaultdict(lambda: defaultdict(OrderedSet)))
def add(self, tablename: str, indexname: str, value: Any, obj: Any):
self.index[tablename][indexname][value].add(obj)
def remove(self, tablename: str, indexname: str, value: Any, obj: Any):
s = self.index[tablename][indexname][value]
s.discard(obj)
if not s:
del self.index[tablename][indexname][value]
def query(self, tablename: str, indexname: str, value: Any) -> List[Any]:
return list(self.index[tablename][indexname].get(value, []))
class RangeIndex:
"""
A range-based index for fast lookups using comparison operators.
Internally uses SortedDict to allow efficient bisecting and slicing.
Structure:
index[tablename][indexname] = SortedDict { value: [obj1, obj2, ...] }
"""
__slots__ = ('index',)
def __init__(self):
self.index = defaultdict(lambda: defaultdict(SortedDict))
def add(self, tablename: str, indexname: str, value: Any, obj: Any):
index = self.index[tablename][indexname]
if value in index:
index[value].append(obj)
else:
index[value] = [obj]
def remove(self, tablename: str, indexname: str, value: Any, obj: Any):
col = self.index[tablename][indexname]
if value in col:
try:
col[value].remove(obj)
if not col[value]:
del col[value]
except ValueError:
pass
def query(self, tablename: str, indexname: str, gt=None, gte=None, lt=None, lte=None) -> List[Any]:
sd = self.index[tablename][indexname]
# Define range bounds
min_key = gte if gte is not None else gt
max_key = lte if lte is not None else lt
inclusive_min = gte is not None
inclusive_max = lte is not None
irange = sd.irange(
minimum=min_key,
maximum=max_key,
inclusive=(inclusive_min, inclusive_max)
)
result = []
for key in irange:
result.extend(sd[key])
return result