Skip to content
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
.DS_Store
.idea/
__pycache__/
1 change: 1 addition & 0 deletions python_streams/partials.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
W = TypeVar('W')
X = TypeVar('X')


def add(n: T) -> NumberToNumber:
return lambda x: x + n

Expand Down
138 changes: 125 additions & 13 deletions python_streams/streams.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
from inspect import signature
from functools import lru_cache, reduce
from itertools import islice, chain
from functools import reduce
from inspect import signature
from itertools import islice, chain, count
from types import BuiltinFunctionType
from typing import Iterable, Iterator, TypeVar, Callable, Tuple, Optional, Generic, List, Any, Union
from typing import Iterable, Iterator, TypeVar, Callable, Tuple, Optional, Generic, List, Union

from python_streams.partials import compose

T = TypeVar('T')
V = TypeVar('V')
Expand All @@ -19,13 +22,118 @@ def expanded_func(item: ...) -> V:
else func)


def Not(value: bool):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this could be moved to commodities.py (which we can rename sugar.py)

return not value


class Stream(Generic[T], Iterable):
def __init__(self, items: Iterable[T] = ()):
def __init__(self, items: Iterable[T] = (), with_cache=True):
self.items = iter(items)
self.with_cache = with_cache
self.cache = None
self.is_consumed = False

def __iter__(self) -> Iterator[T]:
yield from self.items

def __len__(self) -> int:
return len(self.to_list())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably don't want to do a to_list unless the user wants to cache


# start kotlin functions

def __eq__(self, other):
for self_item, other_item in zip(self, other):
if self_item != other_item:
return False
return True

def __contains__(self, item_or_iterable: Union[T, Iterable[T]]) -> bool:
return (self.contains_all(item_or_iterable)
if isinstance(item_or_iterable, Iterable)
else self.contains(item_or_iterable))

def contains(self, item: T) -> bool:
return item in self.items

def contains_all(self, iterator: Iterable[T]) -> bool:
return set(iterator).issubset(set(self))

def __getitem__(self, index: int) -> T:
return self.get(index)

def get(self, index: int) -> T:
return next(islice(self.items, index, index + 1))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: Benchmark vs for loop


def index_of(self, item: T) -> int:
for index, self_item in zip(self.items, count()):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for index, self_item in enumerate(self.items): is more idiomatic

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also aren't items and count() in the wrong order in the zip?

if self_item == item:
return index
return -1

def is_empty(self) -> bool:
try:
self.items.__next__()
return False
except StopIteration:
return True

def last_index_of(self, item: T) -> int:
last_index = -1
for index, self_item in zip(count(), self.items):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

zip can be changed to enumerate

if self_item == item:
last_index = index
return last_index

def sub_stream(self, from_index_inclusive: int, to_index_exclusive: int) -> 'Stream[T]':
return Stream(islice(self, from_index_inclusive, to_index_exclusive))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, could we bind __getslice__ to this so we can do stream[4:7]?


def indices(self) -> 'Stream[int]':
return Stream(range(len(self)))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What example use would this function have?


def last_index(self) -> int:
return len(self) - 1

def all(self, condition: Filter) -> bool:
return list(filter(compose(expand(condition), Not), self.items)) == []
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mola la implementación pero creo que no es muy performant, seguramente un for y salir cuando uno sea False es lo mejor. De todas formas python tiene un all() que igual puede ser rápido. Seguramente all(condition(x) for x in self.items)


def any(self, condition: Filter) -> bool:
return list(filter(expand(condition), self.items)) != []
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lo mismo que all


def count(self) -> int:
return len(self)

# TODO: Fix non laziness
def distinct(self) -> 'Stream[T]':
unique_items = []
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Podemos usar un set comprehension (yo también me acabo de enterar de que existen XD)

return Stream({x for x in self.items})

for item in self.to_list():
if item not in unique_items:
unique_items.append(item)
return Stream(unique_items)

# TODO: Fix non laziness
def distinct_by(self, selector: Transform) -> 'Stream[T]':
unique_items = []
Copy link
Collaborator

@biellls biellls Jul 10, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lazy implementation (untested)

def distinct_by(self, selector: Transform) -> 'Stream[T]':
    def _generate_distinct_by():
        unique_items = Set()
        for item in self.items:
            key = expand(selector)(item)
            if key not in unique_keys:
                yield key
            unique_items.add(key)

    return _generate_distinct_by()

unique_keys = []
for item in self.to_list():
key = expand(selector)(item)
if key not in unique_keys:
unique_keys.append(key)
unique_items.append(item)
return Stream(unique_items)

def drop(self, n: int) -> 'Stream[T]':
for i in range(n):
it = next(self.items, None)
if it is None:
return Stream(())
return Stream(self.items)

def drop_last(self) -> 'Stream[T]':
items = self.to_list()[:-1]
return Stream(items)

# end kotlin functions

def map(self, func: Transform) -> 'Stream[V]':
return Stream(map(expand(func), self.items))

Expand All @@ -51,13 +159,6 @@ def max(self, key: Optional[Transform] = None) -> T:
def take(self, n: int) -> 'Stream[T]':
return Stream(islice(self, n))

def drop(self, n: int) -> 'Stream[T]':
for i in range(n):
it = next(self.items, None)
if it is None:
return Stream(())
return Stream(self.items)

def first(self) -> T:
return next(self.items)

Expand All @@ -73,6 +174,17 @@ def append(self, item: T) -> 'Stream[T]':
def extend(self, item: T) -> 'Stream[T]':
return self.chain(Stream((item,)))

