Skip to main content

A high-level Azure Service Bus abstraction for publishing, consuming, and DLQ workflows.

Project description

asbflow

asbflow is a Python library built on top of the Microsoft Azure Service Bus SDK that abstracts the most common messaging workflows.

Workflow Status

Dev CI Main CI Release

It provides a clean service API for:

  • publishing to topics and queues,
  • consuming from subscriptions and queues,
  • managing dead-letter queue (DLQ) operations.

The library is model-agnostic by default and integrates with Pydantic parsing when you want typed payload validation.

Why asbflow

  • Keep Azure Service Bus integration explicit but compact.
  • Reuse the same high-level APIs across projects.
  • Control parse behavior and settlement behavior per call.
  • Use strategy-based execution (sequential, thread_pool, async) without changing business code.

Core Components

  • ASBPublisher: publish payloads (dict or Pydantic models).
  • ASBConsumer: consume (settling) or read (non-settling) messages.
  • ASBDLQManager: read/consume/redrive/purge DLQ messages.
  • ASBClientProvider: pluggable auth provider (connection string or managed identity).
  • Entity abstraction: topic/queue sender and receiver access via dedicated entity clients.

Installation

pip install asbflow

For local development:

pip install -e .[dev]

For notebook quickstarts:

pip install -e .[notebook]

Quick Start

from asbflow import (
    ASBConnectionConfig,
    ASBConsumer,
    ASBConsumerConfig,
    ASBPublisher,
    ASBPublisherConfig,
)

connection = ASBConnectionConfig(
    connection_string="<connection-string>",
)

publisher = ASBPublisher(
    connection=connection,
    publisher=ASBPublisherConfig(topic_name="<topic-name>"),
)

consumer = ASBConsumer(
    connection=connection,
    consumer=ASBConsumerConfig(
        topic_name="<topic-name>",
        subscription_name="<subscription-name>",
    ),
)

publisher.publish(payload={"id": "a1", "severity": "high"}, parse=False)
result = consumer.consume(parse=False, raise_on_error=False)
print(result.succeeded, result.failed)

Message Config (New in 1.0.2)

You can now override message metadata per publish call, and you can derive metadata dynamically from each payload.

from asbflow import ASBMessageConfig, ASBDynamicMessageConfig, MessageFieldMapping

# static per-call override
publisher.publish(
    payload={"id": "a1", "severity": "high"},
    message=ASBMessageConfig(message_id="fixed-id", subject="alerts"),
)

# dynamic per-payload metadata
dynamic_message = ASBDynamicMessageConfig(
    message_id=MessageFieldMapping(lambda payload: payload["id"] if isinstance(payload, dict) else None),
    subject=MessageFieldMapping(lambda payload: payload["severity"] if isinstance(payload, dict) else None),
)

publisher.publish(
    payload=[
        {"id": "a1", "severity": "high"},
        {"id": "a2", "severity": "critical"},
    ],
    chunk_size=1,
    message=dynamic_message,
)

A richer walkthrough is available in quickstart/, including quickstart/asbflow_quickstart.ipynb.

Public API Snapshot

  • ASBMessageConfig: concrete Service Bus message metadata.
  • ASBDynamicMessageConfig: payload-driven message metadata template.
  • MessageFieldMapping: extractor wrapper used by dynamic message config fields.
  • MessageConfigInput: alias of ASBMessageConfig | ASBDynamicMessageConfig (in asbflow.config.message).

Design Principles

  • Abstraction over boilerplate, not over behavior.
  • Safe defaults with explicit overrides.
  • Uniform result contracts (succeeded, failed, successes, failures).
  • Structured logging across services and strategies.

Development

Run tests:

pytest -q test/unit

Build package:

python -m build

CI/CD Branch Model

The repository ships with branch-oriented GitHub Actions:

  • dev: fast CI feedback (tests + quality checks).
  • main: full CI matrix + package build verification.
  • release: release validation and publish workflow.

Roadmap

  • More built-in payload parser adapters.
  • Additional reliability patterns around retries/backoff.
  • Optional observability integrations (metrics/tracing).

License

MIT. See LICENSE.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

asbflow-1.0.2.tar.gz (32.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

asbflow-1.0.2-py3-none-any.whl (50.6 kB view details)

Uploaded Python 3

File details

Details for the file asbflow-1.0.2.tar.gz.

File metadata

  • Download URL: asbflow-1.0.2.tar.gz
  • Upload date:
  • Size: 32.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for asbflow-1.0.2.tar.gz
Algorithm Hash digest
SHA256 bbb9c9162ffeb95270578b63de1dbb08bf37fec5a48938b3d924bd06bb7ce4c7
MD5 e537bb9b2798c09948710ff8d226eecc
BLAKE2b-256 136970d7e34f3f28e7d2cb1179293b5c2e3654159d413faa6dcdb56cba74d591

See more details on using hashes here.

Provenance

The following attestation bundles were made for asbflow-1.0.2.tar.gz:

Publisher: release.yml on iFoxz17/asbflow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file asbflow-1.0.2-py3-none-any.whl.

File metadata

  • Download URL: asbflow-1.0.2-py3-none-any.whl
  • Upload date:
  • Size: 50.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for asbflow-1.0.2-py3-none-any.whl
Algorithm Hash digest
SHA256 f4dc58cf4a01a703334abcdb4033c252c6fe6dfcd9eb8828e6acf606d95fb527
MD5 2ba1da375930196c01c01f89d1049338
BLAKE2b-256 1548fb5e62ffdee695f1c597d0aa4817d8f1237cf559462601033058219d440d

See more details on using hashes here.

Provenance

The following attestation bundles were made for asbflow-1.0.2-py3-none-any.whl:

Publisher: release.yml on iFoxz17/asbflow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page