Skip to main content

A comprehensive framework for asynchronous I/O operations and utilities.

Project description

[Japanese/日本語]

mtaio - Multi-threaded Async I/O Framework

!! It works on Python3.11+ !!

mtaio is a comprehensive asynchronous I/O framework for Python, providing high-level abstractions for building efficient concurrent applications.

Features

  • Asynchronous Core

    • Task execution with concurrency control
    • Flexible queue implementations (Priority, LIFO)
    • Latch synchronization primitive
  • Caching

    • TTL-based caching with multiple eviction policies (LRU, LFU, FIFO)
    • Distributed cache with node replication
    • Automatic cache cleanup and monitoring
  • Data Processing

    • Observable pattern for reactive data handling
    • Pipeline processing with customizable stages
    • Stream processing with operators
  • Event Handling

    • Asynchronous event emitter with priority support
    • Channel-based communication
    • Pub/sub pattern implementation
  • Protocol Support

    • ASGI application framework
    • MQTT client implementation
    • Async IMAP and SMTP clients
  • Resource Management

    • Rate limiting with token bucket algorithm
    • Concurrency limiting
    • Timeout management
    • Resource monitoring

Installation

pip install mtaio

Quick Start

Here's a simple example that demonstrates some core features:

from mtaio import TaskExecutor, RateLimiter, TimeoutManager

async def main():
    # Create a rate-limited task executor
    executor = TaskExecutor()
    limiter = RateLimiter(rate=10.0)  # 10 operations per second

    @limiter.limit
    async def process_item(item):
        # Process with timeout
        async with TimeoutManager(5.0) as tm:
            result = await tm.run(some_operation(item))
            return result

    # Process multiple items concurrently
    items = range(100)
    results = await executor.map(process_item, items, limit=5)

if __name__ == "__main__":
    asyncio.run(main())

Documentation

Core Components

TaskExecutor

The TaskExecutor provides methods for executing coroutines with concurrency control:

from mtaio import TaskExecutor

async def example():
    executor = TaskExecutor()
    
    # Run multiple coroutines with concurrency limit
    results = await executor.gather(
        coro1(),
        coro2(),
        limit=5
    )

Caching

mtaio provides flexible caching solutions:

from mtaio.cache import TTLCache, DistributedCache

async def cache_example():
    # Simple TTL cache
    cache = TTLCache[str](
        default_ttl=60.0,
        max_size=1000
    )
    
    # Distributed cache
    dist_cache = DistributedCache[str](
        nodes=[
            ('localhost', 5000),
            ('localhost', 5001)
        ]
    )

Event Handling

The event system supports prioritized event handling:

from mtaio.events import EventEmitter

async def event_example():
    emitter = EventEmitter()

    @emitter.on("user_login")
    async def handle_login(event):
        user = event.data
        print(f"User {user.name} logged in")

    await emitter.emit("user_login", user_data)

Advanced Features

Resource Management

Control resource usage with rate limiting and timeouts:

from mtaio.resources import ResourceLimiter

async def resource_example():
    limiter = ResourceLimiter(
        rate=10.0,      # 10 requests per second
        concurrency=5   # Max 5 concurrent executions
    )

    @limiter.limit
    async def limited_operation():
        await process_request()

Monitoring

Monitor system resources and application performance:

from mtaio.monitoring import ResourceMonitor

async def monitor_example():
    monitor = ResourceMonitor()

    @monitor.on_threshold_exceeded
    async def handle_alert(metric, value, threshold):
        print(f"Alert: {metric} exceeded threshold")

    await monitor.start()

Contributing

We welcome contributions! Please follow these steps to contribute:

  1. Fork the repository: github.com/t3tra-dev/mtaio.
  2. Create a feature branch: git checkout -b feat/your-feature.
  3. Commit your changes: git commit -m 'Add a new feature'.
  4. Push to the branch: git push origin feat/your-feature.
  5. Open a Pull Request.

For any questions or support, feel free to open an issue in the repository's Issues section.

License

This project is licensed under the MIT License - see the LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

mtaio-0.1.0.tar.gz (67.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

mtaio-0.1.0-py3-none-any.whl (78.7 kB view details)

Uploaded Python 3

File details

Details for the file mtaio-0.1.0.tar.gz.

File metadata

  • Download URL: mtaio-0.1.0.tar.gz
  • Upload date:
  • Size: 67.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.12.7

File hashes

Hashes for mtaio-0.1.0.tar.gz
Algorithm Hash digest
SHA256 bfdb4c597f00833ed75b56eb29dd369b5bf0866f298f9c3666e99d07dc4a1372
MD5 465e5cda7c3b587f76d3647611625b96
BLAKE2b-256 c7b7ff4131ce7adfa18f946590693e9f72a589e38b651b709faf90c2bd22ed51

See more details on using hashes here.

File details

Details for the file mtaio-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: mtaio-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 78.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.12.7

File hashes

Hashes for mtaio-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 cfa814a09429ed93c04a43fd4cae991cb24bcbdddbdedd7a69be9a082b420396
MD5 ae37e56d4a0717efaddca543041f2175
BLAKE2b-256 39dd62a0e312a6246452a65988fcdde786432e7402fe0e02279b10b8da2ee162

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page