Skip to main content

Facilities to do with buffers, particularly CornuCopyBuffer, an automatically refilling buffer to support parsing of data streams.

Project description

Facilities to do with buffers, particularly CornuCopyBuffer, an automatically refilling buffer to support parsing of data streams.

Latest release 20240630:

  • CornuCopyBuffer: new optional .final_offset attribute indicating the anticipated largest offset, automatcially set for regular files - aids progress bars etc.
  • Drop cs.py3, get pread from os.pread, require python>=3.3.
  • CornuCopyBuffer.close: set self.bufs = None to release memory and prevent accidental late use.
  • _FetchIterator: provide a mutex; SeekableIteratorMixin.seek, FDIterator.read,close: take the mutex.

Class CopyingIterator

Wrapper for an iterator that copies every item retrieved to a callable.

Method CopyingIterator.__init__(self, it, copy_to): Initialise with the iterator it and the callable copy_to.

Class CornuCopyBuffer(cs.deco.Promotable)

An automatically refilling buffer intended to support parsing of data streams.

Its purpose is to aid binary parsers which do not themselves need to handle sources specially; CornuCopyBuffers are trivially made from bytes, iterables of bytes and file-like objects. See cs.binary for convenient parsing classes which work against CornuCopyBuffers.

Attributes:

  • buf: the first of any buffered leading chunks buffer of unparsed data from the input, available for direct inspection by parsers; normally however parsers will use .extend and .take.
  • offset: the logical offset of the buffer; this excludes buffered data and unconsumed input data

Note: the initialiser may supply a cleanup function; although this will be called via the buffer's .__del__ method a prudent user of a buffer should call the .close() method when finished with the buffer to ensure prompt cleanup.

The primary methods supporting parsing of data streams are .extend() and take(). Calling .extend(min_size) arranges that the internal buffer contains at least min_size bytes. Calling .take(size) fetches exactly size bytes from the internal buffer and the input source if necessary and returns them, adjusting the internal buffer.

len(CornuCopyBuffer) returns the length of any buffered data.

bool(CornuCopyBuffer) tests whether len() > 0.

Indexing a CornuCopyBuffer accesses the buffered data only, returning an individual byte's value (an int).

A CornuCopyBuffer is also iterable, yielding data in whatever sizes come from its input_data source, preceeded by any content in the internal buffer.

A CornuCopyBuffer also supports the file methods .read, .tell and .seek supporting drop in use of the buffer in many file contexts. Backward seeks are not supported. .seek will take advantage of the input_data's .seek method if it has one, otherwise it will use consume the input_data as required.

Method CornuCopyBuffer.__init__(self, input_data, buf=None, offset=0, seekable=None, copy_offsets=None, copy_chunks=None, close=None, progress=None, final_offset=None): Prepare the buffer.

Parameters:

  • input_data: an iterable of data chunks (bytes-like instances); if your data source is a file see the .from_file factory; if your data source is a file descriptor see the .from_fd factory.
  • buf: if not None, the initial state of the parse buffer
  • offset: logical offset of the start of the buffer, default 0
  • seekable: whether input_data has a working .seek method; the default is None meaning that it will be attempted on the first skip or seek
  • copy_offsets: if not None, a callable for parsers to report pertinent offsets via the buffer's .report_offset method
  • copy_chunks: if not None, every fetched data chunk is copied to this callable

The input_data is an iterable whose iterator may have some optional additional properties:

  • seek: if present, this is a seek method after the fashion of file.seek; the buffer's seek, skip and skipto methods will take advantage of this if available.
  • offset: the current byte offset of the iterator; this is used during the buffer initialisation to compute input_data_displacement, the difference between the buffer's logical offset and the input data iterable's logical offset; if unavailable during initialisation this is presumed to be 0.
  • end_offset: the end offset of the iterator if known.
  • close: an optional callable that may be provided for resource cleanup when the user of the buffer calls its .close() method.
  • progress: an optional cs.Progress.progress instance to which to report data consumed from input_data; any object supporting += is acceptable
  • final_offset: optional int specifying the largest offset expected to be reached, intended for uses such as callers presenting a pregress indication; this is, for example, provided by CornuCopyBuffer.from_fd for regular files using the stat.st_size field

