Tools to sink kafka messages to a database table
Project description
dbsink
Read from a kafka topic and sink to a database table, one row per message.
This is not unlike the Kafka Connect JdbcConnector. This project has a much lower bar of entry and doesn't require diving into the Kafka Connect ecosystem. I wrote the equivilent to this project using a custom JdbcConnector and it was getting out of control and was basically un-testable. So here we are.
You can choose to unpack the data as avro
, msgpack
or the default json
. avro
requires an additional registry
parameter.
Docker images: https://hub.docker.com/r/axiom/dbsink/builds
WHY?
I needed to read from well-defined kafka topics and store the results in a database table so collaborators could interact with the data in a more familiar way.
It is also a very convienent and easy to setup PostgREST on top of the resulting tables to get a quick read-only REST API on top of the tabled messages.
Mapping messages to tables
You can define custom mappings between messages and tables using a python class. You may register your custom mappings with the dbsink.maps
entrypoint to have them available to dbsink
at run-time.
entry_points = {
'dbsink.maps': [
'YourCustomMap = you.custom.map.module:CustomMapClass',
# ...
]
}
Custom mapping classes should inherit from the BaseMap
class in dbsink
and override the following functions as needed:
-
upsert_constraint_name
- Name of the constraint to use for upserting data. Set to toNone
to disable upserting. Use this class property when creating the upsert constraint on your table (see example below). -
unique_index_name
- Unique index name based on the table name. Use this if defining a single unique index on your table. -
sequence_name
- Unique sequence name based on the table name. Use this if defining a single sequence column on your table. -
_check_key
- Checks for validity of a message'skey
before trying to sink. ReturnTrue
if valid and raise an error if not. -
_check_value
- Checks for validity of a message'svalue
before trying to sink. ReturnTrue
if valid and raise an error if not. -
schema
- A list of SQLAlchmy Column, Index, and Constraint schema definitions to use in table creation and updating. This fully describes your table's schema. -
message_to_values
- A function acceptingkey
andvalue
arguments and returning a tuplekey, dict
where the dict is thevalues
to pass to SQLAlchemy'sinsert().values
method. Thevalue
argument to this function will already be unpacked ifavro
ormsgpack
packing was specified.insert(table).values( # dict_returned_ends_up_here )
Example
A simple example is the StringMap
mapping included with dbsink
from datetime import datetime
import pytz
import sqlalchemy as sql
import simplejson as json
from dbsink.maps import BaseMap
class StringMap(BaseMap):
@property
def upsert_constraint_name(self):
return None # Ignore upserts
def _check_key(self, key):
return True # All keys are valid
def _check_value(self, value):
# Make sure values are JSON parsable
_ = json.loads(json.dumps(value, ignore_nan=True))
return True
@property
def schema(self):
return [
sql.Column('id', sql.Integer, sql.Sequence(self.sequence_name), primary_key=True),
sql.Column('sinked', sql.DateTime(timezone=True), index=True),
sql.Column('key', sql.String, default='', index=True),
sql.Column('payload', sql.String)
]
def message_to_values(self, key, value):
# Raises if invalid. This calls `._check_key` and `._check_value`
self.check(key, value)
values = {
'sinked': datetime.utcnow().replace(tzinfo=pytz.utc).isoformat(),
'key': key,
'payload': json.dumps(value),
}
return key, values
Advanced Example
There are no restrictions on table schemas or how you map the message data into the schema. Take this example below that uses a PostGIS
column.
from datetime import datetime
import pytz
import sqlalchemy as sql
import simplejson as json
from shapely.geometry import shape
from geoalchemy2.types import Geography
from dbsink.maps import BaseMap
class NamedGenericGeography(BaseMap):
def _check_key(self, key):
return True # All keys are valid
def _check_value(self, value):
# Make sure values are JSON parsable
_ = json.loads(json.dumps(value, ignore_nan=True))
return True
@property
def schema(self):
return [
sql.Column('id', sql.Integer, sql.Sequence(self.sequence_name), primary_key=True),
sql.Column('name', sql.String, default='', index=True),
sql.Column('time', sql.DateTime(timezone=True), index=True),
sql.Column('geom', Geography(srid=4326)),
sql.Index(
self.unique_index_name,
'name',
'time',
unique=True,
),
sql.UniqueConstraint(
'name',
'time',
name=self.upsert_constraint_name
)
]
def message_to_values(self, key, value):
""" Assumes a message format of
{
"time": 1000000000, # unix epoch
"name": "my cool thing",
"geojson": {
"geometry": {
"type": "Polygon",
"coordinates": [ [ [ -118.532116484818843, 32.107425500492766 ], [ -118.457544847012443, 32.107425500492702 ], [ -118.457544847012443, 32.054517056541435 ], [ -118.532116484818872, 32.054517056541464 ], [ -118.532116484818843, 32.107425500492766 ] ] ]
}
}
}
"""
# Raises if invalid
self.check(key, value)
# GeoJSON `geometry` attribute to WKT
geometry = shape(value['geojson']['geometry']).wkt
values = {
'name': value['name']
'time': datetime.fromtimestamp(value['time'], pytz.utc).isoformat()
'geom': geometry
}
return key, values
Configuration
This program uses Click
for the CLI interface. For all options please use the help
:
$ dbsink --help
Environmental Variables
All configuration options can be specified with environmental variables using the pattern DBSINK_[argument_name]=[value]
. For more information see the click documentation.
DBSINK_TOPIC="topic-to-listen-to" \
DBSINK_LOOKUP="StringMap" \
DBSINK_TABLE="MyCoolTable" \
DBSINK_CONSUMER="myconsumer" \
DBSINK_PACKING="msgpack" \
DBSINK_OFFSET="earlist" \
DBSINK_DROP="true" \
DBSINK_VERBOSE="1" \
dbsink
Testing
You can run the tests using pytest
. To run the integration tests, start a database with docker run -p 30300:5432 --name dbsink-int-testing-db -e POSTGRES_USER=sink -e POSTGRES_PASSWORD=sink -e POSTGRES_DB=sink -d mdillon/postgis:11
and run pytest -m integration
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file dbsink-2.6.0.tar.gz
.
File metadata
- Download URL: dbsink-2.6.0.tar.gz
- Upload date:
- Size: 17.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.23.0 setuptools/46.1.3 requests-toolbelt/0.9.1 tqdm/4.44.1 CPython/3.8.2
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 9b94b458e6887234bb7cd6e422f6cc41ea3dcbc758d829de5ad5aac8d16563ba |
|
MD5 | 2411f078c69a3e5482573a3a6b9d7e04 |
|
BLAKE2b-256 | 9ac0776ca96a601693e70c64a837519b1ffd919b0de3f77a6ee8578d14b70d06 |
File details
Details for the file dbsink-2.6.0-py3-none-any.whl
.
File metadata
- Download URL: dbsink-2.6.0-py3-none-any.whl
- Upload date:
- Size: 15.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.23.0 setuptools/46.1.3 requests-toolbelt/0.9.1 tqdm/4.44.1 CPython/3.8.2
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | c72d186e9877e292bd34fa6b71f8cb3a6b3fd3cd6de09f569a66a7ccb4df424a |
|
MD5 | cd51db33e8b81fd16dd1ee0a0571ff3c |
|
BLAKE2b-256 | 17e3d782fa3da27c8897ef6dfcdb0f55fd8b2b408f17e74e01b0d88feb8837ef |