generic NSQ → MariaDB transporter with per-topic Python mapper classes
Project description
nsq2mariadb
Generic transporter for moving JSON messages from NSQ topics into
MariaDB tables. You write one Python Mapper class per
topic; the framework handles the NSQ subscription, schema bootstrap, and
transactional inserts.
The shape mirrors nsq2arangodb —
but because MariaDB is schema-full, the per-topic schema and JSON-to-row
translation has to live in code rather than configuration.
Installation
pip install git+https://github.com/larsborn/nsq2mariadb.git@v0.1.0
Or for local development:
git clone https://github.com/larsborn/nsq2mariadb.git
cd nsq2mariadb
python3 -m venv .venv
source .venv/bin/activate
pip install -U pip
pip install -e .
Requires Python 3.9+. Dependencies: pynsq, pymysql.
Usage
For each NSQ topic you want to consume, subclass Mapper:
from nsq2mariadb import Mapper
class OrderMapper(Mapper):
topic = "orders"
schema_sql = """
CREATE TABLE IF NOT EXISTS `order` (
id INT PRIMARY KEY,
customer VARCHAR(255) NOT NULL,
inserted_at DATETIME NOT NULL
);
CREATE TABLE IF NOT EXISTS order_item (
order_id INT NOT NULL,
position SMALLINT NOT NULL,
sku VARCHAR(32) NOT NULL,
PRIMARY KEY (order_id, position),
FOREIGN KEY (order_id) REFERENCES `order`(id) ON DELETE CASCADE
);
"""
def transform(self, doc):
yield "order", {
"id": doc["id"],
"customer": doc["customer"],
"inserted_at": doc["inserted_at"],
}
for i, sku in enumerate(doc.get("items", [])):
yield "order_item", {"order_id": doc["id"], "position": i, "sku": sku}
Then wire it into the runner:
import logging
from nsq2mariadb import MariaDBConfig, Nsq2MariaDB, NsqConfig
logging.basicConfig(level=logging.INFO)
runner = Nsq2MariaDB(
logger=logging.getLogger("nsq2mariadb"),
mariadb_config=MariaDBConfig(
host="mariadb", port=3306,
user="orders", password="secret", database="orders",
),
nsq_config=NsqConfig(
address="nsq-nsqd-1", port=4150,
channel="nsq2mariadb",
),
mappers=[OrderMapper()],
)
runner.run()
On startup the framework opens one pymysql connection, executes every mapper's
schema_sql with CREATE TABLE IF NOT EXISTS semantics, and subscribes an
nsq.Reader per mapper. Each NSQ message is decoded, fanned out through the
mapper's transform(), and inserted in a single transaction with parameterized
INSERT IGNORE statements (so re-published messages are idempotent as long as
your primary key reflects content identity).
Error handling
| Failure mode | Behavior |
|---|---|
| JSON decode error | Logged with traceback, message FIN'd (dropped — won't fix). |
pymysql.MySQLError during insert |
Transaction rolled back, message FIN'd, traceback logged. |
| Connection drop | pymysql raises, propagates, process exits — relies on container restart. |
Schema mismatches (unknown column, missing table, FK violation) are programmer or schema bugs that loop forever if requeued, so we drop them loudly. Tune your log shipping accordingly.
Multiple topics per process
Pass several mappers to one Nsq2MariaDB instance — pynsq supports multiple
Readers in one IOLoop. They share the database connection but each runs an
independent NSQ subscription. Useful when a single project produces several
related topics (e.g. entries + runs) that you want to land in the same DB.
Releasing
Releases are auto-published to PyPI
by .github/workflows/publish.yml on every v* tag, via PyPI's Trusted
Publishers (OIDC — no API token stored in the repo).
To cut a release:
- Bump the version in
setup.py. - Commit and push to
main. - Tag and push:
git tag v0.1.3 && git push origin v0.1.3.
The workflow builds an sdist + wheel and publishes them under the
pypi GitHub environment.
License
MIT.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file nsq2mariadb-0.1.2.tar.gz.
File metadata
- Download URL: nsq2mariadb-0.1.2.tar.gz
- Upload date:
- Size: 8.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0365ae01c7df2e6491ce7c588c1e4337dea7316c98cb0dfa4a87026a9e67f8ea
|
|
| MD5 |
5dadc8e0b110fa0b022d0e488ca829f4
|
|
| BLAKE2b-256 |
0107b683de0a180bf193d169b7253d82e5fbcb71be821ad2f50b30c2669cd160
|
File details
Details for the file nsq2mariadb-0.1.2-py3-none-any.whl.
File metadata
- Download URL: nsq2mariadb-0.1.2-py3-none-any.whl
- Upload date:
- Size: 6.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f14b6e5ed11ea2aec1f80fbd59b09dd51d43a4942811adcf314a1e3fa7a67542
|
|
| MD5 |
852d88e927eccca27c7ee94788da16a5
|
|
| BLAKE2b-256 |
d3a434c0bbea0089f79e9ac960377efed4d521748a3875aa3f5848a882c71520
|