Skip to content

Commit 0e0a3b3

Browse files
committed
Standardize pipeline factory constructor + related changes.
* Run feeds hooks in PipelineBase. * Make Pipeline entirely responsible for its use of parallel map().
1 parent e9a1fe0 commit 0e0a3b3

5 files changed

Lines changed: 77 additions & 65 deletions

File tree

src/reader/_update/__init__.py

Lines changed: 21 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
from itertools import chain
1111
from typing import Any
1212
from typing import NamedTuple
13-
from typing import TYPE_CHECKING
1413

1514
from .._parser import EntryPair
1615
from .._parser import EntryPairBase
@@ -23,16 +22,13 @@
2322
from .._types import FeedForUpdate
2423
from .._types import FeedToUpdate
2524
from .._types import FeedUpdateIntent
26-
from .._utils import MapFunction
25+
from .._utils import make_pool_map
2726
from .._utils import PrefixLogger
2827
from ..exceptions import ParseError
2928
from ..types import ExceptionInfo
3029
from ..types import UpdateConfig
3130
from .base import PipelineBase
3231

33-
if TYPE_CHECKING: # pragma: no cover
34-
from ..core import Reader
35-
3632
log = logging.getLogger("reader")
3733

3834
HASH_CHANGED_LIMIT = 24
@@ -53,6 +49,19 @@ class Decider:
5349

5450
old_feed: FeedForUpdate
5551
now: datetime
52+
53+
# global now, is used as first_updated_epoch for all new entries,
54+
# so that the subset of new entries from an update appears before
55+
# all others and the entries in it are sorted by published/updated;
56+
# if we used last_updated (now) for this, they would be sorted
57+
# by feed order first (due to now increasing for each feed).
58+
#
59+
# A side effect of relying first_updated_epoch for ordering is that
60+
# for the second of two new feeds updated in the same update_feeds()
61+
# call, first_updated_epoch != last_updated.
62+
#
63+
# However, added == last_updated for the first update.
64+
#
5665
global_now: datetime
5766
config: UpdateConfig
5867
log: Any = log
@@ -325,7 +334,7 @@ def next_update_after(now: datetime, interval: int, jitter: float = 0) -> dateti
325334
return rv
326335

327336

328-
@dataclass(frozen=True)
337+
@dataclass
329338
class Pipeline(PipelineBase[FeedData, EntryData, FeedUpdateIntent, EntryUpdateIntent]):
330339
"""Update multiple feeds.
331340
@@ -349,23 +358,6 @@ class Pipeline(PipelineBase[FeedData, EntryData, FeedUpdateIntent, EntryUpdateIn
349358
350359
"""
351360

352-
reader: Reader
353-
354-
# global now, is used as first_updated_epoch for all new entries,
355-
# so that the subset of new entries from an update appears before
356-
# all others and the entries in it are sorted by published/updated;
357-
# if we used last_updated (now) for this, they would be sorted
358-
# by feed order first (due to now increasing for each feed).
359-
#
360-
# A side effect of relying first_updated_epoch for ordering is that
361-
# for the second of two new feeds updated in the same update_feeds()
362-
# call, first_updated_epoch != last_updated.
363-
#
364-
# However, added == last_updated for the first update.
365-
#
366-
global_now: datetime
367-
368-
map: MapFunction[Any, Any]
369361
decider = Decider
370362

