Skip to main content

No project description provided

Project description

Easy Message Queue (ezmq)

A lightweight, thread-based message queue implementation in Python for managing concurrent job execution with resource management and request-response patterns.

Quick Start

from ezmq import MessageQueue, Jobs, Job, Resource

# Define a resource
class DatabaseResource(Resource):
    def _enter(self):
        return connect_to_database(self.identifier)
    
    def _exit(self):
        self._resource.close()
    
    def _peek(self):
        return read_only_connection(self.identifier)

# Define a job
class ProcessDataJob(Job):
    required_resources = ['database']
    
    def execute(self, resources, **kwargs):
        data_id = kwargs.get('data_id')
        with resources['database'] as db:
            data = db.fetch(data_id)
            return {'status': 'processed', 'data': data}

# Register jobs
class MyJobs(Jobs):
    resources = {
        'database': DatabaseResource('main.db')
    }
    process_data: ProcessDataJob

# Create and use the queue
class MyQueue(MessageQueue):
    pass

mq = MyQueue(MyJobs)

# Send a job and get response
msg_id = mq.send('process_data', data_id=123)
response = mq.receive(msg_id, timeout=10.0)

if response.success:
    print(f"Result: {response.result}")
else:
    print(f"Error: {response.error}")

Core Components

Resource

Abstract base class for managing shared resources with automatic locking.

from ezmq import Resource

class MyResource(Resource):
    def _enter(self):
        # Initialize and return the resource
        return open_connection()
    
    def _exit(self):
        # Clean up the resource
        self._resource.close()
    
    def _peek(self):
        # Return read-only view without locking
        return get_read_only_view()

Job

Abstract base class for creating a function to be executed.

from ezmq import Job
from datetime import datetime

class EmailJob(Job):
    required_resources = ['smtp_server', 'template_engine']
    
    def execute(self, resources, **kwargs):
        to = kwargs['to']
        subject = kwargs['subject']
        
        with resources['smtp_server'] as smtp:
            smtp.send(to, subject)
        
        return {'sent': True, 'timestamp': datetime.now()}

Jobs

Container class for registering and managing job types.

from ezmq import Jobs

class MyJobs(Jobs):
    # Define shared resources
    resources = {
        'db': DatabaseResource('postgres://localhost/mydb'),
        'cache': CacheResource('redis://localhost'),
        'storage': FileStorageResource('/data')
    }
    
    # Register job types via type annotations
    send_email: SendEmailJob
    process_payment: ProcessPaymentJob
    generate_report: GenerateReportJob

MessageQueue

Main interface for sending jobs and receiving responses.

# Initialize with job types
mq = MessageQueue(MyJobs)

# Send job (returns UUID)
msg_id = mq.send('send_email', 
                 to='user@example.com',
                 subject='Welcome!')

# Receive response (blocking)
response = mq.receive(msg_id, timeout=30.0)

# Convenience method for simple cases
response = mq.send_and_receive('calculate', x=10, y=20)

Response Model

@dataclass
class Response:
    id: UUID           # Message ID this responds to
    success: bool      # Whether job succeeded
    error: str         # Error message if failed
    result: Any        # Return value from job.execute()

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ezmq-2025.11.11.2.tar.gz (68.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

ezmq-2025.11.11.2-py3-none-any.whl (83.9 kB view details)

Uploaded Python 3

File details

Details for the file ezmq-2025.11.11.2.tar.gz.

File metadata

  • Download URL: ezmq-2025.11.11.2.tar.gz
  • Upload date:
  • Size: 68.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.2.1 CPython/3.13.7 Windows/11

File hashes

Hashes for ezmq-2025.11.11.2.tar.gz
Algorithm Hash digest
SHA256 d5d2d10ffa3a0767d0d367cddfb54b9eaa4f32baa8e3f93877f5fe63f25591dc
MD5 cffdf6a05958333e67c90a95d5c2d0cd
BLAKE2b-256 0ef2022afc2c5447e3b6251e05cc546401e170ac81311a02e7b3d508791c8289

See more details on using hashes here.

File details

Details for the file ezmq-2025.11.11.2-py3-none-any.whl.

File metadata

  • Download URL: ezmq-2025.11.11.2-py3-none-any.whl
  • Upload date:
  • Size: 83.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.2.1 CPython/3.13.7 Windows/11

File hashes

Hashes for ezmq-2025.11.11.2-py3-none-any.whl
Algorithm Hash digest
SHA256 c4a4ddd3a288596477f6f64530cc6e53079014b1049566d44b0a356bec4e7c75
MD5 9fdc0211f871eaf6fbf511517beecfde
BLAKE2b-256 59efa59dee167f735d8c9eb47257aa2952660ceceb562849c5d3b87d4d001843

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page