Skip to main content

BSPump is a real-time stream processor for Python 3.6+

Project description

Documentation Status Build status Code coverage Join the chat at https://gitter.im/TeskaLabs/bspump

Documentation

https://bitswanpump.readthedocs.io/en/latest/index.html

Principles

  • Write once, use many times

  • Everything is a stream

  • Schema-less

  • Kappa architecture

  • Real-Time

  • High performance

  • Simple to use and well documented, so anyone can write their own stream processor

  • Asynchronous via Python 3.5+ async/await and asyncio

  • Event driven Architecture / Reactor pattern

  • Single-threaded core but compatible with threads

  • Compatible with pypy, Just-In-Time compiler capable of boosting Python code performace more then 5x times

  • Good citizen of the Python ecosystem

  • Modularized

Stream processor example

#!/usr/bin/env python3
import bspump
import bspump.socket
import bspump.common
import bspump.elasticsearch

class MyPipeline(bspump.Pipeline):
    def __init__(self, app):
        super().__init__(app)
        self.build(
            bspump.socket.TCPStreamSource(app, self),
            bspump.common.JSONParserProcessor(app, self),
            bspump.elasticsearch.ElasticSearchSink(app, self, "ESConnection")
        )


if __name__ == '__main__':
    app = bspump.BSPumpApplication()
    svc = app.get_service("bspump.PumpService")
    svc.add_connection(bspump.elasticsearch.ElasticSearchConnection(app, "ESConnection"))
    svc.add_pipeline(MyPipeline(app))
    app.run()

Video tutorial

http://img.youtube.com/vi/QvjiPxO4w6w/0.jpg

Build

Docker build

Dockerfile and instructions are in separate repository.

PyPI release

Releases are happening from a git tag (format: vYY.MM) git tag -a v19.07

Following the PyPI packaging, generate distribution package and upload it using following command python -m twine upload dist/*

Blank application setup

You can clone blank application from it’s own repository.

Available technologies

  • bspump.amqp AMQP/RabbitMQ connection, source and sink

  • bspump.avro Apache Avro file source and sink

  • bspump.common Common processors and parsers

  • bspump.elasticsearch ElasticSearch connection, source and sink

  • bspump.file File sources and sinks (plain files, JSON, CSV)

  • bspump.filter Content, Attribute and TimeDrift filters

  • bspump.http.client HTTP client source, WebSocket client sink

  • bspump.http.web HTTP server source and sink, WebSocket server source

  • bspump.influxdb InfluxDB connection and sink

  • bspump.kafka Kafka connection, source and sink

  • bspump.mail SMTP connection and sink

  • bspump.mongodb MongoDB connection and lookup

  • bspump.mysql MySQL connection, source and sink

  • bspump.parquet Apache Parquet file sink

  • bspump.postgresql PostgreSQL connection and sink

  • bspump.slack Slack connection and sink

  • bspump.socket TCP source, UDP source

  • bspump.trigger Opportunistic, PubSub and Periodic triggers

  • bspump.crypto Cryptography

  • bspump.declarative Declarative processors and expressions

    • Hashing: SHA224, SHA256, SHA384, SHA512, SHA1, MD5, BLAKE2b, BLAKE2s

    • Symmetric Encryption: AES 128, AES 192, AES 256

  • bspump.analyzer

    • Time Window analyzer

    • Session analyzer

    • Geographical analyzer

    • Time Drift analyzer

  • bspump.lookup

    • GeoIP Lookup

  • bspump.unittest

    • Interface for testing Processors / Pipelines

  • bspump.web Pump API endpoints for pipelines, lookups etc.

Google Sheet with technological compatiblity matrix: https://docs.google.com/spreadsheets/d/1L1DvSuHuhKUyZ3FEFxqEKNpSoamPH2Z1ZaFuHyageoI/edit?usp=sharing

High-level architecture

Schema of BSPump high-level achitecture

Unit test

from unittest.mock import MagicMock
from bspump.unittest import ProcessorTestCase


class MyProcessorTestCase(ProcessorTestCase):

    def test_my_processor(self):

        # setup processor for test
        self.set_up_processor(my_project.processor.MyProcessor, "proc-arg", proc="key_arg")

        # mock methods to suit your needs on pipeline ..
        self.Pipeline.method = MagicMock()

        # .. or instance of processor
        self.Pipeline.Processor.method = MagicMock()

        output = self.execute(
            [(None, {'foo': 'bar'})]  # Context, event
        )

        # assert output
        self.assertEqual(
            [event for context, event in output],
            [{'FOO': 'BAR'}]
        )

        # asssert expected calls on `self.Pipeline.method` or `self.Pipeline.Processor.method`
        self.Pipeline.Processor.method.assert_called_with(**expected)

Running of unit tests

python3 -m unittest test

You can replace test with a location of your unit test module.

Licence

BSPump is an open-source software, available under BSD 3-Clause License.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

bspump-21.11.tar.gz (182.5 kB view details)

Uploaded Source

Built Distribution

bspump-21.11-py3-none-any.whl (284.8 kB view details)

Uploaded Python 3

File details

Details for the file bspump-21.11.tar.gz.

File metadata

  • Download URL: bspump-21.11.tar.gz
  • Upload date:
  • Size: 182.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.6.0 importlib_metadata/4.8.2 pkginfo/1.7.1 requests/2.25.1 requests-toolbelt/0.9.1 tqdm/4.62.3 CPython/3.8.10

File hashes

Hashes for bspump-21.11.tar.gz
Algorithm Hash digest
SHA256 796d98dd3ca3dbdb65c68a993151abb3f857cc32b5f4f73803ba6c129a974a35
MD5 6f8d45677d83f54c07527f4d8a05aa29
BLAKE2b-256 0d287d13b6d9d2ea1305f6315c14e728bcd446648742f848783bca56f2090e62

See more details on using hashes here.

File details

Details for the file bspump-21.11-py3-none-any.whl.

File metadata

  • Download URL: bspump-21.11-py3-none-any.whl
  • Upload date:
  • Size: 284.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.6.0 importlib_metadata/4.8.2 pkginfo/1.7.1 requests/2.25.1 requests-toolbelt/0.9.1 tqdm/4.62.3 CPython/3.8.10

File hashes

Hashes for bspump-21.11-py3-none-any.whl
Algorithm Hash digest
SHA256 f67cc43b3ce4ce951bd6a82896d4f09d01d7db7983bd1d05c65a1d6ecc13c249
MD5 551c7c3ffd6a40e5b15d5343b1482bc5
BLAKE2b-256 25e6ae9020be7709fb12a8fdf049e4e99aaa15689af6fabc5fb0d7a4b1b16297

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page