Skip to content

Commit b788cf2

Browse files
committed
Extract generic update pipeline base.
1 parent 1d4cfdc commit b788cf2

4 files changed

Lines changed: 159 additions & 136 deletions

File tree

src/reader/_parser/__init__.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -425,10 +425,7 @@ class ParsedFeedBase(NamedTuple, Generic[FD, ED]):
425425
caching_info: JSONType | None = None
426426

427427

428-
class EntryPairBase(NamedTuple, Generic[ED]):
429-
new: ED
430-
old: EntryForUpdate | None
431-
428+
EntryPairBase = tuple[ED, EntryForUpdate | None]
432429

433430
ParseResult = ParseResultBase[FeedForUpdate, FeedData, EntryData, ParseError]
434431
ParsedFeed = ParsedFeedBase[FeedData, EntryData]

src/reader/_types.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -411,10 +411,6 @@ class EntryUpdateIntent(NamedTuple):
411411
# (e.g. repeated updates on platforms with low-precision time,
412412
# like update_feeds_iter() tests on Windows on GitHub Actions)
413413

414-
#: Whether the entry is new.
415-
#: Used for hooks and UpdatedFeed counts, should not be used by storage.
416-
new: bool = True
417-
418414

419415
#: Like the ``tags`` argument of :meth:`.Reader.get_feeds`, except:
420416
#:

src/reader/_update.py

Lines changed: 158 additions & 113 deletions
Original file line numberDiff line numberDiff line change
@@ -2,28 +2,34 @@
22

33
import logging
44
import random
5+
from abc import ABC
6+
from abc import abstractmethod
57
from collections.abc import Iterable
8+
from collections.abc import Iterator
9+
from contextlib import contextmanager
610
from dataclasses import dataclass
711
from datetime import datetime
812
from datetime import timedelta
913
from datetime import timezone
10-
from functools import partial
1114
from itertools import chain
12-
from itertools import tee
1315
from typing import Any
16+
from typing import Generic
1417
from typing import NamedTuple
1518
from typing import TYPE_CHECKING
19+
from typing import TypeVar
1620

1721
from ._parser import EntryPair
22+
from ._parser import EntryPairBase
23+
from ._parser import ParsedFeedBase
1824
from ._parser import ParseResult
25+
from ._parser import ParseResultBase
1926
from ._types import EntryData
2027
from ._types import EntryForUpdate
2128
from ._types import EntryUpdateIntent
2229
from ._types import FeedData
2330
from ._types import FeedForUpdate
2431
from ._types import FeedToUpdate
2532
from ._types import FeedUpdateIntent
26-
from ._utils import count_consumed
2733
from ._utils import PrefixLogger
2834
from .exceptions import FeedNotFoundError
2935
from .exceptions import ParseError
@@ -41,6 +47,16 @@
4147
from .core import Reader
4248

4349

50+
FD = TypeVar('FD')
51+
ED = TypeVar('ED')
52+
FI = TypeVar('FI')
53+
EI = TypeVar('EI')
54+
55+
Result = ParseResultBase[FeedForUpdate, FD, ED, ParseError]
56+
Feed = ParsedFeedBase[FD, ED]
57+
Pair = EntryPairBase[ED]
58+
59+
4460
log = logging.getLogger("reader")
4561

4662
HASH_CHANGED_LIMIT = 24
@@ -88,8 +104,9 @@ def make_intents(
88104
config: UpdateConfig,
89105
result: ParseResult,
90106
entry_pairs: Iterable[EntryPair],
91-
) -> tuple[FeedUpdateIntent, Iterable[EntryUpdateIntent]]:
107+
) -> tuple[FeedUpdateIntent, Iterable[Pair[EntryUpdateIntent]]]:
92108
decider = cls(
109+
# TODO: old_feed is already present in result
93110
old_feed,
94111
now,
95112
global_now,
@@ -193,7 +210,7 @@ def debug(msg: str, *args: Any) -> None:
193210

194211
def get_entries_to_update(
195212
self, pairs: Iterable[EntryPair]
196-
) -> Iterable[EntryUpdateIntent]:
213+
) -> Iterable[Pair[EntryUpdateIntent]]:
197214
for feed_order, (new, old) in reversed(list(enumerate(pairs))):
198215
# This may fail if we ever implement changing the feed URL
199216
# in response to a permanent redirect.
@@ -220,26 +237,21 @@ def get_entries_to_update(
220237
recent_sort,
221238
feed_order,
222239
should_update.hash_changed,
223-
new=not old,
224-
)
240+
), old
225241

