Skip to main content

O(1) resume for large JSONL streams via byte-offset indexing

Project description

jsonl-resumable

Index JSONL files for instant random access and resumable iteration.

PyPI version Python 3.10+ License: MIT

The problem

You have a 10GB JSONL file. Your processing script crashes at line 25 million. To resume, you have to iterate through all 25 million lines you already processed just to get back to where you were:

for i, line in enumerate(open("huge.jsonl")):
    if i < 25_000_000:
        continue  # this takes forever
    process(line)

This library builds a byte-offset index of your file so you can seek directly to any line:

from jsonl_resumable import JsonlIndex

index = JsonlIndex("huge.jsonl")
for event in index.iter_json_from(25_000_000):  # instant
    process(event)

Install

pip install jsonl-resumable

Basic usage

from jsonl_resumable import JsonlIndex

# First run builds the index (takes a while for big files)
# Subsequent runs load it from disk
index = JsonlIndex("events.jsonl")

# Jump to any line
event = index.read_json(1_000_000)

# Iterate from a specific line
for event in index.iter_json_from(last_processed):
    process(event)

# If the file grew, update the index (only scans new bytes)
index.update()

Useful for data pipelines, log analysis, ML training data—anywhere you're dealing with large JSONL files and don't want to start over every time something fails.

API

index = JsonlIndex("data.jsonl")

# Read a single line (parsed or raw)
index.read_json(1000)        # returns dict or list
index.read_line(1000)        # returns raw string
index[1000]                  # same as read_line

# Iterate starting from line N
index.iter_json_from(5000)   # yields parsed JSON
index.iter_from(5000)        # yields raw strings

# Async iteration (for web frameworks)
async for event in index.aiter_json_from(5000):
    await process(event)

# When the file grows
index.update()               # indexes new lines, returns count added

# Properties
index.total_lines
index.file_size

Constructor options:

JsonlIndex(
    "data.jsonl",
    checkpoint_interval=100,  # trade memory for speed (lower = more memory)
    index_path="custom.idx",  # custom index file location
    auto_save=True,           # save index to disk after build/update
)

You can also call rebuild() to force a full re-index, or save() to persist manually.

Incremental updates

If you're appending to your JSONL file over time, you don't need to rebuild the whole index:

index = JsonlIndex("events.jsonl")
print(index.total_lines)  # 1000

# ... later, after appending more data ...

new_count = index.update()
print(new_count)          # 50
print(index.total_lines)  # 1050

update() picks up where the index left off and only scans the new bytes.

Async streaming

If you're building a web API and want to stream JSONL data without blocking, there's async support:

from jsonl_resumable import JsonlIndex

index = JsonlIndex("events.jsonl")

# Basic async iteration
async for event in index.aiter_json_from(start_line):
    await process(event)

# With a context manager (validates file state, guarantees cleanup)
async with index.async_stream(start_line=1000, limit=500) as stream:
    async for event in stream:
        await send_to_client(event)
    print(f"Sent {stream.yielded_count} events")

Works with FastAPI, Starlette, aiohttp, etc. The async methods use batched I/O internally—reading 100 lines per thread hop instead of one at a time—so you're not paying for a context switch on every line.

Handling bad data:

# Skip lines that aren't valid JSON
async for event in index.aiter_json_from(0, on_decode_error="skip"):
    process(event)

# Or get the raw string for invalid lines
async for event in index.aiter_json_from(0, on_decode_error="raw"):
    if isinstance(event, str):
        log_bad_line(event)
    else:
        process(event)

How it works

The library scans your file once and records the byte offset of each line. These offsets get saved to {filename}.idx. When you want line N, it just does file.seek(offset) instead of reading through the whole file.

If the file's size or modification time changes, it detects that and rebuilds automatically.

Examples

Checkpointing for crash recovery:

from pathlib import Path
from jsonl_resumable import JsonlIndex

checkpoint = Path("progress.txt")
index = JsonlIndex("events.jsonl")

start = int(checkpoint.read_text()) if checkpoint.exists() else 0

for i, event in enumerate(index.iter_json_from(start), start=start):
    process(event)
    if i % 1000 == 0:
        checkpoint.write_text(str(i))

Random sampling:

import random
from jsonl_resumable import JsonlIndex

index = JsonlIndex("training_data.jsonl")
sample_ids = random.sample(range(index.total_lines), k=1000)
samples = [index.read_json(i) for i in sample_ids]

Tail (last N lines):

index = JsonlIndex("logs.jsonl")
for line in index.iter_from(index.total_lines - 100):
    print(line)

FastAPI streaming endpoint:

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from jsonl_resumable import JsonlIndex
import json

app = FastAPI()
index = JsonlIndex("events.jsonl")

@app.get("/events/stream")
async def stream_events(start: int = 0):
    async def generate():
        async with index.async_stream(start_line=start) as stream:
            async for event in stream:
                yield f"data: {json.dumps(event)}\n\n"
    return StreamingResponse(generate(), media_type="text/event-stream")

FAQ

How big is the index file?

About 15 bytes per line. A 10 million line file produces roughly a 150MB index.

What if the file gets modified (not just appended)?

The library compares file size and mtime. If something changed, it rebuilds. You can also call rebuild() explicitly.

Is it thread-safe?

Reads are fine from multiple threads. Don't call update() or rebuild() concurrently.

Why not linecache?

linecache loads the entire file into memory. This uses byte offsets so memory usage stays constant regardless of file size.

Do the async methods actually do async I/O?

Not exactly. They use asyncio.to_thread() to run blocking file reads in a thread pool. This keeps your event loop responsive, but the underlying I/O is still synchronous. For most cases this works well—the batched reads (100 lines per thread hop by default) keep the overhead low.

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

jsonl_resumable-0.5.0.tar.gz (38.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

jsonl_resumable-0.5.0-py3-none-any.whl (23.1 kB view details)

Uploaded Python 3

File details

Details for the file jsonl_resumable-0.5.0.tar.gz.

File metadata

  • Download URL: jsonl_resumable-0.5.0.tar.gz
  • Upload date:
  • Size: 38.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for jsonl_resumable-0.5.0.tar.gz
Algorithm Hash digest
SHA256 e3cc009244800da322a28003617595bfcefed5d2671bc570f889d118b2971b7c
MD5 8fe71cd78d8933d332a779c3f15a94e9
BLAKE2b-256 c74de02faa9450b35e7da5a395161eadbffb056f76c3a30752b813d70865d45d

See more details on using hashes here.

Provenance

The following attestation bundles were made for jsonl_resumable-0.5.0.tar.gz:

Publisher: publish.yml on pranavtotla/jsonl-resumable

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file jsonl_resumable-0.5.0-py3-none-any.whl.

File metadata

File hashes

Hashes for jsonl_resumable-0.5.0-py3-none-any.whl
Algorithm Hash digest
SHA256 8afaede0d12c30b05bc0e80ec3e93a8a47f2380022e20cbb121f330663ffcaae
MD5 23092a8df0e8a8b1f66caaff533de285
BLAKE2b-256 68aa9c67d2b3ab14320fbdf4b23f67b1968c5d62c7268bd4cc331a9844ce86cc

See more details on using hashes here.

Provenance

The following attestation bundles were made for jsonl_resumable-0.5.0-py3-none-any.whl:

Publisher: publish.yml on pranavtotla/jsonl-resumable

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page