Skip to main content

A unified I/O management toolkit supporting threads, coroutines, files, databases, and data formats

Project description

Gatling

A unified I/O management toolkit for Python. Orchestrate processes, threads, and coroutines in one pipeline. Read and write files, databases, and queues through a consistent API.

pip install gatling

Requires Python 3.11+


Modules

Module Description
Runtime Multi-stage pipeline: process + thread + coroutine workers
Storage — Table Append-only TSV tables, PostgreSQL, SQLite
Storage — Queue Thread-safe in-memory queue
Storage — Dict In-memory dictionary with batch ops
Storage — SFS Virtual file system with path routing
Define ConstDefine for constants/keys, TableDefine for table schemas
HTTP Async/sync HTTP client (GET/POST/PUT/DELETE)
File I/O JSON, JSONL, Pickle, TOML, text, bytes, zstd compression
Watch Stopwatch, function timing decorator

TaskFlow Pipeline

Run CPU-bound, I/O-bound, and network-bound tasks in a single pipeline with automatic queue management.

from gatling.runtime.taskflow_manager import TaskFlowManager
from gatling.storage import MemoryQueue

if __name__ == '__main__':
    q_wait = MemoryQueue()
    for i in range(100):
        q_wait.put(i)

    tfm = TaskFlowManager(q_wait, retry_on_error=False)

    with tfm.execute(log_interval=1):
        tfm.register_process(cpu_task, worker=4)       # multiprocessing
        tfm.register_coroutine(net_task, worker=10)     # asyncio
        tfm.register_thread(disk_task, worker=4)        # threading

    results = list(tfm.get_qdone())

Workers can be functions (one-in-one-out) or generators (one-in-many-out). Stages chain automatically.


Define — ConstDefine / TableDefine

Two Enum-based classes for defining constants. Both inherit from BaseDefine and share:

Method Description
str(m) Returns member name
"name" in Cls Check if member exists by name
Cls.keys() List of all member names
Cls.items() List of [(name, value), ...] tuples (use dict(Cls.items()) for dict)
Cls.get(name, default) Safe lookup, returns .value (default if not found)
Cls["name"] Access .value by name (raises KeyError if missing)
dict(Cls) Convert to {name: value} dict
Cls.Name Access member (enum object with .name and .value)
len(Cls) Number of members
for m in Cls Iterate over members

ConstDefine — constants and keys

from enum import auto
from gatling.define.constdefine import ConstDefine

class Config(ConstDefine):
    # direct values — any Python literal
    Port      = 8080
    Debug     = False
    Rate      = 0.001
    Name      = "my_app"
    # auto() — member name becomes the string value
    username  = auto()          # .value = "username"
    email     = auto()          # .value = "email"
    # None — single sentinel (only one None allowed per class)
    Secret    = None

Access:

Config.Port.value               # 8080
Config.Port.name                # "Port"
str(Config.Port)                # "Port"
Config.username.value           # "username"
Config.Secret.value             # None

Lookup:

"Port" in Config                # True
"nope" in Config                # False
Config.get("Port")              # 8080
Config.get("nope")              # None
Config.get("nope", "fallback") # "fallback"
Config["Port"]                  # 8080
dict(Config)                    # {'Port': 8080, 'Debug': False, ..., 'Secret': None}

Collection:

Config.keys()                   # ['Port', 'Debug', 'Rate', 'Name', 'username', 'email', 'Secret']
Config.items()                  # [('Port', 8080), ('Debug', False), ..., ('Secret', None)]
dict(Config.items())            # {'Port': 8080, 'Debug': False, ..., 'Secret': None}
len(Config)                     # 7
[m.name for m in Config]        # ['Port', 'Debug', 'Rate', 'Name', 'username', 'email', 'Secret']

TableDefine — typed table schema

All members must be Field (non-Field values raise TypeError).

import datetime
from gatling.define.tabledefine import TableDefine, Field

class Users(TableDefine):
    id        = Field(int, primary=True)
    name      = Field(str, nullable=False)
    score     = Field(float, default=0.0)
    active    = Field(bool, default=True)
    birthday  = Field(datetime.date)
    created   = Field(datetime.datetime)

Field attributes (via .value):

Users.name.value.dtype          # str
Users.name.value.default        # ""
Users.name.value.primary        # False
Users.name.value.nullable       # False
Users.score.value.default       # 0.0
Users.id.value.primary          # True

Serialize / deserialize:

Users.score.value.tostr(9.5)                            # "9.5"
Users.score.value.fmstr("9.5")                          # 9.5
Users.active.value.tostr(True)                           # "1"
Users.active.value.fmstr("0")                            # False
Users.birthday.value.tostr(datetime.date(2025, 6, 15))  # "2025-06-15"
Users.birthday.value.fmstr("2025-06-15")                 # datetime.date(2025, 6, 15)

Lookup and collection (same as ConstDefine):

"name" in Users                 # True
Users.get("name")               # Field(dtype=str, ...)  (returns Field directly)
Users.keys()                    # ['id', 'name', 'score', 'active', 'birthday', 'created']
Users.items()                   # [('id', Field(...)), ('name', Field(...)), ...]
Users.get_name2dtype()          # {'id': int, 'name': str, 'score': float, ...}

SQL generation (PostgreSQL dialect):

from sqlalchemy import BigInteger, String, Float
from sqlalchemy.dialects.postgresql import TIMESTAMP, JSONB

