A high-level Azure Service Bus abstraction for publishing, consuming, and DLQ workflows.
Project description
asbflow
asbflow is a Python library built on top of the Microsoft Azure Service Bus SDK that abstracts the most common messaging workflows.
Workflow Status
It provides a clean service API for:
- publishing to topics and queues,
- consuming from subscriptions and queues,
- managing dead-letter queue (DLQ) operations.
The library is model-agnostic by default and integrates with Pydantic parsing when you want typed payload validation.
Why asbflow
- Keep Azure Service Bus integration explicit but compact.
- Reuse the same high-level APIs across projects.
- Control parse behavior and settlement behavior per call.
- Use strategy-based execution (
sequential,thread_pool,async) without changing business code.
Core Components
ASBPublisher: publish payloads (dictor Pydantic models).ASBConsumer: consume (settling) or read (non-settling) messages.ASBDLQManager: read/consume/redrive/purge DLQ messages.ASBClientProvider: pluggable auth provider (connection string or managed identity).- Entity abstraction: topic/queue sender and receiver access via dedicated entity clients.
Installation
pip install asbflow
For local development:
pip install -e .[dev]
For notebook quickstarts:
pip install -e .[notebook]
Quick Start
from asbflow import (
ASBConnectionConfig,
ASBConsumer,
ASBConsumerConfig,
ASBPublisher,
ASBPublisherConfig,
)
connection = ASBConnectionConfig(
connection_string="<connection-string>",
)
publisher = ASBPublisher(
connection=connection,
publisher=ASBPublisherConfig(topic_name="<topic-name>"),
)
consumer = ASBConsumer(
connection=connection,
consumer=ASBConsumerConfig(
topic_name="<topic-name>",
subscription_name="<subscription-name>",
),
)
publisher.publish(payload={"id": "a1", "severity": "high"}, parse=False)
result = consumer.consume(parse=False, raise_on_error=False)
print(result.succeeded, result.failed)
Message Config (New in 1.0.2)
You can now override message metadata per publish call, and you can derive metadata dynamically from each payload.
from asbflow import ASBMessageConfig, ASBDynamicMessageConfig, MessageFieldMapping
# static per-call override
publisher.publish(
payload={"id": "a1", "severity": "high"},
message=ASBMessageConfig(message_id="fixed-id", subject="alerts"),
)
# dynamic per-payload metadata
dynamic_message = ASBDynamicMessageConfig(
message_id=MessageFieldMapping(lambda payload: payload["id"] if isinstance(payload, dict) else None),
subject=MessageFieldMapping(lambda payload: payload["severity"] if isinstance(payload, dict) else None),
)
publisher.publish(
payload=[
{"id": "a1", "severity": "high"},
{"id": "a2", "severity": "critical"},
],
chunk_size=1,
message=dynamic_message,
)
A richer walkthrough is available in quickstart/, including quickstart/asbflow_quickstart.ipynb.
Public API Snapshot
ASBMessageConfig: concrete Service Bus message metadata.ASBDynamicMessageConfig: payload-driven message metadata template.MessageFieldMapping: extractor wrapper used by dynamic message config fields.MessageConfigInput: alias ofASBMessageConfig | ASBDynamicMessageConfig(inasbflow.config.message).
Design Principles
- Abstraction over boilerplate, not over behavior.
- Safe defaults with explicit overrides.
- Uniform result contracts (
succeeded,failed,successes,failures). - Structured logging across services and strategies.
Development
Run tests:
pytest -q test/unit
Build package:
python -m build
CI/CD Branch Model
The repository ships with branch-oriented GitHub Actions:
dev: fast CI feedback (tests + quality checks).main: full CI matrix + package build verification.release: release validation and publish workflow.
Roadmap
- More built-in payload parser adapters.
- Additional reliability patterns around retries/backoff.
- Optional observability integrations (metrics/tracing).
License
MIT. See LICENSE.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file asbflow-1.0.2.tar.gz.
File metadata
- Download URL: asbflow-1.0.2.tar.gz
- Upload date:
- Size: 32.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
bbb9c9162ffeb95270578b63de1dbb08bf37fec5a48938b3d924bd06bb7ce4c7
|
|
| MD5 |
e537bb9b2798c09948710ff8d226eecc
|
|
| BLAKE2b-256 |
136970d7e34f3f28e7d2cb1179293b5c2e3654159d413faa6dcdb56cba74d591
|
Provenance
The following attestation bundles were made for asbflow-1.0.2.tar.gz:
Publisher:
release.yml on iFoxz17/asbflow
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
asbflow-1.0.2.tar.gz -
Subject digest:
bbb9c9162ffeb95270578b63de1dbb08bf37fec5a48938b3d924bd06bb7ce4c7 - Sigstore transparency entry: 1224568978
- Sigstore integration time:
-
Permalink:
iFoxz17/asbflow@97a76a13f8e2af7ceca24a4a6b40d147e0b59534 -
Branch / Tag:
refs/heads/release - Owner: https://github.com/iFoxz17
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@97a76a13f8e2af7ceca24a4a6b40d147e0b59534 -
Trigger Event:
push
-
Statement type:
File details
Details for the file asbflow-1.0.2-py3-none-any.whl.
File metadata
- Download URL: asbflow-1.0.2-py3-none-any.whl
- Upload date:
- Size: 50.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f4dc58cf4a01a703334abcdb4033c252c6fe6dfcd9eb8828e6acf606d95fb527
|
|
| MD5 |
2ba1da375930196c01c01f89d1049338
|
|
| BLAKE2b-256 |
1548fb5e62ffdee695f1c597d0aa4817d8f1237cf559462601033058219d440d
|
Provenance
The following attestation bundles were made for asbflow-1.0.2-py3-none-any.whl:
Publisher:
release.yml on iFoxz17/asbflow
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
asbflow-1.0.2-py3-none-any.whl -
Subject digest:
f4dc58cf4a01a703334abcdb4033c252c6fe6dfcd9eb8828e6acf606d95fb527 - Sigstore transparency entry: 1224569021
- Sigstore integration time:
-
Permalink:
iFoxz17/asbflow@97a76a13f8e2af7ceca24a4a6b40d147e0b59534 -
Branch / Tag:
refs/heads/release - Owner: https://github.com/iFoxz17
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@97a76a13f8e2af7ceca24a4a6b40d147e0b59534 -
Trigger Event:
push
-
Statement type: