Python utilities for working with Avro data files
Project description
avrokit
Python utilities for working with Avro data files with support for local files, GCS, S3, and HTTP/HTTPS URLs.
Features
- Unified URL Interface: Read and write Avro files transparently across local filesystem, GCS, S3, and HTTP/HTTPS
- CLI Tools: Comprehensive command-line tools for common Avro operations
- Python API: Rich programmatic interface for reading, writing, and transforming Avro data
- Partitioned I/O: Support for reading and writing partitioned/multi-file datasets
- Async Support: Non-blocking I/O primitives for high-throughput applications
- Parquet Conversion: Bidirectional conversion between Avro and Parquet formats
- Schema Operations: Schema extraction, validation, and evolution checking
Installation
Basic Installation
pip install avrokit
With Cloud Storage Support
# AWS S3 support
pip install avrokit[aws]
# Google Cloud Storage support
pip install avrokit[gcp]
# All extras (S3 + GCS)
pip install avrokit[all]
Development Installation
git clone https://github.com/brandtg/avrokit.git
cd avrokit
poetry env use python3.12
make install
Quick Start
CLI Usage
# View statistics for an Avro file
avrokit stats gs://bucket/data.avro
# Convert Avro to JSON
avrokit tojson s3://bucket/data.avro > output.json
# Extract sample records
avrokit cat file:///path/to/data.avro --limit 10
# Get schema
avrokit getschema data.avro
# Convert to Parquet
avrokit toparquet data.avro output.parquet
# Concatenate multiple files
avrokit concat file1.avro file2.avro s3://bucket/file3.avro output.avro
Python API
from avrokit import avro_reader, avro_records, avro_writer, avro_schema, parse_url
# Read records from any URL
url = parse_url('gs://bucket/data.avro', mode='rb')
for record in avro_records(url):
print(record)
# Write Avro data
schema = avro_schema({
'type': 'record',
'name': 'User',
'fields': [
{'name': 'name', 'type': 'string'},
{'name': 'age', 'type': 'int'}
]
})
url = parse_url('output.avro', mode='wb')
with avro_writer(url, schema) as writer:
writer.append({'name': 'Alice', 'age': 30})
writer.append({'name': 'Bob', 'age': 25})
Architecture
avrokit is organized into four main layers:
1. URL Layer (avrokit.url)
Provides a unified abstraction over different storage backends. All URL types implement the URL protocol with common operations:
URL(base protocol): Abstract interface for all URL typesFileURL: Local filesystem access with glob pattern supportGCSURL: Google Cloud Storage viagoogle-cloud-storageS3URL: Amazon S3 viaboto3HTTPURL: HTTP/HTTPS with range request support for streaming
The parse_url() factory function automatically instantiates the correct URL class based on the scheme:
from avrokit.url import parse_url
# All of these return the appropriate URL subclass
url1 = parse_url('file:///path/to/data.avro') # FileURL
url2 = parse_url('gs://bucket/data.avro') # GCSURL
url3 = parse_url('s3://bucket/data.avro') # S3URL
url4 = parse_url('https://example.com/data.avro') # HTTPURL
2. I/O Layer (avrokit.io)
High-level Avro reading and writing primitives built on the URL layer:
Reading:
avro_reader(url): Context manager forDataFileReaderavro_records(url): Generator yielding records as dictionariesPartitionedAvroReader: Reads from multiple files or glob patterns as a single stream
Writing:
avro_writer(url, schema): Context manager forDataFileWriterPartitionedAvroWriter: Writes to multiple files with configurable size limitsTimePartitionedAvroWriter: Time-based partitioning (e.g., hourly/daily files)
Schema Operations:
avro_schema(dict): Create schema from dictionaryread_avro_schema(url): Extract schema from filevalidate_avro_schema_evolution(): Check backward/forward compatibilityadd_avro_schema_fields(): Schema augmentation utilities
Utilities:
compact_avro_data(): Remove deleted/updated records by key
3. Async I/O Layer (avrokit.asyncio)
Non-blocking primitives for async applications:
DeferredAvroWriter: Async writer that batches records and flushes in backgroundBlockingQueueAvroReader: Queue-based async reader for producer/consumer patterns
4. Tools Layer (avrokit.tools)
CLI commands implemented as classes following the Tool protocol:
| Tool | Command | Description |
|---|---|---|
CatTool |
cat |
Extract sample records with optional random sampling |
ConcatTool |
concat |
Concatenate multiple Avro files into one |
CountTool |
count |
Count records in files |
FileSortTool |
filesort |
Sort records by key across multiple files |
FromParquetTool |
fromparquet |
Convert Parquet to Avro |
GetMetaTool |
getmeta |
Extract file metadata (schema, compression, etc.) |
GetSchemaTool |
getschema |
Extract and print schema |
HttpServerTool |
httpserver |
Serve Avro files over HTTP with filtering/sampling |
PartitionTool |
partition |
Split files into multiple partitions |
RepairTool |
repair |
Fix corrupted Avro files |
StatsTool |
stats |
Compute statistics (count, nulls, sizes) |
ToJsonTool |
tojson |
Convert to newline-delimited JSON |
ToParquetTool |
toparquet |
Convert to Parquet format |
CLI Reference
Global Options
avrokit --debug <command> # Enable debug logging
Available Commands
cat - Extract Sample Records
avrokit cat FILE [OPTIONS]
--limit N Maximum records to output (default: 10)
--sample-rate F Random sampling rate (0.0-1.0)
concat - Concatenate Files
avrokit concat INPUT1 [INPUT2 ...] OUTPUT
INPUT can be local files, GCS, S3, or HTTP URLs
OUTPUT schema must be compatible with all inputs
count - Count Records
avrokit count FILE [FILE ...]
Returns total record count across all files
filesort - Sort Records
avrokit filesort INPUT OUTPUT --keys KEY1 [KEY2 ...]
Sorts records by specified keys using external merge sort
fromparquet - Parquet to Avro
avrokit fromparquet INPUT.parquet OUTPUT.avro
getmeta - File Metadata
avrokit getmeta FILE
Outputs: schema, codec, sync marker, block count, etc.
getschema - Extract Schema
avrokit getschema FILE
Outputs Avro schema as JSON
httpserver - HTTP Server
avrokit httpserver --port 8080 --files "*.avro"
Serves Avro files with filtering and sampling support
partition - Partition Files
avrokit partition INPUT OUTPUT --count N
Splits INPUT into N approximately equal partitions
repair - Fix Corrupted Files
avrokit repair INPUT OUTPUT
Attempts to recover readable records from damaged files
stats - Compute Statistics
avrokit stats FILE [FILE ...]
Outputs: record count, file sizes, null counts per field
tojson - Convert to JSON
avrokit tojson FILE [FILE ...]
One JSON record per line (newline-delimited JSON)
toparquet - Convert to Parquet
avrokit toparquet INPUT.avro OUTPUT.parquet
Type mapping: Avro types → Parquet types
Development
Prerequisites
- Python 3.12+
- Poetry
- Docker (for GCS/S3 integration tests)
Setup
# Set up Python environment
poetry env use python3.12
# Install dependencies
make install
# Pull fake-gcs-server for testing
docker pull fsouza/fake-gcs-server
Running Tests
# Run all tests
make test
# Run with coverage
make test-coverage
# Run tests in parallel (default)
poetry run pytest -n auto
Code Quality
# Format code
make format
# Lint code
make lint
# Type checking
make typecheck
# Add license headers
make license
Building
make build # Build wheel and source distribution
Releasing to PyPI
Releases are automated via GitHub Actions using Trusted Publishing (OIDC) - no API tokens needed.
One-time PyPI setup:
- Log into PyPI
- Go to your project → Publishing
- Add a new pending publisher:
- Owner:
brandtg - Repository:
avrokit - Workflow name:
release.yml
- Owner:
- Save the configuration
To release a new version:
- Update version in
pyproject.toml(e.g.,0.1.1) - Commit and push the version bump
- Go to GitHub → Releases → Draft a new release
- Create a new tag matching the version (e.g.,
v0.1.1) - Click Publish release
The GitHub Actions workflow will automatically build and publish to PyPI.
Python API Examples
Working with Partitioned Files
from avrokit import PartitionedAvroReader, PartitionedAvroWriter, parse_url
# Read from multiple files as single stream
url = parse_url('data-*.avro', mode='rb')
with PartitionedAvroReader(url) as reader:
for record in reader:
process(record)
# Write to multiple files (roll every 10000 records)
url = parse_url('output/', mode='wb')
with PartitionedAvroWriter(url, schema, max_records=10000) as writer:
for record in records:
writer.append(record)
if should_roll():
writer.roll() # Create new partition file
Time-Based Partitioning
from avrokit import TimePartitionedAvroWriter, parse_url
from datetime import datetime
# Creates hourly files: output/2024/01/15/10.avro
url = parse_url('output/', mode='wb')
with TimePartitionedAvroWriter(
url,
schema,
time_granularity='hour',
time_field='timestamp'
) as writer:
for record in records:
writer.append(record, timestamp=record['timestamp'])
Async Operations
import asyncio
from avrokit.asyncio import DeferredAvroWriter
from avrokit import parse_url, avro_schema
async def write_async():
url = parse_url('output.avro', mode='wb')
schema = avro_schema({...})
async with DeferredAvroWriter(url, schema) as writer:
for record in records:
await writer.append(record)
# Flushes happen automatically in background
asyncio.run(write_async())
Schema Evolution
from avrokit import validate_avro_schema_evolution, read_avro_schema
from avrokit.url import parse_url
# Check if new schema is backward compatible
reader_schema = read_avro_schema(parse_url('old.avro', 'rb'))
writer_schema = read_avro_schema(parse_url('new.avro', 'rb'))
is_valid = validate_avro_schema_evolution(
reader_schema,
writer_schema,
strategy='backward' # or 'forward', 'full'
)
License
Apache-2.0 - See LICENSE for details.
Contributing
Contributions welcome! Please ensure:
- Code follows the existing style (enforced by
make formatandmake lint) - All tests pass (
make test) - Type checks pass (
make typecheck) - License headers are present (
make license)
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file avrokit-0.0.6.tar.gz.
File metadata
- Download URL: avrokit-0.0.6.tar.gz
- Upload date:
- Size: 114.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ea220a9eb2c5136dc796b181a73ea286be64a3d19d0abe8423c904134ab4f7cb
|
|
| MD5 |
2dc1efa5b66cf3260ff94233ec83608e
|
|
| BLAKE2b-256 |
72b5950ff36a50098ed68346462d1eb37d646b7227ae15d3123ee2e98c742f62
|
Provenance
The following attestation bundles were made for avrokit-0.0.6.tar.gz:
Publisher:
release.yml on brandtg/avrokit
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
avrokit-0.0.6.tar.gz -
Subject digest:
ea220a9eb2c5136dc796b181a73ea286be64a3d19d0abe8423c904134ab4f7cb - Sigstore transparency entry: 1191936211
- Sigstore integration time:
-
Permalink:
brandtg/avrokit@e6cefb1612cf4766f4c2a77ad32a6ebc7712bbdb -
Branch / Tag:
refs/tags/v0.0.6 - Owner: https://github.com/brandtg
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@e6cefb1612cf4766f4c2a77ad32a6ebc7712bbdb -
Trigger Event:
release
-
Statement type:
File details
Details for the file avrokit-0.0.6-py3-none-any.whl.
File metadata
- Download URL: avrokit-0.0.6-py3-none-any.whl
- Upload date:
- Size: 10.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
95f9fce79e81a59bed72353a9416ca8e13a4ebed5b9054a19010856efa380883
|
|
| MD5 |
7d7b567d8a7f9f6320225dba653c2fea
|
|
| BLAKE2b-256 |
d9d0104381aabf60555c7dff00aa07e0040b73fad773f8f2f8300ff12a1258f5
|
Provenance
The following attestation bundles were made for avrokit-0.0.6-py3-none-any.whl:
Publisher:
release.yml on brandtg/avrokit
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
avrokit-0.0.6-py3-none-any.whl -
Subject digest:
95f9fce79e81a59bed72353a9416ca8e13a4ebed5b9054a19010856efa380883 - Sigstore transparency entry: 1191936220
- Sigstore integration time:
-
Permalink:
brandtg/avrokit@e6cefb1612cf4766f4c2a77ad32a6ebc7712bbdb -
Branch / Tag:
refs/tags/v0.0.6 - Owner: https://github.com/brandtg
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@e6cefb1612cf4766f4c2a77ad32a6ebc7712bbdb -
Trigger Event:
release
-
Statement type: