Skip to main content

AioPype - Flow based programming with asyncio

Project description

aiopype

Python asynchronous data pipelines

aiopype allows running continuous data pipelines reliably with a plain simple approach to their development.

Aiopype creates a centralized message handler to allow every processor to work as an independent non-blocking message producer/consumer.

Aiopype has 4 main concepts:

  • Flow

  • Manager

  • Processor

  • Message Handler

Flow

The Flow is aiopype’s main component. A flow is the entrypoint for reliability running pipeline managers.

Flow is responsible for:

  • Starting all registered managers

  • Handling manager failures

  • Reporting errors

  • Restarting failed managers

Manager

The manager is responsible for registering a data pipeline from top to bottom. This means it must register a source and connect it with it’s consumers, until the pipeline finally outputs.

Processor

A processor is a message consumer/producer.

Sources

Sources are special cases of processors. Their special characteristic is that they can run forever, and are the starting point of any pipeline.

Examples of sources may be:

  • A REST API poller

  • An Websocket client

  • A Cron job

Message handler

The message handler is the central piece that allows aiopype to scale.

A Flow will start one or more Sources as the starting point for each registered Manager. Once a Source produces an event, a message will be triggered and the handler will identify and fire the corresponding handlers.

There are two available message handlers:

  • SyncProtocol

  • AsyncProtocol

SyncProtocol

The synchronous event handler is, as its name suggests, synchronous, meaning that once the source emits a message, it must be handled until the end of the pipeline and the source can proceed with it’s normal behavior. This is good for development purposes but fails to meet the asynchronous event driven pattern required to allowing component isolation.

AsyncProtocol

The main difference between SyncProtocol and AsyncProtocol is that the latter uses a decoupled event loop to assess if there are new messages in the queue for processing, whilst the first simply starts processing received messages instantaneously. This allows total isolation of processors.

Example

Apple stock processor.

Source

Our source will be Yahoo Finance for gathering data from AAPL ticker price. We’ll use aiopype RestSource as a base class.

from aiopype.sources import RestSource


class YahooRestSource(RestSource):
  """
  Yahoo REST API source.
  """
  def __init__(self, name, handler, symbol):
    super().__init__(
      name,
      handler,
      'http://finance.yahoo.com/webservice/v1/symbols/{}/quote?format=json&view=detail'.format(symbol), {
        'exception_threshold': 10,
        'request_interval': 30
      }
    )

Processor

Our sample processor will simply extract the price from the returned json.

from aiopype import Processor


class HandleRawData(Processor):
  def handle(self, data, time):
    self.emit('price', time, data['list']['resources'][0]['resource']['fields']['price'])

Output

Our output processor will write price data onto a CSV File.

import csv


class CSVOutput(Processor):
  def __init__(self, name, handler, filename):
    super().__init__(name, handler)
    self.filename = filename

    with open(self.filename, 'w', newline = '') as csvfile:
      writer = csv.writer(csvfile, delimiter = ';')
      writer.writerow(['time', 'price'])

  def write(self, time, price):
    with open(self.filename, 'w', newline = '') as csvfile:
      writer = csv.writer(csvfile, delimiter = ';')
      writer.writerow([time, price])

Manager

The manager will instantiate Source, Processor and Output. It will connect Source’s data event to Processor.handle handler and Processor’s price event to Output.write handler. This will be our data pipeline.

from aiopype import Manager


class YahooManager(Manager):
  name = 'yahoo_apple'

  def __init__(self, handler):
    super().__init__(handler)
    self.processor = HandleRawData(self.build_processor_name('processor'), self.handler)
    self.source = YahooRestSource(self.build_processor_name('source'), self.handler, 'AAPL')
    self.writer = CSVOutput(self.build_processor_name('writer'), self.handler, 'yahoo_appl.csv')

    self.source.on('data', self.processor.handle)
    self.processor.on('price', self.writer.write)

Flow

Our flow config will have the yahoo_apple manager only.

from aiopype import AsyncFlow


class FlowConfig(object):
  FLOWS = ['yahoo_apple']

dataflow = AsyncFlow(FlowConfig())

Main method:

Will simply start the dataflow.

if __name__ == "__main__":
  dataflow.start()

Running the example

Compile all the above code in a file called example.py and run:

python example.py

Clusters

WIP:

This decentralized mechanism makes distributed pipelines a possibility, if we have coordination between nodes.

Changelog

0.1.4 / 2016-07-14

  • #10 Avoid unfinished flows (@jAlpedrinha)

0.1.3 / 2016-07-11

  • #8 Fix AsyncProtocol termination condition (@jAlpedrinha)

0.1.2 / 2016-07-06

  • #6 Handle exceptions from async protocol listener (@jAlpedrinha)

0.1.1 / 2016-07-05

  • #4 Avoid failure on pusherclient disconnection (@jAlpedrinha)

0.1.0 / 2016-07-05

  • #1 Add flow manager and processors (@jAlpedrinha)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aiopype-0.1.4.tar.gz (24.0 kB view details)

Uploaded Source

File details

Details for the file aiopype-0.1.4.tar.gz.

File metadata

  • Download URL: aiopype-0.1.4.tar.gz
  • Upload date:
  • Size: 24.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No

File hashes

Hashes for aiopype-0.1.4.tar.gz
Algorithm Hash digest
SHA256 8f971ff82a3948c9a4557046480d7774762458ac4d79d0ad008e3d6096db694c
MD5 f8cedb94ac81ece658ce25a103d98c36
BLAKE2b-256 92b6418438dad9ba5d79450a7f978ee9782bd27d54ac0cc5d74fd25c5bf69759

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page