Skip to main content

Nonblocking stream reads using Queue and a pump thread

Project description

Nonblocking Stream Queue

This is a simple python package for nonblocking reading from streams.

It could be expanded for general nonblocking usage if desired.

A simple nonblocking queue reader is provided, with implementation from https://stackoverflow.com/questions/375427/a-non-blocking-read-on-a-subprocess-pipe-in-python/4896288#4896288 .

When constructed, the reader spawns a thread and begins reading everything from the stream.

Installation

$ python3 -m pip install nonblocking-stream-queue

Usage

import sys
import nonblocking_stream_queue as nonblocking

reader = nonblocking.Reader(
    sys.stdin, # object to read from
    max_size=4096, # max size of each read
    lines=False, # whether or not to break reads into lines
    max_count=None, # max queued reads
    drop_timeout=None, # time to wait for queue to drain before dropping
    drop_older=False, # which end of the queue to drop from
    pre_cb=None, # if set, data = (pre_cb(), read())
    post_cb=None, # if set, data = post_cb(data)
    drop_cb=None, # if set, call with dropped data
    verbose=False, # if set, display progress filling max_count
)

print(reader.read_one()) # outputs up to 4096 characters, or None if nothing queued
print(reader.read_many()) # outputs all 4096 character chunks queued
print(''.join(reader.read_many())) # outputs all queued text
reader.block() # waits for data to be present or end, returns number of reads queued
reader.is_pumping() # False if data has ended

Timestamping

import sys, time
import nonblocking_stream_queue as nonblocking

reader = nonblocking.Reader(
    sys.stdin.buffer,
    pre_cb=lambda: time.time(),
    post_cb=lambda time_data_tuple: (time_data_tuple[0], time_data_tuple[1], time.time())
)

while reader.block():
    start_time, data, end_time = reader.read_one()
    print(start_time, data, end_time)

Lines

import sys
import nonblocking_stream_queue as nonblocking

reader = nonblocking.Reader(
    sys.stdin,
    lines=True
)

while reader.block():
    print(reader.read_one().rstrip()) # outputs each line of text that is input

Other solutions

There are likely many other existing solutions to this common task.

Here's one:

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

nonblocking_stream_queue-0.1.4.tar.gz (3.6 kB view details)

Uploaded Source

Built Distribution

File details

Details for the file nonblocking_stream_queue-0.1.4.tar.gz.

File metadata

File hashes

Hashes for nonblocking_stream_queue-0.1.4.tar.gz
Algorithm Hash digest
SHA256 fd3cf4112b3a194984a4078a2c588222a389ee1ad58727817c3d2a656aa5907f
MD5 c00f6807478051bbcc7ae1f26f98ba88
BLAKE2b-256 4fa79dbf258b9ad19706ba14ea5b5af17567518ec105e4fba02dffd1346c7030

See more details on using hashes here.

File details

Details for the file nonblocking_stream_queue-0.1.4-py3-none-any.whl.

File metadata

File hashes

Hashes for nonblocking_stream_queue-0.1.4-py3-none-any.whl
Algorithm Hash digest
SHA256 7f4c10303648f33272a31292e2118e86535bf427fea63a1db463ba30c835655d
MD5 0be12a4ab9738ef99b4e12862d115e13
BLAKE2b-256 34200c66086b5492e15bf0d10d8f162b678f2f7530b63ea46c9af76ab21ecbc9

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page