Tools to sink kafka messages to a database table
Project description
dbsink
Read from a kafka topic and sink to a database table, one row per message.
This is not unlike the Kafka Connect JdbcConnector. This project has a much lower bar of entry and doesn't require diving into the Kafka Connect ecosystem. I wrote the equivilent to this project using a custom JdbcConnector and it was getting out of control and was basically un-testable. So here we are.
You can choose to unpack the data as avro, msgpack or the default json. avro requires an additional registry parameter.
Docker images: https://hub.docker.com/r/axiom/dbsink/builds
WHY?
I needed to read from well-defined kafka topics and store the results in a database table so collaborators could interact with the data in a more familiar way.
It is also a very convienent and easy to setup PostgREST on top of the resulting tables to get a quick read-only REST API on top of the tabled messages.
Mapping messages to tables
You can define custom mappings between messages and tables using a python class. You may register your custom mappings with the dbsink.maps entrypoint to have them available to dbsink at run-time.
entry_points = {
'dbsink.maps': [
'YourCustomMap = you.custom.map.module:CustomMapClass',
# ...
]
}
Custom mapping classes should inherit from the BaseMap class in dbsink and override the following functions as needed:
-
upsert_constraint_name- Name of the constraint to use for upserting data. Set to toNoneto disable upserting. Use this class property when creating the upsert constraint on your table (see example below). -
unique_index_name- Unique index name based on the table name. Use this if defining a single unique index on your table. -
sequence_name- Unique sequence name based on the table name. Use this if defining a single sequence column on your table. -
_check_key- Checks for validity of a message'skeybefore trying to sink. ReturnTrueif valid and raise an error if not. -
_check_value- Checks for validity of a message'svaluebefore trying to sink. ReturnTrueif valid and raise an error if not. -
schema- A list of SQLAlchmy Column, Index, and Constraint schema definitions to use in table creation and updating. This fully describes your table's schema. -
message_to_values- A function acceptingkeyandvaluearguments and returning a tuplekey, dictwhere the dict is thevaluesto pass to SQLAlchemy'sinsert().valuesmethod. Thevalueargument to this function will already be unpacked ifavroormsgpackpacking was specified.insert(table).values( # dict_returned_ends_up_here )
Example
A simple example is the StringMap mapping included with dbsink
from datetime import datetime
import pytz
import sqlalchemy as sql
import simplejson as json
from dbsink.maps import BaseMap
class StringMap(BaseMap):
@property
def upsert_constraint_name(self):
return None # Ignore upserts
def _check_key(self, key):
return True # All keys are valid
def _check_value(self, value):
# Make sure values are JSON parsable
_ = json.loads(json.dumps(value, ignore_nan=True))
return True
@property
def schema(self):
return [
sql.Column('id', sql.Integer, sql.Sequence(self.sequence_name), primary_key=True),
sql.Column('sinked', sql.DateTime(timezone=True), index=True),
sql.Column('key', sql.String, default='', index=True),
sql.Column('payload', sql.String)
]
def message_to_values(self, key, value):
# Raises if invalid. This calls `._check_key` and `._check_value`
self.check(key, value)
values = {
'sinked': datetime.utcnow().replace(tzinfo=pytz.utc).isoformat(),
'key': key,
'payload': json.dumps(value),
}
return key, values
Advanced Example
There are no restrictions on table schemas or how you map the message data into the schema. Take this example below that uses a PostGIS column.
from datetime import datetime
import pytz
import sqlalchemy as sql
import simplejson as json
from shapely.geometry import shape
from geoalchemy2.types import Geography
from dbsink.maps import BaseMap
class NamedGenericGeography(BaseMap):
def _check_key(self, key):
return True # All keys are valid
def _check_value(self, value):
# Make sure values are JSON parsable
_ = json.loads(json.dumps(value, ignore_nan=True))
return True
@property
def schema(self):
return [
sql.Column('id', sql.Integer, sql.Sequence(self.sequence_name), primary_key=True),
sql.Column('name', sql.String, default='', index=True),
sql.Column('time', sql.DateTime(timezone=True), index=True),
sql.Column('geom', Geography(srid=4326)),
sql.Index(
self.unique_index_name,
'name',
'time',
unique=True,
),
sql.UniqueConstraint(
'name',
'time',
name=self.upsert_constraint_name
)
]
def message_to_values(self, key, value):
""" Assumes a message format of
{
"time": 1000000000, # unix epoch
"name": "my cool thing",
"geojson": {
"geometry": {
"type": "Polygon",
"coordinates": [ [ [ -118.532116484818843, 32.107425500492766 ], [ -118.457544847012443, 32.107425500492702 ], [ -118.457544847012443, 32.054517056541435 ], [ -118.532116484818872, 32.054517056541464 ], [ -118.532116484818843, 32.107425500492766 ] ] ]
}
}
}
"""
# Raises if invalid
self.check(key, value)
# GeoJSON `geometry` attribute to WKT
geometry = shape(value['geojson']['geometry']).wkt
values = {
'name': value['name']
'time': datetime.fromtimestamp(value['time'], pytz.utc).isoformat()
'geom': geometry
}
return key, values
Configuration
This program uses Click for the CLI interface. For all options please use the help:
$ dbsink --help
Environmental Variables
All configuration options can be specified with environmental variables using the pattern DBSINK_[argument_name]=[value]. For more information see the click documentation.
DBSINK_TOPIC="topic-to-listen-to" \
DBSINK_LOOKUP="StringMap" \
DBSINK_TABLE="MyCoolTable" \
DBSINK_CONSUMER="myconsumer" \
DBSINK_PACKING="msgpack" \
DBSINK_OFFSET="earlist" \
DBSINK_DROP="true" \
DBSINK_VERBOSE="1" \
dbsink
Testing
You can run the tests using pytest. To run the integration tests, start a database with docker run -p 30300:5432 --name dbsink-int-testing-db -e POSTGRES_USER=sink -e POSTGRES_PASSWORD=sink -e POSTGRES_DB=sink -d mdillon/postgis:11 and run pytest -m integration
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file dbsink-2.6.0.tar.gz.
File metadata
- Download URL: dbsink-2.6.0.tar.gz
- Upload date:
- Size: 17.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.23.0 setuptools/46.1.3 requests-toolbelt/0.9.1 tqdm/4.44.1 CPython/3.8.2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9b94b458e6887234bb7cd6e422f6cc41ea3dcbc758d829de5ad5aac8d16563ba
|
|
| MD5 |
2411f078c69a3e5482573a3a6b9d7e04
|
|
| BLAKE2b-256 |
9ac0776ca96a601693e70c64a837519b1ffd919b0de3f77a6ee8578d14b70d06
|
File details
Details for the file dbsink-2.6.0-py3-none-any.whl.
File metadata
- Download URL: dbsink-2.6.0-py3-none-any.whl
- Upload date:
- Size: 15.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.23.0 setuptools/46.1.3 requests-toolbelt/0.9.1 tqdm/4.44.1 CPython/3.8.2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
c72d186e9877e292bd34fa6b71f8cb3a6b3fd3cd6de09f569a66a7ccb4df424a
|
|
| MD5 |
cd51db33e8b81fd16dd1ee0a0571ff3c
|
|
| BLAKE2b-256 |
17e3d782fa3da27c8897ef6dfcdb0f55fd8b2b408f17e74e01b0d88feb8837ef
|