Skip to main content

generic NSQ → MariaDB transporter with per-topic Python mapper classes

Project description

nsq2mariadb

Generic transporter for moving JSON messages from NSQ topics into MariaDB tables. You write one Python Mapper class per topic; the framework handles the NSQ subscription, schema bootstrap, and transactional inserts.

The shape mirrors nsq2arangodb — but because MariaDB is schema-full, the per-topic schema and JSON-to-row translation has to live in code rather than configuration.

Installation

From PyPI (normal use)

pip install nsq2mariadb

Pin a specific version in production:

pip install nsq2mariadb==0.1.2

Or in a requirements.txt:

nsq2mariadb==0.1.2

Requires Python 3.9+. Dependencies: pynsq, pymysql (both pulled in automatically).

Dev install (working on nsq2mariadb itself)

git clone https://github.com/larsborn/nsq2mariadb.git
cd nsq2mariadb
python3 -m venv .venv
source .venv/bin/activate     # Windows bash: source .venv/Scripts/activate
pip install -U pip
pip install -e .              # install the package in editable mode
pip install build twine       # only needed if you'll cut releases locally

Verify the install:

make test       # runs the unittest suite
make version    # prints the current version from setup.py

See CLAUDE.md for the full release flow and packaging notes.

Usage

For each NSQ topic you want to consume, subclass Mapper:

from nsq2mariadb import Mapper

class OrderMapper(Mapper):
    topic = "orders"
    schema_sql = """
        CREATE TABLE IF NOT EXISTS `order` (
            id          INT PRIMARY KEY,
            customer    VARCHAR(255) NOT NULL,
            inserted_at DATETIME     NOT NULL
        );
        CREATE TABLE IF NOT EXISTS order_item (
            order_id INT          NOT NULL,
            position SMALLINT     NOT NULL,
            sku      VARCHAR(32)  NOT NULL,
            PRIMARY KEY (order_id, position),
            FOREIGN KEY (order_id) REFERENCES `order`(id) ON DELETE CASCADE
        );
    """

    def transform(self, doc):
        yield "order", {
            "id":          doc["id"],
            "customer":    doc["customer"],
            "inserted_at": doc["inserted_at"],
        }
        for i, sku in enumerate(doc.get("items", [])):
            yield "order_item", {"order_id": doc["id"], "position": i, "sku": sku}

Then wire it into the runner:

import logging
from nsq2mariadb import MariaDBConfig, Nsq2MariaDB, NsqConfig

logging.basicConfig(level=logging.INFO)

runner = Nsq2MariaDB(
    logger=logging.getLogger("nsq2mariadb"),
    mariadb_config=MariaDBConfig(
        host="mariadb", port=3306,
        user="orders", password="secret", database="orders",
    ),
    nsq_config=NsqConfig(
        address="nsq-nsqd-1", port=4150,
        channel="nsq2mariadb",
    ),
    mappers=[OrderMapper()],
)
runner.run()

On startup the framework opens one pymysql connection, executes every mapper's schema_sql with CREATE TABLE IF NOT EXISTS semantics, and subscribes an nsq.Reader per mapper. Each NSQ message is decoded, fanned out through the mapper's transform(), and inserted in a single transaction with parameterized INSERT IGNORE statements (so re-published messages are idempotent as long as your primary key reflects content identity).

Error handling

Failure mode Behavior
JSON decode error Logged with traceback, message FIN'd (dropped — won't fix).
pymysql.MySQLError during insert Transaction rolled back, message FIN'd, traceback logged.
Connection drop pymysql raises, propagates, process exits — relies on container restart.

Schema mismatches (unknown column, missing table, FK violation) are programmer or schema bugs that loop forever if requeued, so we drop them loudly. Tune your log shipping accordingly.

Multiple topics per process

Pass several mappers to one Nsq2MariaDB instance — pynsq supports multiple Readers in one IOLoop. They share the database connection but each runs an independent NSQ subscription. Useful when a single project produces several related topics (e.g. entries + runs) that you want to land in the same DB.

Releasing

Releases are auto-published to PyPI by .github/workflows/publish.yml on every v* tag, via PyPI's Trusted Publishers (OIDC — no API token stored in the repo).

To cut a release, from a clean main:

make release-patch   # X.Y.Z -> X.Y.Z+1   (bug fixes)
make release-minor   # X.Y.Z -> X.Y+1.0   (new features, backwards compatible)
make release-major   # X.Y.Z -> X+1.0.0   (breaking changes)

Each target bumps setup.py, commits with chore: bump <version>, creates a v<version> tag, and pushes both. The GitHub Actions workflow picks up the tag and uploads to PyPI.

Use make dry-release-patch (etc.) to preview without modifying anything.

See CLAUDE.md for safety checks, prerequisites, and the manual-fallback flow.

License

MIT.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

nsq2mariadb-0.1.3.tar.gz (8.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

nsq2mariadb-0.1.3-py3-none-any.whl (7.0 kB view details)

Uploaded Python 3

File details

Details for the file nsq2mariadb-0.1.3.tar.gz.

File metadata

  • Download URL: nsq2mariadb-0.1.3.tar.gz
  • Upload date:
  • Size: 8.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for nsq2mariadb-0.1.3.tar.gz
Algorithm Hash digest
SHA256 db5635667608c99f47d7d6da2b946c725f74f0eea9df02cb00fe42a8a8a8e843
MD5 fcf2aa0e5d196e8aa1791c97fcc2cbf3
BLAKE2b-256 c5db22401e90594a33cb74029ff42f2b14fbd14eee93f7daa30674e8922dcc86

See more details on using hashes here.

Provenance

The following attestation bundles were made for nsq2mariadb-0.1.3.tar.gz:

Publisher: publish.yml on larsborn/nsq2mariadb

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file nsq2mariadb-0.1.3-py3-none-any.whl.

File metadata

  • Download URL: nsq2mariadb-0.1.3-py3-none-any.whl
  • Upload date:
  • Size: 7.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for nsq2mariadb-0.1.3-py3-none-any.whl
Algorithm Hash digest
SHA256 315896f65a442c1103d29a0305f00c7fab697629903ace9bd8468f0859e60ec9
MD5 a650eb914b5f3fc7e6cc7fed2482cfd2
BLAKE2b-256 8847747de7809aaa3a1c59280f211ba8809004b1f83ba34d97310e01c1cb9998

See more details on using hashes here.

Provenance

The following attestation bundles were made for nsq2mariadb-0.1.3-py3-none-any.whl:

Publisher: publish.yml on larsborn/nsq2mariadb

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page