Python Pipes

I’ve always wanted to have a way to build data processing pipelines in Python using pipes, like this range(10) | F(is_odd) | P(lambda x: x * 2), instead of functions and generators and maps and loops. So I’ve tried …

The idea is pretty simple: let’s create a class with implemented OR and ROR operators, the pipes.

    def __or__(self, other):
        other.source = self
        return other

    def __ror__(self, other):
        self.source = (
            iter(other)
            if not isinstance(other, (str, bytes)) and hasattr(other, "__iter__")
            else other
        )
        return self

The tricky part was implementation of __next__ since I wanted it to be a lazy operation. After a few trials and errors I’ve ended up with a pretty simple approach where the wrapping class implementing the pipe will call next to its source, added by OR or ROR, apply a transformation and then return the result of the transformation.

    def __next__(self):
        if self.source is None:
            raise StopIteration
        value = next(self.source)
        result = self.operator(value)
        return result

It did the trick for standard transformations but not for the filters like is_odd since those skips some data rather than return it like a transformation. To filters easily addable I’ve implemented a wrapper that works similar to the pipe class but applies a filter to the data going through it.

    def __next__(self):
        while True:
            if self.source is None:
                raise StopIteration
            value = next(self.source)
            if self.predicate(value):
                return value

The resulting code allowed me to do things like this:

# Example usage with filtering
pipe = range(10) | P(lambda x: x + 3)
print("range(10) | P(lambda x: x + 3):", list(pipe))

pipe = range(10) | P(lambda x: x + 3) | P(lambda x: x * 2)
print("range(10) | P(lambda x: x + 3) | P(lambda x: x * 2):", list(pipe))

pipe = range(10) | F(is_odd)
print("range(10) | F(is_odd):", list(pipe))

pipe = range(10) | F(is_odd) | P(lambda x: x * 2)
print("range(10) | F(is_odd) | P(lambda x: x * 2):", list(pipe))

pipe = list(range(10)) | F(is_odd) | P(lambda x: x * 2)
print("list(range(10)) | F(is_odd) | P(lambda x: x * 2):", list(pipe))

def gen_fn():
    for i in range(10):
        yield i

pipe = gen_fn() | F(is_odd) | P(lambda x: x * 2)
print("gen_fn() | F(is_odd) | P(lambda x: x * 2):", list(pipe))

Results:

range(10) | P(lambda x: x + 3): [3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
range(10) | P(lambda x: x + 3) | P(lambda x: x * 2): [6, 8, 10, 12, 14, 16, 18, 20, 22, 24]
range(10) | F(is_odd): [1, 3, 5, 7, 9]
range(10) | F(is_odd) | P(lambda x: x * 2): [2, 6, 10, 14, 18]
list(range(10)) | F(is_odd) | P(lambda x: x * 2): [2, 6, 10, 14, 18]
gen_fn() | F(is_odd) | P(lambda x: x * 2): [2, 6, 10, 14, 18]

Moving from this:

result = [x * 2 for x in range(10) if is_odd(x)]

to this:

result = list(range(10)) | F(is_odd) | P(lambda x: x * 2))

This may not be seen as a big deal but if you try to implement something like

range(10) | F(f1) | P(op1) | P(op2) | P(op3) | F(f2)

and make it work in a lazy evaluation fashion, you will discovered rather quickly that it is not that easy at all or intuitive. There are ways like Queue but they require a lot of boilerplate code.

The complete code can be found here: pipe.py