Skip to main content

High-performance CSV reader/writer for Python - streaming, typed columns, large files

Project description

csv-turbo

High-performance CSV reader/writer for Python — streaming, typed columns, large files.

PyPI version Python License: MIT

Features

  • Streaming by default — iterate rows without loading the entire file into memory
  • Typed columns — declare int, float, bool, date, datetime schemas and get cast values automatically
  • Schema validation — catch missing/extra columns and type errors early
  • Dialect detection — auto-detect delimiter, quoting style, and line endings
  • Pipeline API — composable lazy transformations: filter, map, select, rename, chunk, sort, unique
  • Column statistics — profile any CSV with counts, nulls, min/max, mean, std dev, top-N values
  • Built-in dialects — RFC 4180, TSV, semicolon, pipe, Excel presets
  • Zero dependencies — pure Python ≥ 3.9, standard library only

Installation

pip install csv-turbo

Quick Start

Reading

from csv_turbo import CsvReader

for row in CsvReader("data.csv"):
    print(row["name"], row["score"])

Writing

from csv_turbo import CsvWriter

rows = [
    {"id": 1, "name": "Alice", "score": 9.5},
    {"id": 2, "name": "Bob",   "score": 8.0},
]

with CsvWriter("output.csv") as w:
    w.writerows(rows)

Typed Schema

from csv_turbo import CsvReader, Schema, ColumnDef, INT, FLOAT, STRING, DATE

schema = Schema([
    ColumnDef("id",    INT,    nullable=False),
    ColumnDef("name",  STRING),
    ColumnDef("score", FLOAT,  nullable=True, default=0.0),
    ColumnDef("dob",   DATE),
])

for row in CsvReader("data.csv", schema=schema):
    # row["id"] is int, row["score"] is float, row["dob"] is datetime.date
    print(row["id"], row["score"])

Streaming Large Files (Chunked)

from csv_turbo import CsvReader

reader = CsvReader("huge_file.csv")
for batch in reader.chunks(500):
    db.bulk_insert(batch)           # list of 500 row dicts

Pipeline API

from csv_turbo import CsvReader
from csv_turbo.streaming import Pipeline

result = (
    Pipeline(CsvReader("sales.csv", schema=schema))
    .filter(lambda r: r["amount"] > 100)
    .map(lambda r: {**r, "vat": round(r["amount"] * 0.2, 2)})
    .select("id", "name", "amount", "vat")
    .sort(key=lambda r: r["amount"], reverse=True)
    .take(50)
    .to_list()
)

Write pipeline output directly to CSV:

Pipeline(CsvReader("raw.csv"))     \
    .where(status="active")        \
    .drop("internal_notes")        \
    .write_csv("clean.csv")

Dialect Detection

from csv_turbo.dialect import detect_dialect_from_file

dialect = detect_dialect_from_file("european_export.csv")
print(dialect.delimiter)   # likely ";"

for row in CsvReader("european_export.csv", delimiter=dialect.delimiter):
    ...

Use a preset dialect:

from csv_turbo import TSV, CsvReader

for row in CsvReader("data.tsv", delimiter=TSV.delimiter):
    ...

Column Statistics / Profiling

from csv_turbo import CsvReader, profile

rows = CsvReader("data.csv").read_all()
data_profile = profile(rows, top_n=5)
print(data_profile)

# Access per-column stats
stats = data_profile.column("score")
print(stats.mean, stats.std_dev, stats.null_count)

Infer Schema Automatically

from csv_turbo import CsvReader

schema = CsvReader.infer_schema("data.csv", sample_size=500)
print(schema)   # Schema([id:int, name:string, score:float, dob:date])

for row in CsvReader("data.csv", schema=schema):
    print(row)

Read from String (Testing / Inline Data)

from csv_turbo import CsvReader

csv_text = "id,name,score\n1,Alice,9.5\n2,Bob,8.0\n"
for row in CsvReader.from_string(csv_text):
    print(row)

Write to String

from csv_turbo import CsvWriter

csv_str = CsvWriter.to_string([
    {"x": 1, "y": 2},
    {"x": 3, "y": 4},
])
print(csv_str)

API Reference

CsvReader

