Skip to main content

A python stream utility

Project description

A java like stream for python

A stream is like java, everything is lazy, using iterative approach, we don't build large list in memory.

A stream is like a list with cursor, it can only be consumed once! What if I need it again? build a new stream object

A stream can be easily pipelined. you can call them like a chain. e.g. mapped stream can be further reduced or filtered.

Stream(range(0, 1000)).filter(lambda x: mod(x, 2) == 0).map(lambda x:x//2).count() # You know what this does!

It has also parallel processing capability, make parallel programming easy.

Imaging managing many downloads can be done this way

# This can replace most of the use case of parallem processing
Stream(urls).parallel_map(lambda x:download_to_file(x, "/tmp"), thread_count=10)

# or you can reuse thread pool. 
Stream(urls).parallel_map(lambda x:download_to_file(x, "/tmp"), thread_pool=my_thread_pool_executor)

Installing

$ pip3 install pystream-wushilin

Importing

from pystream.pystream import Stream

Using

Creating: stream can be created from iterable, or iterator

# Create stream from iterable (e.g. collections)
# This might be bounded or unbouded
stream = Stream(range(0, 100))
stream = Stream([1,2,3,4,5,6])
dict1 = {'k1': 'v1', 'k2': 'v2'}

key_stream = Stream(dict1.keys())
value_stream.map(lambda k: dict1[k])

# Create stream from iterator!
# This might be bounded or unbounded
string = "hello, world"
iterator = iter(string)
stream = Stream(iterator)

# Create stream from a file (read lines as stream):
# You have to use with the "WITH" keyword so stream can be closed
with Stream.from_file_lines("example.txt") as stream:
   # use stream

# Create stream from iterative function
# effectively, [seed, func(seed), func(func(seed)), func(func(func(seed))), ...]
# This generated is an unbounded stream, however generator can raise StopIteration for EOF
def adder(x):
    return x + 1

Stream.iterate(1, adder).limit(5) # [0, 1, 2, 3, 4]

# Stream can optionally attach a begin function and exit function so with keywords can be done gracefully.
# begin_func is executed before the consumption
# exit_func is executed after stream is out of scope so cleanup can be done.
# This is useful for stream that is from network, or from file, or from database that has something to clean up
with Stream(dbcursor, begin_func=lambda x:print("I am executed before stream is consumed"), exit_func=dbcursor.close) as stream:
  stream.to_list() # or other ways you like

# Creating from generator func, this stream is unbouded. don't reduce on them(count, sum, max, min etc), don't parallel map
# but map, filter, limit etc is fine, since map, filter are lazy.
# if your generator is bounded, raise StopIteration after last element then this stream is unbounded.
# This is typically unbouded, unless generator raise StopIteration
s1 = Stream.generate(lambda:5) # infinite number of 5s, if you count, it hangs!
s1.limit(1000).sum() # should be 5000

a = 1
b = 1
def fib():
    global a
    global b
    a, b = b, a+b
    if a > 100000000000:
      raise StopIteration
    return a
# Generating stream from fibonacci sequence, up to 100000000000. 
# Generating is not an infinite loop, it is lazy!
Stream.generate(fib).limit(10).for_each(print)
1
2
3
5
8
13
21
34
55
89
# Quiz: How to generate random strings?

Using stream

# Mapping
Stream([1,2,3]).map(lambda x: x+1).for_each(print)
2
3
4

# Mapping in parallel. Note this consumes the entire stream, and return result in the original order. If it is infinite stream, this will cause out of memory error
    def slow_map(x):
        """ A slow mapping function that takes 2 seconds """
        sleep(2)
        return x * 2

    Stream.generate(lambda:5).limit(10).parallel_map(slow_map).for_each(print) # default using 10 threads
    Stream.generate(lambda:5).limit(10).parallel_map(slow_map, thread_count=20).for_each(print) # using 20 threads to map concurrently
    thread_pool = ThreadPoolExecutor(max_workers=50)
    Stream.generate(lambda:5).limit(10).parallel_map(slow_map, thread_pool=thread_pool).for_each(print) # re-use thread pool

    # All of above calls will take 2 seconds, instead of 20 seconds if executed in map instead of parallel_map
# Filtering
Stream(range(0, 55)).filter(lambda x: x>50).for_each(print)
51
52
53
54

# Limiting
Stream(range(0, 1000000)).limit(5).for_each(print)
0
1
2
3
4

# Skipping
Stream(range(0, 100)).skip(95).for_each(print)
95
96
97
98
99

# Summing
Stream(range(0,5)).sum() # 10 (0 + 1 + 2 + 3 + 4)

# Max/Min
Stream(range(0, 5)).max() # 4
Stream(range(0, 5)).min() # 0

# Reducing
Stream(range(0, 5)).reduce(lambda x, y: x + y) # 10 -> same as sum!

# Reading from file from_file_lines
with Stream.from_file_lines("readme.txt").with_index() as stream:
  stream.for_each(print)

(0, <line1>)
(1, <line2>)
(2, <line3>) ...

# With index
Stream([1,3,5,7,9]).with_index().for_each(print)
(0, 1)
(1, 3)
(2, 5)
(3, 7)
(4, 9)

# Counting
Stream(range(0, 100)).count() # 100 (0...99)

# Concating stream
s1 = Stream([1,2,3])
s2 = Stream([4,5,6])
(s1 + s2).count() # 6
s1.concat(s2).count() #6
# Note: if you do both of above, second line will be 0 since first one consumed s1 and s2 already.

# visiting with a func
Stream([1,2,3,4,5]).for_each(print)
1
2
3
4
5


# convert to list
list1 = Stream(range(0, 5)).to_list() # [0, 1, 2, 3, 4]
list2 = list(stream) # [0,1,2,3,4] since the stream itself is iterable

# picking from tuple for each element
stream = Stream(range(0, 10, 2)) # 0, 2, 4, 6, 8
indexed_stream = stream.with_index() # (0, 0), (1, 2), (2, 4), (3, 6), (4, 8)
indexed_stream.pick(0) # 0, 1, 2, 3, 4
indexed_stream.pick(1) # 0, 2, 4, 6, 8
indexed_stream.pick(3) # Index Out of Bound error

# Reducing
Stream(range(0, 5)).reduce(lambda x, y: x * y) # 0 (0 * 1 * 2 * 3 * 4) 

# Flatten
Stream([1,2],[3,4]).flatten() # [1,2,3,4]

# Packing
Stream([1,2,3,4,5,6,7]).pack(2) # [[1,2], [3,4], [5,6], [7, None]]
Stream([1,2,3,4,5,6,7].pack(3).flatten()) # [1,2,3,4,5,6,7,None,None]
# If None not desired, filter them yourself.

# Flap map
# When your mapping returns a list, this call flattens it.
Stream([[2, 5], [3, 3]]).flat_map(lambda x: [x[0] for _ in range(x[1])]).for_each(print) # gives you 5 x 2s and 3 x 3s [2,2,2,2,2,3,3,3]

# Ordering
Stream([4,3,2,1,5]).ordered(reverse=true).for_each(print) # [5,4,3,2,1]

# Uniq
Stream([1,1,2,3,4,4,5]).uniq().for_each(print) # [1,2,3,4,5]

# Repeating
print (Stream([1,2,3,4,5]).repeat(3).to_list()) # repeats by default repeat 2 times. this gives you [1,2,3,4,5,1,2,3,4,5,1,2,3,4,5]

# To Set
Stream([1,2,3,4,5,5,6]).to_set() # {1,2,3,4,5,6}

# To map stream (package of 2, or more!) When package has more than 2 elements, key is first of pack, value is rest of pack.
Stream(["k1", "v1", "k2", "v2"]).pack(2).to_maps() # [{k1:v1}, {k2:v2}]

# To a single map
Stream(["k1", "v1", "k2", "v2"]).pack(2).to_map() # [{k1:v1,k2:v2}]

Stream(["k1", "v1", "k2", "v2"]).to_map() # value error!
Stream(["k1", "v1", "k2", "v2"]).to_maps() # value error!

print(Stream(["k1", "v1", "k2", "v2"]).pack(3).to_map()) # {'k1': ['v1', 'k2'], 'v2': [None, None]}

# Repeating stream
Stream(["I love python"]).repeat(20) #["I love python", .... "I love python"] (20 times)


# Spliting stream

# Note that pystreams can be split, however, first splitted stream is primary, others are secondary.
# Secondary has such behaviors:
# Consuming of secondary depends on consumption from primary, and it blocks for ever until elements of first is consumed.
# Elements are available in stream as soon as primary consumes it.
# begin_func, exit_func are attached to primary only. If you split the file stream, please make sure the first 1 (s1 is closed.)
#
    s1, s2, s3, s4 = Stream([1,2,3,4,5], lambda:print("Begin"), lambda:print("end")).split(4)
    
    def slow_consume(name, stream):
        iter = stream.__iter__()
        while True:
            try:
                i = iter.__next__()
                print(f"{name} => consumed {i}")
            except StopIteration:
                break
            sleep(1)

    def consume_asap(name, stream):
        for i in stream:
            print(f"{name} => consumed {i}")

    with s1, s2, s3, s4:
        t1 = threading.Thread(target=slow_consume, args=("s1", s1))
        t2 = threading.Thread(target=consume_asap, args=("s2", s2))
        t3 = threading.Thread(target=consume_asap, args=("s3", s3))
        t4 = threading.Thread(target=consume_asap, args=("s4", s4))
        Stream([t1, t2, t3, t4]).for_each(lambda x: x.start())
        Stream([t1, t2, t3, t4]).for_each(lambda x: x.join())



    # Create random string from charset, with length
    Stream.random_strings("ABCDEFG", 12).limit(20).for_each(print)
    
    # Default length is 5
    Stream.random_chars("ABCDEFG").limit(20).for_each(print)
    
    # Create random integers with [lower, upper) range
    Stream.random_ints(0, 10).limit(20).for_each(print)
    
    # Create floats with scale [0, scale)
    Stream.random_floats(10).limit(20).for_each(print)
    
    # String for upper case characters
    print("Upper")
    
    # Lower case
    Stream.random_alphabets(length=5, lower=False).limit(20).for_each(print)
    print("Lower")
    Stream.random_alphabets(length=6, lower=True).limit(20).for_each(print)
    
    # Upper case hex of length 5
    print("Upper ")
    Stream.random_hex_strings(length=5).limit(20).for_each(print)
    
    # Lower case hex of length 10
    print("Lower hex")
    Stream.random_hex_strings(length=10, lower=True).limit(20).for_each(print)
    
    # Random string in list
    list_new = [x for x in Stream.random_strings().limit(10)]
    print(list_new)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pystream_wushilin-1.0.7.tar.gz (18.1 kB view hashes)

Uploaded Source

Built Distribution

pystream_wushilin-1.0.7-py3-none-any.whl (15.9 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page