No project description provided
Project description
Easy Message Queue (ezmq)
A lightweight, thread-based message queue implementation in Python for managing concurrent job execution with resource management and request-response patterns.
Quick Start
from ezmq import MessageQueue, Jobs, Job, Resource
# Define a resource
class DatabaseResource(Resource):
def _enter(self):
return connect_to_database(self.identifier)
def _exit(self):
self._resource.close()
def _peek(self):
return read_only_connection(self.identifier)
# Define a job
class ProcessDataJob(Job):
required_resources = ['database']
def execute(self, resources, **kwargs):
data_id = kwargs.get('data_id')
with resources['database'] as db:
data = db.fetch(data_id)
return {'status': 'processed', 'data': data}
# Register jobs
class MyJobs(Jobs):
resources = {
'database': DatabaseResource('main.db')
}
process_data: ProcessDataJob
# Create and use the queue
class MyQueue(MessageQueue):
pass
mq = MyQueue(MyJobs)
# Send a job and get response
msg_id = mq.send('process_data', data_id=123)
response = mq.receive(msg_id, timeout=10.0)
if response.success:
print(f"Result: {response.result}")
else:
print(f"Error: {response.error}")
Core Components
Resource
Abstract base class for managing shared resources with automatic locking.
from ezmq import Resource
class MyResource(Resource):
def _enter(self):
# Initialize and return the resource
return open_connection()
def _exit(self):
# Clean up the resource
self._resource.close()
def _peek(self):
# Return read-only view without locking
return get_read_only_view()
Job
Abstract base class for creating a function to be executed.
from ezmq import Job
from datetime import datetime
class EmailJob(Job):
required_resources = ['smtp_server', 'template_engine']
def execute(self, resources, **kwargs):
to = kwargs['to']
subject = kwargs['subject']
with resources['smtp_server'] as smtp:
smtp.send(to, subject)
return {'sent': True, 'timestamp': datetime.now()}
Jobs
Container class for registering and managing job types.
from ezmq import Jobs
class MyJobs(Jobs):
# Define shared resources
resources = {
'db': DatabaseResource('postgres://localhost/mydb'),
'cache': CacheResource('redis://localhost'),
'storage': FileStorageResource('/data')
}
# Register job types via type annotations
send_email: SendEmailJob
process_payment: ProcessPaymentJob
generate_report: GenerateReportJob
MessageQueue
Main interface for sending jobs and receiving responses.
# Initialize with job types
mq = MessageQueue(MyJobs)
# Send job (returns UUID)
msg_id = mq.send('send_email',
to='user@example.com',
subject='Welcome!')
# Receive response (blocking)
response = mq.receive(msg_id, timeout=30.0)
# Convenience method for simple cases
response = mq.send_and_receive('calculate', x=10, y=20)
Response Model
@dataclass
class Response:
id: UUID # Message ID this responds to
success: bool # Whether job succeeded
error: str # Error message if failed
result: Any # Return value from job.execute()
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file ezmq-2025.12.4.0.tar.gz.
File metadata
- Download URL: ezmq-2025.12.4.0.tar.gz
- Upload date:
- Size: 216.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.2.1 CPython/3.13.7 Windows/11
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
652561e117d84b4e33344f1802e25661116199461e3e3b1a956ea7084517289a
|
|
| MD5 |
207579f28e0982d5194069c0f3ef88fc
|
|
| BLAKE2b-256 |
ae70287afd0a93bdb804924025aa3540542c2301241254153d60ca29b15b5184
|
File details
Details for the file ezmq-2025.12.4.0-py3-none-any.whl.
File metadata
- Download URL: ezmq-2025.12.4.0-py3-none-any.whl
- Upload date:
- Size: 257.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.2.1 CPython/3.13.7 Windows/11
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d5154cc9a8f200ecde88996fab285992331f44f747bac4e972e7e5133e02dfbf
|
|
| MD5 |
a7a71ccbb8ae2b378a11d57ec936a8f7
|
|
| BLAKE2b-256 |
af4ce6a2924df4913d69ae45fd19b920181e6e194c1fbefae772c5d9a7cb3c8f
|