Skip to main content

I/O extras

Project description

Oh! IO: The I/O tools that io doesn’t want you to have.

Ohio provides the missing links between Python’s built-in I/O primitives, to help ensure the efficiency, clarity and elegance of your code.

For higher-level examples of what Ohio can do for you, see Extensions.

Contents

csvio

Flexibly encode data to CSV format.

ohio.csv_text(rows, *writer_args, writer=<built-in function writer>, **writer_kwargs)

Encode the specified iterable of rows into CSV text.

Data is encoded to an in-memory str, (rather than to the file system), via an internally-managed io.StringIO, (newly constructed for every invocation of csv_text).

For example:

>>> data = [
...     ('1/2/09 6:17', 'Product1', '1200', 'Mastercard', 'carolina'),
...     ('1/2/09 4:53', 'Product1', '1200', 'Visa', 'Betina'),
... ]

>>> encoded_csv = csv_text(data)

>>> encoded_csv[:80]
'1/2/09 6:17,Product1,1200,Mastercard,carolina\r\n1/2/09 4:53,Product1,1200,Visa,Be'

>>> encoded_csv.splitlines(keepends=True)
['1/2/09 6:17,Product1,1200,Mastercard,carolina\r\n',
 '1/2/09 4:53,Product1,1200,Visa,Betina\r\n']

By default, rows are encoded by built-in csv.writer. You may specify an alternate writer, and provide construction arguments:

>>> header = ('Transaction_date', 'Product', 'Price', 'Payment_Type', 'Name')

>>> data = [
...     {'Transaction_date': '1/2/09 6:17',
...      'Product': 'Product1',
...      'Price': '1200',
...      'Payment_Type': 'Mastercard',
...      'Name': 'carolina'},
...     {'Transaction_date': '1/2/09 4:53',
...      'Product': 'Product1',
...      'Price': '1200',
...      'Payment_Type': 'Visa',
...      'Name': 'Betina'},
... ]

>>> csv_text(data, writer=csv.DictWriter, fieldnames=header).splitlines(keepends=True)
['1/2/09 6:17,Product1,1200,Mastercard,carolina\r\n',
 '1/2/09 4:53,Product1,1200,Visa,Betina\r\n']

class ohio.CsvWriterTextIO(*writer_args, **writer_kwargs)

csv.writer-compatible interface to encode CSV & write to memory.

The writer instance may also be read, to retrieve written CSV, as it is written (iteratively).

Rather than write to the file system, an internal io.StringIO buffer is used to store output temporarily, until it is read. (Unlike ohio.csv_text, this buffer is reused across read/write cycles.)

class ohio.CsvDictWriterTextIO(*writer_args, **writer_kwargs)

CsvWriterTextIO which accepts row data in the form of dict.

Data is passed to csv.DictWriter.

See also: ohio.CsvWriterTextIO.

iterio

Provide a readable file-like interface to any iterable.

class ohio.IteratorTextIO(iterable)

Readable file-like interface for iterable text streams.

IteratorTextIO wraps any iterable of text for consumption like a file, offering methods readline(), read([size]), etc., (implemented via base class ohio.StreamTextIOBase).

For example, given a consumer which expects to read():

>>> def read_chunks(fdesc, chunk_size=1024):
...     get_chunk = lambda: fdesc.read(chunk_size)
...     yield from iter(get_chunk, '')

…And either streamed or in-memory text (i.e. which is not simply on a file system):

>>> def all_caps(fdesc):
...     for line in fdesc:
...         yield line.upper()

…We can connect these two interfaces via IteratorTextIO:

>>> with open('/usr/share/dict/words') as fdesc:
...     louder_words_lines = all_caps(fdesc)
...     with IteratorTextIO(louder_words_lines) as louder_words_desc:
...         louder_words_chunked = read_chunks(louder_words_desc)

pipeio

Efficiently connect read() and write() interfaces.

PipeTextIO provides a readable and iterable interface to text whose producer requires a writable interface.

In contrast to first writing such text to memory and then consuming it, PipeTextIO only allows write operations as necessary to fill its buffer, to fulfill read operations, asynchronously. As such, PipeTextIO consumes a stable minimum of memory, and may significantly boost speed, with a minimum of boilerplate.

ohio.pipe_text(writer_func, *args, buffer_size=None, **kwargs)

Iteratively stream output written by given function through readable file-like interface.

Uses in-process writer thread, (which runs the given function), to mimic buffered text transfer, such as between the standard output and input of two piped processes.

Calls to write are blocked until required by calls to read.

Note: If at all possible, use a generator! Your iterative text- writing function can most likely be designed as a generator, (or as some sort of iterator). Its output can then, far more simply and easily, be streamed to some input. If your input must be read from a file-like object, see ohio.IteratorTextIO. If your output must be CSV-encoded, see ohio.csv_text and ohio.CsvWriterTextIO.

PipeTextIO is suitable for situations where output must be written to a file-like object, which is made blocking to enforce iterativity.

