Skip to main content

Core library for worker services

Project description

Worker Core Library

The foundational Python library for all MeshSync worker services. It provides standardized implementations for storage, messaging, logging, and database access.

Architecture

C4 Component Diagram

C4Component
    title Component Diagram - Worker Core Library

    Container(worker_service, "Worker Service", "Python", "Consumes Core Library")
    
    Container_Boundary(core_lib, "Worker Core Library") {
        Component(storage_mod, "Storage Module", "S3/SFTP/GDrive", "Abstract Storage Provider")
        Component(bullmq_mod, "BullMQ Module", "Redis/BullMQ", "Queue Management")
        Component(elk_mod, "ELK Module", "Elasticsearch", "Structured Logging & Metrics")
        Component(db_mod, "Database Module", "SQLAlchemy", "Data Access Layer")
    }

    ContainerDb(redis, "Redis", "In-Memory", "Job Queues")
    ContainerDb(s3, "Object Storage", "S3/MinIO", "File Storage")
    ContainerDb(elasticsearch, "Elasticsearch", "Search Engine", "Logs & Metrics")

    Rel(worker_service, core_lib, "Uses", "Import")
    Rel(bullmq_mod, redis, "Manages Jobs", "Redis Protocol")
    Rel(storage_mod, s3, "Stores Files", "S3 API")
    Rel(elk_mod, elasticsearch, "Emits Logs", "REST API")

Traceability

This library implements the following requirements:

ID Title Status Implementation
[[FR-008]] Storage Discovery 🟢 Implemented src/core_lib_storage/
[[FR-035]] SFTP Discovery Analysis 🟢 Implemented src/core_lib_storage/sftp_storage.py (via paramiko)
[[ADR-006]] Generic Storage Provider 🟢 Implemented src/core_lib_storage/base_storage.py
[[ADR-007]] Event-Driven Worker Architecture 🟢 Implemented src/core_lib/core_lib_bullmq/

Remaining Tasks

Based on the gap analysis between requirements and implementation:

  • Storage Providers: S3 (boto3), SFTP (paramiko), and Google Drive (google-api-python-client) are all implemented.
  • Testing: Increase unit test coverage for core_lib_storage module.
  • Documentation: Generate API reference documentation for the library.
  • Dropbox Provider: Add Dropbox storage provider when marketplace demand justifies it.
  • ELK Migration: Canonical ELK module moved to core_lib.core_lib_elk (old path is deprecation shim).
  • LLM Port Compliance: LLMMetadataService now implements LLMPort ABC.

Overview

This is a Python library that provides shared functionality for worker services, including:

  • ELK Event Wrapper: High-level interface for emitting metrics and events to Elasticsearch with automatic field population and duration tracking
  • BullMQ integration with adapter pattern for flexible backend selection
  • Storage providers (S3, Google Drive, SFTP)
  • Authentication and credentials management
  • Database utilities
  • Common utility functions

Key Features

ELK Event Wrapper

The library provides a high-level interface for emitting structured metrics and events to Elasticsearch with automatic field population, duration tracking, and error handling.

Quick Example:

from worker_core_lib.elk import ElkEventWrapper

elk_wrapper = ElkEventWrapper(worker_name="worker-metadata-generation")

# Track LLM execution
async with elk_wrapper.track_llm_execution(
    model="gpt-4",
    user_id="user-123",
    prompt_type="metadata_generation"
) as tracker:
    result = await llm.generate(prompt)
    tracker.set_token_usage(input=1000, output=500, total_cost=0.05)

Features:

  • 6 specialized tracking methods: LLM execution, Blender operations, file processing, marketplace API calls, database queries, custom operations
  • Automatic metrics: Duration calculation, timestamp, source, environment
  • Context manager pattern: Automatic cleanup and metric emission
  • Error tracking: Captures exceptions with type and message
  • Flexible metrics: Add any custom metrics via set_metric()

Documentation:

Use Cases:

  • Track LLM token usage and costs across OpenAI, Anthropic, etc.
  • Monitor Blender rendering performance (vertices, faces, render time)
  • Measure file conversion times and compression ratios
  • Track marketplace API rate limits and response times
  • Monitor database query performance

Worker Backend Adapter Pattern

The library now supports multiple backend implementations through the adapter pattern:

  • BullMQ Adapter: Uses BullMQ with Redis for queue management (default)
  • External Worker Backend Adapter: Integrates with a dedicated external worker backend service

Switch between backends using environment variables:

# Use BullMQ (default)
export WORKER_BACKEND_TYPE=bullmq
export REDIS_HOST=localhost
export REDIS_PORT=6379

# Or use external worker backend
export WORKER_BACKEND_TYPE=external
export WORKER_BACKEND_URL=https://worker-backend.example.com
export WORKER_BACKEND_API_KEY=your-api-key

