Iterable data processing Python library
Project description
Iterable Data
Iterable Data is a Python library for reading and writing data files row by row in a consistent, iterator-based interface. It provides a unified API for working with various data formats (CSV, JSON, Parquet, XML, etc.) similar to csv.DictReader but supporting many more formats.
This library simplifies data processing and conversion between formats while preserving complex nested data structures (unlike pandas DataFrames which require flattening).
Features
- Unified API: Single interface for reading/writing multiple data formats
- Automatic Format Detection: Detects file type and compression from filename or content (magic numbers and heuristics)
- Format Capability Reporting: Programmatically query format capabilities (read/write/bulk/totals/streaming/tables)
- Support for Compression: Works seamlessly with compressed files
- Preserves Nested Data: Handles complex nested structures as Python dictionaries
- DuckDB Integration: Optional DuckDB engine for high-performance queries with pushdown optimizations
- Pipeline Processing: Built-in pipeline support for data transformation
- Encoding Detection: Automatic encoding and delimiter detection for text files
- Bulk Operations: Efficient batch reading and writing
- Table Listing: Discover available tables, sheets, and datasets in multi-table formats
- Context Manager Support: Use
withstatements for automatic resource cleanup - DataFrame Bridges: Convert iterable data to Pandas, Polars, and Dask DataFrames with one-liner methods
- Cloud Storage Support: Direct access to S3, GCS, and Azure Blob Storage via URI schemes
- Atomic Writes: Production-safe file writing with temporary files and atomic renames
- Bulk File Conversion: Convert multiple files at once using glob patterns or directories
- Progress Tracking and Metrics: Built-in progress bars, callbacks, and structured metrics objects
- Error Handling Controls: Configurable error policies and structured error logging
- Type Hints and Type Safety: Complete type annotations with typed helper functions for dataclasses and Pydantic models
Supported File Types
Core Formats
- JSON - Standard JSON files
- JSONL/NDJSON - JSON Lines format (one JSON object per line)
- JSON-LD - JSON for Linking Data (RDF format)
- CSV/TSV - Comma and tab-separated values
- Annotated CSV - CSV with type annotations and metadata
- CSVW - CSV on the Web (with metadata)
- PSV/SSV - Pipe and semicolon-separated values
- LTSV - Labeled Tab-Separated Values
- FWF - Fixed Width Format
- XML - XML files with configurable tag parsing
- ZIP XML - XML files within ZIP archives
- HTML - HTML files with table extraction
Binary Formats
- BSON - Binary JSON format
- MessagePack - Efficient binary serialization
- CBOR - Concise Binary Object Representation
- UBJSON - Universal Binary JSON
- SMILE - Binary JSON variant
- Bencode - BitTorrent encoding format
- Avro - Apache Avro binary format
- Pickle - Python pickle format
Columnar & Analytics Formats
- Parquet - Apache Parquet columnar format
- ORC - Optimized Row Columnar format
- Arrow/Feather - Apache Arrow columnar format
- Lance - Modern columnar format optimized for ML and vector search
- Vortex - Modern columnar format with fast random access
- Delta Lake - Delta Lake format
- Iceberg - Apache Iceberg format
- Hudi - Apache Hudi format
Database Formats
- SQLite - SQLite database files
- DBF - dBase/FoxPro database files
- MySQL Dump - MySQL dump files
- PostgreSQL Copy - PostgreSQL COPY format
- DuckDB - DuckDB database files
Statistical Formats
- SAS - SAS data files
- Stata - Stata data files
- SPSS - SPSS data files
- R Data - R RDS and RData files
- PX - PC-Axis format
- ARFF - Attribute-Relation File Format (Weka format)
Scientific Formats
- NetCDF - Network Common Data Form for scientific data
- HDF5 - Hierarchical Data Format
Geospatial Formats
- GeoJSON - Geographic JSON format
- GeoPackage - OGC GeoPackage format
- GML - Geography Markup Language
- KML - Keyhole Markup Language
- Shapefile - ESRI Shapefile format
- MVT/PBF - Mapbox Vector Tiles
- TopoJSON - Topology-preserving GeoJSON extension
RDF & Semantic Formats
- JSON-LD - JSON for Linking Data
- RDF/XML - RDF in XML format
- Turtle - Terse RDF Triple Language
- N-Triples - Line-based RDF format
- N-Quads - N-Triples with context
Feed Formats
- Atom - Atom Syndication Format
- RSS - Rich Site Summary feed format
Network Formats
- PCAP - Packet Capture format
- PCAPNG - PCAP Next Generation format
Log & Event Formats
- Apache Log - Apache access/error logs
- CEF - Common Event Format
- GELF - Graylog Extended Log Format
- WARC - Web ARChive format
- CDX - Web archive index format
- ILP - InfluxDB Line Protocol
- HTML - HTML files with table extraction
Email Formats
- EML - Email message format
- MBOX - Mailbox format
- MHTML - MIME HTML format
Configuration Formats
- INI - INI configuration files
- TOML - Tom's Obvious Minimal Language
- YAML - YAML Ain't Markup Language
- HOCON - Human-Optimized Config Object Notation
- EDN - Extensible Data Notation
Office Formats
- XLS/XLSX - Microsoft Excel files
- ODS - OpenDocument Spreadsheet
CAD Formats
- DXF - AutoCAD Drawing Exchange Format
Streaming & Big Data Formats
- Kafka - Apache Kafka format
- Pulsar - Apache Pulsar format
- Flink - Apache Flink format
- Beam - Apache Beam format
- RecordIO - RecordIO format
- SequenceFile - Hadoop SequenceFile
- TFRecord - TensorFlow Record format
Protocol & Serialization Formats
- Protocol Buffers - Google Protocol Buffers
- Cap'n Proto - Cap'n Proto serialization
- FlatBuffers - FlatBuffers serialization
- FlexBuffers - FlexBuffers format
- Thrift - Apache Thrift format
- ASN.1 - ASN.1 encoding format
- Ion - Amazon Ion format
Other Formats
- VCF - Variant Call Format (genomics)
- iCal - iCalendar format
- LDIF - LDAP Data Interchange Format
- TXT - Plain text files
Supported Compression Codecs
- GZip (.gz)
- BZip2 (.bz2)
- LZMA (.xz, .lzma)
- LZ4 (.lz4)
- ZIP (.zip)
- Brotli (.br)
- ZStandard (.zst, .zstd)
- Snappy (.snappy, .sz)
- LZO (.lzo, .lzop)
- SZIP (.sz)
- 7z (.7z)
Requirements
Python 3.10+
Installation
pip install iterabledata
Or install from source:
git clone https://github.com/datenoio/iterabledata.git
cd pyiterable
pip install .
Quick Start
Basic Reading
from iterable.helpers.detect import open_iterable
# Automatically detects format and compression
# Using context manager (recommended)
with open_iterable('data.csv.gz') as source:
for row in source:
print(row)
# Process your data here
# File is automatically closed
# Or manually (still supported)
source = open_iterable('data.csv.gz')
for row in source:
print(row)
source.close()
Writing Data
from iterable.helpers.detect import open_iterable
# Write compressed JSONL file
# Using context manager (recommended)
with open_iterable('output.jsonl.zst', mode='w') as dest:
for item in my_data:
dest.write(item)
# File is automatically closed
# Or manually (still supported)
dest = open_iterable('output.jsonl.zst', mode='w')
for item in my_data:
dest.write(item)
dest.close()
Usage Examples
Reading Compressed CSV Files
from iterable.helpers.detect import open_iterable
# Read compressed CSV file (supports .gz, .bz2, .xz, .zst, .lz4, .br, .snappy, .lzo)
source = open_iterable('data.csv.xz')
n = 0
for row in source:
n += 1
# Process row data
if n % 1000 == 0:
print(f'Processed {n} rows')
source.close()
Reading Different Formats
from iterable.helpers.detect import open_iterable
# Read JSONL file
jsonl_file = open_iterable('data.jsonl')
for row in jsonl_file:
print(row)
jsonl_file.close()
# Read Parquet file
parquet_file = open_iterable('data.parquet')
for row in parquet_file:
print(row)
parquet_file.close()
# Read XML file (specify tag name)
xml_file = open_iterable('data.xml', iterableargs={'tagname': 'item'})
for row in xml_file:
print(row)
xml_file.close()
# Read Excel file
xlsx_file = open_iterable('data.xlsx')
for row in xlsx_file:
print(row)
xlsx_file.close()
Format Detection and Encoding
from iterable.helpers.detect import open_iterable, detect_file_type, detect_file_type_from_content
from iterable.helpers.utils import detect_encoding, detect_delimiter
# Detect file type and compression (uses filename extension)
result = detect_file_type('data.csv.gz')
print(f"Type: {result['datatype']}, Codec: {result['codec']}")
# Content-based detection (for files without extensions or streams)
with open('data.unknown', 'rb') as f:
detected_format = detect_file_type_from_content(f)
print(f"Detected format: {detected_format}") # e.g., 'parquet', 'json', 'csv'
# open_iterable() automatically uses content-based detection as fallback
# Works with files without extensions, streams, or incorrect extensions
with open_iterable('data.unknown') as source: # Detects from content
for row in source:
print(row)
# Detect encoding for CSV files
encoding_info = detect_encoding('data.csv')
print(f"Encoding: {encoding_info['encoding']}, Confidence: {encoding_info['confidence']}")
# Detect delimiter for CSV files
delimiter = detect_delimiter('data.csv', encoding=encoding_info['encoding'])
# Open with detected settings
source = open_iterable('data.csv', iterableargs={
'encoding': encoding_info['encoding'],
'delimiter': delimiter
})
Error Handling
IterableData provides a comprehensive exception hierarchy and configurable error handling:
from iterable.helpers.detect import open_iterable
from iterable.exceptions import (
FormatDetectionError,
FormatNotSupportedError,
FormatParseError,
CodecError
)
# Basic exception handling
try:
with open_iterable('data.unknown') as source:
for row in source:
process(row)
except FormatDetectionError as e:
print(f"Could not detect format: {e.reason}")
# Try with explicit format or check file content
except FormatNotSupportedError as e:
print(f"Format '{e.format_id}' not supported: {e.reason}")
# Install missing dependencies or use different format
except FormatParseError as e:
print(f"Failed to parse {e.format_id} format")
if e.position:
print(f"Error at position: {e.position}")
except CodecError as e:
print(f"Compression error with {e.codec_name}: {e.message}")
# Check file integrity or try different codec
except Exception as e:
print(f"Unexpected error: {e}")
Configurable Error Policies: Control how malformed records are handled:
# Skip malformed records and continue processing
with open_iterable(
'data.csv',
iterableargs={'on_error': 'skip', 'error_log': 'errors.log'}
) as src:
for row in src:
process(row) # Only processes valid rows
# Warn on errors but continue processing
with open_iterable(
'data.jsonl',
iterableargs={'on_error': 'warn', 'error_log': 'errors.log'}
) as src:
for row in src:
process(row) # Warnings logged, processing continues
# Default: raise exceptions immediately (existing behavior)
with open_iterable('data.csv', iterableargs={'on_error': 'raise'}) as src:
for row in src:
process(row)
Error Logging: Structured JSON logs with context (filename, row number, byte offset, error message, original line).
See Exception Hierarchy documentation for complete exception reference.
Querying Format Capabilities
from iterable.helpers.capabilities import (
get_format_capabilities,
get_capability,
list_all_capabilities
)
# Get all capabilities for a format
caps = get_format_capabilities("csv")
print(f"CSV readable: {caps['readable']}")
print(f"CSV writable: {caps['writable']}")
print(f"CSV supports totals: {caps['totals']}")
print(f"CSV supports tables: {caps['tables']}")
# Query a specific capability
is_writable = get_capability("json", "writable")
has_totals = get_capability("parquet", "totals")
supports_tables = get_capability("xlsx", "tables")
# List capabilities for all formats
all_caps = list_all_capabilities()
for format_id, capabilities in all_caps.items():
if capabilities.get("tables"):
print(f"{format_id} supports multiple tables")
Format Conversion
from iterable.helpers.detect import open_iterable
from iterable.convert.core import convert
# Simple format conversion
convert('input.jsonl.gz', 'output.parquet')
# Convert with options
convert(
'input.csv.xz',
'output.jsonl.zst',
iterableargs={'delimiter': ';', 'encoding': 'utf-8'},
batch_size=10000
)
# Convert and flatten nested structures
convert(
'input.jsonl',
'output.csv',
is_flatten=True,
batch_size=50000
)
Atomic Writes for Production Safety
Use atomic writes to ensure output files are never left in a partially written state:
from iterable.convert.core import convert
from iterable.pipeline.core import pipeline
# Convert with atomic writes (production-safe)
result = convert('input.csv', 'output.parquet', atomic=True)
# Output file only appears when conversion completes successfully
# Atomic writes in pipelines
pipeline(
source=source,
destination=destination,
process_func=transform_func,
atomic=True # Ensures destination file is only created on success
)
Benefits: Prevents data corruption from crashes, interruptions, or mid-process failures. Original files are preserved on failure.
Bulk File Conversion
Convert multiple files at once using glob patterns, directories, or file lists:
from iterable.convert.core import bulk_convert
# Convert all CSV files matching glob pattern
result = bulk_convert('data/raw/*.csv.gz', 'data/processed/', to_ext='parquet')
# Convert with custom filename pattern
result = bulk_convert('data/*.csv', 'output/', pattern='{name}.parquet')
# Convert entire directory
result = bulk_convert('data/raw/', 'data/processed/', to_ext='parquet')
# Access results
print(f"Converted {result.successful_files}/{result.total_files} files")
print(f"Total rows: {result.total_rows_out}")
print(f"Throughput: {result.throughput:.0f} rows/second")
# Check individual file results
for file_result in result.file_results:
if file_result.success:
print(f"✓ {file_result.source_file}: {file_result.result.rows_out} rows")
else:
print(f"✗ {file_result.source_file}: {file_result.error}")
Features: Error resilience (continues if one file fails), aggregated metrics, flexible output naming with placeholders ({name}, {stem}, {ext}).
Progress Tracking and Metrics
Track conversion and pipeline progress with callbacks, progress bars, and structured metrics:
from iterable.convert.core import convert
from iterable.pipeline.core import pipeline
# Progress callback for conversions
def progress_cb(stats):
print(f"Progress: {stats['rows_read']} rows read, "
f"{stats['rows_written']} rows written, "
f"{stats.get('elapsed', 0):.2f}s elapsed")
# Convert with progress tracking
result = convert(
'input.csv',
'output.parquet',
progress=progress_cb,
show_progress=True # Also shows tqdm progress bar
)
# Access conversion metrics
print(f"Converted {result.rows_out} rows in {result.elapsed_seconds:.2f}s")
print(f"Read {result.bytes_read} bytes, wrote {result.bytes_written} bytes")
# Pipeline with progress and metrics
result = pipeline(
source=source,
destination=destination,
process_func=transform_func,
progress=progress_cb # Progress callback
)
# Access pipeline metrics (supports both attribute and dict access)
print(f"Processed {result.rows_processed} rows")
print(f"Throughput: {result.throughput:.0f} rows/second")
print(f"Exceptions: {result.exceptions}")
# Backward compatible: result['rec_count'] also works
Features: Real-time progress callbacks, automatic progress bars with tqdm, structured metrics objects (ConversionResult, PipelineResult).
Using Pipeline for Data Processing
from iterable.helpers.detect import open_iterable
from iterable.pipeline.core import pipeline
source = open_iterable('input.parquet')
destination = open_iterable('output.jsonl.xz', mode='w')
def transform_record(record, state):
"""Transform each record"""
# Add processing logic
out = {}
for key in ['name', 'email', 'age']:
if key in record:
out[key] = record[key]
return out
def progress_callback(stats, state):
"""Called every trigger_on records"""
print(f"Processed {stats['rec_count']} records, "
f"Duration: {stats.get('duration', 0):.2f}s")
def final_callback(stats, state):
"""Called when processing completes"""
print(f"Total records: {stats['rec_count']}")
print(f"Total time: {stats['duration']:.2f}s")
result = pipeline(
source=source,
destination=destination,
process_func=transform_record,
trigger_func=progress_callback,
trigger_on=1000,
final_func=final_callback,
start_state={},
atomic=True # Use atomic writes for production safety
)
# Access pipeline metrics
print(f"Throughput: {result.throughput:.0f} rows/second")
source.close()
destination.close()
Manual Format and Codec Usage
from iterable.datatypes.jsonl import JSONLinesIterable
from iterable.datatypes.bsonf import BSONIterable
from iterable.codecs.gzipcodec import GZIPCodec
from iterable.codecs.lzmacodec import LZMACodec
# Read gzipped JSONL
read_codec = GZIPCodec('input.jsonl.gz', mode='r', open_it=True)
reader = JSONLinesIterable(codec=read_codec)
# Write LZMA compressed BSON
write_codec = LZMACodec('output.bson.xz', mode='wb', open_it=False)
writer = BSONIterable(codec=write_codec, mode='w')
for row in reader:
writer.write(row)
reader.close()
writer.close()
Cloud Storage Support
Read and write data directly from cloud object storage (S3, GCS, Azure):
from iterable.helpers.detect import open_iterable
# Read from S3
with open_iterable('s3://my-bucket/data/events.csv') as source:
for row in source:
print(row)
# Read compressed file from GCS
with open_iterable('gs://my-bucket/data/events.jsonl.gz') as source:
for row in source:
process(row)
# Write to Azure Blob Storage
with open_iterable(
'az://my-container/output/results.jsonl',
mode='w',
iterableargs={'storage_options': {'connection_string': '...'}}
) as dest:
dest.write({'name': 'Alice', 'age': 30})
dest.write({'name': 'Bob', 'age': 25})
Supported Providers:
- Amazon S3:
s3://ands3a://schemes - Google Cloud Storage:
gs://andgcs://schemes - Azure Blob Storage:
az://,abfs://, andabfss://schemes
Installation: pip install iterabledata[cloud]
Note: DuckDB engine does not support cloud storage URIs; use engine='internal' (default).
Using DuckDB Engine with Pushdown Optimizations
The DuckDB engine provides high-performance querying with advanced optimizations:
from iterable.helpers.detect import open_iterable
# Basic DuckDB usage
source = open_iterable('data.csv.gz', engine='duckdb')
total = source.totals() # Fast counting
for row in source:
print(row)
source.close()
# Column projection pushdown (only read specified columns)
with open_iterable(
'data.csv',
engine='duckdb',
iterableargs={'columns': ['name', 'age']} # Reduces I/O and memory
) as src:
for row in src:
process(row)
# Filter pushdown (filter at database level)
with open_iterable(
'data.csv',
engine='duckdb',
iterableargs={'filter': "age > 18 AND status = 'active'"}
) as src:
for row in src:
process(row)
# Combined column projection and filtering
with open_iterable(
'data.parquet',
engine='duckdb',
iterableargs={
'columns': ['name', 'age', 'email'],
'filter': 'age > 18'
}
) as src:
for row in src:
process(row)
# Direct SQL query support
with open_iterable(
'data.parquet',
engine='duckdb',
iterableargs={
'query': 'SELECT name, age FROM read_parquet(\'data.parquet\') WHERE age > 18 ORDER BY age DESC LIMIT 100'
}
) as src:
for row in src:
process(row)
Supported Formats: CSV, JSONL, NDJSON, JSON, Parquet
Supported Codecs: GZIP, ZStandard (.zst)
Benefits: Reduced I/O, lower memory usage, faster processing through database-level optimizations
Bulk Operations
from iterable.helpers.detect import open_iterable
source = open_iterable('input.jsonl')
destination = open_iterable('output.parquet', mode='w')
# Read and write in batches for better performance
batch = []
for row in source:
batch.append(row)
if len(batch) >= 10000:
destination.write_bulk(batch)
batch = []
# Write remaining records
if batch:
destination.write_bulk(batch)
source.close()
destination.close()
Working with Excel Files
from iterable.helpers.detect import open_iterable
# Read Excel file (specify sheet or page)
xls_file = open_iterable('data.xlsx', iterableargs={'page': 0})
for row in xls_file:
print(row)
xls_file.close()
# Read specific sheet in XLSX
xlsx_file = open_iterable('data.xlsx', iterableargs={'page': 'Sheet2'})
XML Processing
from iterable.helpers.detect import open_iterable
# Parse XML with specific tag name
xml_file = open_iterable(
'data.xml',
iterableargs={
'tagname': 'book',
'prefix_strip': True # Strip XML namespace prefixes
}
)
for item in xml_file:
print(item)
xml_file.close()
DataFrame Bridges
Convert iterable data to Pandas, Polars, or Dask DataFrames:
from iterable.helpers.detect import open_iterable
# Convert to Pandas DataFrame
with open_iterable('data.csv.gz') as source:
df = source.to_pandas()
print(df.head())
# Chunked processing for large files
with open_iterable('large_data.csv') as source:
for df_chunk in source.to_pandas(chunksize=100_000):
# Process each chunk
result = df_chunk.groupby('category').sum()
process_chunk(result)
# Convert to Polars DataFrame
with open_iterable('data.csv.gz') as source:
df = source.to_polars()
print(df.head())
# Convert to Dask DataFrame (single file)
with open_iterable('data.csv.gz') as source:
ddf = source.to_dask()
result = ddf.groupby('category').sum().compute()
# Multi-file Dask DataFrame (automatic format detection)
from iterable.helpers.bridges import to_dask
ddf = to_dask(['file1.csv', 'file2.jsonl', 'file3.parquet'])
result = ddf.groupby('category').sum().compute()
Note: DataFrame bridges require optional dependencies. Install with:
pip install iterabledata[dataframes] # All DataFrame libraries
# Or individually:
pip install pandas
pip install polars
pip install "dask[dataframe]"
Type Hints and Type Safety
IterableData includes complete type annotations and typed helper functions for modern Python development:
from iterable.helpers.detect import open_iterable
from iterable.helpers.typed import as_dataclasses, as_pydantic
from dataclasses import dataclass
from pydantic import BaseModel
# Type aliases for better code readability
from iterable.types import Row, IterableArgs, CodecArgs
# Convert to dataclasses for type safety
@dataclass
class Person:
name: str
age: int
email: str | None = None
with open_iterable('people.csv') as source:
for person in as_dataclasses(source, Person):
# Full IDE autocomplete and type checking
print(person.name, person.age)
# Convert to Pydantic models with validation
class PersonModel(BaseModel):
name: str
age: int
email: str | None = None
with open_iterable('people.jsonl') as source:
for person in as_pydantic(source, PersonModel, validate=True):
# Automatic schema validation
print(person.name, person.age)
# Access as Pydantic model with all features
Benefits:
- Complete type annotations across the public API
py.typedmarker file enables mypy, pyright, and other type checkers- Typed helpers provide IDE autocomplete and type safety
- Pydantic validation catches schema issues early
Installation: pip install iterabledata[pydantic] for Pydantic support
Advanced: Converting Compressed XML to Parquet
from iterable.datatypes.xml import XMLIterable
from iterable.datatypes.parquet import ParquetIterable
from iterable.codecs.bz2codec import BZIP2Codec
# Read compressed XML
read_codec = BZIP2Codec('data.xml.bz2', mode='r')
reader = XMLIterable(codec=read_codec, tagname='page')
# Write to Parquet with schema adaptation
writer = ParquetIterable(
'output.parquet',
mode='w',
use_pandas=False,
adapt_schema=True,
batch_size=10000
)
batch = []
for row in reader:
batch.append(row)
if len(batch) >= 10000:
writer.write_bulk(batch)
batch = []
if batch:
writer.write_bulk(batch)
reader.close()
writer.close()
API Reference
Main Functions
open_iterable(filename, mode='r', engine='internal', codecargs={}, iterableargs={})
Opens a file and returns an iterable object.
Parameters:
filename(str): Path to the file (supports local files and cloud storage URIs:s3://,gs://,az://)mode(str): File mode ('r' for read, 'w' for write)engine(str): Processing engine ('internal' or 'duckdb')codecargs(dict): Arguments for codec initializationiterableargs(dict): Arguments for iterable initializationcolumns(list[str]): For DuckDB engine, only read specified columns (pushdown optimization)filter(str | callable): For DuckDB engine, filter rows at database level (SQL string or Python callable)query(str): For DuckDB engine, execute custom SQL query (read-only)on_error(str): Error policy ('raise', 'skip', or 'warn')error_log(str | file-like): Path or file object for structured error loggingstorage_options(dict): Cloud storage authentication options
Returns: Iterable object for the detected file type
detect_file_type(filename)
Detects file type and compression codec from filename.
Returns: Dictionary with success, datatype, and codec keys
convert(fromfile, tofile, iterableargs={}, toiterableargs={}, scan_limit=1000, batch_size=50000, silent=True, is_flatten=False, use_totals=False, progress=None, show_progress=False, atomic=False) -> ConversionResult
Converts data between formats.
Parameters:
fromfile(str): Source file pathtofile(str): Destination file pathiterableargs(dict): Options for reading source filetoiterableargs(dict): Options for writing destination filescan_limit(int): Number of records to scan for schema detectionbatch_size(int): Batch size for bulk operationssilent(bool): Suppress progress outputis_flatten(bool): Flatten nested structuresuse_totals(bool): Use total count for progress tracking (if available)progress(callable): Optional callback function receiving progress stats dictionaryshow_progress(bool): Display progress bar using tqdm (if available)atomic(bool): Write to temporary file and atomically rename on success
Returns: ConversionResult object with:
rows_in(int): Total rows readrows_out(int): Total rows writtenelapsed_seconds(float): Conversion timebytes_read(int | None): Bytes read (if available)bytes_written(int | None): Bytes written (if available)errors(list[Exception]): List of errors encountered
bulk_convert(source, destination, pattern=None, to_ext=None, **kwargs) -> BulkConversionResult
Convert multiple files at once using glob patterns, directories, or file lists.
Parameters:
source(str): Glob pattern, directory path, or file pathdestination(str): Output directory or filename patternpattern(str): Filename pattern with placeholders ({name},{stem},{ext})to_ext(str): Replace file extension (e.g.,'parquet')**kwargs: All parameters fromconvert()function
Returns: BulkConversionResult object with:
total_files(int): Total files processedsuccessful_files(int): Files successfully convertedfailed_files(int): Files that failedtotal_rows_in(int): Total rows read across all filestotal_rows_out(int): Total rows written across all filestotal_elapsed_seconds(float): Total conversion timefile_results(list[FileConversionResult]): Per-file resultserrors(list[Exception]): All errors encounteredthroughput(float | None): Rows per second
pipeline(source, destination, process_func, trigger_func=None, trigger_on=1000, final_func=None, reset_iterables=True, skip_nulls=True, start_state=None, debug=False, batch_size=1000, progress=None, atomic=False) -> PipelineResult
Execute a data processing pipeline.
Parameters:
source(BaseIterable): Source iterable to read fromdestination(BaseIterable | None): Destination iterable to write toprocess_func(callable): Function to process each recordtrigger_func(callable | None): Function called periodically during processingtrigger_on(int): Number of records between trigger function callsfinal_func(callable | None): Function called after processing completesreset_iterables(bool): Reset iterables before processingskip_nulls(bool): Skip None results from process_funcstart_state(dict | None): Initial state dictionarydebug(bool): Raise exceptions instead of catching thembatch_size(int): Number of records to batch before writingprogress(callable | None): Optional callback function for progress updatesatomic(bool): Use atomic writes if destination is a file
Returns: PipelineResult object with:
rows_processed(int): Total rows processedelapsed_seconds(float): Processing timethroughput(float | None): Rows per secondexceptions(int): Number of exceptions encounterednulls(int): Number of null results- Supports both attribute access (
result.rows_processed) and dictionary access (result['rec_count']) for backward compatibility
Iterable Methods
All iterable objects support:
read(skip_empty=True) -> Row- Read single recordread_bulk(num=DEFAULT_BULK_NUMBER) -> list[Row]- Read multiple recordswrite(record)- Write single recordwrite_bulk(records)- Write multiple recordsreset()- Reset iterator to beginningclose()- Close file handlesto_pandas(chunksize=None)- Convert to pandas DataFrame (optional chunked processing)to_polars(chunksize=None)- Convert to Polars DataFrame (optional chunked processing)to_dask(chunksize=1000000)- Convert to Dask DataFramelist_tables(filename=None) -> list[str] | None- List available tables/sheets/datasetshas_tables() -> bool- Check if format supports multiple tables
Helper Functions
as_dataclasses(iterable, dataclass_type, skip_empty=True) -> Iterator[T]
Convert dict-based rows from an iterable into dataclass instances.
Parameters:
iterable(BaseIterable): The iterable to read rows fromdataclass_type(type[T]): The dataclass type to convert rows toskip_empty(bool): Whether to skip empty rows
Returns: Iterator of dataclass instances
as_pydantic(iterable, model_type, skip_empty=True, validate=True) -> Iterator[T]
Convert dict-based rows from an iterable into Pydantic model instances.
Parameters:
iterable(BaseIterable): The iterable to read rows frommodel_type(type[T]): The Pydantic model type to convert rows toskip_empty(bool): Whether to skip empty rowsvalidate(bool): Whether to validate rows against the model schema
Returns: Iterator of Pydantic model instances
Raises: ImportError if pydantic is not installed
to_dask(files, chunksize=1000000, **iterableargs) -> DaskDataFrame
Convert multiple files to a unified Dask DataFrame with automatic format detection.
Parameters:
files(str | list[str]): Single file path or list of file pathschunksize(int): Number of rows per partition**iterableargs: Additional arguments to pass toopen_iterable()for each file
Returns: Dask DataFrame containing data from all files
Raises: ImportError if dask or pandas is not installed
Engines
Internal Engine (Default)
The internal engine uses pure Python implementations for all formats. It supports all file types and compression codecs.
DuckDB Engine
The DuckDB engine provides high-performance querying capabilities for supported formats:
- Formats: CSV, JSONL, NDJSON, JSON
- Codecs: GZIP, ZStandard (.zst)
- Features: Fast querying, totals counting, SQL-like operations
Use engine='duckdb' when opening files:
source = open_iterable('data.csv.gz', engine='duckdb')
Examples Directory
See the examples directory for more complete examples:
simplewiki/- Processing Wikipedia XML dumps
More Examples and Tests
See the tests directory for comprehensive usage examples and test cases.
AI Integration Guides
IterableData can be integrated with AI platforms and frameworks for intelligent data processing:
-
AI Frameworks - Integration with LangChain, CrewAI, and AutoGen
- Tool creation for data reading and format conversion
- Schema inference and data quality analysis
- Multi-agent workflows for data processing
-
OpenAI - Direct OpenAI API integration (GPT-4, GPT-3.5, etc.)
- Function calling and Assistants API
- Structured outputs for consistent results
- Natural language data analysis and transformation
-
Claude - Anthropic Claude AI integration
- Claude API integration with tools support
- Intelligent data analysis and schema inference
- Format conversion with AI guidance
- Data quality assessment and documentation
-
Gemini - Google Gemini AI integration
- Natural language data analysis
- Intelligent format conversion with AI guidance
- Schema documentation and data quality assessment
- Function calling integration
These guides provide patterns, examples, and best practices for combining IterableData's unified data interface with AI capabilities.
Related Projects
This library is used in:
- undatum - Command line data processing tool
- datacrafter - Data processing ETL engine
License
MIT License
Contributing
Contributions are welcome! Please feel free to submit pull requests or open issues.
Changelog
See CHANGELOG.md for detailed version history.
Version 1.0.11 (2026-01-25)
- Atomic Writes: Production-safe file writing with temporary files and atomic renames
- Bulk File Conversion: Convert multiple files at once using glob patterns, directories, or file lists
- Observability Features: Progress tracking, metrics objects, and progress bars for conversions and pipelines
- Cloud Storage Support: Direct access to S3, GCS, and Azure Blob Storage via URI schemes
- DuckDB Engine Pushdown Optimizations: Column projection, filter pushdown, and direct SQL query support
- Error Handling Controls: Configurable error policies (
on_error) and structured error logging (error_log) - Type Hints and Typed Helpers: Complete type annotations with
as_dataclasses()andas_pydantic()helper functions - Vortex Format Support: Added support for reading and writing Vortex columnar data files
Version 1.0.11 (2026-01-25)
- Enhanced Format Detection: Added content-based format detection using magic numbers and heuristics for files without extensions, streams, and files with incorrect extensions
- Exception Hierarchy: Added comprehensive exception hierarchy (
IterableDataError,FormatError,CodecError, etc.) for better error handling - Format Capability Reporting: Added programmatic API to query format capabilities (
get_format_capabilities(),list_all_capabilities(),get_capability()) - Table Listing Support: Added
list_tables()andhas_tables()methods for discovering tables, sheets, and datasets in multi-table formats
Version 1.0.8 (2026-01-05)
- AI Integration Guides: Added comprehensive guides for LangChain, CrewAI, AutoGen, and Google Gemini AI
- Documentation: Added capability matrix and enhanced API documentation
- Development Tools: Added benchmarking and utility scripts
- Code Improvements: Enhanced format detection, codecs, and data type handlers
- Examples: Added ZIP XML processing example
Version 1.0.7 (2024-12-15)
- Major Format Expansion: Added support for 50+ new data formats across multiple categories
- Enhanced Compression: Added LZO, Snappy, and SZIP codec support
- CI/CD: Added GitHub Actions workflows for automated testing and deployment
- Documentation: Complete documentation site with Docusaurus
- Testing: Comprehensive test suite for all formats
Version 1.0.6
- Comprehensive documentation enhancements
- GitHub Actions release workflow
- Improved examples and use cases
Version 1.0.5
- DuckDB engine support
- Enhanced format detection
- Pipeline processing framework
- Bulk operations support
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file iterabledata-1.0.11.tar.gz.
File metadata
- Download URL: iterabledata-1.0.11.tar.gz
- Upload date:
- Size: 252.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
626e3dcb5080d5080d9125fe014e11f84c0671c1c90d31784e8d91aedecb6525
|
|
| MD5 |
9ce254655838aa67a152536796136853
|
|
| BLAKE2b-256 |
62fbca63cca7b0377a864752baa25ab91d9a459ca1f494a1c19ef55defd8c381
|
File details
Details for the file iterabledata-1.0.11-py3-none-any.whl.
File metadata
- Download URL: iterabledata-1.0.11-py3-none-any.whl
- Upload date:
- Size: 237.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
afd87895ee906713fdb82393b93c469ec07d971b417af24f8f2f522d28705cc2
|
|
| MD5 |
697826cd754386a4099b5a2ea8553799
|
|
| BLAKE2b-256 |
89a4048a3037c6afbcecb57e55b01a63e8e2f2784d519d8039de9dfb0e260575
|