CsvReader(
    source,            # str | Path | file-like
    *,
    schema=None,       # Schema for typed casting
    delimiter=",",
    quotechar='"',
    encoding="utf-8",
    has_header=True,
    skip_blank_lines=True,
    strip_whitespace=True,
    null_values={""},  # Values treated as None
    row_filter=None,   # Callable[[dict], bool]
    row_transform=None,# Callable[[dict], dict]
    strict=False,      # Raise on extra columns
    max_errors=0,      # Tolerated cast errors
)
Method / Property Description
__iter__() Iterate rows as dicts
chunks(size) Iterate in batches
read_all() Load all rows into a list
count_rows() Count without materialising
headers Column names after first read
error_count Cast errors encountered
CsvReader.from_string(text) Create from a CSV string
CsvReader.infer_schema(path) Auto-detect schema

CsvWriter

CsvWriter(
    destination,           # str | Path | file-like
    *,
    fieldnames=None,       # Column order
    delimiter=",",
    quotechar='"',
    encoding="utf-8",
    write_header=True,
    formatter=None,        # Callable[[Any], str]
    column_formatters={},  # Per-column formatters
    row_transform=None,
    append=False,
    buffer_size=8192,
)
Method Description
writerow(row) Write one dict row
writerows(rows) Write an iterable of rows
rows_written Count of rows written
CsvWriter.to_string(rows) Render to string

Schema & ColumnDef

Schema([
    ColumnDef(name, type, nullable=True, default=None, alias=None),
    ...
])

Built-in type singletons: STRING, INT, FLOAT, BOOL, DATE, DATETIME

Parameterised constructors: IntType(min_value=0), FloatType(precision=2), DateType(fmt="%d/%m/%Y"), StringType(max_length=255)

Pipeline

Method Description
.filter(fn) Keep rows where fn(row) is True
.map(fn) Transform each row
.select(*cols) Keep only named columns
.rename(mapping) Rename columns
.drop(*cols) Remove columns
.add_field(name, fn) Compute a new column
.where(**kwargs) Equality filter shorthand
.skip(n) Skip first n rows
.take(n) Keep first n rows
.chunk(size) Group into batches
.sort(key) Sort (materialises)
.unique(key) Deduplicate rows
.peek(fn) Side-effect per row
.to_list() Materialise to list
.to_dict(key) Index by column
.count() Count rows
.first() First row or None
.aggregate(col, ...) Sum/mean/min/max
.write_csv(path) Write to file

profile(rows)

data_profile = profile(rows, top_n=10)
# DataProfile.row_count, .columns, .column(name)
# ColumnStats.count, .null_count, .fill_rate, .unique_count
# ColumnStats.mean, .std_dev, .min_value, .max_value, .median (numeric)
# ColumnStats.min_length, .max_length, .avg_length (string)
# ColumnStats.top_values — list of (value, count)

Error Handling

All exceptions inherit from CsvTurboError:

Exception When
ParseError Malformed CSV structure
TypeCastError Value cannot be cast to declared type
SchemaValidationError Missing/extra columns
WriteError Write failure
ConfigurationError Invalid reader/writer options
from csv_turbo import CsvReader, TypeCastError, SchemaValidationError

try:
    for row in CsvReader("data.csv", schema=schema, max_errors=5):
        process(row)
except TypeCastError as e:
    print(f"Bad value '{e.value}' in column '{e.column}' on line {e.line}")
except SchemaValidationError as e:
    print(f"Missing columns: {e.missing_columns}")

License

MIT © Vladyslav Zaiets

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

csv_turbo-1.0.0.tar.gz (19.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

csv_turbo-1.0.0-py3-none-any.whl (24.2 kB view details)

Uploaded Python 3

File details

Details for the file csv_turbo-1.0.0.tar.gz.

File metadata

  • Download URL: csv_turbo-1.0.0.tar.gz
  • Upload date:
  • Size: 19.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.3

File hashes

Hashes for csv_turbo-1.0.0.tar.gz
Algorithm Hash digest
SHA256 4a6181e23dfc36efebaaa32723c876b3d23425d0c1461b9312e42ec7854286f0
MD5 2b176e60486bb9d33a52f1fb6e45d0ec
BLAKE2b-256 01ea13d2c24957c12cb10e3f83d125f3582deea6d22a5326bdce28ef84cba729

See more details on using hashes here.

File details

Details for the file csv_turbo-1.0.0-py3-none-any.whl.

File metadata

  • Download URL: csv_turbo-1.0.0-py3-none-any.whl
  • Upload date:
  • Size: 24.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.3

File hashes

Hashes for csv_turbo-1.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 2b8675d8eec067608e71ca2f96369d8e24f3f54f3c300e47e619abffd16ab5d6
MD5 700b6c7978bccebbdaf3d772cb45b928
BLAKE2b-256 d8a4fb963089f7a3a33ea19f806da87176b65ccba3ea0915bb481272818fcb8a

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page