A comprehensive framework for asynchronous I/O operations and utilities.
Project description
mtaio - Multi-threaded Async I/O Framework
!! It works on Python3.11+ !!
mtaio is a comprehensive asynchronous I/O framework for Python, providing high-level abstractions for building efficient concurrent applications.
Features
-
Asynchronous Core
- Task execution with concurrency control
- Flexible queue implementations (Priority, LIFO)
- Latch synchronization primitive
-
Caching
- TTL-based caching with multiple eviction policies (LRU, LFU, FIFO)
- Distributed cache with node replication
- Automatic cache cleanup and monitoring
-
Data Processing
- Observable pattern for reactive data handling
- Pipeline processing with customizable stages
- Stream processing with operators
-
Event Handling
- Asynchronous event emitter with priority support
- Channel-based communication
- Pub/sub pattern implementation
-
Protocol Support
- ASGI application framework
- MQTT client implementation
- Async IMAP and SMTP clients
-
Resource Management
- Rate limiting with token bucket algorithm
- Concurrency limiting
- Timeout management
- Resource monitoring
Installation
pip install mtaio
Quick Start
Here's a simple example that demonstrates some core features:
from mtaio import TaskExecutor, RateLimiter, TimeoutManager
async def main():
# Create a rate-limited task executor
executor = TaskExecutor()
limiter = RateLimiter(rate=10.0) # 10 operations per second
@limiter.limit
async def process_item(item):
# Process with timeout
async with TimeoutManager(5.0) as tm:
result = await tm.run(some_operation(item))
return result
# Process multiple items concurrently
items = range(100)
results = await executor.map(process_item, items, limit=5)
if __name__ == "__main__":
asyncio.run(main())
Documentation
Core Components
TaskExecutor
The TaskExecutor provides methods for executing coroutines with concurrency control:
from mtaio import TaskExecutor
async def example():
executor = TaskExecutor()
# Run multiple coroutines with concurrency limit
results = await executor.gather(
coro1(),
coro2(),
limit=5
)
Caching
mtaio provides flexible caching solutions:
from mtaio.cache import TTLCache, DistributedCache
async def cache_example():
# Simple TTL cache
cache = TTLCache[str](
default_ttl=60.0,
max_size=1000
)
# Distributed cache
dist_cache = DistributedCache[str](
nodes=[
('localhost', 5000),
('localhost', 5001)
]
)
Event Handling
The event system supports prioritized event handling:
from mtaio.events import EventEmitter
async def event_example():
emitter = EventEmitter()
@emitter.on("user_login")
async def handle_login(event):
user = event.data
print(f"User {user.name} logged in")
await emitter.emit("user_login", user_data)
Advanced Features
Resource Management
Control resource usage with rate limiting and timeouts:
from mtaio.resources import ResourceLimiter
async def resource_example():
limiter = ResourceLimiter(
rate=10.0, # 10 requests per second
concurrency=5 # Max 5 concurrent executions
)
@limiter.limit
async def limited_operation():
await process_request()
Monitoring
Monitor system resources and application performance:
from mtaio.monitoring import ResourceMonitor
async def monitor_example():
monitor = ResourceMonitor()
@monitor.on_threshold_exceeded
async def handle_alert(metric, value, threshold):
print(f"Alert: {metric} exceeded threshold")
await monitor.start()
Contributing
We welcome contributions! Please follow these steps to contribute:
- Fork the repository: github.com/t3tra-dev/mtaio.
- Create a feature branch:
git checkout -b feat/your-feature. - Commit your changes:
git commit -m 'Add a new feature'. - Push to the branch:
git push origin feat/your-feature. - Open a Pull Request.
For any questions or support, feel free to open an issue in the repository's Issues section.
License
This project is licensed under the MIT License - see the LICENSE file for details.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file mtaio-0.1.0.tar.gz.
File metadata
- Download URL: mtaio-0.1.0.tar.gz
- Upload date:
- Size: 67.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.1 CPython/3.12.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
bfdb4c597f00833ed75b56eb29dd369b5bf0866f298f9c3666e99d07dc4a1372
|
|
| MD5 |
465e5cda7c3b587f76d3647611625b96
|
|
| BLAKE2b-256 |
c7b7ff4131ce7adfa18f946590693e9f72a589e38b651b709faf90c2bd22ed51
|
File details
Details for the file mtaio-0.1.0-py3-none-any.whl.
File metadata
- Download URL: mtaio-0.1.0-py3-none-any.whl
- Upload date:
- Size: 78.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.1 CPython/3.12.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
cfa814a09429ed93c04a43fd4cae991cb24bcbdddbdedd7a69be9a082b420396
|
|
| MD5 |
ae37e56d4a0717efaddca543041f2175
|
|
| BLAKE2b-256 |
39dd62a0e312a6246452a65988fcdde786432e7402fe0e02279b10b8da2ee162
|