Skip to content

Commit d8b5d7c

Browse files
committed
commit this current w.i.p bagofwords helper
- i've written these a couple of times, last time there was already something clever about it i think but did i look into my own's code? No, i didn't. It's completely from scratch oOo \|/ oOo
1 parent 701cff2 commit d8b5d7c

4 files changed

Lines changed: 430 additions & 3 deletions

File tree

src/bagofwords.py

Lines changed: 346 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,346 @@
1+
import re
2+
import json
3+
import io
4+
from pathlib import Path
5+
from typing import List, Union, Optional, Dict, Tuple, Generator
6+
7+
Number = Union[int, float]
8+
WordDict = Dict[str, Number]
9+
WordBagArgument = Union[str, List[str], WordDict, "BagOfWords"]
10+
11+
12+
TOKEN_SEPARATORS = set(" _.!?,;/\\\"`+-*=:()[]{}\r\n\t<>@~")
13+
TOKEN_STRIP = "'“”&=#"
14+
15+
16+
def tokenize(text: str) -> List[str]:
17+
tokens = []
18+
token = []
19+
20+
def add_token(token):
21+
token = "".join(token).strip(TOKEN_STRIP)
22+
if token and not any(c.isnumeric() for c in token):
23+
tokens.append(token)
24+
25+
for c in text:
26+
if c in TOKEN_SEPARATORS:
27+
if token:
28+
add_token(token)
29+
30+
token = []
31+
32+
elif ord(c) < 0x100:
33+
token.append(c)
34+
35+
if token:
36+
add_token(token)
37+
38+
return tokens
39+
40+
41+
class BagOfWords:
42+
43+
def __init__(self, data: Optional[WordBagArgument] = None):
44+
self.bag = dict()
45+
self.is_normalized = False
46+
if data:
47+
self += data
48+
49+
def __copy__(self) -> "BagOfWords":
50+
bag = BagOfWords()
51+
bag.bag = self.bag.copy()
52+
bag.is_normalized = self.is_normalized
53+
return bag
54+
55+
def __bool__(self) -> bool:
56+
return bool(self.bag)
57+
58+
def __iadd__(self, other: WordBagArgument) -> "BagOfWords":
59+
self.is_normalized = False
60+
for key, value in self._iter_items(other):
61+
self.bag[key] = self.bag.get(key, 0) + value
62+
return self
63+
64+
def __add__(self, other: WordBagArgument) -> "BagOfWords":
65+
new_bag = self.__copy__()
66+
new_bag += other
67+
return new_bag
68+
69+
def __isub__(self, bag: "BagOfWords") -> "BagOfWords":
70+
return self.subtract(bag)
71+
72+
def __sub__(self, bag: "BagOfWords") -> "BagOfWords":
73+
return self.subtracted(bag)
74+
75+
def __imul__(self, value: Number) -> "BagOfWords":
76+
self.is_normalized = False
77+
for key in self.bag:
78+
self.bag[key] *= value
79+
return self
80+
81+
def __mul__(self, other: Number) -> "BagOfWords":
82+
new_bag = self.__copy__()
83+
new_bag *= other
84+
return new_bag
85+
86+
def __itruediv__(self, value: Number) -> "BagOfWords":
87+
self.is_normalized = False
88+
for key in self.bag:
89+
self.bag[key] /= value
90+
return self
91+
92+
def __truediv__(self, other: Number) -> "BagOfWords":
93+
new_bag = self.__copy__()
94+
new_bag /= other
95+
return new_bag
96+
97+
def __str__(self):
98+
file = io.StringIO()
99+
self.dump(top=20, file=file)
100+
file.seek(0)
101+
return file.read()
102+
103+
def __getitem__(self, item) -> Number:
104+
return self.bag.get(item, 0)
105+
106+
def __setitem__(self, key: str, value: Number):
107+
self.bag[key] = value
108+
109+
def copy(self) -> "BagOfWords":
110+
return self.__copy__()
111+
112+
def size(self) -> int:
113+
return len(self.bag)
114+
115+
def count(self) -> Number:
116+
return sum(self.bag.values()) if self.bag else 0
117+
118+
def max(self) -> Number:
119+
return max(self.bag.values()) if self.bag else 0
120+
121+
def items(self) -> Generator[Tuple[str, Number], None, None]:
122+
return self.bag.items()
123+
124+
def normalized(self, copy: bool = False) -> "BagOfWords":
125+
if self.is_normalized:
126+
return self.__copy__() if copy else self
127+
128+
bag = BagOfWords()
129+
count = self.count() or 1
130+
bag.bag = {
131+
key: value / count
132+
for key, value in self.bag.items()
133+
}
134+
bag.is_normalized = True
135+
return bag
136+
137+
def n(self, copy: bool = False) -> "BagOfWords":
138+
return self.normalized(copy=copy)
139+
140+
def adjust_count(self, count: Number) -> "BagOfWords":
141+
bag = BagOfWords()
142+
cur_count = self.count()
143+
if not cur_count:
144+
return bag
145+
factor = 1. / cur_count * count
146+
bag.bag = {
147+
key: value * factor
148+
for key, value in self.bag.items()
149+
}
150+
return bag
151+
152+
def limited(self, min_count: Optional[int] = None, max_count: Optional[int] = None) -> "BagOfWords":
153+
bag = BagOfWords()
154+
for key, value in self.items():
155+
if min_count is None or value >= min_count:
156+
if max_count is None or value <= max_count:
157+
bag.bag[key] = value
158+
return bag
159+
160+
def sort(self):
161+
self.bag = {
162+
key: self.bag[key]
163+
for key in sorted(sorted(self.bag), key=lambda k: -self.bag[k])
164+
}
165+
166+
def add_word(self, word: str, count: int = 1):
167+
self.bag[word] = self.bag.get(word, 0) + count
168+
169+
def subtract(self, other: WordBagArgument, amount: Optional[Union[str, int, float]] = None) -> "BagOfWords":
170+
"""
171+
Subtract value of other
172+
:param other: text, tokens, dict or BagOfWords
173+
:param amount: None to leave values untouched, number to multiply,
174+
"all" to remove all keys that are in 'other'
175+
:return: self
176+
"""
177+
self.is_normalized = False
178+
other_dict = self._as_dict(other)
179+
180+
if self.size() > len(other_dict):
181+
for key, value in other_dict.items():
182+
if key not in self.bag:
183+
continue
184+
185+
if amount == "all":
186+
value = -1
187+
elif amount is None:
188+
value = self.bag[key] - value
189+
else:
190+
value = self.bag[key] - amount * value
191+
192+
if value <= 0:
193+
del self.bag[key]
194+
else:
195+
self.bag[key] = value
196+
else:
197+
has_zeros = False
198+
for key, value in self.items():
199+
if key in other_dict:
200+
201+
if amount == "all":
202+
value = -1
203+
elif amount is None:
204+
value = value - other_dict[key]
205+
else:
206+
value = value - amount * other_dict[key]
207+
208+
self.bag[key] = value
209+
if value <= 0:
210+
has_zeros = True
211+
212+
if has_zeros:
213+
self.bag = {
214+
key: value
215+
for key, value in self.items()
216+
if value > 0
217+
}
218+
return self
219+
220+
def subtracted(self, other: WordBagArgument, amount: Optional[float] = None) -> "BagOfWords":
221+
new_bag = self.__copy__()
222+
new_bag.subtract(other, amount=amount)
223+
return new_bag
224+
225+
def union(self, other: WordBagArgument):
226+
bag = self.__copy__()
227+
for key, value in self._iter_items(other):
228+
if key not in self.bag:
229+
bag.bag[key] = value
230+
bag.is_normalized = False
231+
return bag
232+
233+
def intersection(self, other: WordBagArgument):
234+
bag = BagOfWords()
235+
bag.bag = {
236+
key: self.bag[key]
237+
for key, _ in self._iter_items(other)
238+
if key in self.bag
239+
}
240+
return bag
241+
242+
def difference(self, other: WordBagArgument):
243+
other_dict = self._as_dict(other)
244+
bag = BagOfWords()
245+
bag.bag = {
246+
key: value
247+
for key, value in self.items()
248+
if key not in other_dict
249+
}
250+
return bag
251+
252+
def get_subset(
253+
self,
254+
big_bag: "BagOfWords",
255+
max_freq: float = 1.,
256+
min_freq_mult: float = 2.,
257+
) -> Dict[str, Dict[str, Number]]:
258+
self_norm = self.normalized()
259+
other_norm = big_bag.normalized()
260+
result = dict()
261+
for key, self_value in self_norm.items():
262+
if self_value <= max_freq:
263+
other_value = other_norm.bag.get(key, 0)
264+
if self_value >= min_freq_mult * other_value:
265+
result[key] = {
266+
"freq": self_value, "big_freq": other_value,
267+
"ratio": self_value / (other_value or 1)
268+
}
269+
270+
return result
271+
272+
def get_subset_bag(
273+
self,
274+
big_bag: "BagOfWords",
275+
max_freq: float = 1.,
276+
min_freq_mult: float = 2.,
277+
):
278+
subset = self.get_subset(big_bag, max_freq=max_freq, min_freq_mult=min_freq_mult)
279+
bag = BagOfWords()
280+
bag.bag = {
281+
key: value["freq"]
282+
for key, value in subset.items()
283+
}
284+
return bag
285+
286+
def get_subset_df(
287+
self,
288+
big_bag: "BagOfWords",
289+
max_freq: float = 1.,
290+
min_freq_mult: float = 2.,
291+
):
292+
import pandas as pd
293+
294+
subset = self.get_subset(big_bag, max_freq=max_freq, min_freq_mult=min_freq_mult)
295+
df = pd.DataFrame(subset).T
296+
if df.shape[0]:
297+
df.sort_values("ratio", ascending=False, inplace=True)
298+
return df
299+
300+
def save_json(self, filename: Union[str, Path], sort: bool = True, indent: Optional[int] = 1):
301+
if sort:
302+
self.sort()
303+
Path(filename).write_text(
304+
json.dumps(self.bag, indent=indent)
305+
)
306+
307+
@classmethod
308+
def load_json(cls, filename: Union[str, Path]) -> "BagOfWords":
309+
return BagOfWords(json.loads(Path(filename).read_text()))
310+
311+
def dump(self, top: Optional[int] = None, file=None):
312+
if not self:
313+
return
314+
keys = sorted(self.bag, key=lambda k: self.bag[k], reverse=True)
315+
if top is not None:
316+
keys = keys[:top]
317+
318+
max_len = max(len(k) for k in keys)
319+
count = self.count()
320+
for key in keys:
321+
print(f"{key:{max_len}}: {self.bag[key]:11,} {self.bag[key] / count:.5}", file=file)
322+
323+
@classmethod
324+
def _iter_items(cls, param: WordBagArgument) -> Generator[Tuple[str, Number], None, None]:
325+
if isinstance(param, (str, list)):
326+
if isinstance(param, str):
327+
param = tokenize(param)
328+
for token in param:
329+
yield token, 1
330+
elif isinstance(param, dict):
331+
yield from param.items()
332+
elif isinstance(param, BagOfWords):
333+
yield from param.bag.items()
334+
else:
335+
raise TypeError(f"Unexpected type '{type(param).__name__}'")
336+
337+
@classmethod
338+
def _as_dict(cls, param: WordBagArgument) -> Dict[str, Number]:
339+
if isinstance(param, (str, list)):
340+
return {key: value for key, value in cls._iter_items(param)}
341+
elif isinstance(param, dict):
342+
return param
343+
elif isinstance(param, BagOfWords):
344+
return param.bag
345+
else:
346+
raise TypeError(f"Unexpected type '{type(param).__name__}'")

src/tests/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)