Skip to content

Commit 178b928

Browse files
authored
Merge pull request #40 from PickwickSoft/feature/add-error-handling
Feature/add error handling
2 parents f6078ba + d68feea commit 178b928

19 files changed

Lines changed: 699 additions & 100 deletions

README.md

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ Now you might be wondering why another library when there are already a few impl
2525
* It boasts high speed and efficiency.
2626
* The implementation achieves 100% test coverage.
2727
* It follows Pythonic principles, resulting in clean and readable code.
28-
* It adds some cool innovative features like conditions and an even more declarative look
28+
* It adds some cool innovative features such as conditions or error handling and an even more declarative look.
2929

3030
Let's take a look at a small example:
3131

@@ -102,6 +102,31 @@ Considering the above characteristics, a stream can be defined as follows:
102102

103103
Conditions provide a convenient means for performing logical operations within your Stream, such as using `filter()`, `take_while()`, `drop_while()`, and more. With PyStreamAPI, you have access to a staggering 111 diverse conditions that enable you to process various data types including strings, types, numbers, and dates. Additionally, PyStreamAPI offers a powerful combiner that allows you to effortlessly combine multiple conditions, facilitating the implementation of highly intricate pipelines.
104104

105+
## Error handling: Work with data that you don't know
106+
PyStreamAPI offers a powerful error handling mechanism that allows you to handle errors in a declarative manner. This is especially useful when working with data that you don't know.
107+
108+
PyStreamAPI offers three different error levels:
109+
- `ErrorLevel.RAISE`: This is the default error level. It will raise an exception if an error occurs.
110+
- `ErrorLevel.IGNORE`: This error level will ignore any errors that occur and won't inform you.
111+
- `ErrorLevel.WARN`: This error level will warn you about any errors that occur and logs them as a warning with default logger.
112+
113+
114+
This is how you can use them:
115+
116+
```python
117+
from pystreamapi import Stream, ErrorLevel
118+
119+
Stream.of([" ", '3', None, "2", 1, ""])
120+
.error_level(ErrorLevel.IGNORE)
121+
.map_to_int()
122+
.sorted()
123+
.for_each(print) # Output: 1 2 3
124+
```
125+
126+
The code above will ignore all errors that occur during mapping to int and will just skip the elements.
127+
128+
For more details on how to use error handling, please refer to the documentation.
129+
105130
## Get started: Installation
106131

107132
To start using PyStreamAPI just install the module with this command:

pystreamapi/__init__.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from pystreamapi.__stream import Stream
2+
from pystreamapi._streams.error.__levels import ErrorLevel
23

3-
__version__ = "0.2"
4-
__all__ = ["Stream"]
4+
__version__ = "0.3"
5+
__all__ = ["Stream", "ErrorLevel"]

pystreamapi/_itertools/__init__.py

Whitespace-only changes.

pystreamapi/_itertools/tools.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
# pylint: disable=protected-access
2+
from pystreamapi._streams.error.__error import ErrorHandler, _sentinel
3+
4+
5+
def dropwhile(predicate, iterable, handler: ErrorHandler=None):
6+
"""
7+
Drop items from the iterable while predicate(item) is true.
8+
Afterward, return every element until the iterable is exhausted.
9+
"""
10+
it = iter(iterable)
11+
for x in it:
12+
if handler is not None:
13+
res = handler._one(mapper=predicate, item=x)
14+
else:
15+
res = predicate(x)
16+
if not res and res is not _sentinel:
17+
yield x
18+
break
19+
yield from it
20+
21+
22+
_initial_missing = object()
23+
24+
25+
def reduce(function, sequence, initial=_initial_missing, handler: ErrorHandler=None):
26+
"""
27+
Apply a function of two arguments cumulatively to the items of a sequence
28+
or iterable, from left to right, to reduce the iterable to a single
29+
value. For example, reduce(lambda x, y: x+y, [1, 2, 3, 4, 5]) calculates
30+
((((1+2)+3)+4)+5). If initial is present, it is placed before the items
31+
of the iterable in the calculation, and serves as a default when the
32+
iterable is empty.
33+
"""
34+
it = iter(sequence)
35+
36+
if initial is _initial_missing:
37+
try:
38+
value = next(it)
39+
except StopIteration:
40+
raise TypeError(
41+
"reduce() of empty iterable with no initial value") from None
42+
else:
43+
value = initial
44+
45+
for element in it:
46+
if handler is not None:
47+
new_value = handler._one(mapper=lambda x, val=value: function(val, x), item=element)
48+
if new_value is not _sentinel:
49+
value = new_value
50+
else:
51+
value = function(value, element)
52+
53+
return value

