Facilities to do with buffers, particularly CornuCopyBuffer, an automatically refilling buffer to support parsing of data streams.
Project description
Facilities to do with buffers, particularly CornuCopyBuffer, an automatically refilling buffer to support parsing of data streams.
Latest release 20240630:
- CornuCopyBuffer: new optional .final_offset attribute indicating the anticipated largest offset, automatcially set for regular files - aids progress bars etc.
- Drop cs.py3, get pread from os.pread, require python>=3.3.
- CornuCopyBuffer.close: set self.bufs = None to release memory and prevent accidental late use.
- _FetchIterator: provide a mutex; SeekableIteratorMixin.seek, FDIterator.read,close: take the mutex.
Class CopyingIterator
Wrapper for an iterator that copies every item retrieved to a callable.
Method CopyingIterator.__init__(self, it, copy_to)
:
Initialise with the iterator it
and the callable copy_to
.
Class CornuCopyBuffer(cs.deco.Promotable)
An automatically refilling buffer intended to support parsing of data streams.
Its purpose is to aid binary parsers
which do not themselves need to handle sources specially;
CornuCopyBuffer
s are trivially made from bytes
,
iterables of bytes
and file-like objects.
See cs.binary
for convenient parsing classes
which work against CornuCopyBuffer
s.
Attributes:
buf
: the first of any buffered leading chunks buffer of unparsed data from the input, available for direct inspection by parsers; normally however parsers will use.extend
and.take
.offset
: the logical offset of the buffer; this excludes buffered data and unconsumed input data
Note: the initialiser may supply a cleanup function;
although this will be called via the buffer's .__del__
method
a prudent user of a buffer should call the .close()
method
when finished with the buffer to ensure prompt cleanup.
The primary methods supporting parsing of data streams are
.extend()
and take()
.
Calling .extend(min_size)
arranges that the internal buffer
contains at least min_size
bytes.
Calling .take(size)
fetches exactly size
bytes from the
internal buffer and the input source if necessary and returns
them, adjusting the internal buffer.
len(CornuCopyBuffer
) returns the length of any buffered data.
bool(CornuCopyBuffer
) tests whether len() > 0.
Indexing a CornuCopyBuffer
accesses the buffered data only,
returning an individual byte's value (an int
).
A CornuCopyBuffer
is also iterable, yielding data in whatever
sizes come from its input_data
source, preceeded by any
content in the internal buffer.
A CornuCopyBuffer
also supports the file methods .read
,
.tell
and .seek
supporting drop in use of the buffer in
many file contexts. Backward seeks are not supported. .seek
will take advantage of the input_data
's .seek method if it
has one, otherwise it will use consume the input_data
as required.
Method CornuCopyBuffer.__init__(self, input_data, buf=None, offset=0, seekable=None, copy_offsets=None, copy_chunks=None, close=None, progress=None, final_offset=None)
:
Prepare the buffer.
Parameters:
input_data
: an iterable of data chunks (bytes
-like instances); if your data source is a file see the.from_file
factory; if your data source is a file descriptor see the.from_fd
factory.buf
: if notNone
, the initial state of the parse bufferoffset
: logical offset of the start of the buffer, default0
seekable
: whetherinput_data
has a working.seek
method; the default isNone
meaning that it will be attempted on the first skip or seekcopy_offsets
: if notNone
, a callable for parsers to report pertinent offsets via the buffer's.report_offset
methodcopy_chunks
: if notNone
, every fetched data chunk is copied to this callable
The input_data
is an iterable whose iterator may have
some optional additional properties:
seek
: if present, this is a seek method after the fashion offile.seek
; the buffer'sseek
,skip
andskipto
methods will take advantage of this if available.offset
: the current byte offset of the iterator; this is used during the buffer initialisation to computeinput_data_displacement
, the difference between the buffer's logical offset and the input data iterable's logical offset; if unavailable during initialisation this is presumed to be0
.end_offset
: the end offset of the iterator if known.close
: an optional callable that may be provided for resource cleanup when the user of the buffer calls its.close()
method.progress
: an optionalcs.Progress.progress
instance to which to report data consumed frominput_data
; any object supporting+=
is acceptablefinal_offset
: optionalint
specifying the largest offset expected to be reached, intended for uses such as callers presenting a pregress indication; this is, for example, provided byCornuCopyBuffer.from_fd
for regular files using thestat.st_size
field
Method CornuCopyBuffer.__del__(self)
:
Release resources when the object is deleted.
Method CornuCopyBuffer.__getitem__(self, index)
:
Fetch from the internal buffer.
This does not consume data from the internal buffer.
Note that this is an expensive way to access the buffer,
particularly if index
is a slice.
If index
is a slice
, slice the join of the internal subbuffers.
This is quite expensive
and it is probably better to take
or takev
some data from the buffer.
Otherwise index
should be an int
and the corresponding
buffered byte is returned.
This is usually not a very useful method; its primary use case is to probe the buffer to make a parsing decision instead of taking a byte off and (possibly) pushing it back.
Method CornuCopyBuffer.__len__(self)
:
The length is the length of the internal buffer: data available without a fetch.
Method CornuCopyBuffer.__next__(self)
:
Fetch a data chunk from the buffer.
Method CornuCopyBuffer.as_fd(self, maxlength=Ellipsis)
:
Create a pipe and dispatch a Thread
to copy
up to maxlength
bytes from bfr
into it.
Return the file descriptor of the read end of the pipe.
The default maxlength
is Ellipsis
, meaning to copy all data.
Note that the thread preemptively consumes from the buffer.
This is useful for passing buffer data to subprocesses.
Method CornuCopyBuffer.at_eof(self)
:
Test whether the buffer is at end of input.
Warning: this will fetch from the input_data
if the buffer
is empty and so it may block.
Method CornuCopyBuffer.bounded(self, end_offset) -> 'CornuCopyBuffer'
:
Return a new CornuCopyBuffer
operating on a bounded view
of this buffer.
This supports parsing of the buffer contents without risk of consuming past a certain point, such as the known end of a packet structure.
Parameters:
end_offset
: the ending offset of the new buffer. Note that this is an absolute offset, not a length.
The new buffer starts with the same offset as self
and
use of the new buffer affects self
. After a flush both
buffers will again have the same offset and the data consumed
via the new buffer will also have been consumed from self
.
Here is an example.
- Make a buffer
bfr
with 9 bytes of data in 3 chunks. - Consume 2 bytes, advancing the offset to 2.
- Make a new bounded buffer
subbfr
extending to offset 5. Its inital offset is also 2. - Iterate over it, yielding the remaining single byte chunk
from
b'abc'
and then the first 2 bytes ofb'def'
. The new buffer's offset is now 5. - Try to take 2 more bytes from the new buffer - this fails.
- Flush the new buffer, synchronising with the original. The original's offset is now also 5.
- Take 2 bytes from the original buffer, which succeeds.
Example:
>>> bfr = CornuCopyBuffer([b'abc', b'def', b'ghi'])
>>> bfr.offset
0
>>> bfr.take(2)
b'ab'
>>> bfr.offset
2
>>> subbfr = bfr.bounded(5)
>>> subbfr.offset
2
>>> for bs in subbfr:
... print(bs)
...
b'c'
b'de'
>>> subbfr.offset
5
>>> subbfr.take(2)
Traceback (most recent call last):
...
EOFError: insufficient input data, wanted 2 bytes but only found 0
>>> subbfr.flush()
>>> bfr.offset
5
>>> bfr.take(2)
b'fg'
WARNING: if the bounded buffer is not completely consumed
then it is critical to call the new CornuCopyBuffer
's .flush
method to push any unconsumed buffer back into this buffer.
Recommended practice is to always call .flush
when finished
with the new buffer.
The CornuCopyBuffer.subbuffer
method returns a context manager
which does this automatically.
Also, because the new buffer may buffer some of the unconsumed data from this buffer, use of the original buffer should be suspended.
Property CornuCopyBuffer.buf
:
The first buffer, or b''
if nothing is buffered.
Method CornuCopyBuffer.byte0(self)
:
Consume the leading byte and return it as an int
(0
..255
).
Method CornuCopyBuffer.close(self)
:
Close the buffer.
This discards the internal buffer of "read but not consumed" data
and calls the close
callable supplied when the buffer was
initialised, if any.
Property CornuCopyBuffer.end_offset
:
Return the end offset of the input data (in buffer ordinates)
if known, otherwise None
.
Note that this depends on the computation of the
input_offset_displacement
which takes place at the buffer
initialisation, which in turn relies on the input_data.offset
attribute, which at initialisation is presumed to be 0 if missing.
Method CornuCopyBuffer.extend(self, min_size, short_ok=False)
:
Extend the buffer to at least min_size
bytes.
If min_size
is Ellipsis
, extend the buffer to consume all the input.
This should really only be used with bounded buffers
in order to avoid unconstrained memory consumption.
If there are insufficient data available then an EOFError
will be raised unless short_ok
is true (default False
)
in which case the updated buffer will be short.
Method CornuCopyBuffer.from_bytes(bs, offset=0, length=None, **kw)
:
Return a CornuCopyBuffer
fed from the supplied bytes bs
starting at offset
and ending after length
.
This is handy for callers parsing using buffers but handed bytes.
Parameters:
bs
: the bytesoffset
: a starting position for the data; the input data will start this far into the byteslength
: the maximium number of bytes to use; the input data will be cropped this far past the starting point; default: the number of bytes inbs
afteroffset
Other keyword arguments are passed to the buffer constructor.
Method CornuCopyBuffer.from_fd(fd, readsize=None, offset=None, final_offset=None, **kw)
:
Return a new CornuCopyBuffer
attached to an open file descriptor.
Internally this constructs a SeekableFDIterator
for regular
files or an FDIterator
for other files, which provides the
iteration that CornuCopyBuffer
consumes, but also seek
support if the underlying file descriptor is seekable.
Parameters:
fd
: the operating system file descriptorreadsize
: an optional preferred read sizeoffset
: a starting position for the data; the file descriptor will seek to this offset, and the buffer will start with this offset Other keyword arguments are passed to the buffer constructor.
Method CornuCopyBuffer.from_file(f, readsize=None, offset=None, final_offset=None, **kw)
:
Return a new CornuCopyBuffer
attached to an open file.
Internally this constructs a SeekableFileIterator
, which
provides the iteration that CornuCopyBuffer
consumes
and also seek support if the underlying file is seekable.
Parameters:
f
: the file like objectreadsize
: an optional preferred read sizeoffset
: a starting position for the data; the file will seek to this offset, and the buffer will start with this offset Other keyword arguments are passed to the buffer constructor.
Method CornuCopyBuffer.from_filename(filename: str, offset=None, final_offset=None, **kw)
:
Open the file named filename
and return a new CornuCopyBuffer
.
If offset
is provided, skip to that position in the file.
A negative offset skips to a position that far from the end of the file
as determined by its Stat.st_size
.
Other keyword arguments are passed to the buffer constructor.
Method CornuCopyBuffer.from_mmap(fd, readsize=None, offset=None, **kw)
:
Return a new CornuCopyBuffer
attached to an mmap of an open
file descriptor.
Internally this constructs a SeekableMMapIterator
, which
provides the iteration that CornuCopyBuffer
consumes, but
also seek support.
Parameters:
fd
: the operating system file descriptorreadsize
: an optional preferred read sizeoffset
: a starting position for the data; the file descriptor will seek to this offset, and the buffer will start with this offset Other keyword arguments are passed to the buffer constructor.
Method CornuCopyBuffer.hint(self, size)
:
Hint that the caller is seeking at least size
bytes.
If the input_data
iterator has a hint
method, this is
passed to it.
Method CornuCopyBuffer.iter(self, maxlength)
:
Yield chunks from the buffer
up to maxlength
in total
or until EOF if maxlength
is Ellipsis
.
Method CornuCopyBuffer.next(self)
:
Fetch a data chunk from the buffer.
Method CornuCopyBuffer.peek(self, size, short_ok=False)
:
Examine the leading bytes of the buffer without consuming them,
a take
followed by a push
.
Returns the bytes.
Method CornuCopyBuffer.promote(obj)
:
Promote obj
to a CornuCopyBuffer
,
used by the @cs.deco.promote` decorator.
Promotes:
int
: assumed to be a file descriptor of a file open for binary readstr
: assumed to be a filesystem pathnamebytes
andbytes
like objects: data- has a
.read1
or.read
method: assume a file open for binary read - iterable: assumed to be an iterable of
bytes
like objects
Method CornuCopyBuffer.push(self, bs)
:
Push the chunk bs
onto the front of the buffered data.
Rewinds the logical .offset
by the length of bs
.
Method CornuCopyBuffer.read(self, size, one_fetch=False)
:
Compatibility method to allow using the buffer like a file.
Parameters:
size
: the desired data sizeone_fetch
: do a single data fetch, defaultFalse
In one_fetch
mode the read behaves like a POSIX file read,
returning up to to size
bytes from a single I/O operation.
Method CornuCopyBuffer.read1(self, size)
:
Shorthand method for self.read(size,one_fetch=True)
.
Method CornuCopyBuffer.readline(self)
:
Return a binary "line" from self
, where a line is defined by
its ending b'\n'
delimiter.
The final line from a buffer might not have a trailing newline;
b''
is returned at EOF.
Example:
>>> bfr = CornuCopyBuffer([b'abc', b'def\nhij'])
>>> bfr.readline()
b'abcdef\n'
>>> bfr.readline()
b'hij'
>>> bfr.readline()
b''
>>> bfr.readline()
b''
Method CornuCopyBuffer.report_offset(self, offset)
:
Report a pertinent offset.
Method CornuCopyBuffer.seek(self, offset, whence=None, short_ok=False)
:
Compatibility method to allow using the buffer like a file.
This returns the resulting absolute offset.
Parameters are as for io.seek
except as noted below:
whence
: (defaultos.SEEK_SET
). This method only supportsos.SEEK_SET
andos.SEEK_CUR
, and does not support seeking to a lower offset than the current buffer offset.short_ok
: (defaultFalse
). If true, the seek may not reach the target if there are insufficentinput_data
- the position will be the end of theinput_data
, and theinput_data
will have been consumed; the caller must check the returned offset to check that it is as expected. If false, aValueError
will be raised; however, note that theinput_data
will still have been consumed.
Method CornuCopyBuffer.selfcheck(self, msg='')
:
Integrity check for the buffer, useful during debugging.
Method CornuCopyBuffer.skip(self, toskip, copy_skip=None, short_ok=False)
:
Advance position by skip_to
. Return the new offset.
Parameters:
toskip
: the distance to advancecopy_skip
: callable to receive skipped data.short_ok
: defaultFalse
; if true then skip may return beforeskipto
bytes if there are insufficientinput_data
.
Method CornuCopyBuffer.skipto(self, new_offset, copy_skip=None, short_ok=False)
:
Advance to position new_offset
. Return the new offset.
Parameters:
new_offset
: the target offset.copy_skip
: callable to receive skipped data.short_ok
: defaultFalse
; if true then skipto may return beforenew_offset
if there are insufficientinput_data
.
Return values:
buf
: the new state ofbuf
offset
: the final offset; this may be short ifshort_ok
.
Method CornuCopyBuffer.subbuffer(self, end_offset)
:
Context manager wrapper for .bounded
which calls the .flush
method automatically
on exiting the context.
Example:
# avoid buffer overrun
with bfr.subbuffer(bfr.offset+128) as subbfr:
id3v1 = ID3V1Frame.parse(subbfr)
# ensure the whole buffer was consumed
assert subbfr.at_eof()
Method CornuCopyBuffer.tail_extend(self, size)
:
Extend method for parsers reading "tail"-like chunk streams,
typically raw reads from a growing file.
This may read 0 bytes at EOF, but a future read may read
more bytes if the file grows.
Such an iterator can be obtained from
cs.fileutils.read_from(..,tail_mode=True)
.
Method CornuCopyBuffer.take(self, size, short_ok=False)
:
Return the next size
bytes.
Other arguments are as for .extend()
.
This is a thin wrapper for the .takev
method.
Method CornuCopyBuffer.takev(self, size, short_ok=False)
:
Return the next size
bytes as a list of chunks
(because the internal buffering is also a list of chunks).
Other arguments are as for .extend()
.
See .take()
to get a flat chunk instead of a list.
Method CornuCopyBuffer.tell(self)
:
Compatibility method to allow using the buffer like a file.
Class FDIterator(_FetchIterator)
An iterator over the data of a file descriptor.
Method FDIterator.__init__(self, fd: int, offset=None, readsize=None, align=True)
:
Initialise the iterator.
Parameters:
fd
: file descriptoroffset
: the initial logical offset, kept up to date by iteration; the default is the current file position.readsize
: a preferred read size; if omitted thenDEFAULT_READSIZE
will be storedalign
: whether to align reads by default: if true then the iterator will do a short read to bring theoffset
into alignment withreadsize
; the default isTrue
Method FDIterator.close(self)
:
Close self.fd
if it is nt yet None
.
Class FileIterator(_FetchIterator, SeekableIteratorMixin)
An iterator over the data of a file object.
Method FileIterator.__init__(self, fp, offset=None, readsize=None, align=False)
:
Initialise the iterator.
Parameters:
fp
: file objectoffset
: the initial logical offset, kept up to date by iteration; the default is 0.readsize
: a preferred read size; if omitted thenDEFAULT_READSIZE
will be storedalign
: whether to align reads by default: if true then the iterator will do a short read to bring theoffset
into alignment withreadsize
; the default isFalse
Method FileIterator.close(self)
:
Detach from the file. Does not call fp.close()
.
Class SeekableFDIterator(FDIterator, SeekableIteratorMixin)
An iterator over the data of a seekable file descriptor.
Property SeekableFDIterator.end_offset
:
The end offset of the file.
Class SeekableFileIterator(FileIterator)
An iterator over the data of a seekable file object.
Note: the iterator closes the file on del or if its .close method is called.
Method SeekableFileIterator.__init__(self, fp, offset=None, **kw)
:
Initialise the iterator.
Parameters:
fp
: file objectoffset
: the initial logical offset, kept up to date by iteration; the default is the current file position.readsize
: a preferred read size; if omitted thenDEFAULT_READSIZE
will be storedalign
: whether to align reads by default: if true then the iterator will do a short read to bring theoffset
into alignment withreadsize
; the default isFalse
Method SeekableFileIterator.seek(self, new_offset, mode=0)
:
Move the logical file pointer.
WARNING: moves the underlying file's pointer.
Class SeekableIteratorMixin
Mixin supplying a logical with a seek
method.
Method SeekableIteratorMixin.seek(self, new_offset, mode=0)
:
Move the logical offset.
Class SeekableMMapIterator(_FetchIterator, SeekableIteratorMixin)
An iterator over the data of a mappable file descriptor.
Method SeekableMMapIterator.__init__(self, fd: int, offset=None, readsize=None, align=True)
:
Initialise the iterator.
Parameters:
offset
: the initial logical offset, kept up to date by iteration; the default is the current file position.readsize
: a preferred read size; if omitted thenDEFAULT_READSIZE
will be storedalign
: whether to align reads by default: if true then the iterator will do a short read to bring theoffset
into alignment withreadsize
; the default isTrue
Method SeekableMMapIterator.close(self)
:
Close the mmap and detach.
Property SeekableMMapIterator.end_offset
:
The end offset of the mmap memoryview.
Release Log
Release 20240630:
- CornuCopyBuffer: new optional .final_offset attribute indicating the anticipated largest offset, automatcially set for regular files - aids progress bars etc.
- Drop cs.py3, get pread from os.pread, require python>=3.3.
- CornuCopyBuffer.close: set self.bufs = None to release memory and prevent accidental late use.
- _FetchIterator: provide a mutex; SeekableIteratorMixin.seek, FDIterator.read,close: take the mutex.
Release 20240412: CornuCopyBuffer.buf: return b"" if nothing is buffered.
Release 20240316: Fixed release upload artifacts.
Release 20240201: CornuCopyBuffer: read1() method shorthand for read(..,one_fetch=True).
Release 20230401:
- CornuCopyBuffer.promote: document accepting an iterable as an iterable of bytes.
- CornuCopyBuffer: new readline() method to return a binary line from the buffer.
- CornuCopyBuffer.promote: assume objects with a .read1 or .read are files.
Release 20230212.2:
- BREAKING: drop @chunky, superceded by @promote for CornuCopyBuffer parameters.
- CornuCopyBuffer: subclass Promotable.
Release 20230212.1: Add missing requirement for cs.gimmicks.
Release 20230212: CornuCopyBuffer: new promote() method to promote int,str,bytes, also assumes nonspecific iterables yield byteslike instances.
Release 20211208: CornuCopyBuffer.init: bugfix for self.input_data when copy_chunks is not None.
Release 20210316:
- New CornuCopyBuffer.from_filename factory method.
- New CornuCopyBuffer.peek method to examine the leading bytes from the buffer.
Release 20210306:
- CornuCopyBuffer.from_file: improve the seekability test, handle files with no .tell().
- CornuCopyBuffer.from_bytes: bugfix: set the buffer.offset to the supplied offset.
- CornuCopyBuffer.from_bytes: queue a memoryview of the supplied bytes.
Release 20201102:
CornuCopyBuffer: new optional progress
parameter for reporting data consumed from input_data
.
Release 20201021:
- CornuCopyBuffer.from_file: changes to the test for a .seek method.
- CornuCopyBuffer.read: call extend with short_ok=True.
- CornuCopyBuffer.from_fd: record the fd as .fd, lets users os.fstat(bfr.fd).
- New CornuCopyBuffer.as_fd method to return a readable file descriptor fed from the buffer by a Thread, intended for feeding subprocesses.
- New CornuCopyBuffer.iter(maxlength) to return an iterator of up to maxlength bytes.
- CornuCopyBuffer.init: new "close" parameter to release resources; new CornuCopyBuffer.close method to call this.
- Some small fixes.
Release 20200517:
- CornuCopyBuffer.skip: bugfix sanity check.
- FileIterator: do not close the supplied file, just set self.fp=None.
- Improve EOFError message text.
Release 20200328:
- CornuCopyBuffer.takev: bugfix adjustment of buf.offset, was not always done.
- CornuCopyBuffer.getitem: add slice support, note how expensive it is to use.
Release 20200229:
- New CornuCopyBuffer.byte0() method consuming the next byte and returning it as an int.
- CornuCopyBuffer.takev: bugfix for size=0, logic refactor.
- CornuCopyBuffer: new .selfcheck method.
Release 20200130: CornuCopyBuffer.skip: bugfix adjustment of skipto for already buffered data.
Release 20191230.1: Docstring updates. Semantic changes were in the previous release.
Release 20191230:
- CornuCopyBuffer: accept a size of Ellipsis in .take and .extend methods, indicating "all the remaining data".
- CornuCopyBuffer: refactor the buffering, replacing .buf with .bufs as an array of chunks;
- this enables support for the new .push method and reduces memory copying.
Release 20181231: Small bugfix.
Release 20181108: New at_eof() method. Python 2 tweak to support incidental import by python 2 even if unused.
Release 20180823: Better handling of seekable and unseekable input data. Tiny bugfix for from_bytes sanity check.
Release 20180810:
- Refactor SeekableFDIterator and SeekableFileIterator to subclass new SeekableIterator.
- New SeekableMMapIterator to process a memory mapped file descriptor, intended for large files.
- New CornuCopyBuffer.hint method to pass a length hint through to the input_data iterator
- if it has a
hint
method, causing it possibly to make a differently sized fetch. - SeekableIterator: new del method calling self.close() - subclasses must provide
- a .close, which should be safe to call multiple times.
- CornuCopyBuffer: add support for .offset and .end_offset optional attributes on the input_data iterator.
- _BoundedBufferIterator: add .offset property plumbed to the underlying buffer offset.
- New CornuCopyBuffer.from_mmap to make a mmap backed buffer so that large data can be returned without penalty.
- Assorted fixes and doc improvements.
Release 20180805: Bugfixes for at_eof method and end_offset initialisation.
Release 20180726.1: Improve docstrings and release with better long_description.
Release 20180726: First PyPI release: CornuCopyBuffer and friends.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for cs.buffer-20240630-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 926824cf0d374b5cd129ba483d8f37376f72a725e04f57fe94e531a91cbee20f |
|
MD5 | 78a15dad02e5d749be545a37b63a2fe3 |
|
BLAKE2b-256 | 4505a8737788c9fa9834d39ad082e18aea1b6465aba24b7efb282845a52315da |