Skip to main content

Core library for worker services

Project description

Worker Core Library

Core library for worker services in the Mesh-Sync platform.

Overview

This is a Python library that provides shared functionality for worker services, including:

  • ELK Event Wrapper: High-level interface for emitting metrics and events to Elasticsearch with automatic field population and duration tracking
  • BullMQ integration with adapter pattern for flexible backend selection
  • Storage providers (S3, Google Drive, SFTP)
  • Authentication and credentials management
  • Database utilities
  • Common utility functions

Key Features

ELK Event Wrapper

The library provides a high-level interface for emitting structured metrics and events to Elasticsearch with automatic field population, duration tracking, and error handling.

Quick Example:

from worker_core_lib.elk import ElkEventWrapper

elk_wrapper = ElkEventWrapper(worker_name="worker-metadata-generation")

# Track LLM execution
async with elk_wrapper.track_llm_execution(
    model="gpt-4",
    user_id="user-123",
    prompt_type="metadata_generation"
) as tracker:
    result = await llm.generate(prompt)
    tracker.set_token_usage(input=1000, output=500, total_cost=0.05)

Features:

  • 6 specialized tracking methods: LLM execution, Blender operations, file processing, marketplace API calls, database queries, custom operations
  • Automatic metrics: Duration calculation, timestamp, source, environment
  • Context manager pattern: Automatic cleanup and metric emission
  • Error tracking: Captures exceptions with type and message
  • Flexible metrics: Add any custom metrics via set_metric()

Documentation:

Use Cases:

  • Track LLM token usage and costs across OpenAI, Anthropic, etc.
  • Monitor Blender rendering performance (vertices, faces, render time)
  • Measure file conversion times and compression ratios
  • Track marketplace API rate limits and response times
  • Monitor database query performance

Worker Backend Adapter Pattern

The library now supports multiple backend implementations through the adapter pattern:

  • BullMQ Adapter: Uses BullMQ with Redis for queue management (default)
  • External Worker Backend Adapter: Integrates with a dedicated external worker backend service

Switch between backends using environment variables:

# Use BullMQ (default)
export WORKER_BACKEND_TYPE=bullmq
export REDIS_HOST=localhost
export REDIS_PORT=6379

# Or use external worker backend
export WORKER_BACKEND_TYPE=external
export WORKER_BACKEND_URL=https://worker-backend.example.com
export WORKER_BACKEND_API_KEY=your-api-key

See Worker Backend Adapter Pattern Documentation for detailed information.

Installation

From PyPI

pip install worker-core-lib

From Source

pip install -e .

From GitHub Container Registry (GHCR)

The Docker image is automatically built and published to GHCR:

docker pull ghcr.io/mesh-sync/worker-core-lib:latest

Development

Building Locally

Build the Python package:

pip install build
python -m build

Build the Docker image:

docker build -t worker-core-lib:local .

Running Tests

pip install -e .
pytest

CI/CD Pipeline

This repository uses GitHub Actions for continuous integration and deployment:

Workflow 1: Build, Test, and Deploy to GHCR

Location: .github/workflows/docker-publish.yml

Triggers:

  • Push to master, main, or develop branches
  • Pull requests to master, main, or develop branches

Jobs:

  1. Build and Test:
    • Sets up Python 3.10 (matching Dockerfile)
    • Installs dependencies
    • Runs pytest (all tests)
    • Builds Python package
    • Determines version based on branch
    • Builds Docker image
    • Pushes to GHCR (only on push, not PRs)

Container Registry: Images are published to GitHub Container Registry (GHCR) at ghcr.io/mesh-sync/worker-core-lib

Versioning Strategy:

  • master/main branch: Uses semantic version from pyproject.toml (e.g., 0.0.1)
  • develop branch: Appends -SNAPSHOT suffix (e.g., 0.0.1-SNAPSHOT)
  • Pull requests: Appends -pr<number> (e.g., 0.0.1-pr6) - built but not pushed to registry
  • Other branches: Appends sanitized branch name (e.g., 0.0.1-feature-xyz)

