Skip to main content

A unified I/O management toolkit supporting threads, coroutines, files, databases, and data formats

Project description

Gatling

A unified I/O management toolkit for Python. Orchestrate processes, threads, and coroutines in one pipeline. Read and write files, databases, and queues through a consistent API.

pip install gatling

Requires Python 3.11+


Modules

Module Description
Runtime Multi-stage pipeline: process + thread + coroutine workers
Storage — Table Append-only TSV tables, PostgreSQL, SQLite
Storage — Queue Thread-safe in-memory queue
Storage — Dict In-memory dictionary with batch ops
Storage — SFS Virtual file system with path routing
Define ConstDefine for constants/keys, TableDefine for table schemas
HTTP Async/sync HTTP client (GET/POST/PUT/DELETE)
File I/O JSON, JSONL, Pickle, TOML, text, bytes, zstd compression
Watch Stopwatch, function timing decorator

TaskFlow Pipeline

Run CPU-bound, I/O-bound, and network-bound tasks in a single pipeline with automatic queue management.

from gatling.runtime.taskflow_manager import TaskFlowManager
from gatling.storage import MemoryQueue

if __name__ == '__main__':
    q_wait = MemoryQueue()
    for i in range(100):
        q_wait.put(i)

    tfm = TaskFlowManager(q_wait, retry_on_error=False)

    with tfm.execute(log_interval=1):
        tfm.register_process(cpu_task, worker=4)       # multiprocessing
        tfm.register_coroutine(net_task, worker=10)     # asyncio
        tfm.register_thread(disk_task, worker=4)        # threading

    results = list(tfm.get_qdone())

Workers can be functions (one-in-one-out) or generators (one-in-many-out). Stages chain automatically.


Define — ConstDefine / TableDefine

Two Enum-based classes for defining constants. Both inherit from BaseDefine and share:

Method Description
str(m) Returns member name
"name" in Cls Check if member exists by name
Cls.keys() List of all member names
Cls.items() List of [(name, value), ...] tuples (use dict(Cls.items()) for dict)
Cls.get(name, default) Safe lookup, returns .value (default if not found)
Cls["name"] Access .value by name (raises KeyError if missing)
dict(Cls) Convert to {name: value} dict
Cls.Name Access member (enum object with .name and .value)
len(Cls) Number of members
for m in Cls Iterate over members

ConstDefine — constants and keys

from enum import auto
from gatling.define.constdefine import ConstDefine

class Config(ConstDefine):
    # direct values — any Python literal
    Port      = 8080
    Debug     = False
    Rate      = 0.001
    Name      = "my_app"
    # auto() — member name becomes the string value
    username  = auto()          # .value = "username"
    email     = auto()          # .value = "email"
    # None — single sentinel (only one None allowed per class)
    Secret    = None

Access:

Config.Port.value               # 8080
Config.Port.name                # "Port"
str(Config.Port)                # "Port"
Config.username.value           # "username"
Config.Secret.value             # None

Lookup:

"Port" in Config                # True
"nope" in Config                # False
Config.get("Port")              # 8080
Config.get("nope")              # None
Config.get("nope", "fallback") # "fallback"
Config["Port"]                  # 8080
dict(Config)                    # {'Port': 8080, 'Debug': False, ..., 'Secret': None}

Collection:

Config.keys()                   # ['Port', 'Debug', 'Rate', 'Name', 'username', 'email', 'Secret']
Config.items()                  # [('Port', 8080), ('Debug', False), ..., ('Secret', None)]
dict(Config.items())            # {'Port': 8080, 'Debug': False, ..., 'Secret': None}
len(Config)                     # 7
[m.name for m in Config]        # ['Port', 'Debug', 'Rate', 'Name', 'username', 'email', 'Secret']

TableDefine — typed table schema

All members must be Field (non-Field values raise TypeError).

import datetime
from gatling.define.tabledefine import TableDefine, Field

class Users(TableDefine):
    id        = Field(int, primary=True)
    name      = Field(str, nullable=False)
    score     = Field(float, default=0.0)
    active    = Field(bool, default=True)
    birthday  = Field(datetime.date)
    created   = Field(datetime.datetime)

Field attributes (via .value):

Users.name.value.dtype          # str
Users.name.value.default        # ""
Users.name.value.primary        # False
Users.name.value.nullable       # False
Users.score.value.default       # 0.0
Users.id.value.primary          # True

Serialize / deserialize:

Users.score.value.tostr(9.5)                            # "9.5"
Users.score.value.fmstr("9.5")                          # 9.5
Users.active.value.tostr(True)                           # "1"
Users.active.value.fmstr("0")                            # False
Users.birthday.value.tostr(datetime.date(2025, 6, 15))  # "2025-06-15"
Users.birthday.value.fmstr("2025-06-15")                 # datetime.date(2025, 6, 15)

Lookup and collection (same as ConstDefine):

"name" in Users                 # True
Users.get("name")               # Field(dtype=str, ...)  (returns Field directly)
Users.keys()                    # ['id', 'name', 'score', 'active', 'birthday', 'created']
Users.items()                   # [('id', Field(...)), ('name', Field(...)), ...]
Users.get_name2dtype()          # {'id': int, 'name': str, 'score': float, ...}

SQL generation (PostgreSQL dialect):

from sqlalchemy import BigInteger, String, Float
from sqlalchemy.dialects.postgresql import TIMESTAMP, JSONB

