Stream
: Lazy-evaluating sequential collection type¶
-
class
carriage.
Stream
(iterable, *, pipeline=None)¶ An iterable wrapper for building a lazy-evaluating sequence transformation pipeline.
Stream is initiated by providing any iterable object like list, tuple, iterator and even an infinite one.
>>> strm = Stream(range(10)) >>> strm = Stream([1, 2, 3])
Some classmethods are provided for creating common Stream instances.
>>> strm = Stream.range(0, 10, 2) >>> strm = Stream.count(0, 5)
Stream instance is immutable. Calling a transforamtion function would create a new Stream instance everytime. But don’t worry, because of it’s lazy-evaluating characteristic, no duplicated data are generated.
>>> strm1 = Stream.range(5, 10) >>> strm2 = strm1.map(lambda n: n * 2) >>> strm3 = strm1.map(lambda n: n * 3) >>> strm1 is strm2 or strm1 is strm3 or strm2 is strm3 False
To evaluate a Stream instance, call an action function.
>>> strm = Stream.range(5, 10).map(lambda n: n * 2).take(3) >>> strm.sum() 36 >>> strm.to_list() [10, 12, 14]
-
accumulate
(func=None)¶ Create a new Stream of calling
itertools.accumulate
-
appended
(elem)¶ Create a new Stream that extends source Stream with another element.
-
cache
()¶ Cache result
-
chunk
(n, strict=False)¶ divide elements into chunks of n elements
>>> s = Stream.range(5) >>> s.chunk(2).to_list() [Row(f0=0, f1=1), Row(f0=2, f1=3), Row(f0=4)] >>> s.chunk(2, strict=True).to_list() [Row(f0=0, f1=1), Row(f0=2, f1=3)]
-
classmethod
count
(start, step=1)¶ Create a infinite consecutive Stream
>>> Stream.count(0, 3).take(3).to_list() [0, 3, 6]
-
classmethod
cycle
(iterable)¶ Create a Stream cycling a iterable
>>> Stream.cycle([1,2]).take(5).to_list() [1, 2, 1, 2, 1]
-
dict_as_row
(fields=None)¶ Create a new Stream with elements as Row objects
>>> stm = Stream([{'name': 'John', 'age': 35}, ... {'name': 'Frank', 'age': 28}]) >>> stm.dict_as_row().to_list() [Row(name='John', age=35), Row(name='Frank', age=28)] >>> stm.dict_as_row(['age', 'name']).to_list() [Row(age=35, name='John'), Row(age=28, name='Frank')]
-
distincted
(key_func=None)¶ Create a new Stream with non-repeating elements. And elements are with the same order of first occurence in the source Stream.
>>> Stream.range(10).distincted(lambda n: n//3).to_list() [0, 3, 6, 9]
-
drop
(n)¶ Create a new Stream with first n element dropped
>>> Stream(dict(a=3, b=4, c=5).items()).drop(2).to_list() [('c', 5)]
-
drop_while
(pred)¶ Create a new Stream without elements as long as predicate evaluates to true.
-
dropwhile
(pred)¶ Create a new Stream without elements as long as predicate evaluates to true.
-
extended
(iterable)¶ Create a new Stream that extends source Stream with another iterable
-
filter
(pred)¶ Create a new Stream contains only elements passing predicate
>>> Stream.range(10).filter(lambda n: n % 2 == 0).to_list() [0, 2, 4, 6, 8]
-
filter_false
(pred)¶ Create a new Stream contains only elements not passing predicate
>>> Stream.range(10).filter_false(lambda n: n % 2 == 0).to_list() [1, 3, 5, 7, 9]
-
find
(pred)¶ Get first element satifying predicate
>>> Stream.range(5, 100).find(lambda n: n % 7 == 0) 7
Returns: Return type: element
-
find_opt
(pred)¶ Optionally get first element satifying predicate. Return Some(element) if exist Otherwise return Nothing
>>> Stream.range(5, 100).find_opt(lambda n: n * 3 + 5 == 40) Nothing >>> Stream.range(5, 100).find_opt(lambda n: n % 7 == 0) Some(7)
Returns: Return type: Optional[element]
-
first
()¶ Get first element
>>> Stream(dict(a=3, b=4, c=5).items()).first() ('a', 3)
Returns: Return type: element
-
first_opt
()¶ Get first element as Some(element), or Nothing if not exists
Returns: Return type: Optional[element]
-
flat_map
(to_iterable_func)¶ Apply function to each element, then flatten the result.
>>> Stream([1, 2, 3]).flat_map(range).to_list() [0, 0, 1, 0, 1, 2]
Returns: Return type: Stream
-
flatten
()¶ flatten each element
>>> Stream([(1, 2), (3, 4)]).flatten().to_list() [1, 2, 3, 4]
Returns: Return type: Stream
-
fold_left
(func, initial)¶ Apply a function of two arguments cumulatively to the elements in Stream from left to right.
-
for_each
(func)¶ Call function for each element
>>> s = Stream.range(3) >>> s.for_each(print) 0 1 2
-
get
(index, default=None)¶ Get item of the index. Return default value if not exists.
>>> s = Stream.range(5, 12) >>> s.get(3) 8 >>> s.get(10) is None True >>> s.get(10, 0) 0
Returns: Return type: element
-
get_opt
(index)¶ Optionally get item of the index. Return Some(value) if exists. Otherwise return Nothing.
>>> s = Stream.range(5, 12) >>> s.get_opt(3) Some(8) >>> s.get_opt(10) Nothing
>>> s.get_opt(10).get_or(0) 0 >>> s.get_opt(3).map(lambda n: n * 2).get_or(0) 16 >>> s.get_opt(10).map(lambda n: n * 2).get_or(0) 0
Returns: Return type: Optional[element]
-
group_by_as_map
(key_func=None)¶ Group values in to a Map by the value of key function evaluation result.
Comparing to
group_by_as_stream
, there’re some pros and cons.Pros:
- Elements don’t need to be sorted by the key function first.
You can call
map_group_by
anytime and correct grouping result.
Cons:
- Key function has to be evaluated to a hashable value.
- Not Lazy-evaluating. Consume more memory while grouping. Yield a group as soon as possible.
>>> Stream.range(10).group_by_as_map(key_func=lambda n: n % 3) Map({0: Array([0, 3, 6, 9]), 1: Array([1, 4, 7]), 2: Array([2, 5, 8])})
- Elements don’t need to be sorted by the key function first.
You can call
-
group_by_as_stream
(key=None)¶ Create a new Stream using the builtin itertools.groupby, which sequentially groups elements as long as the key function evaluates to the same value.
Comparing to
group_by_as_map
, there’re some pros and cons.Cons:
- Elements should be sorted by the key function first, or elements with the same key may be broken into different groups.
Pros:
- Key function doesn’t have to be evaluated to a hashable value.
It can be any type which supports
__eq__
. - Lazy-evaluating. Consume less memory while grouping. Yield a group as soon as possible.
-
interpose
(sep)¶ Create a new Stream by interposing separater between elemens.
>>> Stream.range(5, 10).interpose(0).to_list() [5, 0, 6, 0, 7, 0, 8, 0, 9]
-
classmethod
iterate
(func, x)¶ Create a Stream recursively applying a function to last return value.
>>> def multiply2(x): return x * 2 >>> Stream.iterate(multiply2, 3).take(4).to_list() [3, 6, 12, 24]
-
last
()¶ Get last element
Returns: Return type: element
-
last_opt
()¶ Get last element as Some(element), or Nothing if not exists
Returns: Return type: Optional[element]
-
len
()¶ Get the length of the Stream
Returns: Return type: int
-
make_string
(elem_format='{elem!r}', start='[', elem_sep=', ', end=']')¶ Make string from elements
>>> Stream.range(5, 8).make_string() '[5, 6, 7]' >>> print(Stream.range(5, 8).make_string(elem_sep='\n', start='', end='', elem_format='{index}: {elem}')) 0: 5 1: 6 2: 7
-
map
(func)¶ Create a new Stream by applying function to each element
>>> Stream.range(5, 8).map(lambda x: x * 2).to_list() [10, 12, 14]
Returns: Return type: Stream
-
mean
()¶ Get the average of elements.
>>> Array.range(10).mean() 4.5
-
nlargest
(n, key=None)¶ Get the n largest elements.
>>> Stream([1, 5, 2, 3, 6]).nlargest(2).to_list() [6, 5]
-
nsmallest
(n, key=None)¶ Get the n smallest elements.
>>> Stream([1, 5, 2, 3, 6]).nsmallest(2).to_list() [1, 2]
-
pluck
(key)¶ Create a new Stream of values by evaluating
elem[key]
for each element.>>> s = Stream([dict(x=3, y=4), dict(x=4, y=5), dict(x=8, y=9)]) >>> s.pluck('x').to_list() [3, 4, 8]
Returns: Return type: Stream[ element[key]
]
-
pluck_attr
(attr)¶ Create a new Stream of Optional values by evaluating
elem.attr
of each element. GetSome(value)
if attr exists for that element, otherwise get Nothing singleton.>>> from carriage import Row >>> s = Stream([Row(x=3, y=4), Row(x=4, y=5), Row(x=8, y=9)]) >>> s.pluck_attr('x').to_list() [3, 4, 8]
Returns: Return type: Stream[type of element.attr
]
-
pluck_opt
(key)¶ Create a new Stream of Optional values by evaluating
elem[key]
for each element. GetSome(value)
if the key exists for that element, otherwise get Nothing singleton.>>> s = Stream([dict(x=3, y=4), dict(y=5), dict(x=8, y=9)]) >>> s.pluck_opt('x').to_list() [Some(3), Nothing, Some(8)] >>> s.pluck_opt('x').map(lambda n_opt: n_opt.get_or(1)).to_list() [3, 1, 8]
Returns: Return type: Stream[Optional(type of element[key]
)]
-
classmethod
range
(start, end=None, step=1)¶ Create a Stream from range.
>>> Stream.range(2, 10, 2).to_list() [2, 4, 6, 8] >>> Stream.range(3).to_list() [0, 1, 2]
-
classmethod
read_txt
(path)¶ Create from a text file. Treat lines as elements and remove newline character.
>>> Stream.read_txt(path)
Parameters: path (str or path or file object) – path to the input file
-
reduce
(func)¶ Apply a function of two arguments cumulatively to the elements in Stream from left to right.
-
classmethod
repeat
(elems, times=None)¶ Create a Stream repeating elems
>>> Stream.repeat(1, 3).to_list() [1, 1, 1] >>> Stream.repeat([1, 2, 3], 2).to_list() [[1, 2, 3], [1, 2, 3]]
-
classmethod
repeatedly
(func, times=None)¶ Create a Stream repeatedly calling a zero parameter function
>>> def counter(): ... counter.num += 1 ... return counter.num >>> counter.num = -1 >>> Stream.repeatedly(counter, 5).to_list() [0, 1, 2, 3, 4]
-
reversed
()¶ Create a new reversed Stream.
>>> Stream(['a', 'b', 'c']).reversed().to_list() ['c', 'b', 'a']
-
second
()¶ Get second element
>>> Stream(dict(a=3, b=4, c=5).items()).second() ('b', 4)
Returns: Return type: element
-
second_opt
()¶ Get second element as Some(element), or Nothing if not exists
Returns: Return type: Optional[element]
-
show_pipeline
(n=2)¶ Show pipeline and some examples for debugging
>>> def mul_2(x): ... return x*2 >>> (Stream ... .range(10) ... .map(mul_2) ... .nlargest(3) ... .show_pipeline(2)) range(0, 10) [0] 0 [1] 1 -> map(<function mul_2 at 0x10a1dbd08>) [0] 0 [1] 2 -> nlargest(3) [0] 2 [1] 0
-
slice
(start, stop, step=None)¶ Create a Stream from the slice of items.
>>> Stream(list(range(10))).slice(5, 8).to_list() [5, 6, 7]
Returns: Return type: Stream[element]
-
sliding_window
(n, step=1)¶ Create a new Stream instance that all elements are sliding windows of source elements.
>>> (Stream('they have the same meaning'.split()) ... .sliding_window(3) ... .to_list()) [('they', 'have', 'the'), ('have', 'the', 'same'), ('the', 'same', 'meaning')]
>>> (Stream('they have the same meaning'.split()) ... .sliding_window(3, step=2) ... .to_list()) [('they', 'have', 'the'), ('the', 'same', 'meaning')]
-
sorted
(key=None, reverse=False)¶ Create a new sorted Stream.
-
split_after
(pred)¶ Create a new Stream of Arrays by splitting after each element passing predicate.
>>> Stream.range(10).split_after(lambda n: n % 3 == 2).to_list() [Array([0, 1, 2]), Array([3, 4, 5]), Array([6, 7, 8]), Array([9])]
-
split_before
(pred)¶ Create a new Stream of Arrays by splitting before each element passing predicate.
>>> Stream.range(10).split_before(lambda n: n % 3 == 2).to_list() [Array([0, 1]), Array([2, 3, 4]), Array([5, 6, 7]), Array([8, 9])]
-
star_for_each
(func)¶ Call function for each element as agument tuple
>>> s = Stream(['a', 'b', 'c']).zip_index(1) >>> s.star_for_each(lambda c, i: print(f'{i}:{c}')) 1:a 2:b 3:c
-
starmap
(func)¶ Create a new Stream by evaluating function using argument tulpe from each element. i.e.
func(*elem)
. It’s convenient that if all elements in Stream are iterable and you want to treat each element in elemnts as separate argument while calling the function.>>> Stream([(1, 2), (3, 4)]).starmap(lambda a, b: a+b).to_list() [3, 7] >>> Stream([(1, 2), (3, 4)]).map(lambda a_b: a_b[0]+a_b[1]).to_list() [3, 7]
-
sum
()¶ Get sum of elements
-
tail
()¶ Create a new Stream with first element dropped
>>> Stream(dict(a=3, b=4, c=5).items()).tail().to_list() [('b', 4), ('c', 5)]
-
take
(n)¶ Create a new Stream contains only first n element
>>> Stream(dict(a=3, b=4, c=5).items()).take(2).to_list() [('a', 3), ('b', 4)]
-
take_while
(pred)¶ Create a new Stream with successive elements as long as predicate evaluates to true.
>>> Stream.range(10).take_while(lambda n: n % 5 < 3).to_list() [0, 1, 2]
-
takewhile
(pred)¶ Create a new Stream with successive elements as long as predicate evaluates to true.
>>> Stream.range(10).take_while(lambda n: n % 5 < 3).to_list() [0, 1, 2]
-
tap
(tag='', n=5, msg_format='{tag}:{index}: {elem}')¶ A debugging tool. This method create a new Stream with the same elements. While evaluating Stream, it print first n elements.
>>> (Stream.range(3).tap('orig') ... .map(lambda x: x * 2).tap_with(lambda i, e: f'{i} -> {e}') ... .accumulate(lambda a, b: a + b).tap('acc') ... .tap(msg_format='end\n') ... .to_list()) orig:0: 0 0 -> 0 acc:0: 0 end orig:1: 1 1 -> 2 acc:1: 2 end orig:2: 2 2 -> 4 acc:2: 6 end [0, 2, 6]
-
tap_with
(func, n=5)¶ A debugging tool. This method create a new Stream with the same elements. While evaluating Stream, it call the function using index and element then prints the return value for first n elements.
>>> (Stream.range(3).tap('orig') ... .map(lambda x: x * 2).tap('x2') ... .accumulate(lambda a, b: a + b).tap('acc') ... .to_list()) orig:0: 0 x2:0: 0 acc:0: 0 orig:1: 1 x2:1: 2 acc:1: 2 orig:2: 2 x2:2: 4 acc:2: 6 [0, 2, 6]
Parameters: - func (
func(index, elem) -> Any
) – Function for building the printing object. - n (int) – First n element will be print.
- func (
-
tee
(n=2)¶ Copy the Stream into multiple Stream with the same elements.
>>> itr = iter(range(3, 6)) >>> s1 = Stream(itr).map(lambda x: x * 2) >>> s2, s3 = s1.tee(2) >>> s2.map(lambda x: x * 2).to_list() [12, 16, 20] >>> s3.map(lambda x: x * 3).to_list() [18, 24, 30]
-
to_array
()¶ Convert to a Map
>>> Stream.range(5, 8, 2).zip_index().to_array() Array([Row(value=5, index=0), Row(value=7, index=1)])
Returns: Return type: Array
-
to_dict
()¶ Convert to a dict
>>> Stream.range(5, 10, 2).zip_index().to_dict() {5: 0, 7: 1, 9: 2}
Returns: Return type: dict
-
to_list
()¶ Convert to a list.
>>> Stream.range(5, 10, 2).to_list() [5, 7, 9]
Returns: Return type: list
-
to_map
()¶ Convert to a Map
>>> Stream.range(5, 10, 2).zip_index().to_map() Map({5: 0, 7: 1, 9: 2})
Returns: Return type: Map
-
to_series
()¶ Convert to a pandas Series
>>> Stream.range(5, 10, 2).to_series() 0 5 1 7 2 9 dtype: int64
Returns: Return type: pandas.Series
-
to_set
()¶ Convert to a set
>>> Stream.cycle([1, 2, 3]).take(5).to_set() {1, 2, 3}
Returns: Return type: set
-
to_streamtable
()¶ Convert to StreamTable
All elements should be in Row type
Returns: Return type: StreamTable
-
tuple_as_row
(fields)¶ Create a new Stream with elements as Row objects
>>> Stream([(1, 2), (3, 4)]).tuple_as_row(['x', 'y']).to_list() [Row(x=1, y=2), Row(x=3, y=4)]
-
unique
(key_func=None)¶ Create a new Stream of unique elements
>>> Stream.range(10).unique(lambda x: x // 3).to_list() [0, 3, 6, 9]
-
without
(*elems)¶ Create a new Stream without specified elements.
>>> Stream.range(10).without(3, 6, 9).to_list() [0, 1, 2, 4, 5, 7, 8]
Returns: Return type: Stream[element]
-
write_txt
(path, sep='\n')¶ Write into a text file.
All elements will be applied
str()
before write to the file.>>> Stream.range(10).write_txt('nums.txt')
- path : str or path or file object
- path to the input file
- sep : str
- element separator. defaults to ‘
‘
-
zip
(*iterables)¶ Create a new Stream by zipping elements with other iterables.
>>> Stream.range(5, 8).zip([1,2,3]).to_list() [Row(f0=5, f1=1), Row(f0=6, f1=2), Row(f0=7, f1=3)]
>>> Stream.range(5, 8).zip([1,2,3], [9, 10, 11]).to_list() [Row(f0=5, f1=1, f2=9), Row(f0=6, f1=2, f2=10), Row(f0=7, f1=3, f2=11)]
>>> Stream.range(5, 8).zip([1,2]).to_list() [Row(f0=5, f1=1), Row(f0=6, f1=2)]
>>> import itertools as itt >>> Stream.range(5, 8).zip(itt.count(10)).to_list() [Row(f0=5, f1=10), Row(f0=6, f1=11), Row(f0=7, f1=12)]
-
zip_index
(start=0)¶ Create a new Stream by zipping elements with index.
>>> Stream(['a', 'b', 'c']).zip_index().to_list() [Row(value='a', index=0), Row(value='b', index=1), Row(value='c', index=2)]
>>> Stream(['a', 'b', 'c']).zip_index(1).to_list() [Row(value='a', index=1), Row(value='b', index=2), Row(value='c', index=3)]
-
zip_longest
(*iterables, fillvalue=None)¶ Create a new Stream by zipping elements with other iterables as long as possible.
>>> Stream.range(5, 8).zip_longest([1,2]).to_list() [Row(f0=5, f1=1), Row(f0=6, f1=2), Row(f0=7, f1=None)]
>>> Stream.range(5, 8).zip_longest([1,2], fillvalue=0).to_list() [Row(f0=5, f1=1), Row(f0=6, f1=2), Row(f0=7, f1=0)]
-
zip_next
(fillvalue=None)¶ Create a new Stream by zipping elements with next one.
>>> Stream.range(5, 8).zip_next().to_list() [Row(curr=5, prev=6), Row(curr=6, prev=7), Row(curr=7, prev=None)]
>>> Stream.range(5, 8).zip_next(fillvalue=1).to_list() [Row(curr=5, prev=6), Row(curr=6, prev=7), Row(curr=7, prev=1)]
-
zip_prev
(fillvalue=None)¶ Create a new Stream by zipping elements with previous one.
>>> Stream.range(5, 8).zip_prev().to_list() [Row(curr=5, prev=None), Row(curr=6, prev=5), Row(curr=7, prev=6)]
>>> Stream.range(5, 8).zip_prev(fillvalue=0).to_list() [Row(curr=5, prev=0), Row(curr=6, prev=5), Row(curr=7, prev=6)]
-