Skip to content

Commit fbc6c17

Browse files
committed
simplify code, revert to older model, better performance
1 parent d0a1f8b commit fbc6c17

File tree

1 file changed

+109
-174
lines changed

1 file changed

+109
-174
lines changed

text_preprocessing/preprocessor.py

Lines changed: 109 additions & 174 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,9 @@
11
#!/usr/bin/env python3
22
"""Text Preprocessor"""
33

4-
import gc
5-
import multiprocessing as mp
64
import os
75
import sqlite3
86
import sys
9-
import warnings
107
from collections import defaultdict, deque
118
from dataclasses import dataclass
129
from itertools import combinations
@@ -15,19 +12,13 @@
1512
import lz4.frame
1613
import orjson
1714
import regex as re
18-
import spacy
19-
import torch
2015
from multiprocess.pool import Pool
2116
from spacy.language import Language
2217
from spacy.tokens import Doc, Token
2318

2419
from .modernizer import Modernizer
2520
from .spacy_helpers import PreprocessorToken, Tokens, load_language_model
2621

27-
# Suppress all UserWarning messages
28-
warnings.filterwarnings("ignore", category=UserWarning)
29-
mp.set_start_method("spawn", force=True)
30-
3122
Doc.set_extension("metadata", default={})
3223
Doc.set_extension("char_num", default=0)
3324
Token.set_extension("ext", default={})
@@ -47,86 +38,6 @@
4738
PHILO_OBJECT_LEVEL: dict[int, str] = {1: "doc", 2: "div1", 3: "div2", 4: "div3", 5: "para", 6: "sent", 7: "word"}
4839

4940