Method CornuCopyBuffer.__del__(self): Release resources when the object is deleted.

Method CornuCopyBuffer.__getitem__(self, index): Fetch from the internal buffer. This does not consume data from the internal buffer. Note that this is an expensive way to access the buffer, particularly if index is a slice.

If index is a slice, slice the join of the internal subbuffers. This is quite expensive and it is probably better to take or takev some data from the buffer.

Otherwise index should be an int and the corresponding buffered byte is returned.

This is usually not a very useful method; its primary use case is to probe the buffer to make a parsing decision instead of taking a byte off and (possibly) pushing it back.

Method CornuCopyBuffer.__len__(self): The length is the length of the internal buffer: data available without a fetch.

Method CornuCopyBuffer.__next__(self): Fetch a data chunk from the buffer.

Method CornuCopyBuffer.as_fd(self, maxlength=Ellipsis): Create a pipe and dispatch a Thread to copy up to maxlength bytes from bfr into it. Return the file descriptor of the read end of the pipe.

The default maxlength is Ellipsis, meaning to copy all data.

Note that the thread preemptively consumes from the buffer.

This is useful for passing buffer data to subprocesses.

Method CornuCopyBuffer.at_eof(self): Test whether the buffer is at end of input.

Warning: this will fetch from the input_data if the buffer is empty and so it may block.

Method CornuCopyBuffer.bounded(self, end_offset) -> 'CornuCopyBuffer': Return a new CornuCopyBuffer operating on a bounded view of this buffer.

This supports parsing of the buffer contents without risk of consuming past a certain point, such as the known end of a packet structure.

Parameters:

  • end_offset: the ending offset of the new buffer. Note that this is an absolute offset, not a length.

The new buffer starts with the same offset as self and use of the new buffer affects self. After a flush both buffers will again have the same offset and the data consumed via the new buffer will also have been consumed from self.

Here is an example.

  • Make a buffer bfr with 9 bytes of data in 3 chunks.
  • Consume 2 bytes, advancing the offset to 2.
  • Make a new bounded buffer subbfr extending to offset 5. Its inital offset is also 2.
  • Iterate over it, yielding the remaining single byte chunk from b'abc' and then the first 2 bytes of b'def'. The new buffer's offset is now 5.
  • Try to take 2 more bytes from the new buffer - this fails.
  • Flush the new buffer, synchronising with the original. The original's offset is now also 5.
  • Take 2 bytes from the original buffer, which succeeds.

Example:

>>> bfr = CornuCopyBuffer([b'abc', b'def', b'ghi'])
>>> bfr.offset
0
>>> bfr.take(2)
b'ab'
>>> bfr.offset
2
>>> subbfr = bfr.bounded(5)
>>> subbfr.offset
2
>>> for bs in subbfr:
...   print(bs)
...
b'c'
b'de'
>>> subbfr.offset
5
>>> subbfr.take(2)
Traceback (most recent call last):
    ...
EOFError: insufficient input data, wanted 2 bytes but only found 0
>>> subbfr.flush()
>>> bfr.offset
5
>>> bfr.take(2)
b'fg'

WARNING: if the bounded buffer is not completely consumed then it is critical to call the new CornuCopyBuffer's .flush method to push any unconsumed buffer back into this buffer. Recommended practice is to always call .flush when finished with the new buffer. The CornuCopyBuffer.subbuffer method returns a context manager which does this automatically.

Also, because the new buffer may buffer some of the unconsumed data from this buffer, use of the original buffer should be suspended.

Property CornuCopyBuffer.buf: The first buffer, or b'' if nothing is buffered.

Method CornuCopyBuffer.byte0(self): Consume the leading byte and return it as an int (0..255).

Method CornuCopyBuffer.close(self): Close the buffer. This discards the internal buffer of "read but not consumed" data and calls the close callable supplied when the buffer was initialised, if any.

