Stream: Lazy-evaluating sequential collection type

class carriage.Stream(iterable, *, pipeline=None)

An iterable wrapper for building a lazy-evaluating sequence transformation pipeline.

Stream is initiated by providing any iterable object like list, tuple, iterator and even an infinite one.

>>> strm = Stream(range(10))
>>> strm = Stream([1, 2, 3])

Some classmethods are provided for creating common Stream instances.

>>> strm = Stream.range(0, 10, 2)
>>> strm = Stream.count(0, 5)

Stream instance is immutable. Calling a transforamtion function would create a new Stream instance everytime. But don’t worry, because of it’s lazy-evaluating characteristic, no duplicated data are generated.

>>> strm1 = Stream.range(5, 10)
>>> strm2 = strm1.map(lambda n: n * 2)
>>> strm3 = strm1.map(lambda n: n * 3)
>>> strm1 is strm2 or strm1 is strm3 or strm2 is strm3
False

To evaluate a Stream instance, call an action function.

>>> strm = Stream.range(5, 10).map(lambda n: n * 2).take(3)
>>> strm.sum()
36
>>> strm.to_list()
[10, 12, 14]
accumulate(func=None)

Create a new Stream of calling itertools.accumulate

appended(elem)

Create a new Stream that extends source Stream with another element.

cache()

Cache result

chunk(n, strict=False)

divide elements into chunks of n elements

>>> s = Stream.range(5)
>>> s.chunk(2).to_list()
[Row(f0=0, f1=1), Row(f0=2, f1=3), Row(f0=4)]
>>> s.chunk(2, strict=True).to_list()
[Row(f0=0, f1=1), Row(f0=2, f1=3)]
classmethod count(start, step=1)

Create a infinite consecutive Stream

>>> Stream.count(0, 3).take(3).to_list()
[0, 3, 6]
classmethod cycle(iterable)

Create a Stream cycling a iterable

>>> Stream.cycle([1,2]).take(5).to_list()
[1, 2, 1, 2, 1]
dict_as_row(fields=None)

Create a new Stream with elements as Row objects

>>> stm = Stream([{'name': 'John', 'age': 35},
...               {'name': 'Frank', 'age': 28}])
>>> stm.dict_as_row().to_list()
[Row(name='John', age=35), Row(name='Frank', age=28)]
>>> stm.dict_as_row(['age', 'name']).to_list()
[Row(age=35, name='John'), Row(age=28, name='Frank')]
distincted(key_func=None)

Create a new Stream with non-repeating elements. And elements are with the same order of first occurence in the source Stream.