pystreamapi/_parallel/fork_and_join.py

Lines changed: 33 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
1+
# pylint: disable=protected-access
12
import os
23

3-
from functools import reduce
4-
from typing import Callable, Any
4+
from typing import Callable, Any, Optional
55

6-
from joblib import Parallel, delayed
6+
from pystreamapi._parallel.parallelizer import Parallel, delayed
7+
from pystreamapi._streams.error.__error import ErrorHandler
8+
from pystreamapi._streams.error.__levels import ErrorLevel
9+
from pystreamapi._itertools.tools import reduce
710

811

912
class Parallelizer:
@@ -21,31 +24,44 @@ class Parallelizer:
2124

2225
def __init__(self):
2326
self.__src = None
27+
self.__handler: Optional[ErrorHandler] = None
2428

25-
def set_source(self, src: list):
26-
"""Set the source list
29+
def set_source(self, src: list, handler: ErrorHandler=None):
30+
"""
31+
Set the source list
32+
:param handler: The error handler to be used
2733
:param src: The source list
2834
"""
2935
self.__src = src
36+
self.__handler = handler
3037

3138
def filter(self, function):
3239
"""Parallel filter function"""
3340
parts = self.fork()
34-
result = self.__run_job_in_parallel(parts, self.__filter, function)
41+
if self.__handler is not None and self.__handler._get_error_level() != ErrorLevel.RAISE:
42+
result = self.__run_job_in_parallel(parts, self._filter_ignore_errors, function)
43+
else:
44+
result = self.__run_job_in_parallel(parts, self.__filter, function)
3545
return [item for sublist in result for item in sublist]
3646

47+
@staticmethod
48+
def __filter(function, src):
49+
"""Filter function used in the fork-and-join technology"""
50+
return [element for element in src if function(element)]
51+
52+
def _filter_ignore_errors(self, function, src):
53+
"""Filter function used in the fork-and-join technology using an error handler"""
54+
return [self.__handler._one(condition=function, item=element) for element in src]
55+
3756
def reduce(self, function: Callable[[Any, Any], Any]):
3857
"""Parallel reduce function using functools.reduce behind"""
3958
if len(self.__src) < 2:
4059
return self.__src
4160
parts = self.fork(min_nr_items=2)
42-
result = self.__run_job_in_parallel(parts, reduce, function)
43-
return reduce(function, result)
44-
45-
@staticmethod
46-
def __filter(function, src):
47-
"""Filter function used in the fork-and-join technology"""
48-
return [element for element in src if function(element)]
61+
result = self.__run_job_in_parallel(
62+
parts, lambda x, y: reduce(function=x, sequence=y, handler=self.__handler), function
63+
)
64+
return reduce(function, result, handler=self.__handler)
4965

5066
def fork(self, min_nr_items=1):
5167
"""
@@ -77,8 +93,8 @@ def __calculate_number_of_parts(self, min_nr_items=1):
7793
return round(len(self.__src) / min_nr_items)
7894
return os.cpu_count() - 2 if os.cpu_count() > 2 else os.cpu_count()
7995

80-
@staticmethod
81-
def __run_job_in_parallel(src, operation, op_function):
96+
def __run_job_in_parallel(self, src, operation, op_function):
8297
"""Run the operation in parallel"""
83-
return Parallel(n_jobs=-1, prefer="processes")(delayed(operation)(op_function, part)
84-
for part in src)
98+
return Parallel(n_jobs=-1, prefer="processes", handler=self.__handler)(
99+
delayed(operation)(op_function, part) for part in src
100+
)
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
from joblib import Parallel as _JoblibParallel, delayed # pylint: disable=unused-import
2+
3+
from pystreamapi._streams.error.__error import ErrorHandler
4+
from pystreamapi._streams.error.__levels import ErrorLevel
5+
6+
7+
class Parallel:
8+
"""Wrapper for joblib.Parallel supporting error handling"""
9+
10+
def __init__(self, n_jobs=-1, prefer="processes", handler: ErrorHandler=None):
11+
self.n_jobs = n_jobs
12+
self.prefer = prefer
13+
self.handler = handler
14+
15+
def __call__(self, iterable):
16+
"""Call joblib.Parallel with error handling"""
17+
res = _JoblibParallel(n_jobs=self.n_jobs, prefer=self.prefer)(iterable)
18+
if self.handler and self.handler._get_error_level() != ErrorLevel.RAISE:
19+
return ErrorHandler._remove_sentinel(res)
20+
return res

0 commit comments

Comments
 (0)