Tags:

  • <version> - Version-specific tag (e.g., 0.0.1 or 0.0.1-SNAPSHOT)
  • latest - Latest build from master/main branch only
  • master, main, or develop - Branch-specific builds
  • sha-<commit> - Build with commit SHA for traceability
  • Pull request builds are validated but not pushed to the registry

Workflow 2: Build and Publish to PyPI

Location: .github/workflows/pypi-publish.yml

Triggers:

  • Push to master, main, or develop branches (build and test only)
  • Pull requests (build and test only)
  • Push tags matching v*.*.* pattern (e.g., v0.0.1, v1.2.3) - triggers publish

Jobs:

  1. Build and Test:

    • Sets up Python 3.10
    • Installs dependencies
    • Runs pytest
    • Builds Python package
    • Validates with twine
    • Uploads artifacts
  2. Publish to PyPI (only on version tags):

    • Downloads build artifacts
    • Publishes to PyPI using trusted publishing (no manual tokens needed)

PyPI Package: Published at https://pypi.org/project/worker-core-lib/

Accessing Container Images

Images are publicly available from GHCR:

# Pull latest release
docker pull ghcr.io/mesh-sync/worker-core-lib:latest

# Pull specific version
docker pull ghcr.io/mesh-sync/worker-core-lib:0.0.1

# Pull develop snapshot
docker pull ghcr.io/mesh-sync/worker-core-lib:0.0.1-SNAPSHOT
docker pull ghcr.io/mesh-sync/worker-core-lib:develop

# Pull specific branch
docker pull ghcr.io/mesh-sync/worker-core-lib:master

# Pull specific commit
docker pull ghcr.io/mesh-sync/worker-core-lib:sha-abc1234

Manual Publishing with Justfile

You can also manually build and publish using the Justfile:

Docker Image Management

# Get current version (will append -SNAPSHOT on develop branch)
just version

# Build Docker image with appropriate version tag
just build

# Push to GHCR (requires docker login to ghcr.io)
just publish

PyPI Package Management

# Get current version
just get-version

# Build Python package
just build-package

# Check package validity
just check-package

# Show package contents
just show-package

# Test package installation locally
just test-install

# Publish to PyPI (requires PyPI credentials)
just publish-pypi

# Publish to Test PyPI (for testing)
just publish-test-pypi

Version Management

# Set specific version
just set-version 1.2.3

# Bump patch version (0.0.1 -> 0.0.2)
just bump-patch

# Bump minor version (0.0.1 -> 0.1.0)
just bump-minor

# Bump major version (0.0.1 -> 1.0.0)
just bump-major

# Complete release workflow (bump, commit, tag, push)
just release patch   # or minor, or major

Publishing a New Version to PyPI

To publish a new version to PyPI:

  1. Update the version (using one of these methods):

    # Option 1: Use the release command (recommended)
    just release patch  # or minor, or major
    
    # Option 2: Manual bump and tag
    just bump-patch
    git add pyproject.toml
    git commit -m "Bump version to $(just get-version)"
    git push
    just tag-and-publish
    
  2. The GitHub Actions workflow will automatically:

    • Build and test the package
    • Publish to PyPI when the tag is pushed
  3. Verify the package on PyPI:

Setting Up PyPI Trusted Publishing (Recommended)

For the GitHub Actions workflow to publish to PyPI without manual tokens:

  1. Go to https://pypi.org/manage/account/publishing/
  2. Add a new "pending publisher":
    • PyPI Project Name: worker-core-lib
    • Owner: Mesh-Sync
    • Repository name: worker-core-lib
    • Workflow name: pypi-publish.yml
    • Environment name: pypi

Manual PyPI Publishing (Alternative)

If you prefer to publish manually without GitHub Actions:

# Set PyPI credentials (one time setup)
export TWINE_USERNAME=__token__
export TWINE_PASSWORD=<your-pypi-token>

# Or configure ~/.pypirc

# Build and publish
just publish-pypi

Usage

Using the Adapter Pattern

from core_lib.core_lib_bullmq import (
    BaseWorker, 
    QueueManager,
    WorkerBackendFactory
)

# Get the configured adapter
adapter = WorkerBackendFactory.get_adapter()

