Skip to main content

AioPype - Flow based programming with asyncio

Project description

aiopype

Python asynchronous data pipelines

aiopype allows running continuous data pipelines reliably with a plain simple approach to their development.

Aiopype creates a centralized message handler to allow every processor to work as an independent non-blocking message producer/consumer.

Aiopype has 4 main concepts:

  • Flow

  • Manager

  • Processor

  • Message Handler

Flow

The Flow is aiopype’s main component. A flow is the entrypoint for reliability running pipeline managers.

Flow is responsible for:

  • Starting all registered managers

  • Handling manager failures

  • Reporting errors

  • Restarting failed managers

Manager

The manager is responsible for registering a data pipeline from top to bottom. This means it must register a source and connect it with it’s consumers, until the pipeline finally outputs.

Processor

A processor is a message consumer/producer.

Sources

Sources are special cases of processors. Their special characteristic is that they can run forever, and are the starting point of any pipeline.

Examples of sources may be:

  • A REST API poller

  • An Websocket client

  • A Cron job

Message handler

The message handler is the central piece that allows aiopype to scale.

A Flow will start one or more Sources as the starting point for each registered Manager. Once a Source produces an event, a message will be triggered and the handler will identify and fire the corresponding handlers.

There are two available message handlers:

  • SyncProtocol

  • AsyncProtocol

SyncProtocol

The synchronous event handler is, as its name suggests, synchronous, meaning that once the source emits a message, it must be handled until the end of the pipeline and the source can proceed with it’s normal behavior. This is good for development purposes but fails to meet the asynchronous event driven pattern required to allowing component isolation.

AsyncProtocol

The main difference between SyncProtocol and AsyncProtocol is that the latter uses a decoupled event loop to assess if there are new messages in the queue for processing, whilst the first simply starts processing received messages instantaneously. This allows total isolation of processors.

Example

Apple stock processor.

Source

Our source will be Yahoo Finance for gathering data from AAPL ticker price. We’ll use aiopype RestSource as a base class.

from aiopype.sources import RestSource


class YahooRestSource(RestSource):
  """
  Yahoo REST API source.
  """
  def __init__(self, name, handler, symbol):
    super().__init__(
      name,
      handler,
      'http://finance.yahoo.com/webservice/v1/symbols/{}/quote?format=json&view=detail'.format(symbol), {
        'exception_threshold': 10,
        'request_interval': 30
      }
    )

Processor

Our sample processor will simply extract the price from the returned json.

from aiopype import Processor


class HandleRawData(Processor):
  def handle(self, data, time):
    self.emit('price', time, data['list']['resources'][0]['resource']['fields']['price'])

Output

Our output processor will write price data onto a CSV File.

import csv


class CSVOutput(Processor):
  def __init__(self, name, handler, filename):
    super().__init__(name, handler)
    self.filename = filename

    with open(self.filename, 'w', newline = '') as csvfile:
      writer = csv.writer(csvfile, delimiter = ';')
      writer.writerow(['time', 'price'])

  def write(self, time, price):
    with open(self.filename, 'w', newline = '') as csvfile:
      writer = csv.writer(csvfile, delimiter = ';')
      writer.writerow([time, price])

Manager

The manager will instantiate Source, Processor and Output. It will connect Source’s data event to Processor.handle handler and Processor’s price event to Output.write handler. This will be our data pipeline.

from aiopype import Manager


class YahooManager(Manager):
  name = 'yahoo_apple'

  def __init__(self, handler):
    super().__init__(handler)
    self.processor = HandleRawData(self.build_processor_name('processor'), self.handler)
    self.source = YahooRestSource(self.build_processor_name('source'), self.handler, 'AAPL')
    self.writer = CSVOutput(self.build_processor_name('writer'), self.handler, 'yahoo_appl.csv')

    self.source.on('data', self.processor.handle)
    self.processor.on('price', self.writer.write)

Flow

Our flow config will have the yahoo_apple manager only.

from aiopype import AsyncFlow


class FlowConfig(object):
  FLOWS = ['yahoo_apple']

dataflow = AsyncFlow(FlowConfig())

Main method:

Will simply start the dataflow.

if __name__ == "__main__":
  dataflow.start()

Running the example

Compile all the above code in a file called example.py and run:

python example.py

Clusters

WIP:

This decentralized mechanism makes distributed pipelines a possibility, if we have coordination between nodes.

Changelog

0.1.2 / 2016-07-06

  • #6 Handle exceptions from async protocol listener (@jAlpedrinha)

0.1.1 / 2016-07-05

  • #4 Avoid failure on pusherclient disconnection (@jAlpedrinha)

0.1.0 / 2016-07-05

  • #1 Add flow manager and processors (@jAlpedrinha)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aiopype-0.1.2.tar.gz (23.4 kB view details)

Uploaded Source

File details

Details for the file aiopype-0.1.2.tar.gz.

File metadata

  • Download URL: aiopype-0.1.2.tar.gz
  • Upload date:
  • Size: 23.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No

File hashes

Hashes for aiopype-0.1.2.tar.gz
Algorithm Hash digest
SHA256 f85ac6552f8c97246a13ba02d35d2419cc460017e732637d0f975ad7e8dd5dc4
MD5 d805d0cd77d83b6cc8f439baa95735e7
BLAKE2b-256 924ce8269de3db35f63813cb9698b7ee3fefae2c0024b6e2b18edaa1fba19cde

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page