Skip to content

Commit 8660a27

Browse files
authored
Merge pull request #89 from NuclearMissile/parallel_bug
Parallel bug - the behavior of parallel() and Stream.parallel_of() not same.
2 parents 30dc953 + 30d8a50 commit 8660a27

3 files changed

Lines changed: 16 additions & 0 deletions

File tree

pystreamapi/__stream_converter.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
# pylint: disable=protected-access
12
from pystreamapi._streams.__base_stream import BaseStream
23
from pystreamapi._streams.__parallel_stream import ParallelStream
34
from pystreamapi._streams.__sequential_stream import SequentialStream
@@ -16,15 +17,18 @@ def to_numeric_stream(stream: BaseStream) -> NumericBaseStream:
1617
stream.__class__ = SequentialNumericStream
1718
if isinstance(stream, ParallelStream):
1819
stream.__class__ = ParallelNumericStream
20+
stream._init_parallelizer()
1921
return stream
2022

2123
@staticmethod
2224
def to_parallel_stream(stream: BaseStream) -> ParallelStream:
2325
"""Converts a stream to a parallel stream."""
2426
if isinstance(stream, SequentialNumericStream):
2527
stream.__class__ = ParallelNumericStream
28+
stream._init_parallelizer()
2629
elif isinstance(stream, SequentialStream):
2730
stream.__class__ = ParallelStream
31+
stream._init_parallelizer()
2832
return stream
2933

3034
@staticmethod

pystreamapi/_streams/__parallel_stream.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ def __init__(self, source: Iterable[stream.K]):
2020
super().__init__(source)
2121
self._parallelizer = Parallelizer()
2222

23+
def _init_parallelizer(self):
24+
self._parallelizer = Parallelizer()
25+
2326
@terminal
2427
def all_match(self, predicate: Callable[[Any], bool]):
2528
return all(Parallel(n_jobs=-1, prefer="threads", handler=self)

tests/_streams/test_stream_converter.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
from unittest import TestCase
22

3+
from parameterized import parameterized
4+
35
from pystreamapi._streams.__parallel_stream import ParallelStream
46
from pystreamapi._streams.__sequential_stream import SequentialStream
57
from pystreamapi._streams.numeric.__parallel_numeric_stream import ParallelNumericStream
@@ -51,3 +53,10 @@ def test_convert_to_sequential_stream_parallel(self):
5153
def test_convert_to_sequential_stream_parallel_numeric(self):
5254
stream = ParallelNumericStream(["1", "2", "3"]).sequential()
5355
self.assertIsInstance(stream, SequentialNumericStream)
56+
57+
@parameterized.expand([("sequential stream", SequentialStream),
58+
("sequential numeric stream", SequentialNumericStream)])
59+
def test_convert_sequential_to_parallel_parallelizer_working(self, _, stream):
60+
res = []
61+
stream([1, 2, 3]).parallel().filter(lambda x: x > 1).for_each(res.append)
62+
self.assertEqual(res, [2, 3])

0 commit comments

Comments
 (0)