class Posts(TableDefine):
    id        = Field(BigInteger, primary=True, autoincrement=True, comment="primary key")
    title     = Field(String(256), nullable=False)
    score     = Field(Float, default=0.0, index=True)
    tags      = Field(JSONB)
    created   = Field(TIMESTAMP(timezone=True), server_default="now()")

Posts.get_sql_create()          # CREATE TABLE IF NOT EXISTS "Posts" (id BIGSERIAL ... )
Posts.get_sql_drop()            # DROP TABLE IF EXISTS "Posts"

Field options:

Option Type Description
dtype type Python type (int, str, ...) or SQLAlchemy type (BigInteger, String(64), ...)
default any Default value (auto-inferred from dtype if omitted)
primary bool Primary key
nullable bool Allow NULL (default True)
unique bool Unique constraint
index bool Create index
autoincrement bool Auto-increment
server_default str SQL server-side default expression (e.g. "now()")
foreign_key str Foreign key reference (e.g. "users.id")
comment str Column comment
tostr callable Custom serializer (auto-inferred from dtype if omitted)
fmstr callable Custom deserializer (auto-inferred from dtype if omitted)

Tables

All table types share the same schema. Only creation differs.

Create

# SQLite
from gatling.storage.g_table.sql.real_sqlite_table import SQLiteTable
ft = SQLiteTable("users", "app.db")
ft.create(schema)

# PostgreSQL
from gatling.storage.g_table.sql.a_pgsql_base import create_pool
from gatling.storage.g_table.sql.real_pgsql_table import PGSQLTable
pool = create_pool("postgresql://user:pass@localhost:5432/db")
ft = PGSQLTable("users", pool)
ft.create(schema)

# Append-only TSV file
from gatling.storage.g_table.append_only.real_tsv_table import TSVTable
ft = TSVTable("users.tsv")
ft.create(schema)

Usage (same for all SQL tables)

# Insert — single or batch
ft.insert({"id": 1, "name": "Alice", "score": 9.5})
ft.insert(
    {"id": 2, "name": "Bob",   "score": 8.0},
    {"id": 3, "name": "Carol", "score": 7.5},
)

# Query
ft.fetch(where={"name": "Alice"})
ft.fetch(order_by={"score": True}, limit=10)
ft.count(where={"score": 9.5})

# Update / Delete
ft.update({"score": 10.0}, where={"id": 1})
ft.delete(where={"id": 3})

# Transaction (rollback on exception)
with ft:
    ft.insert({"id": 4, "name": "Dan", "score": 6.0})
    ft.update({"score": 0.0}, where={"id": 2})

TSV tables use append / extend and support indexing:

ft.append({"ts": "2025-01-01", "level": "INFO", "msg": "started"})
ft.extend([...])

with ft:
    print(len(ft))      # row count
    print(ft[0])         # first row
    print(ft[-1])        # last row
    print(ft[2:5])       # slice

HTTP Client

from gatling.utility.http_fetch_fctns import sync_fetch_http, async_fetch_http, fwrap

# Sync
data, status, size = sync_fetch_http("https://httpbin.org/get", rtype="json")

# Async
data, status, size = await fwrap(async_fetch_http, target_url="https://httpbin.org/get", rtype="json")

File I/O

from gatling.utility.io_fctns import (
    save_json, read_json,
    save_jsonl, read_jsonl,
    save_text, read_text,
    save_pickle, read_pickle,
    save_bytes, read_bytes,
    read_toml, remove_file,
)

save_json({"key": "value"}, "data.json")
save_jsonl([{"a": 1}, {"a": 2}], "data.jsonl")
save_text("hello", "msg.txt")

Watch

from gatling.utility.watch import Watch, watch_time

@watch_time
def slow():
    time.sleep(1)

w = Watch()
# ... work ...
print(w.see_seconds())    # interval since last check
print(w.total_seconds())  # total elapsed

License

MIT

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

gatling-0.3.0.25.11.tar.gz (73.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

gatling-0.3.0.25.11-py3-none-any.whl (105.4 kB view details)

Uploaded Python 3

File details

Details for the file gatling-0.3.0.25.11.tar.gz.

File metadata

  • Download URL: gatling-0.3.0.25.11.tar.gz
  • Upload date:
  • Size: 73.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for gatling-0.3.0.25.11.tar.gz
Algorithm Hash digest
SHA256 76f285366ee88ee9a20825317b66bc475f174c6069f3bc89fcb8a8e3b91d0ca9
MD5 40198c59f2801436d4a7aff3b7a49d5c
BLAKE2b-256 56eb2cda9ffc5601aa4e6637c111b03edc4047a567ab638ccdd9fb14d6545aa0

See more details on using hashes here.

Provenance

The following attestation bundles were made for gatling-0.3.0.25.11.tar.gz:

Publisher: b_publish_to_pypi.yml on MacroMozilla/gatling

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file gatling-0.3.0.25.11-py3-none-any.whl.

File metadata

  • Download URL: gatling-0.3.0.25.11-py3-none-any.whl
  • Upload date:
  • Size: 105.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for gatling-0.3.0.25.11-py3-none-any.whl
Algorithm Hash digest
SHA256 b388fc0233271247b792c6bc8e9bb2a49d4768c40298202e4a5915a27c873819
MD5 5029743975bd5f9ed9f856ebada48700
BLAKE2b-256 a2639765982b1cc4f9b418229a12203188d1952d3413c74700ed7e187c465f7e

See more details on using hashes here.

Provenance

The following attestation bundles were made for gatling-0.3.0.25.11-py3-none-any.whl:

Publisher: b_publish_to_pypi.yml on MacroMozilla/gatling

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page