@lru_cache(1)
class AlreadyConsumed(Exception):
pass

def to_list(self) -> List[T]:
return list(self.items)
if self.with_cache:
Copy link
Collaborator

@biellls biellls Jul 10, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make self.items be a @property that returns either the list if with_cache=True and cache is not None and otherwise returns the generator (which is currently self.items, we can rename it to self._generator)?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The nice thing about this is that if we do that then every method in this class automatically uses the cache if it's available instead of throwing an exception, without changing any code.

if not self.cache:
self.cache = list(self.items)
return self.cache
else:
if self.is_consumed:
raise self.AlreadyConsumed
else:
self.is_consumed = True
return list(self.items)
152 changes: 147 additions & 5 deletions tests/unit/streams_test.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from itertools import count, cycle
from typing import Sequence, TypeVar

from pytest import raises

from python_streams import Stream

Expand All @@ -10,12 +11,157 @@ def test_iter():
assert i == x


def test___contains___when_item_is_contained():
assert 1 in Stream([1, 2])


def test___contains___when_item_is_not_contained():
assert 3 not in Stream([1, 2])


def test_contains_when_item_is_contained():
assert Stream([1, 2]).contains(1)


def test_contains_when_item_is_not_contained():
assert not Stream([1, 2]).contains(3)


def test___contains___when_other_stream_is_contained():
assert Stream([1, 3]) in Stream([1, 2, 3, 4])


def test___contains___when_other_stream_is_not_contained():
assert Stream([1, 3]) not in Stream([1])


def test___contains___when_other_iterable_is_contained():
assert [1, 3] in Stream([1, 2, 3, 4])


def test_contains_all_when_other_stream_is_contained():
assert Stream([1, 2, 3, 4]).contains_all(Stream([1, 3]))


def test_contains_all_when_other_stream_is_not_contained():
assert not Stream([1]).contains_all(Stream([1, 3]))


def test_contains_all_when_other_iterable_is_contained():
assert Stream([1, 2, 3, 4]).contains_all([1, 3])


def test_get():
assert Stream([0, 1, 2, 3]).get(2) == 2


def test_get_when_index_too_high():
with raises(StopIteration):
Stream([0, 1, 2, 3]).get(7)


def test___get_item___():
assert Stream([0, 1, 2, 3])[2] == 2


def test___get_when___index_too_high():
with raises(StopIteration):
Stream([0, 1, 2, 3]).get(7)


def test_index_of_when_item_in_stream():
assert Stream([0, 1, 2, 1]).index_of(1) == 1


def test_is_empty_when_stream_is_empty():
assert Stream().is_empty()


def test_is_empty_when_stream_is_not_empty():
assert not Stream([1, 2]).is_empty()


def test_index_of_when_item_not_in_stream():
assert Stream([0, 1, 2]).index_of(4) == -1


def test_last_index_of_when_two_items_in_stream():
assert Stream([0, 1, 2, 1]).last_index_of(1) == 3


def test_last_index_of_when_item_not_in_stream():
assert Stream([0, 1, 2]).last_index_of(4) == -1


def test_sub_stream():
assert Stream([0, 1, 2, 3, 4]).sub_stream(1, 3) == Stream([1, 2])


def test_indices():
assert Stream(['a', 'b', 'c', 'd']).indices() == Stream(range(4))


def test_last_index():
assert Stream(['a', 'b', 'c', 'd']).last_index() == 3


def test_last_index_when_stream_is_empty():
assert Stream().last_index() == -1


def test_all_when_all_items_match():
assert Stream([2, 4, 6, 8]).all(lambda x: x % 2 == 0)


def test_all_when_not_all_items_match():
assert not Stream([2, 4, 7]).all(lambda x: x % 2 == 0)


def test_any_when_an_item_match():
assert Stream([2, 7]).any(lambda x: x % 2 == 0)


def test_any_when_no_items_match():
assert not Stream([1, 7]).any(lambda x: x % 2 == 0)


def test_distinct():
assert Stream([1, 1, 2, 2, 2, 3]).distinct() == Stream([1, 2, 3])


def test_distinct_when_stream_is_empty():
assert Stream().distinct() == Stream()


def test_distinct_by():
assert Stream([('a', 1), ('b', 2), ('b', 4)]).distinct_by(lambda k, v: k) == Stream([('a', 1), ('b', 2)])


def test_distinct_by_when_stream_is_empty():
assert Stream().distinct_by(lambda k, v: k) == Stream()


def test_drop():
assert Stream([0,1,2,3,4,5]).drop(3).first() == 3


def test_drop_last():
assert Stream([0,1,2,3]).drop_last() == Stream([0,1,2])


def test_to_list():
s = Stream(['a', 'b', 'c'])
assert s.to_list() == ['a', 'b', 'c']
assert s.to_list() == ['a', 'b', 'c']


def test_to_list_when_without_cache():
s = Stream(['a', 'b', 'c'], with_cache=False)
assert s.to_list() == ['a', 'b', 'c']
with raises(Stream.AlreadyConsumed):
s.to_list()


def test_map():
assert Stream([1, 5, 3]).map(lambda x: x * 2).to_list() == [2, 10, 6]

Expand All @@ -28,10 +174,6 @@ def test_take():
assert Stream(count(1, 2)).take(3).to_list() == [1, 3, 5]


def test_drop():
assert Stream(count(1, 2)).drop(3).first() == 7


def test_filter():
assert Stream(count(1)).filter(lambda x: x % 7 == 0).take(4).to_list() == [7, 14, 21, 28]

Expand Down