Skip to main content

A python stream utility

Project description

A java like stream for python

A stream is like java, everything is lazy, using iterative approach, we don't build large list in memory.

A stream is like a list with cursor, it can only be consumed once! What if I need it again? build a new stream object

A stream can be easily pipelined. you can call them like a chain. e.g. mapped stream can be further reduced or filtered.

Stream(range(0, 1000)).filter(lambda x: mod(x, 2) == 0).map(lambda x:x//2).count() # You know what this does!

It has also parallel processing capability, make parallel programming easy.

Imaging managing many downloads can be done this way

# This can replace most of the use case of parallem processing
Stream(urls).parallel_map(lambda x:download_to_file(x, "/tmp"), thread_count=10)

# or you can reuse thread pool. 
Stream(urls).parallel_map(lambda x:download_to_file(x, "/tmp"), thread_pool=my_thread_pool_executor)

Installing

$ pip3 install pystream-wushilin

Importing

from pystream.pystream import Stream

Using

Creating: stream can be created from iterable, or iterator

# Create stream from iterable (e.g. collections)
# This might be bounded or unbouded
stream = Stream(range(0, 100))
stream = Stream([1,2,3,4,5,6])
dict1 = {'k1': 'v1', 'k2': 'v2'}

key_stream = Stream(dict1.keys())
value_stream.map(lambda k: dict1[k])

# Create stream from iterator!
# This might be bounded or unbounded
string = "hello, world"
iterator = iter(string)
stream = Stream(iterator)

# Create stream from a file (read lines as stream):
# You have to use with the "WITH" keyword so stream can be closed
with Stream.from_file_lines("example.txt") as stream:
   # use stream

# Create stream from iterative function
# effectively, [seed, func(seed), func(func(seed)), func(func(func(seed))), ...]
# This generated is an unbounded stream, however generator can raise StopIteration for EOF
def adder(x):
    return x + 1

Stream.iterate(1, adder).limit(5) # [0, 1, 2, 3, 4]

# Stream can optionally attach a begin function and exit function so with keywords can be done gracefully.
# begin_func is executed before the consumption
# exit_func is executed after stream is out of scope so cleanup can be done.
# This is useful for stream that is from network, or from file, or from database that has something to clean up
with Stream(dbcursor, begin_func=lambda x:print("I am executed before stream is consumed"), exit_func=dbcursor.close) as stream:
  stream.to_list() # or other ways you like

# Creating from generator func, this stream is unbouded. don't reduce on them(count, sum, max, min etc), don't parallel map
# but map, filter, limit etc is fine, since map, filter are lazy.
# if your generator is bounded, raise StopIteration after last element then this stream is unbounded.
# This is typically unbouded, unless generator raise StopIteration
s1 = Stream.generate(lambda:5) # infinite number of 5s, if you count, it hangs!
s1.limit(1000).sum() # should be 5000

a = 1
b = 1
def fib():
    global a
    global b
    a, b = b, a+b
    if a > 100000000000:
      raise StopIteration
    return a
# Generating stream from fibonacci sequence, up to 100000000000. 
# Generating is not an infinite loop, it is lazy!
Stream.generate(fib).limit(10).for_each(print)
1
2
3
5
8
13
21
34
55
89
# Quiz: How to generate random strings?

Using stream

# Mapping
Stream([1,2,3]).map(lambda x: x+1).for_each(print)
2
3
4

# Mapping in parallel. Note this consumes the entire stream, and return result in the original order. If it is infinite stream, this will cause out of memory error
    def slow_map(x):
        """ A slow mapping function that takes 2 seconds """
        sleep(2)
        return x * 2

    Stream.generate(lambda:5).limit(10).parallel_map(slow_map).for_each(print) # default using 10 threads
    Stream.generate(lambda:5).limit(10).parallel_map(slow_map, thread_count=20).for_each(print) # using 20 threads to map concurrently
    thread_pool = ThreadPoolExecutor(max_workers=50)
    Stream.generate(lambda:5).limit(10).parallel_map(slow_map, thread_pool=thread_pool).for_each(print) # re-use thread pool

    # All of above calls will take 2 seconds, instead of 20 seconds if executed in map instead of parallel_map
# Filtering
Stream(range(0, 55)).filter(lambda x: x>50).for_each(print)
51
52
53
54

# Limiting
Stream(range(0, 1000000)).limit(5).for_each(print)
0
1
2
3
4

# Skipping
Stream(range(0, 100)).skip(95).for_each(print)
95
96
97
98
99

# Summing
Stream(range(0,5)).sum() # 10 (0 + 1 + 2 + 3 + 4)

# Max/Min
Stream(range(0, 5)).max() # 4
Stream(range(0, 5)).min() # 0

# Reducing
Stream(range(0, 5)).reduce(lambda x, y: x + y) # 10 -> same as sum!

# Reading from file from_file_lines
with Stream.from_file_lines("readme.txt").with_index() as stream:
  stream.for_each(print)

(0, <line1>)
(1, <line2>)
(2, <line3>) ...

# With index
Stream([1,3,5,7,9]).with_index().for_each(print)
(0, 1)
(1, 3)
(2, 5)
(3, 7)
(4, 9)

# Counting
Stream(range(0, 100)).count() # 100 (0...99)

# Concating stream
s1 = Stream([1,2,3])
s2 = Stream([4,5,6])
(s1 + s2).count() # 6
s1.concat(s2).count() #6
# Note: if you do both of above, second line will be 0 since first one consumed s1 and s2 already.

# visiting with a func
Stream([1,2,3,4,5]).for_each(print)
1
2
3
4
5


# convert to list
list1 = Stream(range(0, 5)).to_list() # [0, 1, 2, 3, 4]
list2 = list(stream) # [0,1,2,3,4] since the stream itself is iterable

# picking from tuple for each element
stream = Stream(range(0, 10, 2)) # 0, 2, 4, 6, 8
indexed_stream = stream.with_index() # (0, 0), (1, 2), (2, 4), (3, 6), (4, 8)
indexed_stream.pick(0) # 0, 1, 2, 3, 4
indexed_stream.pick(1) # 0, 2, 4, 6, 8
indexed_stream.pick(3) # Index Out of Bound error

# Reducing
Stream(range(0, 5)).reduce(lambda x, y: x * y) # 0 (0 * 1 * 2 * 3 * 4) 

# Flatten
Stream([1,2],[3,4]).flatten() # [1,2,3,4]

# Packing
Stream([1,2,3,4,5,6,7]).pack(2) # [[1,2], [3,4], [5,6], [7, None]]
Stream([1,2,3,4,5,6,7].pack(3).flatten()) # [1,2,3,4,5,6,7,None,None]
# If None not desired, filter them yourself.

# Flap map
# When your mapping returns a list, this call flattens it.
Stream([[2, 5], [3, 3]]).flat_map(lambda x: [x[0] for _ in range(x[1])]).for_each(print) # gives you 5 x 2s and 3 x 3s [2,2,2,2,2,3,3,3]

# Ordering
Stream([4,3,2,1,5]).ordered(reverse=true).for_each(print) # [5,4,3,2,1]

# Uniq
Stream([1,1,2,3,4,4,5]).uniq().for_each(print) # [1,2,3,4,5]

# Repeating
print (Stream([1,2,3,4,5]).repeat(3).to_list()) # repeats by default repeat 2 times. this gives you [1,2,3,4,5,1,2,3,4,5,1,2,3,4,5]

# To Set
Stream([1,2,3,4,5,5,6]).to_set() # {1,2,3,4,5,6}

# To map stream (package of 2, or more!) When package has more than 2 elements, key is first of pack, value is rest of pack.
Stream(["k1", "v1", "k2", "v2"]).pack(2).to_maps() # [{k1:v1}, {k2:v2}]

# To a single map
Stream(["k1", "v1", "k2", "v2"]).pack(2).to_map() # [{k1:v1,k2:v2}]

Stream(["k1", "v1", "k2", "v2"]).to_map() # value error!
Stream(["k1", "v1", "k2", "v2"]).to_maps() # value error!

print(Stream(["k1", "v1", "k2", "v2"]).pack(3).to_map()) # {'k1': ['v1', 'k2'], 'v2': [None, None]}

# Repeating stream
Stream(["I love python"]).repeat(20) #["I love python", .... "I love python"] (20 times)


# Spliting stream

# Note that pystreams can be split, however, first splitted stream is primary, others are secondary.
# Secondary has such behaviors:
# Consuming of secondary depends on consumption from primary, and it blocks for ever until elements of first is consumed.
# Elements are available in stream as soon as primary consumes it.
# begin_func, exit_func are attached to primary only. If you split the file stream, please make sure the first 1 (s1 is closed.)
#
    s1, s2, s3, s4 = Stream([1,2,3,4,5], lambda:print("Begin"), lambda:print("end")).split(4)
    
    def slow_consume(name, stream):
        iter = stream.__iter__()
        while True:
            try:
                i = iter.__next__()
                print(f"{name} => consumed {i}")
            except StopIteration:
                break
            sleep(1)

    def consume_asap(name, stream):
        for i in stream:
            print(f"{name} => consumed {i}")

    with s1, s2, s3, s4:
        t1 = threading.Thread(target=slow_consume, args=("s1", s1))
        t2 = threading.Thread(target=consume_asap, args=("s2", s2))
        t3 = threading.Thread(target=consume_asap, args=("s3", s3))
        t4 = threading.Thread(target=consume_asap, args=("s4", s4))
        Stream([t1, t2, t3, t4]).for_each(lambda x: x.start())
        Stream([t1, t2, t3, t4]).for_each(lambda x: x.join())



    # Create random string from charset, with length
    Stream.random_strings("ABCDEFG", 12).limit(20).for_each(print)
    
    # Default length is 5
    Stream.random_chars("ABCDEFG").limit(20).for_each(print)
    
    # Create random integers with [lower, upper) range
    Stream.random_ints(0, 10).limit(20).for_each(print)
    
    # Create floats with scale [0, scale)
    Stream.random_floats(10).limit(20).for_each(print)
    
    # String for upper case characters
    print("Upper")
    
    # Lower case
    Stream.random_alphabets(length=5, lower=False).limit(20).for_each(print)
    print("Lower")
    Stream.random_alphabets(length=6, lower=True).limit(20).for_each(print)
    
    # Upper case hex of length 5
    print("Upper ")
    Stream.random_hex_strings(length=5).limit(20).for_each(print)
    
    # Lower case hex of length 10
    print("Lower hex")
    Stream.random_hex_strings(length=10, lower=True).limit(20).for_each(print)
    
    # Random string in list
    list_new = [x for x in Stream.random_strings().limit(10)]
    print(list_new)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pystream_wushilin-1.0.7.tar.gz (18.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pystream_wushilin-1.0.7-py3-none-any.whl (15.9 kB view details)

Uploaded Python 3

File details

Details for the file pystream_wushilin-1.0.7.tar.gz.

File metadata

  • Download URL: pystream_wushilin-1.0.7.tar.gz
  • Upload date:
  • Size: 18.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.4.1 importlib_metadata/4.6.0 pkginfo/1.7.0 requests/2.25.1 requests-toolbelt/0.9.1 tqdm/4.61.1 CPython/3.9.6

File hashes

Hashes for pystream_wushilin-1.0.7.tar.gz
Algorithm Hash digest
SHA256 316c7968e94c3aa98ebbd52c0a7ebc3ee735c02ffd02d91854b1188a3d84b69c
MD5 cf28726cf95ee5064c3065b38a35d3c1
BLAKE2b-256 b0633a3994a69be978916a0c377128f97d6f09e4a8cce5978e400c3942cae2c7

See more details on using hashes here.

File details

Details for the file pystream_wushilin-1.0.7-py3-none-any.whl.

File metadata

  • Download URL: pystream_wushilin-1.0.7-py3-none-any.whl
  • Upload date:
  • Size: 15.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.4.1 importlib_metadata/4.6.0 pkginfo/1.7.0 requests/2.25.1 requests-toolbelt/0.9.1 tqdm/4.61.1 CPython/3.9.6

File hashes

Hashes for pystream_wushilin-1.0.7-py3-none-any.whl
Algorithm Hash digest
SHA256 5100d20a304353a03e4bf21f789db653e6f6338a594015be59973dc65b48eb67
MD5 beee923f3cab09c3efb9e8448b872e19
BLAKE2b-256 63c816ff5e7b46af244e5e0270bff4d675f52d513c4b9d23b42513abbdcc5915

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page