Skip to main content

A unified I/O management toolkit supporting threads, coroutines, files, databases, and data formats

Project description

🧩 Gatling Utility Library

Gatling is a lightweight asynchronous utility library built on aiohttp, asyncio, and threading. It provides concurrent HTTP requests, coroutine-thread orchestration, data pipelines, and handy file utilities.


📦 Installation

pip install gatling

📁 Module Overview

Module Description
http_client.py Async/sync HTTP request handling
coroutine_thread_mana.py Thread + coroutine concurrent task manager
file_utils.py Common file read/write helpers
taskflow_manager.py Multi-stage task pipeline system
watch.py Stopwatch and timing tools

🌐 1. HTTP Client Module

File: gatling/utility/http_client.py

Provides unified async/sync HTTP request helpers supporting GET, POST, PUT, and DELETE.

Example

from gatling.utility.http_fetch_fctns import sync_fetch_http, async_fetch_http, fwrap
import asyncio, aiohttp

target_url = "https://httpbin.org/get"
# --- Synchronous request ---
result, status, size = sync_fetch_http(target_url)
print(status, size, result[:80])


# --- Asynchronous request ---
async def main():
    res, status, size = await fwrap(async_fetch_http, target_url=target_url, rtype="json")
    print(res)


asyncio.run(main())

Main functions

  • async_fetch_http(...): Generic async HTTP fetcher
  • fwrap(...): Safely manages aiohttp session lifecycle
  • sync_fetch_http(...): Simple synchronous wrapper (for scripts)

🧵 2. Coroutine & Thread Manager

File: gatling/runtime/runtime_task_manager_thread.py

File: gatling/runtime/runtime_task_manager_coroutine.py

A hybrid thread + coroutine manager that can run both sync and async tasks concurrently.

Example

from gatling.runtime.runtime_task_manager_thread import RuntimeTaskManagerThread
import time


# --- Async task ---
def async_fctn(name, delay=0.1):
    print(f"{name} running")
    time.sleep(delay)


def async_iter(name, delay=0.1):
    for i in range(5):
        sent = f"{name}-{i}"
        yield sent
        time.sleep(delay)  # simulate async source


# Async mode
print("--- Function ---")
m = RuntimeTaskManagerThread(async_fctn, args=("fctn",), kwargs={"delay": 0.1})
m.start(worker=2)
time.sleep(0.5)
m.stop()

print("--- Function with Context Manager ---")
m = RuntimeTaskManagerThread(async_fctn, args=("fctn",), kwargs={"delay": 0.1})
with m.execute(worker=2):
    time.sleep(0.5)

print("--- Iterator ---")
m = RuntimeTaskManagerThread(async_iter, args=("iter",), kwargs={"delay": 0.1})
m.start(worker=2)
time.sleep(0.5)
m.stop()

print("--- Iterator with Context Manager ---")
m = RuntimeTaskManagerThread(async_iter, args=("iter",), kwargs={"delay": 0.1})
with m.execute(worker=2):
    time.sleep(0.5)
from gatling.runtime.runtime_task_manager_coroutine import RuntimeTaskManagerCoroutine
from gatling.runtime.runtime_task_manager_thread import RuntimeTaskManagerThread
import asyncio, time


# --- Async task ---

async def async_fctn(name, delay=0.5):
    print(f"{name} running")
    await asyncio.sleep(delay)


async def async_iter(name, delay=0.1):
    for i in range(10):
        sent = f"{name}-{i}"
        print(sent)
        yield sent
        await asyncio.sleep(delay)  # simulate async source


# Async mode
print("--- Async Function ---")
m = RuntimeTaskManagerCoroutine(async_fctn, args=("async_fctn",), kwargs={"delay": 0.1})
m.start(worker=2)
time.sleep(0.5)
m.stop()

print("--- Async Function with Context Manager ---")
m = RuntimeTaskManagerCoroutine(async_fctn, args=("async_fctn",), kwargs={"delay": 0.1})
with m.execute(worker=2):
    time.sleep(0.5)

print("--- Async Iterator ---")
m = RuntimeTaskManagerCoroutine(async_iter, args=("async_iter",), kwargs={"delay": 0.1})
m.start(worker=2)
time.sleep(0.5)
m.stop()

