A python stream utility
Project description
A java like stream for python
A stream is like java, everything is lazy, using iterative approach, we don't build large list in memory.
A stream is like a list with cursor, it can only be consumed once! What if I need it again? build a new stream object
A stream can be easily pipelined. you can call them like a chain. e.g. mapped stream can be further reduced or filtered.
Stream(range(0, 1000)).filter(lambda x: mod(x, 2) == 0).map(lambda x:x//2).count() # You know what this does!
It has also parallel processing capability, make parallel programming easy.
Imaging managing many downloads can be done this way
# This can replace most of the use case of parallem processing
Stream(urls).parallel_map(lambda x:download_to_file(x, "/tmp"), thread_count=10)
# or you can reuse thread pool.
Stream(urls).parallel_map(lambda x:download_to_file(x, "/tmp"), thread_pool=my_thread_pool_executor)
Installing
$ pip3 install pystream-wushilin
Importing
from pystream.pystream import Stream
Using
Creating: stream can be created from iterable, or iterator
# Create stream from iterable (e.g. collections)
# This might be bounded or unbouded
stream = Stream(range(0, 100))
stream = Stream([1,2,3,4,5,6])
dict1 = {'k1': 'v1', 'k2': 'v2'}
key_stream = Stream(dict1.keys())
value_stream.map(lambda k: dict1[k])
# Create stream from iterator!
# This might be bounded or unbounded
string = "hello, world"
iterator = iter(string)
stream = Stream(iterator)
# Create stream from a file (read lines as stream):
# You have to use with the "WITH" keyword so stream can be closed
with Stream.from_file_lines("example.txt") as stream:
# use stream
# Create stream from iterative function
# effectively, [seed, func(seed), func(func(seed)), func(func(func(seed))), ...]
# This generated is an unbounded stream, however generator can raise StopIteration for EOF
def adder(x):
return x + 1
Stream.iterate(1, adder).limit(5) # [0, 1, 2, 3, 4]
# Stream can optionally attach a begin function and exit function so with keywords can be done gracefully.
# begin_func is executed before the consumption
# exit_func is executed after stream is out of scope so cleanup can be done.
# This is useful for stream that is from network, or from file, or from database that has something to clean up
with Stream(dbcursor, begin_func=lambda x:print("I am executed before stream is consumed"), exit_func=dbcursor.close) as stream:
stream.to_list() # or other ways you like
# Creating from generator func, this stream is unbouded. don't reduce on them(count, sum, max, min etc), don't parallel map
# but map, filter, limit etc is fine, since map, filter are lazy.
# if your generator is bounded, raise StopIteration after last element then this stream is unbounded.
# This is typically unbouded, unless generator raise StopIteration
s1 = Stream.generate(lambda:5) # infinite number of 5s, if you count, it hangs!
s1.limit(1000).sum() # should be 5000
a = 1
b = 1
def fib():
global a
global b
a, b = b, a+b
if a > 100000000000:
raise StopIteration
return a
# Generating stream from fibonacci sequence, up to 100000000000.
# Generating is not an infinite loop, it is lazy!
Stream.generate(fib).limit(10).for_each(print)
1
2
3
5
8
13
21
34
55
89
# Quiz: How to generate random strings?
Using stream
# Mapping
Stream([1,2,3]).map(lambda x: x+1).for_each(print)
2
3
4
# Mapping in parallel. Note this consumes the entire stream, and return result in the original order. If it is infinite stream, this will cause out of memory error
def slow_map(x):
""" A slow mapping function that takes 2 seconds """
sleep(2)
return x * 2
Stream.generate(lambda:5).limit(10).parallel_map(slow_map).for_each(print) # default using 10 threads
Stream.generate(lambda:5).limit(10).parallel_map(slow_map, thread_count=20).for_each(print) # using 20 threads to map concurrently
thread_pool = ThreadPoolExecutor(max_workers=50)
Stream.generate(lambda:5).limit(10).parallel_map(slow_map, thread_pool=thread_pool).for_each(print) # re-use thread pool
# All of above calls will take 2 seconds, instead of 20 seconds if executed in map instead of parallel_map
# Filtering
Stream(range(0, 55)).filter(lambda x: x>50).for_each(print)
51
52
53
54
# Limiting
Stream(range(0, 1000000)).limit(5).for_each(print)
0
1
2
3
4
# Skipping
Stream(range(0, 100)).skip(95).for_each(print)
95
96
97
98
99
# Summing
Stream(range(0,5)).sum() # 10 (0 + 1 + 2 + 3 + 4)
# Max/Min
Stream(range(0, 5)).max() # 4
Stream(range(0, 5)).min() # 0
# Reducing
Stream(range(0, 5)).reduce(lambda x, y: x + y) # 10 -> same as sum!
# Reading from file from_file_lines
with Stream.from_file_lines("readme.txt").with_index() as stream:
stream.for_each(print)
(0, <line1>)
(1, <line2>)
(2, <line3>) ...
# With index
Stream([1,3,5,7,9]).with_index().for_each(print)
(0, 1)
(1, 3)
(2, 5)
(3, 7)
(4, 9)
# Counting
Stream(range(0, 100)).count() # 100 (0...99)
# Concating stream
s1 = Stream([1,2,3])
s2 = Stream([4,5,6])
(s1 + s2).count() # 6
s1.concat(s2).count() #6
# Note: if you do both of above, second line will be 0 since first one consumed s1 and s2 already.
# visiting with a func
Stream([1,2,3,4,5]).for_each(print)
1
2
3
4
5
# convert to list
list1 = Stream(range(0, 5)).to_list() # [0, 1, 2, 3, 4]
list2 = list(stream) # [0,1,2,3,4] since the stream itself is iterable
# picking from tuple for each element
stream = Stream(range(0, 10, 2)) # 0, 2, 4, 6, 8
indexed_stream = stream.with_index() # (0, 0), (1, 2), (2, 4), (3, 6), (4, 8)
indexed_stream.pick(0) # 0, 1, 2, 3, 4
indexed_stream.pick(1) # 0, 2, 4, 6, 8
indexed_stream.pick(3) # Index Out of Bound error
# Reducing
Stream(range(0, 5)).reduce(lambda x, y: x * y) # 0 (0 * 1 * 2 * 3 * 4)
# Flatten
Stream([1,2],[3,4]).flatten() # [1,2,3,4]
# Packing
Stream([1,2,3,4,5,6,7]).pack(2) # [[1,2], [3,4], [5,6], [7, None]]
Stream([1,2,3,4,5,6,7].pack(3).flatten()) # [1,2,3,4,5,6,7,None,None]
# If None not desired, filter them yourself.
# Flap map
# When your mapping returns a list, this call flattens it.
Stream([[2, 5], [3, 3]]).flat_map(lambda x: [x[0] for _ in range(x[1])]).for_each(print) # gives you 5 x 2s and 3 x 3s [2,2,2,2,2,3,3,3]
# Ordering
Stream([4,3,2,1,5]).ordered(reverse=true).for_each(print) # [5,4,3,2,1]
# Uniq
Stream([1,1,2,3,4,4,5]).uniq().for_each(print) # [1,2,3,4,5]
# Repeating
print (Stream([1,2,3,4,5]).repeat(3).to_list()) # repeats by default repeat 2 times. this gives you [1,2,3,4,5,1,2,3,4,5,1,2,3,4,5]
# To Set
Stream([1,2,3,4,5,5,6]).to_set() # {1,2,3,4,5,6}
# To map stream (package of 2, or more!) When package has more than 2 elements, key is first of pack, value is rest of pack.
Stream(["k1", "v1", "k2", "v2"]).pack(2).to_maps() # [{k1:v1}, {k2:v2}]
# To a single map
Stream(["k1", "v1", "k2", "v2"]).pack(2).to_map() # [{k1:v1,k2:v2}]
Stream(["k1", "v1", "k2", "v2"]).to_map() # value error!
Stream(["k1", "v1", "k2", "v2"]).to_maps() # value error!
print(Stream(["k1", "v1", "k2", "v2"]).pack(3).to_map()) # {'k1': ['v1', 'k2'], 'v2': [None, None]}
# Repeating stream
Stream(["I love python"]).repeat(20) #["I love python", .... "I love python"] (20 times)
# Spliting stream
# Note that pystreams can be split, however, first splitted stream is primary, others are secondary.
# Secondary has such behaviors:
# Consuming of secondary depends on consumption from primary, and it blocks for ever until elements of first is consumed.
# Elements are available in stream as soon as primary consumes it.
# begin_func, exit_func are attached to primary only. If you split the file stream, please make sure the first 1 (s1 is closed.)
#
s1, s2, s3, s4 = Stream([1,2,3,4,5], lambda:print("Begin"), lambda:print("end")).split(4)
def slow_consume(name, stream):
iter = stream.__iter__()
while True:
try:
i = iter.__next__()
print(f"{name} => consumed {i}")
except StopIteration:
break
sleep(1)
def consume_asap(name, stream):
for i in stream:
print(f"{name} => consumed {i}")
with s1, s2, s3, s4:
t1 = threading.Thread(target=slow_consume, args=("s1", s1))
t2 = threading.Thread(target=consume_asap, args=("s2", s2))
t3 = threading.Thread(target=consume_asap, args=("s3", s3))
t4 = threading.Thread(target=consume_asap, args=("s4", s4))
Stream([t1, t2, t3, t4]).for_each(lambda x: x.start())
Stream([t1, t2, t3, t4]).for_each(lambda x: x.join())
# Create random string from charset, with length
Stream.random_strings("ABCDEFG", 12).limit(20).for_each(print)
# Default length is 5
Stream.random_chars("ABCDEFG").limit(20).for_each(print)
# Create random integers with [lower, upper) range
Stream.random_ints(0, 10).limit(20).for_each(print)
# Create floats with scale [0, scale)
Stream.random_floats(10).limit(20).for_each(print)
# String for upper case characters
print("Upper")
# Lower case
Stream.random_alphabets(length=5, lower=False).limit(20).for_each(print)
print("Lower")
Stream.random_alphabets(length=6, lower=True).limit(20).for_each(print)
# Upper case hex of length 5
print("Upper ")
Stream.random_hex_strings(length=5).limit(20).for_each(print)
# Lower case hex of length 10
print("Lower hex")
Stream.random_hex_strings(length=10, lower=True).limit(20).for_each(print)
# Random string in list
list_new = [x for x in Stream.random_strings().limit(10)]
print(list_new)
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
pystream_wushilin-1.0.7.tar.gz
(18.1 kB
view hashes)
Built Distribution
Close
Hashes for pystream_wushilin-1.0.7-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 5100d20a304353a03e4bf21f789db653e6f6338a594015be59973dc65b48eb67 |
|
MD5 | beee923f3cab09c3efb9e8448b872e19 |
|
BLAKE2b-256 | 63c816ff5e7b46af244e5e0270bff4d675f52d513c4b9d23b42513abbdcc5915 |