226242
def get_feed_to_update(
227-
self,
228-
parsed_feed: ParsedFeed,
229-
entries_to_update: bool,
243+
self, parsed_feed: ParsedFeed, entries_to_update: bool
230244
) -> FeedToUpdate | None:
231245
if self.should_update_feed(parsed_feed.feed, entries_to_update):
232246
return FeedToUpdate(parsed_feed.feed, self.now, parsed_feed.caching_info)
233247
return None
234248

235249
def update(
236-
self,
237-
result: ParseResult,
238-
entry_pairs: Iterable[EntryPair],
239-
) -> tuple[FeedUpdateIntent, Iterable[EntryUpdateIntent]]:
250+
self, result: ParseResult, entry_pairs: Iterable[EntryPair]
251+
) -> tuple[FeedUpdateIntent, Iterable[Pair[EntryUpdateIntent]]]:
240252

241253
# TODO: move entries_to_update in FeedToUpdate, maybe?
242-
entries_to_update: Iterable[EntryUpdateIntent] = ()
254+
entries_to_update: Iterable[Pair[EntryUpdateIntent]] = ()
243255
value: FeedToUpdate | None | ExceptionInfo
244256

245257
if not result.value:
@@ -336,8 +348,116 @@ def next_update_after(now: datetime, interval: int, jitter: float = 0) -> dateti
336348
return rv
337349

338350

