Skip to main content

An abstraction layer for distributed computation

Project description

Fugue

PyPI version PyPI pyversions PyPI license codecov Codacy Badge

Documentation Tutorials Chat with us on slack!
Doc Jupyter Book Badge Slack Status

Fugue is a unified interface for distributed computing that lets users execute Python, pandas, and SQL code on Spark and Dask without rewrites. It is meant for:

  • Data scientists/analysts who want to focus on defining logic rather than worrying about execution
  • SQL-lovers wanting to use SQL to define end-to-end workflows in pandas, Spark, and Dask
  • Data scientists using pandas wanting to take advantage of Spark or Dask with minimal effort
  • Big data practitioners finding testing code to be costly and slow
  • Data teams with big data projects that struggle maintaining code

Select Features

  • Cross-framework code: Write code once in native Python, SQL, or pandas then execute it on Dask or Spark with no rewrites. Logic and execution are decoupled through Fugue, enabling users to leverage the Spark and Dask engines without learning the specific framework syntax.
  • Rapid iterations for big data projects: Test code on smaller data, then reliably scale to Dask or Spark when ready. This accelerates project iteration time and reduces expensive mistakes.
  • Friendlier interface for Spark: Users can get Python/pandas code running on Spark with significantly less effort compared to PySpark. FugueSQL extends SparkSQL to be a more complete programming language.
  • Highly testable code: Fugue makes logic more testable because all code is written in native Python. Unit tests scale seamlessly from local workflows to distributed computing workflows.

Fugue Transform

The simplest way to use Fugue is the transform() function. This lets users parallelize the execution of a single function by bringing it to Spark or Dask. In the example below, the map_letter_to_food() function takes in a mapping and applies it on a column. This is just pandas and Python so far (without Fugue).

import pandas as pd
from typing import Dict

input_df = pd.DataFrame({"id":[0,1,2], "value": (["A", "B", "C"])})
map_dict = {"A": "Apple", "B": "Banana", "C": "Carrot"}

def map_letter_to_food(df: pd.DataFrame, mapping: Dict[str, str]) -> pd.DataFrame:
    df["value"] = df["value"].map(mapping)
    return df

Now, the map_letter_to_food() function is brought to the Spark execution engine by invoking the transform function of Fugue. The output schema, params and engine are passed to the transform() call. The schema is needed because it's a requirement on Spark. A schema of "*" below means all input columns are in the output.

from fugue import transform
from fugue_spark import SparkExecutionEngine

df = transform(input_df,
               map_letter_to_food,
               schema="*",
               params=dict(mapping=map_dict),
               engine=SparkExecutionEngine
            )
df.show()
+---+------+
| id| value|
+---+------+
|  0| Apple|
|  1|Banana|
|  2|Carrot|
+---+------+
PySpark equivalent of Fugue transform
from typing import Iterator, Union
from pyspark.sql.types import StructType
from pyspark.sql import DataFrame, SparkSession

spark_session = SparkSession.builder.getOrCreate()

def mapping_wrapper(dfs: Iterator[pd.DataFrame], mapping):
  for df in dfs:
      yield map_letter_to_food(df, mapping)

def run_map_letter_to_food(input_df: Union[DataFrame, pd.DataFrame], mapping):
  # conversion
  if isinstance(input_df, pd.DataFrame):
      sdf = spark_session.createDataFrame(input_df.copy())
  else:
      sdf = input_df.copy()

  schema = StructType(list(sdf.schema.fields))
  return sdf.mapInPandas(lambda dfs: mapping_wrapper(dfs, mapping),
                          schema=schema)

result = run_map_letter_to_food(input_df, map_dict)
result.show()

This syntax is simpler, cleaner, and more maintainable than the PySpark equivalent. At the same time, no edits were made to the original pandas-based function to bring it to Spark. It is still usable on pandas DataFrames. Because the Spark execution engine was used, the returned df is now a Spark DataFrame. Fugue transform() also supports DaskExecutionEngine and the pandas-based NativeExecutionEngine.

FugueSQL