371363
def parse_feeds(self, feeds: Iterable[FeedForUpdate]) -> Iterable[ParseResult]:
@@ -388,10 +380,11 @@ def parser_process_feeds_for_update(
388380
except ParseError as e:
389381
parse_errors.append(ParseResult(feed, e))
390382

391-
feeds = parser_process_feeds_for_update(feeds)
392-
feeds = map(self.decider.process_feed_for_update, feeds)
393-
parse_results = self.reader._parser.parallel(feeds, self.map)
394-
return chain(parse_results, parse_errors)
383+
with make_pool_map(self.workers) as parallel_map:
384+
feeds = parser_process_feeds_for_update(feeds)
385+
feeds = map(self.decider.process_feed_for_update, feeds)
386+
parse_results = self.reader._parser.parallel(feeds, parallel_map)
387+
yield from chain(parse_results, parse_errors)
395388

396389
def make_intents(
397390
self, result: ParseResult, entries: Iterable[EntryPair]
@@ -411,7 +404,7 @@ def make_intents(
411404

412405
return self.decider.make_intents(
413406
self.reader._now(),
414-
self.global_now,
407+
self.now,
415408
config,
416409
result,
417410
entries,

src/reader/_update/base.py

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,13 @@
33
import logging
44
from abc import ABC
55
from abc import abstractmethod
6+
from collections.abc import Callable
67
from collections.abc import Iterable
78
from collections.abc import Iterator
89
from contextlib import contextmanager
10+
from dataclasses import dataclass
11+
from datetime import datetime
12+
from typing import Any # noqa: F401
913
from typing import Generic
1014
from typing import TYPE_CHECKING
1115
from typing import TypeVar
@@ -28,6 +32,11 @@
2832
log = logging.getLogger("reader")
2933

3034

35+
PipelineFactory = Callable[
36+
['Reader', datetime, int, bool], 'PipelineBase[Any, Any, Any, Any]'
37+
]
38+
39+
3140
FD = TypeVar('FD')
3241
ED = TypeVar('ED')
3342
FI = TypeVar('FI')
@@ -37,6 +46,7 @@
3746
EntryPair = EntryPairBase[ED]
3847

3948

49+
@dataclass
4050
class PipelineBase(Generic[FD, ED, FI, EI], ABC):
4151
"""Run through high level update phases and call update hooks.
4252
@@ -46,6 +56,9 @@ class PipelineBase(Generic[FD, ED, FI, EI], ABC):
4656
"""
4757

4858
reader: Reader
59+
now: datetime
60+
workers: int
61+
call_feeds_hooks: bool
4962

5063
@abstractmethod
5164
def parse_feeds(
@@ -78,6 +91,10 @@ def get_entry_data(self, intent: EI) -> EntryData:
7891
"""Transform an entry update intent into entry data (for plugins)."""
7992

8093
def update(self, filter: FeedFilter) -> Iterable[UpdateResult]:
94+
95+
if self.call_feeds_hooks:
96+
self.reader._update_hooks.run('before_feeds_update', None)
97+
8198
feeds = self.reader._storage.get_feeds_for_update(filter)
8299
parse_results = self.parse_feeds(feeds)
83100
update_results = map(self.process_result, parse_results)
@@ -93,6 +110,12 @@ def update(self, filter: FeedFilter) -> Iterable[UpdateResult]:
93110

94111
yield UpdateResult(url, value)
95112

113+
if self.call_feeds_hooks:
114+
with self.reader._update_hooks.group(
115+
"got unexpected after-update hook errors"
116+
) as hook_errors:
117+
hook_errors.run('after_feeds_update', None)
118+
96119
def process_result(
97120
self, result: ParseResult[FD, ED]
98121
) -> tuple[str, UpdatedFeed | None | Exception]:
@@ -103,7 +126,7 @@ def process_result(
103126
feed_intent, entry_intents = self.make_intents(result, entry_pairs)
104127
entry_intents = list(entry_intents)
105128

106-
with self.run_hooks(result.feed.url, entry_intents):
129+
with self.run_feed_hooks(result.feed.url, entry_intents):
107130
self.store_feed(feed_intent, [new for new, _ in entry_intents])
108131

109132
except Exception as e:
@@ -129,7 +152,9 @@ def get_entry_pairs(self, result: ParseResult[FD, ED]) -> Iterable[EntryPair[ED]
129152
return zip(result.value.entries, entries_for_update, strict=True)
130153

131154
@contextmanager
132-
def run_hooks(self, feed: str, entries: Iterable[EntryPair[EI]]) -> Iterator[None]:
155+
def run_feed_hooks(
156+
self, feed: str, entries: Iterable[EntryPair[EI]]
157+
) -> Iterator[None]:
133158
hooks = self.reader._update_hooks
134159

135160
hooks.run('before_feed_update', (feed,), feed)

src/reader/_utils.py

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,9 @@
1111
from collections.abc import Iterable
1212
from collections.abc import Iterator
1313
from collections.abc import Sequence
14-
from contextlib import AbstractContextManager
14+
from contextlib import AbstractContextManager as CM
1515
from contextlib import contextmanager
16+
from contextlib import nullcontext
1617
from functools import wraps
1718
from typing import Any
1819
from typing import cast
@@ -79,14 +80,25 @@ def eager_iterable(it: Iterable[_T]) -> Iterable[_T]:
7980
return it
8081

8182

83+
# if we substitute MapFunction below, mypy complains
84+
# https://github.com/python/mypy/issues/17551
8285
MapFunction = Callable[[Callable[[_T], _U], Iterable[_T]], Iterator[_U]]
83-
MapContextManager = AbstractContextManager[MapFunction[_T, _U]]
8486

85-
# type ignore because of https://github.com/python/mypy/issues/17551
8687

88+
def make_pool_map(
89+
workers: int,
90+
) -> CM[Callable[[Callable[[_T], _U], Iterable[_T]], Iterator[_U]]]:
91+
if workers < 1:
92+
raise ValueError("workers must be a positive integer")
93+
if workers == 1:
94+
return nullcontext(map)
95+
return _make_pool_map(workers)
8796

88-
@contextmanager # type: ignore[arg-type]
89-
def make_pool_map(workers: int) -> Iterator[MapFunction[_T, _U]]:
97+
98+
@contextmanager
99+
def _make_pool_map(
100+
workers: int,
101+
) -> Iterator[Callable[[Callable[[_T], _U], Iterable[_T]], Iterator[_U]]]:
90102
# We are using concurrent.futures instead of multiprocessing.dummy
91103
# because the latter doesn't work on some environments (e.g. AWS Lambda).
92104
# We are not using executor.map() because it consumes the entire iterable.

src/reader/core.py

Lines changed: 6 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,12 @@
11
from __future__ import annotations
22

3-
import builtins
43
import logging
54
import numbers
65
import warnings
76
from collections.abc import Callable
87
from collections.abc import Iterable
98
from collections.abc import Mapping
109
from collections.abc import MutableSequence
11-
from contextlib import nullcontext
1210
from datetime import datetime
1311
from datetime import timezone
1412
from types import MappingProxyType
@@ -37,8 +35,6 @@
3735
from ._types import UpdateHooks
3836
from ._update import Pipeline
3937
from ._utils import eager_iterable
40-
from ._utils import make_pool_map
41-
from ._utils import MapContextManager
4238
from ._utils import zero_or_one
4339
from .exceptions import EntryNotFoundError
4440
from .exceptions import FeedError
@@ -82,6 +78,7 @@
8278

8379
if TYPE_CHECKING: # pragma: no cover
8480
from ._parser import Parser
81+
from ._update.base import PipelineFactory
8582

8683

8784
log = logging.getLogger('reader')
@@ -388,6 +385,7 @@ def __init__(
388385

389386
self._reserved_name_scheme = _reserved_name_scheme
390387
self._enable_search = _enable_search
388+
self._make_pipeline: PipelineFactory = Pipeline
391389
self._update_hooks = UpdateHooks(self)
392390

393391
#: Override update_feeds(scheduled=...).
@@ -930,7 +928,7 @@ def update_feeds_iter(
930928
new: bool | None = None,
931929
scheduled: bool = True,
932930
workers: int = 1,
933-
_call_feeds_update_hooks: bool = True,
931+
_call_feeds_hooks: bool = True,
934932
) -> Iterable[UpdateResult]:
935933
r"""Update all or some of the feeds.
936934
@@ -1035,24 +1033,8 @@ def update_feeds_iter(
10351033
now, feed, tags, broken, updates_enabled, new, scheduled
10361034
)
10371035

1038-
if workers < 1:
1039-
raise ValueError("workers must be a positive integer")
1040-
1041-
make_map: MapContextManager[Any, Any] = (
1042-
nullcontext(builtins.map) if workers == 1 else make_pool_map(workers)
1043-
)
1044-
1045-
if _call_feeds_update_hooks:
1046-
self._update_hooks.run('before_feeds_update', None)
1047-
1048-
with make_map as map:
1049-
yield from Pipeline(self, now, map).update(filter)
1050-
1051-
if _call_feeds_update_hooks:
1052-
with self._update_hooks.group(
1053-
"got unexpected after-update hook errors"
1054-
) as hook_errors:
1055-
hook_errors.run('after_feeds_update', None)
1036+
pipeline = self._make_pipeline(self, now, workers, _call_feeds_hooks)
1037+
yield from pipeline.update(filter)
10561038

10571039
def update_feed(self, feed: FeedInput, /) -> UpdatedFeed | None:
10581040
r"""Update a single feed.
@@ -1102,7 +1084,7 @@ def update_feed(self, feed: FeedInput, /) -> UpdatedFeed | None:
11021084
feed=feed,
11031085
updates_enabled=None,
11041086
scheduled=False,
1105-
_call_feeds_update_hooks=False,
1087+
_call_feeds_hooks=False,
11061088
),
11071089
lambda: FeedNotFoundError(_feed_argument(feed)),
11081090
)

tests/test_reader_update.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -696,14 +696,14 @@ def target():
696696

697697
from reader._update import Pipeline
698698

699-
def update_feed(*args, **kwargs):
699+
def store_feed(*args, **kwargs):
700700
monkeypatch.undo()
701701
block()
702702
assert reader.get_entry(entry).title == 'two'
703-
return Pipeline.update_feed(*args, **kwargs)
703+
return Pipeline.store_feed(*args, **kwargs)
704704

705705
# TODO: this would have been easier if Pipeline were a reader attribute
706-
monkeypatch.setattr(Pipeline, 'update_feed', update_feed)
706+
monkeypatch.setattr(Pipeline, 'store_feed', store_feed)
707707

708708
t = threading.Thread(target=target)
709709
t.start()
@@ -746,13 +746,13 @@ def target():
746746

747747
from reader._update import Pipeline
748748

749-
def update_feed(*args, **kwargs):
749+
def store_feed(*args, **kwargs):
750750
monkeypatch.undo()
751751
block()
752-
return Pipeline.update_feed(*args, **kwargs)
752+
return Pipeline.store_feed(*args, **kwargs)
753753

754754
# TODO: this would have been easier if Pipeline were a reader attribute
755-
monkeypatch.setattr(Pipeline, 'update_feed', update_feed)
755+
monkeypatch.setattr(Pipeline, 'store_feed', store_feed)
756756

757757
before_entry = reader.get_entry(entry)
758758
(before_efu,) = reader._storage.get_entries_for_update([entry.resource_id])

0 commit comments

Comments
 (0)