>>> Stream.range(10).distincted(lambda n: n//3).to_list()
[0, 3, 6, 9]
drop(n)

Create a new Stream with first n element dropped

>>> Stream(dict(a=3, b=4, c=5).items()).drop(2).to_list()
[('c', 5)]
drop_while(pred)

Create a new Stream without elements as long as predicate evaluates to true.

dropwhile(pred)

Create a new Stream without elements as long as predicate evaluates to true.

extended(iterable)

Create a new Stream that extends source Stream with another iterable

filter(pred)

Create a new Stream contains only elements passing predicate

>>> Stream.range(10).filter(lambda n: n % 2 == 0).to_list()
[0, 2, 4, 6, 8]
filter_false(pred)

Create a new Stream contains only elements not passing predicate

>>> Stream.range(10).filter_false(lambda n: n % 2 == 0).to_list()
[1, 3, 5, 7, 9]
find(pred)

Get first element satifying predicate

>>> Stream.range(5, 100).find(lambda n: n % 7 == 0)
7
Returns:
Return type:element
find_opt(pred)

Optionally get first element satifying predicate. Return Some(element) if exist Otherwise return Nothing

>>> Stream.range(5, 100).find_opt(lambda n: n * 3 + 5 == 40)
Nothing
>>> Stream.range(5, 100).find_opt(lambda n: n % 7 == 0)
Some(7)
Returns:
Return type:Optional[element]
first()

Get first element

>>> Stream(dict(a=3, b=4, c=5).items()).first()
('a', 3)
Returns:
Return type:element
first_opt()

Get first element as Some(element), or Nothing if not exists

Returns:
Return type:Optional[element]
flat_map(to_iterable_func)

Apply function to each element, then flatten the result.

>>> Stream([1, 2, 3]).flat_map(range).to_list()
[0, 0, 1, 0, 1, 2]
Returns:
Return type:Stream
flatten()

flatten each element

>>> Stream([(1, 2), (3, 4)]).flatten().to_list()
[1, 2, 3, 4]
Returns:
Return type:Stream
fold_left(func, initial)

Apply a function of two arguments cumulatively to the elements in Stream from left to right.

for_each(func)

Call function for each element

>>> s = Stream.range(3)
>>> s.for_each(print)
0
1
2
get(index, default=None)

Get item of the index. Return default value if not exists.

>>> s = Stream.range(5, 12)
>>> s.get(3)
8
>>> s.get(10) is None
True
>>> s.get(10, 0)
0
Returns:
Return type:element
get_opt(index)

Optionally get item of the index. Return Some(value) if exists. Otherwise return Nothing.

>>> s = Stream.range(5, 12)
>>> s.get_opt(3)
Some(8)
>>> s.get_opt(10)
Nothing
>>> s.get_opt(10).get_or(0)
0
>>> s.get_opt(3).map(lambda n: n * 2).get_or(0)
16
>>> s.get_opt(10).map(lambda n: n * 2).get_or(0)
0
Returns:
Return type:Optional[element]
group_by_as_map(key_func=None)

Group values in to a Map by the value of key function evaluation result.

Comparing to group_by_as_stream, there’re some pros and cons.

Pros:

  • Elements don’t need to be sorted by the key function first. You can call map_group_by anytime and correct grouping result.

Cons:

  • Key function has to be evaluated to a hashable value.
  • Not Lazy-evaluating. Consume more memory while grouping. Yield a group as soon as possible.
>>> Stream.range(10).group_by_as_map(key_func=lambda n: n % 3)
Map({0: Array([0, 3, 6, 9]), 1: Array([1, 4, 7]), 2: Array([2, 5, 8])})
group_by_as_stream(key=None)

Create a new Stream using the builtin itertools.groupby, which sequentially groups elements as long as the key function evaluates to the same value.

Comparing to group_by_as_map, there’re some pros and cons.

Cons:

  • Elements should be sorted by the key function first, or elements with the same key may be broken into different groups.

Pros:

  • Key function doesn’t have to be evaluated to a hashable value. It can be any type which supports __eq__.
  • Lazy-evaluating. Consume less memory while grouping. Yield a group as soon as possible.
interpose(sep)

Create a new Stream by interposing separater between elemens.

>>> Stream.range(5, 10).interpose(0).to_list()
[5, 0, 6, 0, 7, 0, 8, 0, 9]
classmethod iterate(func, x)

Create a Stream recursively applying a function to last return value.

>>> def multiply2(x): return x * 2
>>> Stream.iterate(multiply2, 3).take(4).to_list()
[3, 6, 12, 24]
last()

Get last element

Returns:
Return type:element
last_opt()

Get last element as Some(element), or Nothing if not exists

Returns:
Return type:Optional[element]
len()

Get the length of the Stream

Returns:
Return type:int
make_string(elem_format='{elem!r}', start='[', elem_sep=', ', end=']')

Make string from elements

>>> Stream.range(5, 8).make_string()
'[5, 6, 7]'
>>> print(Stream.range(5, 8).make_string(elem_sep='\n', start='', end='', elem_format='{index}: {elem}'))
0: 5
1: 6
2: 7
map(func)

Create a new Stream by applying function to each element

>>> Stream.range(5, 8).map(lambda x: x * 2).to_list()
[10, 12, 14]
Returns:
Return type:Stream
mean()

Get the average of elements.

>>> Array.range(10).mean()
4.5
nlargest(n, key=None)

Get the n largest elements.

>>> Stream([1, 5, 2, 3, 6]).nlargest(2).to_list()
[6, 5]
nsmallest(n, key=None)

Get the n smallest elements.

>>> Stream([1, 5, 2, 3, 6]).nsmallest(2).to_list()
[1, 2]
pluck(key)

Create a new Stream of values by evaluating elem[key] for each element.

>>> s = Stream([dict(x=3, y=4), dict(x=4, y=5), dict(x=8, y=9)])
>>> s.pluck('x').to_list()
[3, 4, 8]
Returns:
Return type:Stream[element[key]]
pluck_attr(attr)

Create a new Stream of Optional values by evaluating elem.attr of each element. Get Some(value) if attr exists for that element, otherwise get Nothing singleton.

>>> from carriage import Row
>>> s = Stream([Row(x=3, y=4), Row(x=4, y=5), Row(x=8, y=9)])
>>> s.pluck_attr('x').to_list()
[3, 4, 8]
Returns:
Return type:Stream[type of element.attr]
pluck_opt(key)

Create a new Stream of Optional values by evaluating elem[key] for each element. Get Some(value) if the key exists for that element, otherwise get Nothing singleton.

>>> s = Stream([dict(x=3, y=4), dict(y=5), dict(x=8, y=9)])
>>> s.pluck_opt('x').to_list()
[Some(3), Nothing, Some(8)]
>>> s.pluck_opt('x').map(lambda n_opt: n_opt.get_or(1)).to_list()
[3, 1, 8]
Returns:
Return type:Stream[Optional(type of element[key])]
classmethod range(start, end=None, step=1)

Create a Stream from range.

>>> Stream.range(2, 10, 2).to_list()
[2, 4, 6, 8]
>>> Stream.range(3).to_list()
[0, 1, 2]
classmethod read_txt(path)

Create from a text file. Treat lines as elements and remove newline character.

>>> Stream.read_txt(path) 
Parameters:path (str or path or file object) – path to the input file
reduce(func)

Apply a function of two arguments cumulatively to the elements in Stream from left to right.

classmethod repeat(elems, times=None)

Create a Stream repeating elems

>>> Stream.repeat(1, 3).to_list()
[1, 1, 1]
>>> Stream.repeat([1, 2, 3], 2).to_list()
[[1, 2, 3], [1, 2, 3]]
classmethod repeatedly(func, times=None)

Create a Stream repeatedly calling a zero parameter function

>>> def counter():
...     counter.num += 1
...     return counter.num
>>> counter.num = -1
>>> Stream.repeatedly(counter, 5).to_list()
[0, 1, 2, 3, 4]
reversed()

Create a new reversed Stream.

>>> Stream(['a', 'b', 'c']).reversed().to_list()
['c', 'b', 'a']
second()

Get second element

>>> Stream(dict(a=3, b=4, c=5).items()).second()
('b', 4)
Returns:
Return type:element
second_opt()

Get second element as Some(element), or Nothing if not exists

Returns:
Return type:Optional[element]
show_pipeline(n=2)

Show pipeline and some examples for debugging

>>> def mul_2(x):
...     return x*2
>>> (Stream
...  .range(10)
...  .map(mul_2)
...  .nlargest(3)
...  .show_pipeline(2))  
range(0, 10)
    [0] 0
    [1] 1
 -> map(<function mul_2 at 0x10a1dbd08>)
    [0] 0
    [1] 2
 -> nlargest(3)
    [0] 2
    [1] 0
slice(start, stop, step=None)

Create a Stream from the slice of items.

>>> Stream(list(range(10))).slice(5, 8).to_list()
[5, 6, 7]
Returns:
Return type:Stream[element]
sliding_window(n, step=1)

Create a new Stream instance that all elements are sliding windows of source elements.

>>> (Stream('they have the same meaning'.split())
...  .sliding_window(3)
...  .to_list())
[('they', 'have', 'the'), ('have', 'the', 'same'), ('the', 'same', 'meaning')]
>>> (Stream('they have the same meaning'.split())
...  .sliding_window(3, step=2)
...  .to_list())
[('they', 'have', 'the'), ('the', 'same', 'meaning')]
sorted(key=None, reverse=False)

Create a new sorted Stream.

split_after(pred)

Create a new Stream of Arrays by splitting after each element passing predicate.

>>> Stream.range(10).split_after(lambda n: n % 3 == 2).to_list()
[Array([0, 1, 2]), Array([3, 4, 5]), Array([6, 7, 8]), Array([9])]
split_before(pred)

Create a new Stream of Arrays by splitting before each element passing predicate.

>>> Stream.range(10).split_before(lambda n: n % 3 == 2).to_list()
[Array([0, 1]), Array([2, 3, 4]), Array([5, 6, 7]), Array([8, 9])]
star_for_each(func)

Call function for each element as agument tuple

>>> s = Stream(['a', 'b', 'c']).zip_index(1)
>>> s.star_for_each(lambda c, i: print(f'{i}:{c}'))
1:a
2:b
3:c
starmap(func)

Create a new Stream by evaluating function using argument tulpe from each element. i.e. func(*elem). It’s convenient that if all elements in Stream are iterable and you want to treat each element in elemnts as separate argument while calling the function.

>>> Stream([(1, 2), (3, 4)]).starmap(lambda a, b: a+b).to_list()
[3, 7]
>>> Stream([(1, 2), (3, 4)]).map(lambda a_b: a_b[0]+a_b[1]).to_list()
[3, 7]
sum()

Get sum of elements

tail()

Create a new Stream with first element dropped

>>> Stream(dict(a=3, b=4, c=5).items()).tail().to_list()
[('b', 4), ('c', 5)]
take(n)

Create a new Stream contains only first n element

>>> Stream(dict(a=3, b=4, c=5).items()).take(2).to_list()
[('a', 3), ('b', 4)]
take_while(pred)

Create a new Stream with successive elements as long as predicate evaluates to true.

>>> Stream.range(10).take_while(lambda n: n % 5 < 3).to_list()
[0, 1, 2]
takewhile(pred)

Create a new Stream with successive elements as long as predicate evaluates to true.

>>> Stream.range(10).take_while(lambda n: n % 5 < 3).to_list()
[0, 1, 2]
tap(tag='', n=5, msg_format='{tag}:{index}: {elem}')

A debugging tool. This method create a new Stream with the same elements. While evaluating Stream, it print first n elements.

>>> (Stream.range(3).tap('orig')
...  .map(lambda x: x * 2).tap_with(lambda i, e: f'{i} -> {e}')
...  .accumulate(lambda a, b: a + b).tap('acc')
...  .tap(msg_format='end\n')
...  .to_list())
orig:0: 0
0 -> 0
acc:0: 0
end

orig:1: 1
1 -> 2
acc:1: 2
end

orig:2: 2
2 -> 4
acc:2: 6
end

[0, 2, 6]
tap_with(func, n=5)

A debugging tool. This method create a new Stream with the same elements. While evaluating Stream, it call the function using index and element then prints the return value for first n elements.

>>> (Stream.range(3).tap('orig')
...  .map(lambda x: x * 2).tap('x2')
...  .accumulate(lambda a, b: a + b).tap('acc')
...  .to_list())
orig:0: 0
x2:0: 0
acc:0: 0
orig:1: 1
x2:1: 2
acc:1: 2
orig:2: 2
x2:2: 4
acc:2: 6
[0, 2, 6]
Parameters:
  • func (func(index, elem) -> Any) – Function for building the printing object.
  • n (int) – First n element will be print.
tee(n=2)

Copy the Stream into multiple Stream with the same elements.

>>> itr = iter(range(3, 6))
>>> s1 = Stream(itr).map(lambda x: x * 2)
>>> s2, s3 = s1.tee(2)
>>> s2.map(lambda x: x * 2).to_list()
[12, 16, 20]
>>> s3.map(lambda x: x * 3).to_list()
[18, 24, 30]
to_array()

Convert to a Map

>>> Stream.range(5, 8, 2).zip_index().to_array()
Array([Row(value=5, index=0), Row(value=7, index=1)])
Returns:
Return type:Array
to_dict()

Convert to a dict

>>> Stream.range(5, 10, 2).zip_index().to_dict()
{5: 0, 7: 1, 9: 2}
Returns:
Return type:dict
to_list()

Convert to a list.

>>> Stream.range(5, 10, 2).to_list()
[5, 7, 9]
Returns:
Return type:list
to_map()

Convert to a Map

>>> Stream.range(5, 10, 2).zip_index().to_map()
Map({5: 0, 7: 1, 9: 2})
Returns:
Return type:Map
to_series()

Convert to a pandas Series

>>> Stream.range(5, 10, 2).to_series()
0    5
1    7
2    9
dtype: int64
Returns:
Return type:pandas.Series
to_set()

Convert to a set

>>> Stream.cycle([1, 2, 3]).take(5).to_set()
{1, 2, 3}
Returns:
Return type:set
to_streamtable()

Convert to StreamTable

All elements should be in Row type

Returns:
Return type:StreamTable
tuple_as_row(fields)

Create a new Stream with elements as Row objects

>>> Stream([(1, 2), (3, 4)]).tuple_as_row(['x', 'y']).to_list()
[Row(x=1, y=2), Row(x=3, y=4)]
unique(key_func=None)

Create a new Stream of unique elements

>>> Stream.range(10).unique(lambda x: x // 3).to_list()
[0, 3, 6, 9]
value_counts()

Get a Counter instance of elements counts

Returns:
Return type:Map[E, int]
without(*elems)

Create a new Stream without specified elements.

>>> Stream.range(10).without(3, 6, 9).to_list()
[0, 1, 2, 4, 5, 7, 8]
Returns:
Return type:Stream[element]
write_txt(path, sep='\n')

Write into a text file.

All elements will be applied str() before write to the file.

>>> Stream.range(10).write_txt('nums.txt')
path : str or path or file object
path to the input file
sep : str
element separator. defaults to ‘

zip(*iterables)

Create a new Stream by zipping elements with other iterables.

>>> Stream.range(5, 8).zip([1,2,3]).to_list()
[Row(f0=5, f1=1), Row(f0=6, f1=2), Row(f0=7, f1=3)]
>>> Stream.range(5, 8).zip([1,2,3], [9, 10, 11]).to_list()
[Row(f0=5, f1=1, f2=9), Row(f0=6, f1=2, f2=10), Row(f0=7, f1=3, f2=11)]
>>> Stream.range(5, 8).zip([1,2]).to_list()
[Row(f0=5, f1=1), Row(f0=6, f1=2)]
>>> import itertools as itt
>>> Stream.range(5, 8).zip(itt.count(10)).to_list()
[Row(f0=5, f1=10), Row(f0=6, f1=11), Row(f0=7, f1=12)]
zip_index(start=0)

Create a new Stream by zipping elements with index.

>>> Stream(['a', 'b', 'c']).zip_index().to_list()
[Row(value='a', index=0), Row(value='b', index=1), Row(value='c', index=2)]
>>> Stream(['a', 'b', 'c']).zip_index(1).to_list()
[Row(value='a', index=1), Row(value='b', index=2), Row(value='c', index=3)]
zip_longest(*iterables, fillvalue=None)

Create a new Stream by zipping elements with other iterables as long as possible.

>>> Stream.range(5, 8).zip_longest([1,2]).to_list()
[Row(f0=5, f1=1), Row(f0=6, f1=2), Row(f0=7, f1=None)]
>>> Stream.range(5, 8).zip_longest([1,2], fillvalue=0).to_list()
[Row(f0=5, f1=1), Row(f0=6, f1=2), Row(f0=7, f1=0)]
zip_next(fillvalue=None)

Create a new Stream by zipping elements with next one.

>>> Stream.range(5, 8).zip_next().to_list()
[Row(curr=5, prev=6), Row(curr=6, prev=7), Row(curr=7, prev=None)]
>>> Stream.range(5, 8).zip_next(fillvalue=1).to_list()
[Row(curr=5, prev=6), Row(curr=6, prev=7), Row(curr=7, prev=1)]
zip_prev(fillvalue=None)

Create a new Stream by zipping elements with previous one.

>>> Stream.range(5, 8).zip_prev().to_list()
[Row(curr=5, prev=None), Row(curr=6, prev=5), Row(curr=7, prev=6)]
>>> Stream.range(5, 8).zip_prev(fillvalue=0).to_list()
[Row(curr=5, prev=0), Row(curr=6, prev=5), Row(curr=7, prev=6)]