A SQL-based language capable of expressing end-to-end workflows. The map_letter_to_food() function above is used in the SQL expression below. This is how to use a Python-defined transformer along with the standard SQL SELECT statement.

from fugue_sql import fsql
import json

query = """
    SELECT id, value FROM input_df
    TRANSFORM USING map_letter_to_food(mapping={{mapping}}) SCHEMA *
    PRINT
    """
map_dict_str = json.dumps(map_dict)

fsql(query,mapping=map_dict_str).run()

For FugueSQL, we can change the engine by passing it to the run() method: fsql(query,mapping=map_dict_str).run("spark").

Jupyter Notebook Extension

There is an accompanying notebook extension for FugueSQL that lets users use the %%fsql cell magic. The extension also provides syntax highlighting for FugueSQL cells. (Syntax highlighting is not available yet for JupyterLab).

FugueSQL gif

The notebook environment can be setup by using the setup() function as follows in the first cell of a notebook:

from fugue_notebook import setup
setup()

Note that you can automatically load fugue_notebook iPython extension at startup, read this to configure your Jupyter environment.

Installation

Fugue can be installed through pip by using:

pip install fugue

It also has the following extras:

For example a common use case is:

pip install fugue[sql,spark]

To install the notebook extension (after installing Fugue):

jupyter nbextension install --py fugue_notebook
jupyter nbextension enable fugue_notebook --py

Getting Started

The best way to get started with Fugue is to work through the tutorials.

The tutorials can also be run in an interactive notebook environment through binder or Docker:

Using binder

Binder

Note it runs slow on binder because the machine on binder isn't powerful enough for a distributed framework such as Spark. Parallel executions can become sequential, so some of the performance comparison examples will not give you the correct numbers.

Using Docker

Alternatively, you should get decent performance by running this Docker image on your own machine:

docker run -p 8888:8888 fugueproject/tutorials:latest

For the API docs, click here

Further Resources

View some of our latest conferences presentations and content. For a more complete list, check the Resources page in the tutorials.

Blogs

Conferences

Community and Contributing

Feel free to message us on Slack. We also have contributing instructions.

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

fugue-0.6.4.dev2.tar.gz (336.9 kB view details)

Uploaded Source

Built Distribution

fugue-0.6.4.dev2-py3-none-any.whl (410.8 kB view details)

Uploaded Python 3

File details

Details for the file fugue-0.6.4.dev2.tar.gz.

File metadata

  • Download URL: fugue-0.6.4.dev2.tar.gz
  • Upload date:
  • Size: 336.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.4.2 importlib_metadata/4.2.0 pkginfo/1.7.1 requests/2.26.0 requests-toolbelt/0.9.1 tqdm/4.62.3 CPython/3.7.12

File hashes

Hashes for fugue-0.6.4.dev2.tar.gz
Algorithm Hash digest
SHA256 0c685da377489dd41e78236ad8086dc8e67838f9fd79ffd0b874f9876cfc08f3
MD5 3149f5a24b2c30261723fdae5b274adf
BLAKE2b-256 a4f0fd6bd9986654aaf9d0494cb93159700c27376a7f4c9ce3be364b6874d428

See more details on using hashes here.

File details

Details for the file fugue-0.6.4.dev2-py3-none-any.whl.

File metadata

  • Download URL: fugue-0.6.4.dev2-py3-none-any.whl
  • Upload date:
  • Size: 410.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.4.2 importlib_metadata/4.2.0 pkginfo/1.7.1 requests/2.26.0 requests-toolbelt/0.9.1 tqdm/4.62.3 CPython/3.7.12

File hashes

Hashes for fugue-0.6.4.dev2-py3-none-any.whl
Algorithm Hash digest
SHA256 e1353fc8ee6a619959817052ea0fd75bf4bfd928c34f93cd1c8119cff9b23b1c
MD5 bd053941a9ff2c08d564dbe0759f6e3b
BLAKE2b-256 99dd64457fd0e9b0f5b7acccb973b3fcb068fb0a7dbc754acc9d8522b31c00a2

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page