Skip to main content

An abstraction layer for distributed computation

Project description

Fugue

PyPI version PyPI pyversions PyPI license Doc Coverage Status Codacy Badge

Slack Status

Fugue is a pure abstraction layer that makes code portable across differing computing frameworks such as Pandas, Spark and Dask.

  • Framework-agnostic code: Write code once in native Python. Fugue makes it runnable on Pandas, Dask or Spark with minimal changes. Logic and code is decoupled from frameworks, even from Fugue itself. Fugue adapts user's code, as well as the underlying computing frameworks.
  • Rapid iterations for big data projects: Test code on smaller data, then reliably scale to Dask or Spark when ready. This drastically improves project iteration time and saves cluster expense. This lessens the frequency spinning up clusters to test code, and reduces expensive mistakes.
  • Friendlier interface for Spark: Fugue handles some optimizations on Spark, making it easier for big data practitioners to focus on logic. A lot of Fugue users see performance gains in their Spark jobs. Fugue SQL extends Spark SQL to be a programming language.
  • Highly testable code: Fugue naturally makes logic more testable because the code is in native Python. Unit tests scale seamlessly from local workflows to distributed computing workflows.

Who is it for?

  • Big data practitioners looking to reduce compute costs and increase project velocity
  • Data practitioners who keep switching between data processing frameworks (Pandas, Spark, Dask)
  • Data engineers scaling data pipelines to handle bigger data in a consistent and reliable way
  • Data practitioners looking to write more testable code
  • Spark/Dask users who want to have an easier experience working with distributed computing
  • People who love using SQL. Fugue SQL extends standard SQL to be a programming language

Key Features

Here is an example Fugue code snippet that illustrates some of the key features of the framework. A fillna function creates a new column named filled, which is the same as the column value except that the None values are filled.

from fugue import FugueWorkflow
from typing import Iterable, Dict, Any, List

# Creating sample data
data = [
    ["A", "2020-01-01", 10],
    ["A", "2020-01-02", None],
    ["A", "2020-01-03", 30],
    ["B", "2020-01-01", 20],
    ["B", "2020-01-02", None],
    ["B", "2020-01-03", 40]
]
schema = "id:str,date:date,value:double"

# schema: *, filled:double
def fillna(df:Iterable[Dict[str,Any]], value:float=0) -> Iterable[Dict[str,Any]]:
    for row in df:
        row["filled"] = (row["value"] or value)
        yield row

with FugueWorkflow() as dag:
    df1 = dag.df(data, schema).transform(fillna)
    df1.show()

Catch errors faster

Fugue builds a directed acyclic graph (DAG) before running code, allowing users to receive errors faster. This catches more errors before expensive jobs are run on a cluster. For example, mismatches in specified schema will raise errors. In the code above, the schema hint comment is read and the schema is enforced during execution. Schema is required for Fugue extensions.

Cross-platform execution

Notice that the fillna function written above is purely in native Python. The code will still run without Fugue. Fugue lets users write code in Python, and then port the logic to Pandas, Spark, or Dask. Users can focus on the logic, rather than on what engine it will be executed. To bring it to Spark, simply pass the SparkExecutionEngine into the FugueWorkflow as follows.

from fugue_spark import SparkExecutionEngine

with FugueWorkflow(SparkExecutionEngine) as dag:
    df1 = dag.df(data, schema).transform(fillna)
    df1.show()

Similarly for Dask, we can pass the DaskExecutionEngine into the FugueWorkflow instead.

Spark optimizations

Fugue makes Spark easier to use for people starting with distributed computing. For example, Fugue uses the constructed DAG to smartly auto-persist dataframes used multiple times. This often speeds up Spark jobs of users.

Access to framework configuration

Even if Fugue tries to simplify the experience of using distributed computing frameworks, it does not restrict users from editing configuration when needed. For example, the Spark session can be configured with the following:

from pyspark.sql import SparkSession
from fugue_spark import SparkExecutionEngine

spark_session = (SparkSession
                 .builder
                 .config("spark.executor.cores",4)
                 .config("fugue.dummy","dummy")
                 .getOrCreate())

