Skip to main content

Scramjet is a simple reactive stream programming framework.

Project description

Scramjet in Python

GitHub license version GitHub stars Donate

⭐ Star us on GitHub — it motivates us a lot! 🚀

Scramjet Framework

Scramjet is a simple reactive stream programming framework. The code is written by chaining functions that transform the streamed data, including well known map, filter and reduce.

The main advantage of Scramjet is running asynchronous operations on your data streams concurrently. It allows you to perform the transformations both synchronously and asynchronously by using the same API - so now you can "map" your stream from whatever source and call any number of API's consecutively.

Originally written on top of node.js object streams, Scramjet is now being ported into Python. This is what is happening in this repository.

Tested with Python 3.8.10 and Ubuntu 20.04.

Table of contents

Installation

Since this is a pre-release version it is not available as a pip package yet. However, it can be used in your requirements.txt file by referring to this git repository:

pip install scramjet-framework-py

After adding Scramjet Framework as dependency, it needs to be instaled via pip.

Usage

Basic building block of Scramjet is the Stream class. It reads input in chunks, performs operations on these chunks and produces an iterable output that can be collected and written somewhere.

Creating a stream is done using read_from class method. It accepts any iterable or an object implementing .read() method as the input, and returns a Stream instance.

Transforming a stream:

  • map - transform each chunk in a stream using specified function.
  • filter - keep only chunks for which specified function evaluates to True.
  • flatmap - run specified function on each chunk, and return all of its results as separate chunks.
  • batch - convert a stream of chunks into a stream of lists of chunks.

Each of these methods return the modified stream, so they can be chained like this: some_stream.map(...).filter(...).batch(...)

Collecting data from the stream (asynchronous):

  • write_to - write all resulting stream chunks into a target.
  • to_list - return a list with all stream chunks.
  • reduce - combine all chunks using specified function.

Examples :books:

Let's say we have a fruits.csv file like this:

orange,sweet,1
lemon,sour,2
pigface,salty,5
banana,sweet,3
cranberries,bitter,6

and we want to write the names of the sweet fruits to a separate file. To do this, write an async function like this:

with open("misc/fruits.csv") as file_in, open("sweet.txt", "w") as file_out:
    await (
        Stream
        .read_from(file_in)
        .map(lambda line: line.split(','))
        .filter(lambda record: record[1] == "sweet")
        .map(lambda record: f"{record[0]}\n")
        .write_to(file_out)
    )

and that's it!

You can find more examples in hello_datastream.py file. They don't require any additional dependencies, just the standard library, so you can run them simply with:

python hello_datastream.py

Requesting Features

Anything missing? Or maybe there is something which would make using Scramjet Framework much easier or efficient? Don't hesitate to fill up a new feature request! We really appreciate all feedback.

Reporting bugs

If you have found a bug, inconsistent or confusing behavior please fill up a new bug report.

Contributing

You can contribute to this project by giving us feedback (reporting bugs and requesting features) and also by writing code yourself!

The easiest way is to create a fork of this repository and then create a pull request with all your changes. In most cases, you should branch from and target main branch.

Please refer to Development Setup section on how to setup this project.

Development Setup

  1. Install Python3 interpreter on your computer. Refer to official docs.

  2. Install git version control system. Refer to official docs.

  3. Clone this repository:

git clone git@github.com:scramjetorg/framework-python.git
  1. Create and activate a virtualenv:
sudo apt install python3-virtualenv
virtualenv -p python3 venv
.venv/bin/activate
  1. Check Python version:
$ python --version
Python 3.8.10
  1. Install dependencies:
pip install -r dev-requirements.txt
  1. Run test cases (with activated virtualenv):
pytest

:bulb: HINT: add a filename if you want to limit which tests are run

  1. If you want to enable detailed debug logging, set one of the following env variables:
PYFCA_DEBUG=1       # debug pyfca
DATASTREAM_DEBUG=1  # debug datastream
SCRAMJET_DEBUG=1    # debug both

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

scramjet-framework-py-0.10.tar.gz (26.0 kB view details)

Uploaded Source

Built Distribution

scramjet_framework_py-0.10-py3-none-any.whl (11.8 kB view details)

Uploaded Python 3

File details

Details for the file scramjet-framework-py-0.10.tar.gz.

File metadata

  • Download URL: scramjet-framework-py-0.10.tar.gz
  • Upload date:
  • Size: 26.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.0 CPython/3.9.7

File hashes

Hashes for scramjet-framework-py-0.10.tar.gz
Algorithm Hash digest
SHA256 f74f12983da2aa21847315d4753635e88c29351192b91a158ae6187ef9874446
MD5 7be8e0563fc9731ea869c3102831d063
BLAKE2b-256 d6c671ed12b67b60ef237fb2da4d5869eb1c7a6764493be6ff63e31ec679e609

See more details on using hashes here.

File details

Details for the file scramjet_framework_py-0.10-py3-none-any.whl.

File metadata

File hashes

Hashes for scramjet_framework_py-0.10-py3-none-any.whl
Algorithm Hash digest
SHA256 0e0c463499c022ec7bf8e9a213bf475e1878ddf883202802fe05a6f0931769b0
MD5 a6c4305d327258abfe07476f7bc46630
BLAKE2b-256 25b508fa84edbaaddda694e92071d1c9f3e7d7c2fe88c99125af35d20e4ee206

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page