print("--- Async Iterator with Context Manager ---")
m = RuntimeTaskManagerCoroutine(async_iter, args=("async_iter",), kwargs={"delay": 0.1})
with m.execute(worker=2):
    time.sleep(0.5)

Main methods

  • .start(worker:int): Starts the workers
  • .stop(): Stops all threads safely

💾 3. File Utility Module

File: gatling/utility/io_fctns.py

Convenient helpers for reading and writing JSON, JSONL, Pickle, TOML, text, and byte files.

Example

from gatling.utility.io_fctns import *

save_json({"a": 1}, "data.json")
print(read_json("data.json"))
remove_file("data.json")

save_jsonl([{"x": 1}, {"x": 2}], "data.jsonl")
print(read_jsonl("data.jsonl"))

remove_file("data.jsonl")

save_text("Hello world", "msg.txt")
print(read_text("msg.txt"))

remove_file("msg.txt")

Main functions

  • save_json / read_json
  • save_jsonl / read_jsonl
  • save_text / read_text
  • save_pickle / read_pickle
  • save_bytes / read_bytes
  • read_toml
  • remove_file

🔄 4. Task Flow Manager

File: gatling/utility/taskflow_manager.py

Builds a multi-stage processing pipeline — combining threads, coroutines, and queues. Each stage can be synchronous or asynchronous.

Example

from gatling.runtime.taskflow_manager import TaskFlowManager
from gatling.storage.queue.memory_queue import MemoryQueue
import asyncio, time


def sync_square(x):
    time.sleep(0.2)
    return x * x


async def async_double(x):
    await asyncio.sleep(0.3)
    return x * 2


def sync_to_str(x):
    return f"Result<{x}>"


q_wait = MemoryQueue()
q_done = MemoryQueue()

# Queue must have items before starting.
for i in range(5):
    q_wait.put(i)

tfm = TaskFlowManager(q_wait, q_done, retry_on_error=False)
with tfm.execute(log_interval=1):
    tfm.register_thread(sync_square, worker=2)
    tfm.register_coroutine(async_double, worker=5)
    tfm.register_thread(sync_to_str, worker=2)

print(list(q_done))

Main classes

  • TaskFlowManager: Coordinates multi-stage parallel workflows
  • TaskQueueTracker: Monitors queue states, errors, and speed metrics

⏱️ 5. Watch Utility

File: gatling/utility/watch.py

A simple stopwatch for timing operations, plus a decorator for measuring function execution time.

Example

from gatling.utility.watch import Watch, watch_time
import time


@watch_time
def slow_func():
    time.sleep(1)


slow_func()

w = Watch()
time.sleep(0.5)
print("Δt:", w.see_seconds(), "Total:", w.total_seconds())

Main items

  • Watch: Manual stopwatch class for measuring intervals
  • watch_time: Decorator that prints function execution time

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

gatling-0.2.5.0.tar.gz (21.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

gatling-0.2.5.0-py3-none-any.whl (30.1 kB view details)

Uploaded Python 3

File details

Details for the file gatling-0.2.5.0.tar.gz.

File metadata

  • Download URL: gatling-0.2.5.0.tar.gz
  • Upload date:
  • Size: 21.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for gatling-0.2.5.0.tar.gz
Algorithm Hash digest
SHA256 4a4d3021b1ce51183aacdc188a40056c060fd241e6d49683e10d5c8584beb8ca
MD5 ced1aca326d7e7510dd57b55833708fe
BLAKE2b-256 eb2d182eb14cfb06dd4868d6a28b40559a75946fcc168962fd552663436dadaf

See more details on using hashes here.

File details

Details for the file gatling-0.2.5.0-py3-none-any.whl.

File metadata

  • Download URL: gatling-0.2.5.0-py3-none-any.whl
  • Upload date:
  • Size: 30.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for gatling-0.2.5.0-py3-none-any.whl
Algorithm Hash digest
SHA256 20110ba8ae63f3373d199004e41af4b29a0f04ae62350f2bf72cde32808a2796
MD5 35f9650f4bdf65b37005264978cae320
BLAKE2b-256 0f4835c74100c8e78e11456bd9a4c399a5cda35b4c1ff986c6941207d9d4faa9

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page