Python Pipes
I’ve always wanted to have a way to build data processing pipelines in Python
using pipes, like this range(10) | F(is_odd) | P(lambda x: x * 2)
, instead of functions and generators and maps and loops.
So I’ve tried …
The idea is pretty simple: let’s create a class with implemented OR
and ROR
operators, the pipes.
def __or__(self, other):
other.source = self
return other
def __ror__(self, other):
self.source = (
iter(other)
if not isinstance(other, (str, bytes)) and hasattr(other, "__iter__")
else other
)
return self
The tricky part was implementation of __next__
since I wanted it to be a lazy operation. After a few trials and errors I’ve ended up with a pretty simple
approach where the wrapping class implementing the pipe will call next
to its
source, added by OR
or ROR
, apply a transformation and then return
the result of the transformation.
def __next__(self):
if self.source is None:
raise StopIteration
value = next(self.source)
result = self.operator(value)
return result
It did the trick for standard transformations but not for the filters like is_odd
since those skips some data rather than return it like a transformation.
To filters easily addable I’ve implemented a wrapper that works similar to the pipe class but applies a filter to the data going through it.
def __next__(self):
while True:
if self.source is None:
raise StopIteration
value = next(self.source)
if self.predicate(value):
return value
The resulting code allowed me to do things like this:
# Example usage with filtering
pipe = range(10) | P(lambda x: x + 3)
print("range(10) | P(lambda x: x + 3):", list(pipe))
pipe = range(10) | P(lambda x: x + 3) | P(lambda x: x * 2)
print("range(10) | P(lambda x: x + 3) | P(lambda x: x * 2):", list(pipe))
pipe = range(10) | F(is_odd)
print("range(10) | F(is_odd):", list(pipe))
pipe = range(10) | F(is_odd) | P(lambda x: x * 2)
print("range(10) | F(is_odd) | P(lambda x: x * 2):", list(pipe))
pipe = list(range(10)) | F(is_odd) | P(lambda x: x * 2)
print("list(range(10)) | F(is_odd) | P(lambda x: x * 2):", list(pipe))
def gen_fn():
for i in range(10):
yield i
pipe = gen_fn() | F(is_odd) | P(lambda x: x * 2)
print("gen_fn() | F(is_odd) | P(lambda x: x * 2):", list(pipe))
Results:
range(10) | P(lambda x: x + 3): [3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
range(10) | P(lambda x: x + 3) | P(lambda x: x * 2): [6, 8, 10, 12, 14, 16, 18, 20, 22, 24]
range(10) | F(is_odd): [1, 3, 5, 7, 9]
range(10) | F(is_odd) | P(lambda x: x * 2): [2, 6, 10, 14, 18]
list(range(10)) | F(is_odd) | P(lambda x: x * 2): [2, 6, 10, 14, 18]
gen_fn() | F(is_odd) | P(lambda x: x * 2): [2, 6, 10, 14, 18]
Moving from this:
result = [x * 2 for x in range(10) if is_odd(x)]
to this:
result = list(range(10)) | F(is_odd) | P(lambda x: x * 2))
This may not be seen as a big deal but if you try to implement something like
range(10) | F(f1) | P(op1) | P(op2) | P(op3) | F(f2)
and make it work in a lazy evaluation fashion, you will discovered rather quickly that
it is not that easy at all or intuitive. There are ways like Queue
but they require
a lot of boilerplate code.
The complete code can be found here: pipe.py