351+
class PipelineBase(Generic[FD, ED, FI, EI], ABC):
352+
353+
reader: Reader
354+
355+
@abstractmethod
356+
def parse(
357+
self, feeds_for_update: Iterable[FeedForUpdate]
358+
) -> Iterable[Result[FD, ED]]:
359+
"""Retrieve and parse an iterable of feeds, possibly in parallel."""
360+
361+
@abstractmethod
362+
def make_intent(
363+
self, result: Result[FD, ED], entry_pairs: Iterable[Pair[ED]]
364+
) -> tuple[FI, Iterable[Pair[EI]]]:
365+
"""Transform a parse result into a feed update intent."""
366+
367+
@abstractmethod
368+
def update_feed(self, feed: FI, entries: Iterable[EI]) -> None:
369+
"""Save the update intents to storage."""
370+
371+
@abstractmethod
372+
def get_entry_id(self, entry: ED) -> tuple[str, str]:
373+
"""Return the entry id of an entry data."""
374+
375+
@abstractmethod
376+
def get_entry_data(self, intent: EI) -> EntryData:
377+
"""Transform an entry update intent into entry data (for plugins)."""
378+
379+
def update(self, filter: FeedFilter) -> Iterable[UpdateResult]:
380+
feeds_for_update = self.reader._storage.get_feeds_for_update(filter)
381+
parse_results = self.parse(feeds_for_update)
382+
update_results = map(self.process_parse_result, parse_results)
383+
384+
for url, value in update_results:
385+
if isinstance(value, FeedNotFoundError):
386+
log.info("update feed %r: feed removed during update", url)
387+
continue
388+
389+
if isinstance(value, Exception):
390+
if not isinstance(value, UpdateError):
391+
raise value
392+
393+
yield UpdateResult(url, value)
394+
395+
def process_parse_result(
396+
self, result: Result[FD, ED]
397+
) -> tuple[str, UpdatedFeed | None | Exception]:
398+
feed, value, *_ = result
399+
400+
try:
401+
entry_pairs = self.get_entry_pairs(result)
402+
feed_intent, entry_intents = self.make_intent(result, entry_pairs)
403+
entry_intents = list(entry_intents)
404+
405+
with self.run_hooks(result.feed.url, entry_intents):
406+
self.update_feed(feed_intent, [new for new, _ in entry_intents])
407+
408+
except Exception as e:
409+
return feed.url, e
410+
411+
if not value or isinstance(value, Exception):
412+
return feed.url, value
413+
414+
new = sum(1 for _, old in entry_intents if not old)
415+
416+
return feed.url, UpdatedFeed(
417+
feed.url,
418+
new=new,
419+
modified=len(entry_intents) - new,
420+
unmodified=len(value.entries) - len(entry_intents),
421+
)
422+
423+
def get_entry_pairs(self, result: Result[FD, ED]) -> Iterable[Pair[ED]]:
424+
if not result.value or isinstance(result.value, Exception):
425+
return []
426+
427+
ids = map(self.get_entry_id, result.value.entries)
428+
entries_for_update = self.reader._storage.get_entries_for_update(ids)
429+
return zip(result.value.entries, entries_for_update, strict=True)
430+
431+
@contextmanager
432+
def run_hooks(self, feed: str, entries: Iterable[Pair[EI]]) -> Iterator[None]:
433+
hooks = self.reader._update_hooks
434+
435+
hooks.run('before_feed_update', (feed,), feed)
436+
437+
yield
438+
439+
with hooks.group("got unexpected after-update hook errors") as hook_errors:
440+
for new, old in entries:
441+
if not old:
442+
entry_status = EntryUpdateStatus.NEW
443+
else:
444+
entry_status = EntryUpdateStatus.MODIFIED
445+
446+
entry = self.get_entry_data(new)
447+
448+
hook_errors.run(
449+
'after_entry_update',
450+
entry.resource_id,
451+
entry,
452+
entry_status,
453+
limit=5,
454+
)
455+
456+
hook_errors.run('after_feed_update', (feed,), feed)
457+
458+
339459
@dataclass(frozen=True)
340-
class Pipeline:
460+
class Pipeline(PipelineBase[FeedData, EntryData, FeedUpdateIntent, EntryUpdateIntent]):
341461
"""Update multiple feeds.
342462
343463
Calls dependencies and hooks in the right order, possibly in parallel.
@@ -379,12 +499,7 @@ class Pipeline:
379499
map: MapFunction[Any, Any]
380500
decider = Decider
381501

382-
def update(self, filter: FeedFilter) -> Iterable[UpdateResult]:
383-
config_key = self.reader.make_reader_reserved_name(CONFIG_KEY)
384-
config = flatten_config(self.reader.get_tag((), config_key, {}), DEFAULT_CONFIG)
385-
386-
process_parse_result = partial(self.process_parse_result, config)
387-
502+
def parse(self, feeds_for_update: Iterable[FeedForUpdate]) -> Iterable[ParseResult]:
388503
# ಠ_ಠ
389504
# The pipeline is not equipped to handle ParseErrors
390505
# as early as parser.process_feed_for_update().
@@ -404,115 +519,45 @@ def parser_process_feeds_for_update(
404519
except ParseError as e:
405520
parser_process_feeds_for_update_errors.append(ParseResult(feed, e))
406521

407-
# assemble pipeline
408-
feeds_for_update = self.reader._storage.get_feeds_for_update(filter)
409-
# feeds_for_update = map(self.parser.process_feed_for_update, feeds_for_update)
410522
feeds_for_update = parser_process_feeds_for_update(feeds_for_update)
411523
feeds_for_update = map(self.decider.process_feed_for_update, feeds_for_update)
412524
parse_results = self.reader._parser.parallel(feeds_for_update, self.map)
413-
parse_results = chain(parse_results, parser_process_feeds_for_update_errors)
414-
update_results = map(process_parse_result, parse_results)
415-
416-
for url, value in update_results:
417-
if isinstance(value, FeedNotFoundError):
418-
log.info("update feed %r: feed removed during update", url)
419-
continue
525+
return chain(parse_results, parser_process_feeds_for_update_errors)
420526

421-
if isinstance(value, Exception):
422-
if not isinstance(value, UpdateError):
423-
raise value
424-
425-
yield UpdateResult(url, value)
527+
def make_intent(
528+
self, result: ParseResult, entry_pairs: Iterable[EntryPair]
529+
) -> tuple[FeedUpdateIntent, Iterable[Pair[EntryUpdateIntent]]]:
530+
feed, value, *_ = result
426531

427-
def process_parse_result(
428-
self,
429-
config: UpdateConfig,
430-
result: ParseResult,
431-
) -> tuple[str, UpdatedFeed | None | Exception]:
432-
feed, value, _ = result
433-
434-
# TODO: don't duplicate code from update()
435-
# TODO: the feed tag value should come from get_feeds_for_update()
436532
config_key = self.reader.make_reader_reserved_name(CONFIG_KEY)
533+
# TODO: get global config only once
534+
config = flatten_config(self.reader.get_tag((), config_key, {}), DEFAULT_CONFIG)
535+
# TODO: the feed tag value should come from get_feeds_for_update()
437536
config = flatten_config(self.reader.get_tag(feed, config_key, {}), config)
438537

439-
make_intents = partial(
440-
self.decider.make_intents,
538+
if value and not isinstance(value, Exception):
539+
entry_pairs = self.reader._parser.process_entry_pairs(
540+
feed.url, value.mime_type, entry_pairs
541+
)
542+
543+
return self.decider.make_intents(
441544
feed,
442545
self.reader._now(),
443546
self.global_now,
444547
config,
445548
result,
549+
entry_pairs,
446550
)
447551

448-
try:
449-
# assemble pipeline
450-
if value and not isinstance(value, Exception):
451-
entry_pairs = self.get_entry_pairs(value)
452-
entry_pairs = self.reader._parser.process_entry_pairs(
453-
feed.url, value.mime_type, entry_pairs
454-
)
455-
entry_pairs, get_total_count = count_consumed(entry_pairs)
456-
else:
457-
entry_pairs = ()
458-
get_total_count = lambda: 0 # noqa: E731
459-
460-
intents = make_intents(entry_pairs)
461-
counts = self.update_feed(*intents)
462-
total = get_total_count()
463-
464-
except Exception as e:
465-
return feed.url, e
466-
467-
if not value or isinstance(value, Exception):
468-
return feed.url, value
469-
470-
return feed.url, UpdatedFeed(feed.url, *counts, total - sum(counts))
471-
472-
def get_entry_pairs(self, result: ParsedFeed) -> Iterable[EntryPair]:
473-
# give storage a chance to consume entries in a streaming fashion
474-
entries1, entries2 = tee(result.entries)
475-
entries_for_update = self.reader._storage.get_entries_for_update(
476-
(e.feed_url, e.id) for e in entries1
477-
)
478-
return map(EntryPair._make, zip(entries2, entries_for_update, strict=True))
479-
480552
def update_feed(
481-
self,
482-
feed: FeedUpdateIntent,
483-
entries: Iterable[EntryUpdateIntent],
484-
) -> tuple[int, int]:
485-
url = feed.url
486-
hooks = self.reader._update_hooks
487-
488-
hooks.run('before_feed_update', (url,), url)
489-
553+
self, feed: FeedUpdateIntent, entries: Iterable[EntryUpdateIntent]
554+
) -> None:
490555
if entries:
491556
self.reader._storage.add_or_update_entries(entries)
492557
self.reader._storage.update_feed(feed)
493558

494-
# if feed_for_update.url != parsed_feed.feed.url, the feed was redirected.
495-
# TODO: Maybe handle redirects somehow else (e.g. change URL if permanent).
496-
497-
with hooks.group("got unexpected after-update hook errors") as hook_errors:
498-
new_count = 0
499-
updated_count = 0
500-
for entry in entries:
501-
if entry.new:
502-
new_count += 1
503-
entry_status = EntryUpdateStatus.NEW
504-
else:
505-
updated_count += 1
506-
entry_status = EntryUpdateStatus.MODIFIED
507-
508-
hook_errors.run(
509-
'after_entry_update',
510-
entry.entry.resource_id,
511-
entry.entry,
512-
entry_status,
513-
limit=5,
514-
)
515-
516-
hook_errors.run('after_feed_update', (url,), url)
559+
def get_entry_id(self, entry: EntryData) -> tuple[str, str]:
560+
return entry.resource_id
517561

518-
return new_count, updated_count
562+
def get_entry_data(self, intent: EntryUpdateIntent) -> EntryData:
563+
return intent.entry

0 commit comments

Comments
 (0)