engine = SparkExecutionEngine(spark_session, {"additional_conf":"abc"})

Fugue SQL

A SQL-based language capable of expressing end-to-end workflows. The fillna code above is equivalent to the code below. This is how to use a Python-defined transformer along with the standard SQL SELECT statement.

with FugueSQLWorkflow() as dag:
    df1 = dag.df(data, schema)
    dag("""
    SELECT id, date, value FROM df1
    TRANSFORM USING fillna (value=10)
    PRINT
    """)

Alternatively, there is a simpler way:

df1 = ArrayDataFrame(data, schema)
fsql("""
    SELECT id, date, value FROM df1
    TRANSFORM USING fillna (value=10)
    PRINT
""").run()

Get started

To read the complete static docs, click here

The best way to start is to go through the tutorials. We have the tutorials in an interactive notebook environent.

Run the tutorial using binder:

Binder

But it runs slow on binder, the machine on binder isn't powerful enough for a distributed framework such as Spark. Parallel executions can become sequential, so some of the performance comparison examples will not give you the correct numbers.

Run the tutorial using docker

Alternatively, you should get decent performance if running its docker image on your own machine:

docker run -p 8888:8888 fugueproject/tutorials:latest

Installation

pip install fugue

Fugue has these extras:

For example a common use case is:

pip install fugue[sql,spark]

Jupyter Notebook Extension (since 0.5.1)

pip install fugue
jupyter nbextension install --py fugue_notebook
jupyter nbextension enable fugue_notebook --py

After installing the Jupyter extension, you can have %%fsql magic cells, where the Fugue SQL inside the cell will be highlighted.

We are also able to run this fsql magic cell if you load the ipython extension, here is an example:

In cell 1

%load_ext fugue_notebook

In cell 2

%%fsql
CREATE [[0]] SCHEMA a:int
PRINT

In cell 3 where you want to use dask

%%fsql dask
CREATE [[0]] SCHEMA a:int
PRINT

Note that you can automatically load fugue_notebook ipthon extension at startup, read this to configure your jupyter environment.

There is an ad-hoc way to setup your notebook environment, you don't need to install anything or change the startup script. You only need to do the following at the first cell of each of your notebook, and you will get highlights and %%fsql cells become runnable too:

from fugue_notebook import setup
setup()

Contributing

Feel free to message us on Slack. We also have contributing instructions.

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

fugue-0.5.3.tar.gz (299.5 kB view details)

Uploaded Source

Built Distribution

fugue-0.5.3-py3-none-any.whl (368.7 kB view details)

Uploaded Python 3

File details

Details for the file fugue-0.5.3.tar.gz.

File metadata

  • Download URL: fugue-0.5.3.tar.gz
  • Upload date:
  • Size: 299.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.4.1 importlib_metadata/4.0.1 pkginfo/1.7.0 requests/2.25.1 requests-toolbelt/0.9.1 tqdm/4.60.0 CPython/3.7.10

File hashes

Hashes for fugue-0.5.3.tar.gz
Algorithm Hash digest
SHA256 109f5e4b7cc75db8f736b0ee08c75f22beb3db7182e9b0dfb17d8c54ef3cd4c6
MD5 b244773740221eb99ecc82c5640c32f6
BLAKE2b-256 dd96236f63778e8b0acc154bc96007ee8b602966e14da641d9975c1b65b8648a

See more details on using hashes here.

File details

Details for the file fugue-0.5.3-py3-none-any.whl.

File metadata

  • Download URL: fugue-0.5.3-py3-none-any.whl
  • Upload date:
  • Size: 368.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.4.1 importlib_metadata/4.0.1 pkginfo/1.7.0 requests/2.25.1 requests-toolbelt/0.9.1 tqdm/4.60.0 CPython/3.7.10

File hashes

Hashes for fugue-0.5.3-py3-none-any.whl
Algorithm Hash digest
SHA256 950611192e1bfaf8e91916e587b4be45a697c6c9098e82d34b26531811fd9b87
MD5 8f833ac4e427ce910962d6d71298ea48
BLAKE2b-256 1d6a0db73cf707851a4a58cdaa7a04122ac75a1ea1d9d1361106769256b601ba

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page