50-
def check_gpu_ram():
51-
"""Returns the percentage of GPU memory being used."""
52-
device = torch.cuda.current_device()
53-
allocated = torch.cuda.memory_allocated(device)
54-
total = torch.cuda.get_device_properties(device).total_memory
55-
allocated_percent = (allocated / total) * 100
56-
57-
if allocated_percent > 20: # This is is only a subset of GPU RAM usage, but indicative of high usage
58-
torch.cuda.empty_cache()
59-
torch.cuda.synchronize()
60-
61-
gc.collect()
62-
63-
64-
def process_batch_texts(
65-
queue, text_fetcher_args, batch_texts, language_model, normalize_options, do_nlp, keep_all, progress_info
66-
):
67-
nlp, using_gpu = load_language_model(language_model, normalize_options)
68-
text_fetcher = TextFetcher(nlp, **text_fetcher_args) # Initialize text_fetcher with required params
69-
previous_philo_id = None
70-
for tokens, _ in text_fetcher(batch_texts, do_nlp=do_nlp, keep_all=keep_all, progress=False):
71-
if isinstance(tokens, PreparedDoc):
72-
spacy_doc = make_spacy_doc(nlp, tokens)
73-
if spacy_doc._.char_num > 10000 and using_gpu is True:
74-
split_doc = split_spacy_docs(nlp, spacy_doc)
75-
doc = Doc.from_docs(list(nlp.pipe(split_doc, batch_size=64)))
76-
doc._.metadata = spacy_doc._.metadata
77-
tokens = Tokens(doc, keep_all=keep_all)
78-
else:
79-
tokens = Tokens(nlp(spacy_doc), keep_all=keep_all)
80-
elif isinstance(tokens, Doc):
81-
tokens = Tokens(tokens, keep_all=keep_all)
82-
if using_gpu:
83-
check_gpu_ram()
84-
if text_fetcher_args["is_philo_db"] is True:
85-
current_doc_id = tokens.metadata.get("philo_id").split()[0]
86-
if previous_philo_id != current_doc_id:
87-
progress_info["doc_count"] += 1
88-
if progress_info["progress"] is True:
89-
progress_info["count"] += 1
90-
if text_fetcher_args["text_object_type"] == "doc":
91-
print(
92-
f"\r{progress_info['progress_prefix']} {progress_info['count']} texts processed...",
93-
end="",
94-
flush=True,
95-
)
96-
else:
97-
print(
98-
f"\r{progress_info['progress_prefix']} {progress_info['count']} text chunks of {progress_info['doc_count']} documents processed...",
99-
end="",
100-
flush=True,
101-
)
102-
previous_philo_id = current_doc_id
103-
queue.put(tokens)
104-
queue.put(None)
105-
106-
107-
def split_spacy_docs(nlp, doc: Doc) -> list[Doc]:
108-
"""Split spacy doc into smaller docs of 10 sentences"""
109-
sentence_group: list[Doc] = []
110-
docs: list[Doc] = []
111-
for sent in doc.sents:
112-
if len(sentence_group) == 10:
113-
docs.append(Doc.from_docs(sentence_group))
114-
sentence_group = []
115-
else:
116-
sent_starts = []
117-
words = []
118-
for token in sent:
119-
sent_starts.append(token.is_sent_start)
120-
words.append(token.text)
121-
sent_doc = Doc(nlp.vocab, words, sent_starts=sent_starts)
122-
for pos, token in enumerate(sent):
123-
sent_doc[pos]._.ext = token._.ext
124-
sentence_group.append(sent_doc)
125-
if sentence_group:
126-
docs.append(Doc.from_docs(sentence_group))
127-
return docs
128-
129-
13041
@dataclass(slots=True)
13142
class PreparedDoc:
13243
"""Prepared doc for conversion to Spacy Doc object"""
@@ -184,7 +95,15 @@ def __init__(
18495
"ents_to_keep": ents_to_keep or [],
18596
}
18697
self.language = language
187-
self.language_model = language_model
98+
self.nlp, using_gpu = load_language_model(language_model, self.normalize_options)
99+
self.using_gpu = using_gpu
100+
if workers is None:
101+
cpu_count = os.cpu_count() or 2
102+
self.workers = cpu_count - 1
103+
else:
104+
self.workers = workers
105+
if self.using_gpu is True:
106+
self.workers = 1
188107
ngrams = ngrams or 0
189108
if ngrams:
190109
self.ngram_config = {
@@ -195,62 +114,22 @@ def __init__(
195114
else:
196115
self.ngram_config = None
197116
self.post_func = post_processing_function
198-
if workers is None:
199-
cpu_count = os.cpu_count() or 2
200-
self.workers = cpu_count - 1
201-
else:
202-
self.workers = workers
117+
self.text_fetcher = TextFetcher(
118+
self.nlp,
119+
word_regex=word_regex,
120+
sentence_boundaries=sentence_boundaries,
121+
language=language,
122+
modernize=modernize,
123+
strip_tags=strip_tags,
124+
is_philo_db=is_philo_db,
125+
text_object_type=text_object_type,
126+
ngram_config=self.ngram_config,
127+
workers=self.workers,
128+
)
203129
if self.normalize_options["pos_to_keep"] or self.normalize_options["ents_to_keep"] or lemmatizer == "spacy":
204130
self.do_nlp = True
205131
else:
206132
self.do_nlp = False
207-
if self.do_nlp is False:
208-
using_gpu = False
209-
else:
210-
using_gpu = spacy.prefer_gpu()
211-
if using_gpu is True:
212-
self.workers = 1
213-
self.text_fetcher_args = {
214-
"word_regex": word_regex,
215-
"sentence_boundaries": sentence_boundaries,
216-
"language": language,
217-
"modernize": modernize,
218-
"strip_tags": strip_tags,
219-
"is_philo_db": is_philo_db,
220-
"text_object_type": text_object_type,
221-
"workers": self.workers,
222-
"ngram_config": self.ngram_config,
223-
"is_string": False
224-
}
225-
226-
def __process_batch(self, batch, keep_all, progress_info):
227-
queue = mp.Queue()
228-
process = mp.Process(
229-
target=process_batch_texts,
230-
args=(
231-
queue,
232-
self.text_fetcher_args,
233-
batch,
234-
self.language_model,
235-
self.normalize_options,
236-
self.do_nlp,
237-
keep_all,
238-
progress_info,
239-
),
240-
)
241-
process.start()
242-
243-
while True:
244-
tokens = queue.get() # This blocks until data is available
245-
if tokens is None: # End signal
246-
break
247-
if self.ngram_config is not None:
248-
tokens = generate_ngrams(**self.ngram_config, tokens=tokens)
249-
if self.post_func is not None:
250-
tokens = self.post_func(tokens)
251-
yield tokens
252-
253-
process.join()
254133

255134
def process_texts(
256135
self,
@@ -260,32 +139,82 @@ def process_texts(
260139
progress_prefix="Processing texts...",
261140
) -> Iterable[Tokens]:
262141
"""Process all documents. Returns an iterator of documents"""
263-
progress_info = {"count": 0, "doc_count": 0, "progress": progress, "progress_prefix": progress_prefix}
264-
current_batch = []
265-
if progress is True:
266-
if self.text_fetcher_args["text_object_type"] == "doc":
267-
print(f"\r{progress_prefix} 0 documents processed...", end="", flush=True)
142+
count = 0
143+
fetched_texts = self.text_fetcher(
144+
texts, do_nlp=self.do_nlp, keep_all=keep_all, progress=progress, post_func=self.post_func
145+
)
146+
if self.text_fetcher.text_object_type == "sent" and self.do_nlp is True:
147+
fetched_texts = self.nlp.pipe(
148+
((make_spacy_doc(self.nlp, tokens), c) for tokens, c in fetched_texts),
149+
as_tuples=True,
150+
batch_size=250,
151+
)
152+
for tokens, doc_count in fetched_texts:
153+
count += 1
154+
if progress is True:
155+
if doc_count is not None: # workaround for sent and para since nlp.pipe does not return context...
156+
print(
157+
f"\r{progress_prefix} {doc_count} done: {count} text objects extracted... ",
158+
end="",
159+
flush=True,
160+
)
161+
else:
162+
print(
163+
f"\r{progress_prefix} {count} text objects extracted... ",
164+
end="",
165+
flush=True,
166+
)
167+
if isinstance(tokens, PreparedDoc):
168+
spacy_doc = make_spacy_doc(self.nlp, tokens)
169+
if spacy_doc._.char_num > 100000 and self.using_gpu is True: # being conservative to preserve GPU RAM
170+
split_doc = self.__split_spacy_docs(spacy_doc)
171+
rebuilt_doc = Doc.from_docs(list(self.nlp.pipe(split_doc, batch_size=128)))
172+
rebuilt_doc._.metadata = spacy_doc._.metadata
173+
tokens = Tokens(rebuilt_doc, keep_all=keep_all)
174+
else:
175+
tokens = Tokens(self.nlp(spacy_doc), keep_all=keep_all)
176+
if self.ngram_config is not None:
177+
tokens = generate_ngrams(**self.ngram_config, tokens=tokens)
178+
if self.post_func is not None:
179+
tokens = self.post_func(tokens)
180+
yield tokens
181+
elif isinstance(tokens, Doc):
182+
tokens = Tokens(tokens, keep_all=keep_all)
183+
if self.ngram_config is not None:
184+
tokens = generate_ngrams(**self.ngram_config, tokens=tokens)
185+
if self.post_func is not None:
186+
tokens = self.post_func(tokens)
187+
yield tokens
268188
else:
269-
print(f"\r{progress_prefix} 0 text chunks of 0 documents processed...", end="", flush=True)
270-
for text in texts:
271-
current_batch.append(text)
272-
if len(current_batch) >= 100:
273-
yield from self.__process_batch(current_batch, keep_all, progress_info)
274-
progress_info["count"] += 1
275-
current_batch = []
276-
progress_info["doc_count"] += 100
277-
278-
# Process the remaining texts
279-
if current_batch:
280-
yield from self.__process_batch(current_batch, keep_all, progress_info)
189+
yield tokens
281190

282191
def process_string(self, text: str, keep_all: bool = True) -> Tokens:
283192
"""Take a string and return a list of preprocessed tokens"""
284-
progress_info = {"count": 0, "doc_count": 0, "progress": False, "progress_prefix": ""}
285-
self.text_fetcher_args["is_philo_db"] = False # Ensure string processing does not expect PhiloLogic format
286-
self.text_fetcher_args["is_string"] = True # Ensure string processing
287-
result = self.__process_batch([text], keep_all, progress_info)
288-
return next(result)
193+
doc = self.text_fetcher.process_string(text)
194+
processed_doc = self.nlp(doc)
195+
return Tokens(processed_doc, keep_all=keep_all)
196+
197+
def __split_spacy_docs(self, doc: Doc) -> list[Doc]:
198+
"""Split spacy doc into smaller docs of 10 sentences"""
199+
sentence_group: list[Doc] = []
200+
docs: list[Doc] = []
201+
for sent in doc.sents:
202+
if len(sentence_group) == 10:
203+
docs.append(Doc.from_docs(sentence_group))
204+
sentence_group = []
205+
else:
206+
sent_starts = []
207+
words = []
208+
for token in sent:
209+
sent_starts.append(token.is_sent_start)
210+
words.append(token.text)
211+
sent_doc = Doc(self.nlp.vocab, words, sent_starts=sent_starts)
212+
for pos, token in enumerate(sent):
213+
sent_doc[pos]._.ext = token._.ext
214+
sentence_group.append(sent_doc)
215+
if sentence_group:
216+
docs.append(Doc.from_docs(sentence_group))
217+
return docs
289218

290219

291220
class TextFetcher:
@@ -315,7 +244,6 @@ def __init__(
315244
text_object_type="doc",
316245
ngram_config=None,
317246
workers=None,
318-
is_string=False,
319247
**_, # this is meant to make the constructor accept invalid keywords
320248
):
321249
cls.language = language
@@ -335,7 +263,6 @@ def __init__(
335263
else:
336264
cls.workers = workers
337265
cls.ngram_config = ngram_config
338-
cls.is_string = is_string
339266

340267
@classmethod
341268
def __call__(
@@ -351,21 +278,27 @@ def __call__(
351278
if progress is True:
352279
print("Extractings texts...", end="", flush=True)
353280

354-
with Pool(cls.workers) as pool:
355-
for processed_docs in pool.imap_unordered(
281+
if cls.workers == 1:
282+
for processed_docs in map(
356283
cls.__local_process, ((text, do_nlp, keep_all, post_func) for text in texts)
357284
):
358285
doc_count += 1
359286
for doc in processed_docs:
360287
yield doc, doc_count
288+
else:
289+
with Pool(cls.workers) as pool:
290+
for processed_docs in pool.imap_unordered(
291+
cls.__local_process, ((text, do_nlp, keep_all, post_func) for text in texts)
292+
):
293+
doc_count += 1
294+
for doc in processed_docs:
295+
yield doc, doc_count
361296

362297
@classmethod
363298
def __local_process(cls, args) -> Iterable[PreparedDoc | Tokens]:
364299
text, do_nlp, keep_all, post_func = args
365300
if cls.is_philo_db is True:
366301
text_objects, sent_starts_list, metadata = cls.process_philo_text(text)
367-
elif cls.is_string is True:
368-
text_objects, sent_starts_list, metadata = cls.process_string(text)
369302
else:
370303
text_objects, sent_starts_list, metadata = cls.process_text(text)
371304
docs = cls.__prepare_docs(text_objects, sent_starts_list, metadata)
@@ -404,14 +337,16 @@ def process_text(cls, text: str):
404337
doc: str = input_text.read()
405338
tokens, sent_starts = cls.tokenize_text(doc)
406339
metadata: dict[str, Any] = {"filename": text}
407-
return [tokens], [sent_starts], [metadata]
340+
return tokens, sent_starts, metadata
408341

409342
@classmethod
410-
def process_string(cls, text: str):
343+
def process_string(cls, text: str) -> Doc:
411344
"""Process one string. Return the transformed document"""
412345
tokens, sent_starts = cls.tokenize_text(text)
413-
metadata: dict[str, Any] = {"filename": text}
414-
return [tokens], [sent_starts], [metadata]
346+
doc = Doc(cls.model.vocab, [word for word, _ in tokens], sent_starts=sent_starts) # type: ignore
347+
for pos, (_, ext) in enumerate(tokens):
348+
doc[pos]._.ext = ext
349+
return doc
415350

416351
@classmethod
417352
def tokenize_text(cls, doc: str) -> tuple[list[tuple[str, dict[str, str]]], list[bool]]:

0 commit comments

Comments
 (0)