Property CornuCopyBuffer.end_offset: Return the end offset of the input data (in buffer ordinates) if known, otherwise None.

Note that this depends on the computation of the input_offset_displacement which takes place at the buffer initialisation, which in turn relies on the input_data.offset attribute, which at initialisation is presumed to be 0 if missing.

Method CornuCopyBuffer.extend(self, min_size, short_ok=False): Extend the buffer to at least min_size bytes.

If min_size is Ellipsis, extend the buffer to consume all the input. This should really only be used with bounded buffers in order to avoid unconstrained memory consumption.

If there are insufficient data available then an EOFError will be raised unless short_ok is true (default False) in which case the updated buffer will be short.

Method CornuCopyBuffer.from_bytes(bs, offset=0, length=None, **kw): Return a CornuCopyBuffer fed from the supplied bytes bs starting at offset and ending after length.

This is handy for callers parsing using buffers but handed bytes.

Parameters:

  • bs: the bytes
  • offset: a starting position for the data; the input data will start this far into the bytes
  • length: the maximium number of bytes to use; the input data will be cropped this far past the starting point; default: the number of bytes in bs after offset Other keyword arguments are passed to the buffer constructor.

Method CornuCopyBuffer.from_fd(fd, readsize=None, offset=None, final_offset=None, **kw): Return a new CornuCopyBuffer attached to an open file descriptor.

Internally this constructs a SeekableFDIterator for regular files or an FDIterator for other files, which provides the iteration that CornuCopyBuffer consumes, but also seek support if the underlying file descriptor is seekable.

Parameters:

  • fd: the operating system file descriptor
  • readsize: an optional preferred read size
  • offset: a starting position for the data; the file descriptor will seek to this offset, and the buffer will start with this offset Other keyword arguments are passed to the buffer constructor.

Method CornuCopyBuffer.from_file(f, readsize=None, offset=None, final_offset=None, **kw): Return a new CornuCopyBuffer attached to an open file.

Internally this constructs a SeekableFileIterator, which provides the iteration that CornuCopyBuffer consumes and also seek support if the underlying file is seekable.

Parameters:

  • f: the file like object
  • readsize: an optional preferred read size
  • offset: a starting position for the data; the file will seek to this offset, and the buffer will start with this offset Other keyword arguments are passed to the buffer constructor.

Method CornuCopyBuffer.from_filename(filename: str, offset=None, final_offset=None, **kw): Open the file named filename and return a new CornuCopyBuffer.

If offset is provided, skip to that position in the file. A negative offset skips to a position that far from the end of the file as determined by its Stat.st_size.

Other keyword arguments are passed to the buffer constructor.

Method CornuCopyBuffer.from_mmap(fd, readsize=None, offset=None, **kw): Return a new CornuCopyBuffer attached to an mmap of an open file descriptor.

Internally this constructs a SeekableMMapIterator, which provides the iteration that CornuCopyBuffer consumes, but also seek support.

Parameters:

  • fd: the operating system file descriptor
  • readsize: an optional preferred read size
  • offset: a starting position for the data; the file descriptor will seek to this offset, and the buffer will start with this offset Other keyword arguments are passed to the buffer constructor.

Method CornuCopyBuffer.hint(self, size): Hint that the caller is seeking at least size bytes.

If the input_data iterator has a hint method, this is passed to it.

Method CornuCopyBuffer.iter(self, maxlength): Yield chunks from the buffer up to maxlength in total or until EOF if maxlength is Ellipsis.

Method CornuCopyBuffer.next(self): Fetch a data chunk from the buffer.

Method CornuCopyBuffer.peek(self, size, short_ok=False): Examine the leading bytes of the buffer without consuming them, a take followed by a push. Returns the bytes.

