generic NSQ → MariaDB transporter with per-topic Python mapper classes
Project description
nsq2mariadb
Generic transporter for moving JSON messages from NSQ topics into
MariaDB tables. You write one Python Mapper class per
topic; the framework handles the NSQ subscription, schema bootstrap, and
transactional inserts.
The shape mirrors nsq2arangodb —
but because MariaDB is schema-full, the per-topic schema and JSON-to-row
translation has to live in code rather than configuration.
Installation
From PyPI (normal use)
pip install nsq2mariadb
Pin a specific version in production:
pip install nsq2mariadb==0.1.2
Or in a requirements.txt:
nsq2mariadb==0.1.2
Requires Python 3.9+. Dependencies: pynsq, pymysql (both pulled in
automatically).
Dev install (working on nsq2mariadb itself)
git clone https://github.com/larsborn/nsq2mariadb.git
cd nsq2mariadb
python3 -m venv .venv
source .venv/bin/activate # Windows bash: source .venv/Scripts/activate
pip install -U pip
pip install -e . # install the package in editable mode
pip install build twine # only needed if you'll cut releases locally
Verify the install:
make test # runs the unittest suite
make version # prints the current version from setup.py
See CLAUDE.md for the full release flow and packaging notes.
Usage
For each NSQ topic you want to consume, subclass Mapper:
from nsq2mariadb import Mapper
class OrderMapper(Mapper):
topic = "orders"
schema_sql = """
CREATE TABLE IF NOT EXISTS `order` (
id INT PRIMARY KEY,
customer VARCHAR(255) NOT NULL,
inserted_at DATETIME NOT NULL
);
CREATE TABLE IF NOT EXISTS order_item (
order_id INT NOT NULL,
position SMALLINT NOT NULL,
sku VARCHAR(32) NOT NULL,
PRIMARY KEY (order_id, position),
FOREIGN KEY (order_id) REFERENCES `order`(id) ON DELETE CASCADE
);
"""
def transform(self, doc):
yield "order", {
"id": doc["id"],
"customer": doc["customer"],
"inserted_at": doc["inserted_at"],
}
for i, sku in enumerate(doc.get("items", [])):
yield "order_item", {"order_id": doc["id"], "position": i, "sku": sku}
Then wire it into the runner:
import logging
from nsq2mariadb import MariaDBConfig, Nsq2MariaDB, NsqConfig
logging.basicConfig(level=logging.INFO)
runner = Nsq2MariaDB(
logger=logging.getLogger("nsq2mariadb"),
mariadb_config=MariaDBConfig(
host="mariadb", port=3306,
user="orders", password="secret", database="orders",
),
nsq_config=NsqConfig(
address="nsq-nsqd-1", port=4150,
channel="nsq2mariadb",
),
mappers=[OrderMapper()],
)
runner.run()
On startup the framework opens one pymysql connection, executes every mapper's
schema_sql with CREATE TABLE IF NOT EXISTS semantics, and subscribes an
nsq.Reader per mapper. Each NSQ message is decoded, fanned out through the
mapper's transform(), and inserted in a single transaction with parameterized
INSERT IGNORE statements (so re-published messages are idempotent as long as
your primary key reflects content identity).
Error handling
| Failure mode | Behavior |
|---|---|
| JSON decode error | Logged with traceback, message FIN'd (dropped — won't fix). |
pymysql.MySQLError during insert |
Transaction rolled back, message FIN'd, traceback logged. |
| Connection drop | pymysql raises, propagates, process exits — relies on container restart. |
Schema mismatches (unknown column, missing table, FK violation) are programmer or schema bugs that loop forever if requeued, so we drop them loudly. Tune your log shipping accordingly.
Multiple topics per process
Pass several mappers to one Nsq2MariaDB instance — pynsq supports multiple
Readers in one IOLoop. They share the database connection but each runs an
independent NSQ subscription. Useful when a single project produces several
related topics (e.g. entries + runs) that you want to land in the same DB.
Releasing
Releases are auto-published to PyPI
by .github/workflows/publish.yml on every v* tag, via PyPI's Trusted
Publishers (OIDC — no API token stored in the repo).
To cut a release, from a clean main:
make release-patch # X.Y.Z -> X.Y.Z+1 (bug fixes)
make release-minor # X.Y.Z -> X.Y+1.0 (new features, backwards compatible)
make release-major # X.Y.Z -> X+1.0.0 (breaking changes)
Each target bumps setup.py, commits with chore: bump <version>, creates
a v<version> tag, and pushes both. The GitHub Actions workflow picks up the
tag and uploads to PyPI.
Use make dry-release-patch (etc.) to preview without modifying anything.
See CLAUDE.md for safety checks, prerequisites, and the
manual-fallback flow.
License
MIT.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file nsq2mariadb-0.1.3.tar.gz.
File metadata
- Download URL: nsq2mariadb-0.1.3.tar.gz
- Upload date:
- Size: 8.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
db5635667608c99f47d7d6da2b946c725f74f0eea9df02cb00fe42a8a8a8e843
|
|
| MD5 |
fcf2aa0e5d196e8aa1791c97fcc2cbf3
|
|
| BLAKE2b-256 |
c5db22401e90594a33cb74029ff42f2b14fbd14eee93f7daa30674e8922dcc86
|
Provenance
The following attestation bundles were made for nsq2mariadb-0.1.3.tar.gz:
Publisher:
publish.yml on larsborn/nsq2mariadb
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
nsq2mariadb-0.1.3.tar.gz -
Subject digest:
db5635667608c99f47d7d6da2b946c725f74f0eea9df02cb00fe42a8a8a8e843 - Sigstore transparency entry: 1621527136
- Sigstore integration time:
-
Permalink:
larsborn/nsq2mariadb@ccd34650068db556e71fe8a1146cabf35fd2f0a8 -
Branch / Tag:
refs/tags/v0.1.3 - Owner: https://github.com/larsborn
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@ccd34650068db556e71fe8a1146cabf35fd2f0a8 -
Trigger Event:
push
-
Statement type:
File details
Details for the file nsq2mariadb-0.1.3-py3-none-any.whl.
File metadata
- Download URL: nsq2mariadb-0.1.3-py3-none-any.whl
- Upload date:
- Size: 7.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
315896f65a442c1103d29a0305f00c7fab697629903ace9bd8468f0859e60ec9
|
|
| MD5 |
a650eb914b5f3fc7e6cc7fed2482cfd2
|
|
| BLAKE2b-256 |
8847747de7809aaa3a1c59280f211ba8809004b1f83ba34d97310e01c1cb9998
|
Provenance
The following attestation bundles were made for nsq2mariadb-0.1.3-py3-none-any.whl:
Publisher:
publish.yml on larsborn/nsq2mariadb
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
nsq2mariadb-0.1.3-py3-none-any.whl -
Subject digest:
315896f65a442c1103d29a0305f00c7fab697629903ace9bd8468f0859e60ec9 - Sigstore transparency entry: 1621527213
- Sigstore integration time:
-
Permalink:
larsborn/nsq2mariadb@ccd34650068db556e71fe8a1146cabf35fd2f0a8 -
Branch / Tag:
refs/tags/v0.1.3 - Owner: https://github.com/larsborn
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@ccd34650068db556e71fe8a1146cabf35fd2f0a8 -
Trigger Event:
push
-
Statement type: