Utility library for async database operations, HTTP requests, and parallel execution
Project description
esuls
A Python utility library for async database operations, HTTP requests, and parallel execution utilities.
Features
- AsyncDB - Type-safe async SQLite with dataclass schemas
- Async HTTP client - High-performance HTTP client with retry logic and connection pooling
- Parallel utilities - Async parallel execution with concurrency control
- CloudFlare bypass - curl-cffi integration for bypassing protections
Installation
# With pip
pip install esuls
# With uv
uv pip install esuls
Usage
Parallel Execution
import asyncio
from esuls import run_parallel
async def fetch_data(id):
await asyncio.sleep(1)
return f"Data {id}"
async def main():
# Run multiple async functions in parallel with concurrency limit
results = await run_parallel(
lambda: fetch_data(1),
lambda: fetch_data(2),
lambda: fetch_data(3),
limit=20 # Max concurrent tasks
)
print(results)
asyncio.run(main())
Database Client (AsyncDB)
Built on SQLAlchemy 2.0 Core async. Dataclass-as-schema ergonomics; SQLite
behind the scenes with a tuned PRAGMA set (WAL, foreign_keys=ON, mmap_size,
temp_store=MEMORY, wal_autocheckpoint, busy_timeout=30s). Two engines per
db: a StaticPool writer (single conn, serialised by an in-process write
lock) and a 4-conn reader pool for concurrent WAL reads.
import asyncio
from dataclasses import dataclass, field
from esuls import AsyncDB, BaseModel
@dataclass
class User(BaseModel): # id + created_at + updated_at, all UTC
name: str = field(default="", metadata={"index": True})
email: str = field(default="", metadata={"unique": True})
age: int = 0
async def main():
db = AsyncDB("users.db", "users", User)
# Single upsert (idempotent by primary key).
await db.save(User(name="Alice", email="alice@example.com", age=30))
# Atomic batch — one round-trip, fail-fast on any bad item.
await db.save_batch([
User(name="Bob", email="bob@example.com", age=25),
User(name="Charlie", email="charlie@example.com", age=35),
])
# Best-effort batch — per-item loop, log+skip rotten items.
await db.save_each([User(name="x", email=""), User(name="y", email="z@e.com")])
# Queries.
results = await db.find(name="Alice")
adults = await db.find(age__gte=18, order_by="-age")
count = await db.count(age__gte=18)
user = await db.get_by_id("some-uuid")
# Bulk updates / deletes (both require at least one filter).
n = await db.update_many({"status": "verified"}, email__like="%@example.com")
n = await db.delete_many(age__lt=18)
# Aggregations: count / count_distinct / sum / avg / min / max + GROUP BY + HAVING.
by_age = await db.aggregate(
group_by="age",
count=True, count_distinct="email",
having={"count__gte": 2},
order_by="-count",
)
# Stream big result sets without materialising in memory.
async for u in db.stream(order_by="created_at", batch_size=500):
process(u)
# Project specific columns (returns List[dict], not List[User]).
rows = await db.find_columns(["id", "email"], age__gte=18)
# Always close: PRAGMA optimize + wal_checkpoint(TRUNCATE) on exit.
await db.close()
asyncio.run(main())
Query operators (suffix on filter kwargs, e.g. age__gte=18):
eq(default),neq,gt,gte,lt,ltelike— SQLLIKEin,not_in— pass any iterableis_null=True/False,not_null=True/Falsebetween=(lo, hi)— inclusive
HAVING in aggregate() accepts the same suffixes — but on alias names
(count, count_distinct_<col>, sum_<col>, …), not raw columns.
Schema base classes
Pick the base class that matches the table's needs:
| Base | Primary key | Timestamps | Use for |
|---|---|---|---|
IdModel |
str (UUID4) |
— | Cache / lookup tables — no created_at/updated_at columns |
IntIdModel |
int autoincrement |
— | High-throughput tables where UUID string PK fragments the B-tree |
TimestampedModel |
str (UUID4) |
created_at + updated_at (UTC) |
The default — same as BaseModel alias |
TimestampedIntModel |
int autoincrement |
created_at + updated_at (UTC) |
Combines int PK perf + auto-managed timestamps |
BaseModel is a backward-compat alias for TimestampedModel. Timestamps
use the public utcnow() clock (always timezone-aware UTC).
File-based migrations
Drop NNN_*.py scripts next to the db file under migrations/. Each
exports version, description, and async def upgrade(conn):
# my_app/migrations/001_add_email_index.py
from sqlalchemy import text
version = 1
description = "Index users.email for login lookup"
async def upgrade(conn):
await conn.execute(text("CREATE INDEX idx_users_email ON users(email)"))
AsyncDB picks them up on first use, applies pending ones inside the same
transaction as schema init (atomic), and stamps PRAGMA user_version.
Fresh databases leap-frog to the latest version (the dataclass already
represents the post-migration state). Migrations are forward-only.
# Inspect without opening a db:
from esuls import discover_migrations
print(discover_migrations(Path("my_app/migrations")))
# Inspect with applied/pending status:
db = AsyncDB(...)
print(await db.list_migrations())
Concurrency model
- Writes serialise through a per-loop, per-db-path
asyncio.Lock+ a single physical writer connection. No SQLITE_BUSY surfaces to the caller; transient contention is retried with exponential backoff (jittered) up tomax_retries(configurable per call). - Reads use a 4-conn pool — true concurrent reads under WAL.
- Cross-loop reuse is supported: the same
AsyncDBinstance survives multipleasyncio.run()calls because locks/engines are keyed on the running loop. await db.close()(orasync with AsyncDB(...)) is required for clean fd release. Skipping it emits aResourceWarningwhen the engines are GC'd; aiosqlite's worker thread is daemon-marked so the process never hangs on missing close.
HTTP Request Client
import asyncio
from esuls import AsyncRequest, make_request
# Using context manager (recommended for multiple requests)
async def example1():
async with AsyncRequest() as client:
response = await client.request(
url="https://api.example.com/data",
method="GET",
add_user_agent=True,
max_attempt=3,
timeout_request=30
)
if response:
data = response.json()
print(data)
# Using standalone function (uses shared connection pool)
async def example2():
response = await make_request(
url="https://api.example.com/users",
method="POST",
json_data={"name": "Alice", "email": "alice@example.com"},
headers={"Authorization": "Bearer token"},
max_attempt=5,
force_response=True # Return response even on error
)
if response:
print(response.status_code)
print(response.text)
asyncio.run(example1())
Request Parameters:
url- Request URLmethod- HTTP method (GET, POST, PUT, DELETE, etc.)headers- Request headerscookies- Cookies dictparams- URL parametersjson_data- JSON bodyfiles- Multipart file uploadproxy- Proxy URLtimeout_request- Timeout in seconds (default: 60)max_attempt- Max retry attempts (default: 10)force_response- Return response even on error (default: False)json_response- Validate JSON response (default: False)json_response_check- Check for key in JSON responseskip_response- Skip if text contains pattern(s)exception_sleep- Delay between retries in seconds (default: 10)add_user_agent- Add random User-Agent header (default: False)
CloudFlare Bypass
import asyncio
from esuls import make_request_cffi
async def fetch_protected_page():
html = await make_request_cffi("https://protected-site.com")
if html:
print(html)
asyncio.run(fetch_protected_page())
Development
Project Structure
utils/
├── pyproject.toml
├── README.md
├── LICENSE
└── src/
└── esuls/
├── __init__.py
├── utils.py # Parallel execution utilities
├── db_cli.py # AsyncDB with dataclass schemas
└── request_cli.py # Async HTTP client
Local Development Installation
# Navigate to the project
cd utils
# Install in editable mode with uv
uv pip install -e .
# Or with pip
pip install -e .
Building and Publishing
# With uv
uv build && twine upload dist/*
# Or with traditional tools
pip install build twine
python -m build
twine upload dist/*
Advanced Features
AsyncDB Schema Definition
from dataclasses import dataclass, field
from esuls import BaseModel
from datetime import datetime
from typing import Optional, List
import enum
class Status(enum.Enum):
ACTIVE = "active"
INACTIVE = "inactive"
@dataclass
class User(BaseModel):
# BaseModel provides: id, created_at, updated_at
# Indexed field
email: str = field(metadata={"index": True, "unique": True})
# Simple fields
name: str = ""
age: int = 0
# Enum support
status: Status = Status.ACTIVE
# JSON-serialized complex types
tags: List[str] = field(default_factory=list)
# Optional fields
phone: Optional[str] = None
# Table constraints (optional)
__table_constraints__ = [
"CHECK (age >= 0)"
]
Connection Pooling & Performance
The HTTP client uses:
- Shared connection pool (prevents "too many open files" errors)
- Automatic retry with exponential backoff
- SSL optimization
- Random User-Agent rotation
- Cookie and header persistence
License
MIT License
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file esuls-0.2.1.tar.gz.
File metadata
- Download URL: esuls-0.2.1.tar.gz
- Upload date:
- Size: 109.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.4
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
347cb69b330a8a60ef167cda253a358e098626dcbe6ea49945f8851cb54f5f0d
|
|
| MD5 |
66bdf9cef62f4036363c5755cfb591d9
|
|
| BLAKE2b-256 |
7407b7ed4b5c7660da25e64b170c469ce425ba35d7e7556e82033b830cc1ecbc
|
File details
Details for the file esuls-0.2.1-py3-none-any.whl.
File metadata
- Download URL: esuls-0.2.1-py3-none-any.whl
- Upload date:
- Size: 113.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.4
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
6cae67cbad40775d8c1b3f2fb6388252d77b171210b3f73df86d69553d4b1720
|
|
| MD5 |
7121bae68f54a20adbf83a4ff10b140b
|
|
| BLAKE2b-256 |
bf2343aa7c24f580e803671adbc6359357e447d9c28780faf865ce65f4cfd035
|