Skip to main content

A high-level Azure Service Bus abstraction for publishing, consuming, and DLQ workflows.

Project description

asbflow

asbflow is a Python library built on top of the Microsoft Azure Service Bus SDK that abstracts the most common messaging workflows.

Workflow Status

Dev CI Main CI Release

It provides a clean service API for:

  • publishing to topics and queues,
  • consuming from subscriptions and queues,
  • managing dead-letter queue (DLQ) operations.

The library is model-agnostic by default and integrates with Pydantic parsing when you want typed payload validation.

Why asbflow

  • Keep Azure Service Bus integration explicit but compact.
  • Reuse the same high-level APIs across projects.
  • Control parse behavior and settlement behavior per call.
  • Use strategy-based execution (sequential, thread_pool, async) without changing business code.

Core Components

  • ASBPublisher: publish payloads (dict or Pydantic models).
  • ASBConsumer: consume (settling) or read (non-settling) messages.
  • ASBDLQManager: read/consume/redrive/purge DLQ messages.
  • ASBClientProvider: pluggable auth provider (connection string or managed identity).
  • Entity abstraction: topic/queue sender and receiver access via dedicated entity clients.

Installation

pip install asbflow

For local development:

pip install -e .[dev]

For notebook quickstarts:

pip install -e .[notebook]

Quick Start

from asbflow import (
    ASBConnectionConfig,
    ASBConsumer,
    ASBConsumerConfig,
    ASBPublisher,
    ASBPublisherConfig,
)

connection = ASBConnectionConfig(
    connection_string="<connection-string>",
)

publisher = ASBPublisher(
    connection=connection,
    publisher=ASBPublisherConfig(topic_name="<topic-name>"),
)

consumer = ASBConsumer(
    connection=connection,
    consumer=ASBConsumerConfig(
        topic_name="<topic-name>",
        subscription_name="<subscription-name>",
    ),
)

publisher.publish(payload={"id": "a1", "severity": "high"}, parse=False)
result = consumer.consume(parse=False, raise_on_error=False)
print(result.succeeded, result.failed)

Message Config

You can now override message metadata per publish call, and you can derive metadata dynamically from each payload.

from asbflow import ASBMessageConfig, ASBDynamicMessageConfig, MessageFieldMapping

# static per-call override
publisher.publish(
    payload={"id": "a1", "severity": "high"},
    message=ASBMessageConfig(message_id="fixed-id", subject="alerts"),
)

# dynamic per-payload metadata
dynamic_message = ASBDynamicMessageConfig(
    message_id=MessageFieldMapping(lambda payload: payload["id"] if isinstance(payload, dict) else None),
    subject=MessageFieldMapping(lambda payload: payload["severity"] if isinstance(payload, dict) else None),
)

publisher.publish(
    payload=[
        {"id": "a1", "severity": "high"},
        {"id": "a2", "severity": "critical"},
    ],
    chunk_size=1,
    message=dynamic_message,
)

A richer walkthrough is available in quickstart/, including quickstart/asbflow_quickstart.ipynb.

Public API Snapshot

  • ASBMessageConfig: concrete Service Bus message metadata.
  • ASBDynamicMessageConfig: payload-driven message metadata template.
  • MessageFieldMapping: extractor wrapper used by dynamic message config fields.
  • MessageConfigInput: alias of ASBMessageConfig | ASBDynamicMessageConfig (in asbflow.config.message).

Design Principles

  • Abstraction over boilerplate, not over behavior.
  • Safe defaults with explicit overrides.
  • Uniform result contracts (succeeded, failed, successes, failures).
  • Structured logging across services and strategies.

Development

Run tests:

pytest -q test/unit

Build package:

python -m build

CI/CD Branch Model

The repository ships with branch-oriented GitHub Actions:

  • dev: fast CI feedback (tests + quality checks).
  • main: full CI matrix + package build verification.
  • release: release validation and publish workflow.

Roadmap

The active roadmap and release follow-ups are tracked in TODO.md.

License

MIT. See LICENSE.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

asbflow-1.0.3.tar.gz (32.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

asbflow-1.0.3-py3-none-any.whl (51.3 kB view details)

Uploaded Python 3

File details

Details for the file asbflow-1.0.3.tar.gz.

File metadata

  • Download URL: asbflow-1.0.3.tar.gz
  • Upload date:
  • Size: 32.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for asbflow-1.0.3.tar.gz
Algorithm Hash digest
SHA256 34003eabf1af2c35c3ef53c5f78e3e360d18acb85a62e3fc913e4b45aad2ec68
MD5 c07aa39027ec604c17478e9f373e2b59
BLAKE2b-256 8fe4e78a2aab2818077dcc54fb022cc39869dd8708e06f22ce5a996bf6163745

See more details on using hashes here.

Provenance

The following attestation bundles were made for asbflow-1.0.3.tar.gz:

Publisher: release.yml on iFoxz17/asbflow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file asbflow-1.0.3-py3-none-any.whl.

File metadata

  • Download URL: asbflow-1.0.3-py3-none-any.whl
  • Upload date:
  • Size: 51.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for asbflow-1.0.3-py3-none-any.whl
Algorithm Hash digest
SHA256 c3f233bb7bd5d5bdde3f680f11381133b0d8bf95c08dbd745ff6dc09d01cab67
MD5 e6d26b81f6a8f8da46c5ab4522b7d78a
BLAKE2b-256 84265082a09b473e07c0833d8371307199d5b36eda71cdd6e110cc9e6c3e00e9

See more details on using hashes here.

Provenance

The following attestation bundles were made for asbflow-1.0.3-py3-none-any.whl:

Publisher: release.yml on iFoxz17/asbflow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page