Method CornuCopyBuffer.promote(obj): Promote obj to a CornuCopyBuffer, used by the @cs.deco.promote` decorator.

Promotes:

  • int: assumed to be a file descriptor of a file open for binary read
  • str: assumed to be a filesystem pathname
  • bytes and byteslike objects: data
  • has a .read1 or .read method: assume a file open for binary read
  • iterable: assumed to be an iterable of byteslike objects

Method CornuCopyBuffer.push(self, bs): Push the chunk bs onto the front of the buffered data. Rewinds the logical .offset by the length of bs.

Method CornuCopyBuffer.read(self, size, one_fetch=False): Compatibility method to allow using the buffer like a file.

Parameters:

  • size: the desired data size
  • one_fetch: do a single data fetch, default False

In one_fetch mode the read behaves like a POSIX file read, returning up to to size bytes from a single I/O operation.

Method CornuCopyBuffer.read1(self, size): Shorthand method for self.read(size,one_fetch=True).

Method CornuCopyBuffer.readline(self): Return a binary "line" from self, where a line is defined by its ending b'\n' delimiter. The final line from a buffer might not have a trailing newline; b'' is returned at EOF.

Example:

>>> bfr = CornuCopyBuffer([b'abc', b'def\nhij'])
>>> bfr.readline()
b'abcdef\n'
>>> bfr.readline()
b'hij'
>>> bfr.readline()
b''
>>> bfr.readline()
b''

Method CornuCopyBuffer.report_offset(self, offset): Report a pertinent offset.

Method CornuCopyBuffer.seek(self, offset, whence=None, short_ok=False): Compatibility method to allow using the buffer like a file. This returns the resulting absolute offset.

Parameters are as for io.seek except as noted below:

  • whence: (default os.SEEK_SET). This method only supports os.SEEK_SET and os.SEEK_CUR, and does not support seeking to a lower offset than the current buffer offset.
  • short_ok: (default False). If true, the seek may not reach the target if there are insufficent input_data - the position will be the end of the input_data, and the input_data will have been consumed; the caller must check the returned offset to check that it is as expected. If false, a ValueError will be raised; however, note that the input_data will still have been consumed.

Method CornuCopyBuffer.selfcheck(self, msg=''): Integrity check for the buffer, useful during debugging.

Method CornuCopyBuffer.skip(self, toskip, copy_skip=None, short_ok=False): Advance position by skip_to. Return the new offset.

Parameters:

  • toskip: the distance to advance
  • copy_skip: callable to receive skipped data.
  • short_ok: default False; if true then skip may return before skipto bytes if there are insufficient input_data.

Method CornuCopyBuffer.skipto(self, new_offset, copy_skip=None, short_ok=False): Advance to position new_offset. Return the new offset.

Parameters:

  • new_offset: the target offset.
  • copy_skip: callable to receive skipped data.
  • short_ok: default False; if true then skipto may return before new_offset if there are insufficient input_data.

Return values:

  • buf: the new state of buf
  • offset: the final offset; this may be short if short_ok.

Method CornuCopyBuffer.subbuffer(self, end_offset): Context manager wrapper for .bounded which calls the .flush method automatically on exiting the context.

Example:

# avoid buffer overrun
with bfr.subbuffer(bfr.offset+128) as subbfr:
    id3v1 = ID3V1Frame.parse(subbfr)
    # ensure the whole buffer was consumed
    assert subbfr.at_eof()

Method CornuCopyBuffer.tail_extend(self, size): Extend method for parsers reading "tail"-like chunk streams, typically raw reads from a growing file.

This may read 0 bytes at EOF, but a future read may read more bytes if the file grows. Such an iterator can be obtained from cs.fileutils.read_from(..,tail_mode=True).

Method CornuCopyBuffer.take(self, size, short_ok=False): Return the next size bytes. Other arguments are as for .extend().

This is a thin wrapper for the .takev method.

Method CornuCopyBuffer.takev(self, size, short_ok=False): Return the next size bytes as a list of chunks (because the internal buffering is also a list of chunks). Other arguments are as for .extend().

See .take() to get a flat chunk instead of a list.

Method CornuCopyBuffer.tell(self): Compatibility method to allow using the buffer like a file.

Class FDIterator(_FetchIterator)

An iterator over the data of a file descriptor.

Method FDIterator.__init__(self, fd: int, offset=None, readsize=None, align=True): Initialise the iterator.

Parameters:

  • fd: file descriptor
  • offset: the initial logical offset, kept up to date by iteration; the default is the current file position.
  • readsize: a preferred read size; if omitted then DEFAULT_READSIZE will be stored
  • align: whether to align reads by default: if true then the iterator will do a short read to bring the offset into alignment with readsize; the default is True

Method FDIterator.close(self): Close self.fd if it is nt yet None.

Class FileIterator(_FetchIterator, SeekableIteratorMixin)

An iterator over the data of a file object.

Method FileIterator.__init__(self, fp, offset=None, readsize=None, align=False): Initialise the iterator.

Parameters:

  • fp: file object
  • offset: the initial logical offset, kept up to date by iteration; the default is 0.
  • readsize: a preferred read size; if omitted then DEFAULT_READSIZE will be stored
  • align: whether to align reads by default: if true then the iterator will do a short read to bring the offset into alignment with readsize; the default is False

Method FileIterator.close(self): Detach from the file. Does not call fp.close().

Class SeekableFDIterator(FDIterator, SeekableIteratorMixin)

An iterator over the data of a seekable file descriptor.

Property SeekableFDIterator.end_offset: The end offset of the file.

Class SeekableFileIterator(FileIterator)

An iterator over the data of a seekable file object.

Note: the iterator closes the file on del or if its .close method is called.

Method SeekableFileIterator.__init__(self, fp, offset=None, **kw): Initialise the iterator.

Parameters:

  • fp: file object
  • offset: the initial logical offset, kept up to date by iteration; the default is the current file position.
  • readsize: a preferred read size; if omitted then DEFAULT_READSIZE will be stored
  • align: whether to align reads by default: if true then the iterator will do a short read to bring the offset into alignment with readsize; the default is False

Method SeekableFileIterator.seek(self, new_offset, mode=0): Move the logical file pointer.

WARNING: moves the underlying file's pointer.

Class SeekableIteratorMixin

Mixin supplying a logical with a seek method.

Method SeekableIteratorMixin.seek(self, new_offset, mode=0): Move the logical offset.

Class SeekableMMapIterator(_FetchIterator, SeekableIteratorMixin)

An iterator over the data of a mappable file descriptor.

Method SeekableMMapIterator.__init__(self, fd: int, offset=None, readsize=None, align=True): Initialise the iterator.

Parameters:

  • offset: the initial logical offset, kept up to date by iteration; the default is the current file position.
  • readsize: a preferred read size; if omitted then DEFAULT_READSIZE will be stored
  • align: whether to align reads by default: if true then the iterator will do a short read to bring the offset into alignment with readsize; the default is True

Method SeekableMMapIterator.close(self): Close the mmap and detach.

Property SeekableMMapIterator.end_offset: The end offset of the mmap memoryview.

Release Log

Release 20240630:

  • CornuCopyBuffer: new optional .final_offset attribute indicating the anticipated largest offset, automatcially set for regular files - aids progress bars etc.
  • Drop cs.py3, get pread from os.pread, require python>=3.3.
  • CornuCopyBuffer.close: set self.bufs = None to release memory and prevent accidental late use.
  • _FetchIterator: provide a mutex; SeekableIteratorMixin.seek, FDIterator.read,close: take the mutex.

Release 20240412: CornuCopyBuffer.buf: return b"" if nothing is buffered.

Release 20240316: Fixed release upload artifacts.

Release 20240201: CornuCopyBuffer: read1() method shorthand for read(..,one_fetch=True).

Release 20230401:

  • CornuCopyBuffer.promote: document accepting an iterable as an iterable of bytes.
  • CornuCopyBuffer: new readline() method to return a binary line from the buffer.
  • CornuCopyBuffer.promote: assume objects with a .read1 or .read are files.

Release 20230212.2:

  • BREAKING: drop @chunky, superceded by @promote for CornuCopyBuffer parameters.
  • CornuCopyBuffer: subclass Promotable.

Release 20230212.1: Add missing requirement for cs.gimmicks.

Release 20230212: CornuCopyBuffer: new promote() method to promote int,str,bytes, also assumes nonspecific iterables yield byteslike instances.

Release 20211208: CornuCopyBuffer.init: bugfix for self.input_data when copy_chunks is not None.

Release 20210316:

  • New CornuCopyBuffer.from_filename factory method.
  • New CornuCopyBuffer.peek method to examine the leading bytes from the buffer.

Release 20210306:

  • CornuCopyBuffer.from_file: improve the seekability test, handle files with no .tell().
  • CornuCopyBuffer.from_bytes: bugfix: set the buffer.offset to the supplied offset.
  • CornuCopyBuffer.from_bytes: queue a memoryview of the supplied bytes.

Release 20201102: CornuCopyBuffer: new optional progress parameter for reporting data consumed from input_data.

Release 20201021:

  • CornuCopyBuffer.from_file: changes to the test for a .seek method.
  • CornuCopyBuffer.read: call extend with short_ok=True.
  • CornuCopyBuffer.from_fd: record the fd as .fd, lets users os.fstat(bfr.fd).
  • New CornuCopyBuffer.as_fd method to return a readable file descriptor fed from the buffer by a Thread, intended for feeding subprocesses.
  • New CornuCopyBuffer.iter(maxlength) to return an iterator of up to maxlength bytes.
  • CornuCopyBuffer.init: new "close" parameter to release resources; new CornuCopyBuffer.close method to call this.
  • Some small fixes.

Release 20200517:

  • CornuCopyBuffer.skip: bugfix sanity check.
  • FileIterator: do not close the supplied file, just set self.fp=None.
  • Improve EOFError message text.

Release 20200328:

  • CornuCopyBuffer.takev: bugfix adjustment of buf.offset, was not always done.
  • CornuCopyBuffer.getitem: add slice support, note how expensive it is to use.

Release 20200229:

  • New CornuCopyBuffer.byte0() method consuming the next byte and returning it as an int.
  • CornuCopyBuffer.takev: bugfix for size=0, logic refactor.
  • CornuCopyBuffer: new .selfcheck method.

Release 20200130: CornuCopyBuffer.skip: bugfix adjustment of skipto for already buffered data.

Release 20191230.1: Docstring updates. Semantic changes were in the previous release.

Release 20191230:

  • CornuCopyBuffer: accept a size of Ellipsis in .take and .extend methods, indicating "all the remaining data".
  • CornuCopyBuffer: refactor the buffering, replacing .buf with .bufs as an array of chunks;
  • this enables support for the new .push method and reduces memory copying.

Release 20181231: Small bugfix.

Release 20181108: New at_eof() method. Python 2 tweak to support incidental import by python 2 even if unused.

Release 20180823: Better handling of seekable and unseekable input data. Tiny bugfix for from_bytes sanity check.

Release 20180810:

  • Refactor SeekableFDIterator and SeekableFileIterator to subclass new SeekableIterator.
  • New SeekableMMapIterator to process a memory mapped file descriptor, intended for large files.
  • New CornuCopyBuffer.hint method to pass a length hint through to the input_data iterator
  • if it has a hint method, causing it possibly to make a differently sized fetch.
  • SeekableIterator: new del method calling self.close() - subclasses must provide
  • a .close, which should be safe to call multiple times.
  • CornuCopyBuffer: add support for .offset and .end_offset optional attributes on the input_data iterator.
  • _BoundedBufferIterator: add .offset property plumbed to the underlying buffer offset.
  • New CornuCopyBuffer.from_mmap to make a mmap backed buffer so that large data can be returned without penalty.
  • Assorted fixes and doc improvements.

Release 20180805: Bugfixes for at_eof method and end_offset initialisation.

Release 20180726.1: Improve docstrings and release with better long_description.

Release 20180726: First PyPI release: CornuCopyBuffer and friends.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

cs.buffer-20240630.tar.gz (35.1 kB view hashes)

Uploaded Source

Built Distribution

cs.buffer-20240630-py3-none-any.whl (21.5 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page