class Posts(TableDefine):
    id        = Field(BigInteger, primary=True, autoincrement=True, comment="primary key")
    title     = Field(String(256), nullable=False)
    score     = Field(Float, default=0.0, index=True)
    tags      = Field(JSONB)
    created   = Field(TIMESTAMP(timezone=True), server_default="now()")

Posts.get_sql_create()          # CREATE TABLE IF NOT EXISTS "Posts" (id BIGSERIAL ... )
Posts.get_sql_drop()            # DROP TABLE IF EXISTS "Posts"

Field options:

Option Type Description
dtype type Python type (int, str, ...) or SQLAlchemy type (BigInteger, String(64), ...)
default any Default value (auto-inferred from dtype if omitted)
primary bool Primary key
nullable bool Allow NULL (default True)
unique bool Unique constraint
index bool Create index
autoincrement bool Auto-increment
server_default str SQL server-side default expression (e.g. "now()")
foreign_key str Foreign key reference (e.g. "users.id")
comment str Column comment
tostr callable Custom serializer (auto-inferred from dtype if omitted)
fmstr callable Custom deserializer (auto-inferred from dtype if omitted)

Tables

All table types share the same schema. Only creation differs.

Create

# SQLite
from gatling.storage.g_table.sql.real_sqlite_table import SQLiteTable
ft = SQLiteTable("users", "app.db")
ft.create(schema)

# PostgreSQL
from gatling.storage.g_table.sql.a_pgsql_base import create_pool
from gatling.storage.g_table.sql.real_pgsql_table import PGSQLTable
pool = create_pool("postgresql://user:pass@localhost:5432/db")
ft = PGSQLTable("users", pool)
ft.create(schema)

# Append-only TSV file
from gatling.storage.g_table.append_only.real_tsv_table import TSVTable
ft = TSVTable("users.tsv")
ft.create(schema)

Usage (same for all SQL tables)

# Insert — single or batch
ft.insert({"id": 1, "name": "Alice", "score": 9.5})
ft.insert(
    {"id": 2, "name": "Bob",   "score": 8.0},
    {"id": 3, "name": "Carol", "score": 7.5},
)

# Query
ft.fetch(where={"name": "Alice"})
ft.fetch(order_by={"score": True}, limit=10)
ft.count(where={"score": 9.5})

# Update / Delete
ft.update({"score": 10.0}, where={"id": 1})
ft.delete(where={"id": 3})

# Transaction (rollback on exception)
with ft:
    ft.insert({"id": 4, "name": "Dan", "score": 6.0})
    ft.update({"score": 0.0}, where={"id": 2})

TSV tables use append / extend and support indexing:

ft.append({"ts": "2025-01-01", "level": "INFO", "msg": "started"})
ft.extend([...])

with ft:
    print(len(ft))      # row count
    print(ft[0])         # first row
    print(ft[-1])        # last row
    print(ft[2:5])       # slice

HTTP Client

from gatling.utility.http_fetch_fctns import sync_fetch_http, async_fetch_http, fwrap

# Sync
data, status, size = sync_fetch_http("https://httpbin.org/get", rtype="json")

# Async
data, status, size = await fwrap(async_fetch_http, target_url="https://httpbin.org/get", rtype="json")

File I/O

from gatling.utility.io_fctns import (
    save_json, read_json,
    save_jsonl, read_jsonl,
    save_text, read_text,
    save_pickle, read_pickle,
    save_bytes, read_bytes,
    read_toml, remove_file,
)

save_json({"key": "value"}, "data.json")
save_jsonl([{"a": 1}, {"a": 2}], "data.jsonl")
save_text("hello", "msg.txt")

Watch

from gatling.utility.watch import Watch, watch_time

@watch_time
def slow():
    time.sleep(1)

w = Watch()
# ... work ...
print(w.see_seconds())    # interval since last check
print(w.total_seconds())  # total elapsed

License

MIT

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

gatling-0.3.0.25.10.tar.gz (72.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

gatling-0.3.0.25.10-py3-none-any.whl (104.6 kB view details)

Uploaded Python 3

File details

Details for the file gatling-0.3.0.25.10.tar.gz.

File metadata

  • Download URL: gatling-0.3.0.25.10.tar.gz
  • Upload date:
  • Size: 72.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for gatling-0.3.0.25.10.tar.gz
Algorithm Hash digest
SHA256 167f02af74b5aea4ab624315beb47a22208bfd64a238dcfa603b0151aad16afb
MD5 a4b0ae4f0ce5ae12e5f8ac1ac3434aa5
BLAKE2b-256 9f5007b835f0dc4c45f943c0cbb553028520d6a8b8ab36747a98f501ddb8a05d

See more details on using hashes here.

Provenance

The following attestation bundles were made for gatling-0.3.0.25.10.tar.gz:

Publisher: b_publish_to_pypi.yml on MacroMozilla/gatling

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file gatling-0.3.0.25.10-py3-none-any.whl.

File metadata

  • Download URL: gatling-0.3.0.25.10-py3-none-any.whl
  • Upload date:
  • Size: 104.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for gatling-0.3.0.25.10-py3-none-any.whl
Algorithm Hash digest
SHA256 1ed51a9aef6f90d476c46b5825d49f84b5128311a9d18f36876157e27a4e5f98
MD5 85de142bbfc154abc02e86477435c9c4
BLAKE2b-256 387050202861b8ac101712ca976320bf090419a92f1bd4a22f77f77e8ea5ed3c

See more details on using hashes here.

Provenance

The following attestation bundles were made for gatling-0.3.0.25.10-py3-none-any.whl:

Publisher: b_publish_to_pypi.yml on MacroMozilla/gatling

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page