PipeTextIO is not “seekable,” but supports all other typical, read-write file-like features.

For example, consider the following callable, requiring a file-like object, to which to write:

>>> def write_output(file_like):
...     file_like.write("Hi there.\r\n")
...     print('[writer]', 'Yay I wrote one line')
...     file_like.write("Cool, right?\r\n")
...     print('[writer]', 'Finally ... I wrote a second line!')
...     file_like.write("All right, later :-)\r\n")
...     print('[writer]', "Done.")

Most typically, we might read this content as follows:

>>> with PipeTextIO(write_output) as pipe:
...     for line in pipe:
...         ...

And, this is recommended. However, for the sake of example, consider the following:

>>> pipe = PipeTextIO(write_output, buffer_size=1)

>>> pipe.read(5)
[writer] Yay I wrote one line
'Hi th'
[writer] Finally ... I wrote a second line!

>>> pipe.readline()
'ere.\r\n'

>>> pipe.readline()
'Cool, right?\r\n'
[writer] Done.

>>> pipe.read()
'All right, later :-)\r\n'

In the above example, write_output requires a file-like interface to which to write its output; and, we presume that there is no alternative to this implementation, (such as a generator), and that its output is large enough that we don’t want to hold it in memory. And, in the case that we don’t want this output written to the file system, we are enabled to read it directly, in chunks.

  1. Initially, nothing is written.

    1. Upon requesting to read – in this case, only the first 5

      bytes – the writer is initialized, and permitted to write its first chunk, (which happens to be one full line). This is retrieved from the write buffer, and sufficient to satisfy the read request.

    1. Having removed the first chunk from the write buffer,

      the writer is permitted to eagerly write its next chunk, (the second line), (but, no more than that).

  2. The second read request – for the remainder of the line – is

    fully satisfied by the first chunk retrieved from the write buffer. No more writing takes place.

  3. The third read request, for another line, retrieves the

    second chunk from the write buffer. The writer is permitted to write its final chunk to the write buffer.

  4. The final read request returns all remaining text,

    (retrieved from the write buffer).

Concretely, this is commonly useful with the PostgreSQL COPY command, for efficient data transfer, (and without the added complexity of the file system). While your database interface may vary, PipeTextIO enables the following syntax, for example to copy data into the database:

>>> def write_csv(file_like):
...     writer = csv.writer(file_like)
...     ...

>>> with PipeTextIO(write_csv) as pipe, \
...      connection.cursor() as cursor:
...     cursor.copy_from(pipe, 'my_table', format='csv')

…or, to copy data out of the database:

>>> with connection.cursor() as cursor:
...     writer = lambda pipe: cursor.copy_to(pipe,
...                                          'my_table',
...                                          format='csv')
...
...     with PipeTextIO(writer) as pipe:
...         reader = csv.reader(pipe)
...         ...

Alternatively, writer arguments may be passed to PipeTextIO:

>>> with connection.cursor() as cursor:
...     with PipeTextIO(cursor.copy_to,
...                     args=['my_table'],
...                     kwargs={'format': 'csv'}) as pipe:
...         reader = csv.reader(pipe)
...         ...

(But, bear in mind, the signature of the callable passed to PipeTextIO must be such that its first, anonymous argument is the PipeTextIO instance.)

Consider also the above example with the helper pipe_text:

>>> with connection.cursor() as cursor:
...     with pipe_text(cursor.copy_to,
...                    'my_table',
...                    format='csv') as pipe:
...         reader = csv.reader(pipe)
...         ...

baseio

Low-level primitives.

class ohio.StreamTextIOBase

Readable file-like abstract base class.

Concrete classes must implement method __next_chunk__ to return chunk(s) of the text to be read.

exception ohio.IOClosed(*args)

Exception indicating an attempted operation on a file-like object which has been closed.

Extensions

Modules integrating Ohio with the toolsets that need it.

Extensions for pandas

This module extends pandas.DataFrame with methods pg_copy_to and pg_copy_from.

To enable, simply import this module anywhere in your project, (most likely – just once, in its root module):

>>> import ohio.ext.pandas

For example, if you have just one module – in there – or, in a Python package:

ohio/
    __init__.py
    baseio.py
    ...

then in its __init__.py, to ensure that extensions are loaded before your code, which uses them, is run.

NOTE: These extensions are intended for Pandas, and attempt to import pandas. Pandas must be available (installed) in your environment.

class ohio.ext.pandas.DataFramePgCopyTo(data_frame)

pg_copy_to: Copy DataFrame to database table via PostgreSQL COPY.

ohio.PipeTextIO enables the direct, in-process “piping” of DataFrame CSV into the “standard input” of the PostgreSQL COPY command, for quick, memory-efficient database persistence, (and without the needless involvement of the local file system).

For example, given a SQLAlchemy database connection engine and a Pandas DataFrame:

>>> from sqlalchemy import create_engine
>>> engine = create_engine('sqlite://', echo=False)

>>> df = pandas.DataFrame({'name' : ['User 1', 'User 2', 'User 3']})