# Add a job
job_id = await QueueManager.safe_add_job(
    "my-queue",
    "process-task",
    {"input": "data"},
    {"priority": 1}
)

# Create a worker
class MyWorker(BaseWorker):
    async def process(self, job, job_token):
        # Process the job
        return {"result": "success"}

worker = MyWorker(queue_name="my-queue", use_adapter=True)

BullMQ-like Features

The library now provides BullMQ-like functionality for creating child jobs, job flows, and common patterns:

Creating Child Jobs

from core_lib.core_lib_bullmq import JobContext, QueueManager

class ParentWorker(BaseWorker):
    async def process(self, job, job_token):
        # Create JobContext with QueueManager
        context = JobContext(job, result_queue=None, queue_manager=QueueManager)
        
        # Add child jobs
        child_id = await context.add_child_job(
            'child-queue',
            'child-task',
            {'data': 'child data'}
        )
        
        # Signal waiting for children
        await context.move_to_waiting_children()
        
        return {'status': 'waiting_for_children'}

Using FlowBuilder for Complex Job Dependencies

from core_lib.core_lib_bullmq import FlowBuilder, QueueManager

# Create a flow with dependencies
flow = FlowBuilder()
parent = flow.add_job('queue', 'parent-task', {'input': 'data'})
child1 = flow.add_child(parent, 'queue', 'child-1', {'data': 'c1'})
child2 = flow.add_child(parent, 'queue', 'child-2', {'data': 'c2'})
flow.add_child(child1, 'queue', 'grandchild', {'data': 'gc'})

# Execute the flow (jobs added bottom-up with dependencies)
job_ids = await flow.execute(QueueManager)

BullMQ Helper Methods

from core_lib.core_lib_bullmq import BullMQHelpers, QueueManager

# Add job with retry
job_id = await BullMQHelpers.add_job_with_retry(
    QueueManager, 'queue', 'task', {'data': 'value'},
    max_attempts=5, backoff_delay=1000
)

# Add delayed job
job_id = await BullMQHelpers.add_delayed_job(
    QueueManager, 'queue', 'task', {'data': 'value'},
    delay_ms=60000  # 1 minute
)

# Add job with priority
job_id = await BullMQHelpers.add_job_with_priority(
    QueueManager, 'queue', 'task', {'data': 'value'},
    priority=10
)

Using External Worker Backend (mesh-sync-worker-backend-client)

The library integrates with the mesh-sync-worker-backend-client package:

# Set environment variables
export WORKER_BACKEND_TYPE=external
export WORKER_BACKEND_URL=https://worker-backend.example.com
export WORKER_BACKEND_API_KEY=your-api-key
# No code changes needed - QueueManager automatically uses the external backend
job_id = await QueueManager.safe_add_job("queue", "task", {"data": "value"})

See the example code and documentation for more details.

License

MIT License

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

worker_core_lib-0.0.39.tar.gz (47.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

worker_core_lib-0.0.39-py3-none-any.whl (52.4 kB view details)

Uploaded Python 3

File details

Details for the file worker_core_lib-0.0.39.tar.gz.

File metadata

  • Download URL: worker_core_lib-0.0.39.tar.gz
  • Upload date:
  • Size: 47.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.5

File hashes

Hashes for worker_core_lib-0.0.39.tar.gz
Algorithm Hash digest
SHA256 a9fd2131d0afc7795c5cc09dd616d36073f56b4db899b58383ae17e7f451cf36
MD5 53dd321697da07a1200e3a12f3b19c5d
BLAKE2b-256 829fe305a367403a69547e6124acbb2f8a951ced8902b63e12691e8ff050b500

See more details on using hashes here.

File details

Details for the file worker_core_lib-0.0.39-py3-none-any.whl.

File metadata

File hashes

Hashes for worker_core_lib-0.0.39-py3-none-any.whl
Algorithm Hash digest
SHA256 931ecbd92adefd5be25c5e2dfb9f79851c9168ddad8a24843378102b0567579a
MD5 54094d68820e12121b876ed779be435c
BLAKE2b-256 a2efce431e1f921ca78361036c2f670acb0cc0e69162e954b7e2cff47a73ed40

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page