A python stream utility
Project description
A java like stream for python
A stream is like java, everything is lazy, using iterative approach, we don't build large list in memory.
A stream is like a list with cursor, it can only be consumed once! What if I need it again? build a new stream object
A stream can be easily pipelined. you can call them like a chain. e.g. mapped stream can be further reduced or filtered.
Stream(range(0, 1000)).filter(lambda x: mod(x, 2) == 0).map(lambda x:x//2).count() # You know what this does!
It has also parallel processing capability, make parallel programming easy.
Imaging managing many downloads can be done this way
# This can replace most of the use case of parallem processing
Stream(urls).parallel_map(lambda x:download_to_file(x, "/tmp"), thread_count=10)
# or you can reuse thread pool.
Stream(urls).parallel_map(lambda x:download_to_file(x, "/tmp"), thread_pool=my_thread_pool_executor)
Installing
$ pip3 install pystream-wushilin
Importing
from pystream.pystream import Stream
Using
Creating: stream can be created from iterable, or iterator
# Create stream from iterable (e.g. collections)
# This might be bounded or unbouded
stream = Stream(range(0, 100))
stream = Stream([1,2,3,4,5,6])
dict1 = {'k1': 'v1', 'k2': 'v2'}
key_stream = Stream(dict1.keys())
value_stream.map(lambda k: dict1[k])
# Create stream from iterator!
# This might be bounded or unbounded
string = "hello, world"
iterator = iter(string)
stream = Stream(iterator)
# Create stream from a file (read lines as stream):
# You have to use with the "WITH" keyword so stream can be closed
with Stream.from_file_lines("example.txt") as stream:
# use stream
# Create stream from iterative function
# effectively, [seed, func(seed), func(func(seed)), func(func(func(seed))), ...]
# This generated is an unbounded stream, however generator can raise StopIteration for EOF
def adder(x):
return x + 1
Stream.iterate(1, adder).limit(5) # [0, 1, 2, 3, 4]
# Stream can optionally attach a begin function and exit function so with keywords can be done gracefully.
# begin_func is executed before the consumption
# exit_func is executed after stream is out of scope so cleanup can be done.
# This is useful for stream that is from network, or from file, or from database that has something to clean up
with Stream(dbcursor, begin_func=lambda x:print("I am executed before stream is consumed"), exit_func=dbcursor.close) as stream:
stream.to_list() # or other ways you like
# Creating from generator func, this stream is unbouded. don't reduce on them(count, sum, max, min etc), don't parallel map
# but map, filter, limit etc is fine, since map, filter are lazy.
# if your generator is bounded, raise StopIteration after last element then this stream is unbounded.
# This is typically unbouded, unless generator raise StopIteration
s1 = Stream.generate(lambda:5) # infinite number of 5s, if you count, it hangs!
s1.limit(1000).sum() # should be 5000
a = 1
b = 1
def fib():
global a
global b
a, b = b, a+b
if a > 100000000000:
raise StopIteration
return a
# Generating stream from fibonacci sequence, up to 100000000000.
# Generating is not an infinite loop, it is lazy!
Stream.generate(fib).limit(10).for_each(print)
1
2
3
5
8
13
21
34
55
89
# Quiz: How to generate random strings?
Using stream
# Mapping
Stream([1,2,3]).map(lambda x: x+1).for_each(print)
2
3
4
# Mapping in parallel. Note this consumes the entire stream, and return result in the original order. If it is infinite stream, this will cause out of memory error
def slow_map(x):
""" A slow mapping function that takes 2 seconds """
sleep(2)
return x * 2
Stream.generate(lambda:5).limit(10).parallel_map(slow_map).for_each(print) # default using 10 threads
Stream.generate(lambda:5).limit(10).parallel_map(slow_map, thread_count=20).for_each(print) # using 20 threads to map concurrently
thread_pool = ThreadPoolExecutor(max_workers=50)
Stream.generate(lambda:5).limit(10).parallel_map(slow_map, thread_pool=thread_pool).for_each(print) # re-use thread pool
# All of above calls will take 2 seconds, instead of 20 seconds if executed in map instead of parallel_map
# Filtering
Stream(range(0, 55)).filter(lambda x: x>50).for_each(print)
51
52
53
54
# Limiting
Stream(range(0, 1000000)).limit(5).for_each(print)
0
1
2
3
4
# Skipping
Stream(range(0, 100)).skip(95).for_each(print)
95
96
97
98
99
# Summing
Stream(range(0,5)).sum() # 10 (0 + 1 + 2 + 3 + 4)
# Max/Min
Stream(range(0, 5)).max() # 4
Stream(range(0, 5)).min() # 0
# Reducing
Stream(range(0, 5)).reduce(lambda x, y: x + y) # 10 -> same as sum!
# Reading from file from_file_lines
with Stream.from_file_lines("readme.txt").with_index() as stream:
stream.for_each(print)
(0, <line1>)
(1, <line2>)
(2, <line3>) ...
# With index
Stream([1,3,5,7,9]).with_index().for_each(print)
(0, 1)
(1, 3)
(2, 5)
(3, 7)
(4, 9)
# Counting
Stream(range(0, 100)).count() # 100 (0...99)
# Concating stream
s1 = Stream([1,2,3])
s2 = Stream([4,5,6])
(s1 + s2).count() # 6
s1.concat(s2).count() #6
# Note: if you do both of above, second line will be 0 since first one consumed s1 and s2 already.
# visiting with a func
Stream([1,2,3,4,5]).for_each(print)
1
2
3
4
5
# convert to list
list1 = Stream(range(0, 5)).to_list() # [0, 1, 2, 3, 4]
list2 = list(stream) # [0,1,2,3,4] since the stream itself is iterable
# picking from tuple for each element
stream = Stream(range(0, 10, 2)) # 0, 2, 4, 6, 8
indexed_stream = stream.with_index() # (0, 0), (1, 2), (2, 4), (3, 6), (4, 8)
indexed_stream.pick(0) # 0, 1, 2, 3, 4
indexed_stream.pick(1) # 0, 2, 4, 6, 8
indexed_stream.pick(3) # Index Out of Bound error
# Reducing
Stream(range(0, 5)).reduce(lambda x, y: x * y) # 0 (0 * 1 * 2 * 3 * 4)
# Flatten
Stream([1,2],[3,4]).flatten() # [1,2,3,4]
# Packing
Stream([1,2,3,4,5,6,7]).pack(2) # [[1,2], [3,4], [5,6], [7, None]]
Stream([1,2,3,4,5,6,7].pack(3).flatten()) # [1,2,3,4,5,6,7,None,None]
# If None not desired, filter them yourself.
# Flap map
# When your mapping returns a list, this call flattens it.
Stream([[2, 5], [3, 3]]).flat_map(lambda x: [x[0] for _ in range(x[1])]).for_each(print) # gives you 5 x 2s and 3 x 3s [2,2,2,2,2,3,3,3]
# Ordering
Stream([4,3,2,1,5]).ordered(reverse=true).for_each(print) # [5,4,3,2,1]
# Uniq
Stream([1,1,2,3,4,4,5]).uniq().for_each(print) # [1,2,3,4,5]
# Repeating
print (Stream([1,2,3,4,5]).repeat(3).to_list()) # repeats by default repeat 2 times. this gives you [1,2,3,4,5,1,2,3,4,5,1,2,3,4,5]
# To Set
Stream([1,2,3,4,5,5,6]).to_set() # {1,2,3,4,5,6}
# To map stream (package of 2, or more!) When package has more than 2 elements, key is first of pack, value is rest of pack.
Stream(["k1", "v1", "k2", "v2"]).pack(2).to_maps() # [{k1:v1}, {k2:v2}]
# To a single map
Stream(["k1", "v1", "k2", "v2"]).pack(2).to_map() # [{k1:v1,k2:v2}]
Stream(["k1", "v1", "k2", "v2"]).to_map() # value error!
Stream(["k1", "v1", "k2", "v2"]).to_maps() # value error!
print(Stream(["k1", "v1", "k2", "v2"]).pack(3).to_map()) # {'k1': ['v1', 'k2'], 'v2': [None, None]}
# Repeating stream
Stream(["I love python"]).repeat(20) #["I love python", .... "I love python"] (20 times)
# Spliting stream
# Note that pystreams can be split, however, first splitted stream is primary, others are secondary.
# Secondary has such behaviors:
# Consuming of secondary depends on consumption from primary, and it blocks for ever until elements of first is consumed.
# Elements are available in stream as soon as primary consumes it.
# begin_func, exit_func are attached to primary only. If you split the file stream, please make sure the first 1 (s1 is closed.)
#
s1, s2, s3, s4 = Stream([1,2,3,4,5], lambda:print("Begin"), lambda:print("end")).split(4)
def slow_consume(name, stream):
iter = stream.__iter__()
while True:
try:
i = iter.__next__()
print(f"{name} => consumed {i}")
except StopIteration:
break
sleep(1)
def consume_asap(name, stream):
for i in stream:
print(f"{name} => consumed {i}")
with s1, s2, s3, s4:
t1 = threading.Thread(target=slow_consume, args=("s1", s1))
t2 = threading.Thread(target=consume_asap, args=("s2", s2))
t3 = threading.Thread(target=consume_asap, args=("s3", s3))
t4 = threading.Thread(target=consume_asap, args=("s4", s4))
Stream([t1, t2, t3, t4]).for_each(lambda x: x.start())
Stream([t1, t2, t3, t4]).for_each(lambda x: x.join())
# Create random string from charset, with length
Stream.random_strings("ABCDEFG", 12).limit(20).for_each(print)
# Default length is 5
Stream.random_chars("ABCDEFG").limit(20).for_each(print)
# Create random integers with [lower, upper) range
Stream.random_ints(0, 10).limit(20).for_each(print)
# Create floats with scale [0, scale)
Stream.random_floats(10).limit(20).for_each(print)
# String for upper case characters
print("Upper")
# Lower case
Stream.random_alphabets(length=5, lower=False).limit(20).for_each(print)
print("Lower")
Stream.random_alphabets(length=6, lower=True).limit(20).for_each(print)
# Upper case hex of length 5
print("Upper ")
Stream.random_hex_strings(length=5).limit(20).for_each(print)
# Lower case hex of length 10
print("Lower hex")
Stream.random_hex_strings(length=10, lower=True).limit(20).for_each(print)
# Random string in list
list_new = [x for x in Stream.random_strings().limit(10)]
print(list_new)
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pystream_wushilin-1.0.7.tar.gz.
File metadata
- Download URL: pystream_wushilin-1.0.7.tar.gz
- Upload date:
- Size: 18.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.4.1 importlib_metadata/4.6.0 pkginfo/1.7.0 requests/2.25.1 requests-toolbelt/0.9.1 tqdm/4.61.1 CPython/3.9.6
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
316c7968e94c3aa98ebbd52c0a7ebc3ee735c02ffd02d91854b1188a3d84b69c
|
|
| MD5 |
cf28726cf95ee5064c3065b38a35d3c1
|
|
| BLAKE2b-256 |
b0633a3994a69be978916a0c377128f97d6f09e4a8cce5978e400c3942cae2c7
|
File details
Details for the file pystream_wushilin-1.0.7-py3-none-any.whl.
File metadata
- Download URL: pystream_wushilin-1.0.7-py3-none-any.whl
- Upload date:
- Size: 15.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.4.1 importlib_metadata/4.6.0 pkginfo/1.7.0 requests/2.25.1 requests-toolbelt/0.9.1 tqdm/4.61.1 CPython/3.9.6
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
5100d20a304353a03e4bf21f789db653e6f6338a594015be59973dc65b48eb67
|
|
| MD5 |
beee923f3cab09c3efb9e8448b872e19
|
|
| BLAKE2b-256 |
63c816ff5e7b46af244e5e0270bff4d675f52d513c4b9d23b42513abbdcc5915
|