We may simply invoke the DataFrame’s Ohio extension method, pg_copy_to:

>>> df.pg_copy_to('users', engine)

pg_copy_to supports all the same parameters as to_sql, (excepting parameter method).

In addition to the signature of to_sql, pg_copy_to accepts the optimization parameter buffer_size, which controls the maximum number of CSV-encoded write results to hold in memory prior to their being read into the database. Depending on use-case, increasing this value may speed up the operation, at the cost of additional memory – and vice-versa. buffer_size defaults to 100.

ohio.ext.pandas.to_sql_method_pg_copy_to(table, conn, keys, data_iter, buffer_size=100)

Write pandas data to table via stream through PostgreSQL COPY.

This implements a pandas to_sql “method”, with the added optional argument buffer_size.

ohio.ext.pandas.data_frame_pg_copy_from(sql, engine, index_col=None, parse_dates=False, columns=None, dtype=None, nrows=None, buffer_size=100)

pg_copy_from: Construct DataFrame from database table or query via PostgreSQL COPY.

ohio.PipeTextIO enables the direct, in-process “piping” of the PostgreSQL COPY command into Pandas read_csv, for quick, memory-efficient construction of DataFrame from database, (and without the needless involvement of the local file system).

For example, given a SQLAlchemy database connection engine:

>>> from sqlalchemy import create_engine
>>> engine = create_engine('sqlite://', echo=False)

We may simply invoke the DataFrame’s Ohio extension method, pg_copy_from:

>>> df = DataFrame.pg_copy_from('users', engine)

pg_copy_from supports many of the same parameters as read_sql and read_csv.

In addition, pg_copy_from accepts the optimization parameter buffer_size, which controls the maximum number of CSV-encoded results written by the database cursor to hold in memory prior to their being read into the DataFrame. Depending on use-case, increasing this value may speed up the operation, at the cost of additional memory – and vice-versa. buffer_size defaults to 100.

Benchmarking

Ohio extensions for pandas were benchmarked to test their speed and memory-efficiency relative both to pandas built-in functionality and to custom implementations which do not utilize Ohio.

Interfaces and syntactical niceties aside, Ohio generally features memory stability. Its tools enable pipelines which may also improve speed.

In the below benchmark, Ohio extensions pg_copy_from & pg_copy_to reduced memory consumption by 84% & 61%, and completed in 39% & 89% less time, relative to pandas built-ins read_sql & to_sql, (respectively).

Compared to purpose-built extensions – which utilized PostgreSQL COPY, but using io.StringIO in place of ohio.PipeTextIOpg_copy_from & pg_copy_to still reduced memory consumption by 60% & 33%, respectively. pg_copy_from also completed in 16% less time than the io.StringIO version. pg_copy_to took on average 7% more time to complete than the io.StringIO version. (Speed improvements – which do not diminish Ohio’s memory efficiency – have been identified as a target for future work.)

The benchmarks plotted below were produced from averages and standard deviations over 3 randomized trials per target. Input data consisted of 896,677 rows across 83 columns: 1 of these of type timestamp, 51 integers and 31 floats. The benchmarking package, prof, is preserved in Ohio’s repository.

https://raw.githubusercontent.com/dssg/ohio/0.3.0/doc/img/profile-copy-from-database-to-datafram-1554345457.svg?sanitize=true
ohio_pg_copy_from_X

pg_copy_from(buffer_size=X)

A PostgreSQL database-connected cursor writes the results of COPY to a PipeTextIO, from which pandas constructs a DataFrame.

pandas_read_sql

pandas.read_sql()

Pandas constructs a DataFrame from a given database query.

pandas_read_sql_chunks_100

pandas.read_sql(chunksize=100)

Pandas is instructed to generate DataFrame slices of the database query result, and these slices are concatenated into a single frame, with: pandas.concat(chunks, copy=False).

pandas_read_csv_stringio

pandas.read_csv(StringIO())

A PostgreSQL database-connected cursor writes the results of COPY to a StringIO, from which pandas constructs a DataFrame.

https://raw.githubusercontent.com/dssg/ohio/0.3.0/doc/img/profile-copy-from-dataframe-to-databas-1554320666.svg?sanitize=true
ohio_pg_copy_to_X

pg_copy_to(buffer_size=X)

DataFrame data are written and encoded through a PipeTextIO, and read by a PostgreSQL database-connected cursor’s COPY command.

pandas_to_sql

pandas.DataFrame.to_sql()

Pandas inserts DataFrame data into the database row by row.

pandas_to_sql_multi_X

pandas.DataFrame.to_sql(method='multi', chunksize=X)

Pandas inserts DataFrame data into the database in chunks of rows.

copy_stringio_to_db

DataFrame data are written and encoded to a StringIO, and then read by a PostgreSQL database-connected cursor’s COPY command.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ohio-0.3.0.tar.gz (19.3 kB view hashes)

Uploaded Source

Built Distribution

ohio-0.3.0-py3-none-any.whl (20.9 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page