See Worker Backend Adapter Pattern Documentation for detailed information.

Installation

From PyPI

pip install worker-core-lib

From Source

pip install -e .

From GitHub Container Registry (GHCR)

The Docker image is automatically built and published to GHCR:

docker pull ghcr.io/mesh-sync/worker-core-lib:latest

Development

Building Locally

Build the Python package:

pip install build
python -m build

Build the Docker image:

docker build -t worker-core-lib:local .

Running Tests

pip install -e .
pytest

CI/CD Pipeline

This repository uses GitHub Actions for continuous integration and deployment:

Workflow 1: Build, Test, and Deploy to GHCR

Location: .github/workflows/docker-publish.yml

Triggers:

  • Push to master, main, or develop branches
  • Pull requests to master, main, or develop branches

Jobs:

  1. Build and Test:
    • Sets up Python 3.10 (matching Dockerfile)
    • Installs dependencies
    • Runs pytest (all tests)
    • Builds Python package
    • Determines version based on branch
    • Builds Docker image
    • Pushes to GHCR (only on push, not PRs)

Container Registry: Images are published to GitHub Container Registry (GHCR) at ghcr.io/mesh-sync/worker-core-lib

Versioning Strategy:

  • master/main branch: Uses semantic version from pyproject.toml (e.g., 0.0.1)
  • develop branch: Appends -SNAPSHOT suffix (e.g., 0.0.1-SNAPSHOT)
  • Pull requests: Appends -pr<number> (e.g., 0.0.1-pr6) - built but not pushed to registry
  • Other branches: Appends sanitized branch name (e.g., 0.0.1-feature-xyz)

Tags:

  • <version> - Version-specific tag (e.g., 0.0.1 or 0.0.1-SNAPSHOT)
  • latest - Latest build from master/main branch only
  • master, main, or develop - Branch-specific builds
  • sha-<commit> - Build with commit SHA for traceability
  • Pull request builds are validated but not pushed to the registry

Workflow 2: Build and Publish to PyPI

Location: .github/workflows/pypi-publish.yml

Triggers:

  • Push to master, main, or develop branches (build and test only)
  • Pull requests (build and test only)
  • Push tags matching v*.*.* pattern (e.g., v0.0.1, v1.2.3) - triggers publish

Jobs:

  1. Build and Test:

    • Sets up Python 3.10
    • Installs dependencies
    • Runs pytest
    • Builds Python package
    • Validates with twine
    • Uploads artifacts
  2. Publish to PyPI (only on version tags):

    • Downloads build artifacts
    • Publishes to PyPI using trusted publishing (no manual tokens needed)

PyPI Package: Published at https://pypi.org/project/worker-core-lib/

Accessing Container Images

Images are publicly available from GHCR:

# Pull latest release
docker pull ghcr.io/mesh-sync/worker-core-lib:latest

# Pull specific version
docker pull ghcr.io/mesh-sync/worker-core-lib:0.0.1

# Pull develop snapshot
docker pull ghcr.io/mesh-sync/worker-core-lib:0.0.1-SNAPSHOT
docker pull ghcr.io/mesh-sync/worker-core-lib:develop

# Pull specific branch
docker pull ghcr.io/mesh-sync/worker-core-lib:master

# Pull specific commit
docker pull ghcr.io/mesh-sync/worker-core-lib:sha-abc1234

Manual Publishing with Justfile

You can also manually build and publish using the Justfile:

Docker Image Management

# Get current version (will append -SNAPSHOT on develop branch)
just version

# Build Docker image with appropriate version tag
just build

# Push to GHCR (requires docker login to ghcr.io)
just publish

PyPI Package Management

# Get current version
just get-version

# Build Python package
just build-package

# Check package validity
just check-package

# Show package contents
just show-package

# Test package installation locally
just test-install

# Publish to PyPI (requires PyPI credentials)
just publish-pypi

# Publish to Test PyPI (for testing)
just publish-test-pypi

Version Management

# Set specific version
just set-version 1.2.3

# Bump patch version (0.0.1 -> 0.0.2)
just bump-patch

# Bump minor version (0.0.1 -> 0.1.0)
just bump-minor

# Bump major version (0.0.1 -> 1.0.0)
just bump-major

# Complete release workflow (bump, commit, tag, push)
just release patch   # or minor, or major

Publishing a New Version to PyPI

To publish a new version to PyPI:

  1. Update the version (using one of these methods):

    # Option 1: Use the release command (recommended)
    just release patch  # or minor, or major
    
    # Option 2: Manual bump and tag
    just bump-patch
    git add pyproject.toml
    git commit -m "Bump version to $(just get-version)"
    git push
    just tag-and-publish
    
  2. The GitHub Actions workflow will automatically:

    • Build and test the package
    • Publish to PyPI when the tag is pushed
  3. Verify the package on PyPI:

Setting Up PyPI Trusted Publishing (Recommended)

For the GitHub Actions workflow to publish to PyPI without manual tokens:

  1. Go to https://pypi.org/manage/account/publishing/
  2. Add a new "pending publisher":
    • PyPI Project Name: worker-core-lib
    • Owner: Mesh-Sync
    • Repository name: worker-core-lib
    • Workflow name: pypi-publish.yml
    • Environment name: pypi

Manual PyPI Publishing (Alternative)

If you prefer to publish manually without GitHub Actions:

# Set PyPI credentials (one time setup)
export TWINE_USERNAME=__token__
export TWINE_PASSWORD=<your-pypi-token>

# Or configure ~/.pypirc

# Build and publish
just publish-pypi

Usage

Using the Adapter Pattern

from core_lib.core_lib_bullmq import (
    BaseWorker, 
    QueueManager,
    WorkerBackendFactory
)

# Get the configured adapter
adapter = WorkerBackendFactory.get_adapter()

# Add a job
job_id = await QueueManager.safe_add_job(
    "my-queue",
    "process-task",
    {"input": "data"},
    {"priority": 1}
)

# Create a worker
class MyWorker(BaseWorker):
    async def process(self, job, job_token):
        # Process the job
        return {"result": "success"}

worker = MyWorker(queue_name="my-queue", use_adapter=True)

BullMQ-like Features

The library now provides BullMQ-like functionality for creating child jobs, job flows, and common patterns:

Creating Child Jobs

from core_lib.core_lib_bullmq import JobContext, QueueManager

class ParentWorker(BaseWorker):
    async def process(self, job, job_token):
        # Create JobContext with QueueManager
        context = JobContext(job, result_queue=None, queue_manager=QueueManager)
        
        # Add child jobs
        child_id = await context.add_child_job(
            'child-queue',
            'child-task',
            {'data': 'child data'}
        )
        
        # Signal waiting for children
        await context.move_to_waiting_children()
        
        return {'status': 'waiting_for_children'}

Using FlowBuilder for Complex Job Dependencies

from core_lib.core_lib_bullmq import FlowBuilder, QueueManager

# Create a flow with dependencies
flow = FlowBuilder()
parent = flow.add_job('queue', 'parent-task', {'input': 'data'})
child1 = flow.add_child(parent, 'queue', 'child-1', {'data': 'c1'})
child2 = flow.add_child(parent, 'queue', 'child-2', {'data': 'c2'})
flow.add_child(child1, 'queue', 'grandchild', {'data': 'gc'})

# Execute the flow (jobs added bottom-up with dependencies)
job_ids = await flow.execute(QueueManager)

BullMQ Helper Methods

from core_lib.core_lib_bullmq import BullMQHelpers, QueueManager

# Add job with retry
job_id = await BullMQHelpers.add_job_with_retry(
    QueueManager, 'queue', 'task', {'data': 'value'},
    max_attempts=5, backoff_delay=1000
)

# Add delayed job
job_id = await BullMQHelpers.add_delayed_job(
    QueueManager, 'queue', 'task', {'data': 'value'},
    delay_ms=60000  # 1 minute
)

# Add job with priority
job_id = await BullMQHelpers.add_job_with_priority(
    QueueManager, 'queue', 'task', {'data': 'value'},
    priority=10
)

Using External Worker Backend (mesh-sync-worker-backend-client)

The library integrates with the mesh-sync-worker-backend-client package:

# Set environment variables
export WORKER_BACKEND_TYPE=external
export WORKER_BACKEND_URL=https://worker-backend.example.com
export WORKER_BACKEND_API_KEY=your-api-key
# No code changes needed - QueueManager automatically uses the external backend
job_id = await QueueManager.safe_add_job("queue", "task", {"data": "value"})

See the example code and documentation for more details.

License

MIT License

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

worker_core_lib-0.1.4.tar.gz (76.7 kB view details)

Uploaded Source

File details

Details for the file worker_core_lib-0.1.4.tar.gz.

File metadata

  • Download URL: worker_core_lib-0.1.4.tar.gz
  • Upload date:
  • Size: 76.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.5

File hashes

Hashes for worker_core_lib-0.1.4.tar.gz
Algorithm Hash digest
SHA256 2ab654015988ce1fe7b8076cf0aacaed193df864e28adc94679581f1fb8c39c7
MD5 7ed5c0df44723f33673c254dfe1d6d1f
BLAKE2b-256 4948eed7067d62978835fb37eab8df0f121c04ea8d28c4e4a3454d980a0c7229

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page