Skip to main content

An abstraction layer for distributed computation

Project description

Fugue

PyPI version PyPI pyversions PyPI license codecov Codacy Badge Downloads

Tutorials API Documentation Chat with us on slack!
Jupyter Book Badge Doc Slack Status

Fugue is a unified interface for distributed computing that lets users execute Python, pandas, and SQL code on Spark, Dask, and Ray with minimal rewrites.

The most common use cases are:

  • Parallelizing or scaling existing Python and pandas code by bringing it to Spark, Dask, or Ray with minimal rewrites.
  • Using FugueSQL to define end-to-end workflows on top of pandas, Spark, and Dask DataFrames. FugueSQL is an enhanced SQL interface that can invoke Python code with added keywords.
  • Maintaining one codebase for pandas, Spark, Dask, and Ray projects. Logic and execution are decoupled through Fugue, enabling users to be focused on their business logic rather than writing framework-specific code.
  • Improving iteration speed of big data projects. Fugue seamlessly scales execution to big data after local development and testing. By removing PySpark code, unit tests can be written in Python or pandas and ran quickly without spinning up a cluster.

For a more comprehensive overview of Fugue, read this article.

Fugue transform()

The simplest way to use Fugue is the transform() function. This lets users parallelize the execution of a single function by bringing it to Spark, Dask, or Ray. In the example below, the map_letter_to_food() function takes in a mapping and applies it on a column. This is just pandas and Python so far (without Fugue).

import pandas as pd
from typing import Dict

input_df = pd.DataFrame({"id":[0,1,2], "value": (["A", "B", "C"])})
map_dict = {"A": "Apple", "B": "Banana", "C": "Carrot"}

def map_letter_to_food(df: pd.DataFrame, mapping: Dict[str, str]) -> pd.DataFrame:
    df["value"] = df["value"].map(mapping)
    return df

Now, the map_letter_to_food() function is brought to the Spark execution engine by invoking the transform() function of Fugue. The output schema, params and engine are passed to the transform() call. The schema is needed because it's a requirement for distributed frameworks. A schema of "*" below means all input columns are in the output.

from pyspark.sql import SparkSession
from fugue import transform

spark = SparkSession.builder.getOrCreate()

df = transform(input_df,
               map_letter_to_food,
               schema="*",
               params=dict(mapping=map_dict),
               engine=spark
               )
df.show()
+---+------+
| id| value|
+---+------+
|  0| Apple|
|  1|Banana|
|  2|Carrot|
+---+------+
PySpark equivalent of Fugue transform()
from typing import Iterator, Union
from pyspark.sql.types import StructType
from pyspark.sql import DataFrame, SparkSession

spark_session = SparkSession.builder.getOrCreate()

def mapping_wrapper(dfs: Iterator[pd.DataFrame], mapping):
  for df in dfs:
      yield map_letter_to_food(df, mapping)

def run_map_letter_to_food(input_df: Union[DataFrame, pd.DataFrame], mapping):
  # conversion
  if isinstance(input_df, pd.DataFrame):
      sdf = spark_session.createDataFrame(input_df.copy())
  else:
      sdf = input_df.copy()

  schema = StructType(list(sdf.schema.fields))
  return sdf.mapInPandas(lambda dfs: mapping_wrapper(dfs, mapping),
                          schema=schema)

result = run_map_letter_to_food(input_df, map_dict)
result.show()

This syntax is simpler, cleaner, and more maintainable than the PySpark equivalent. At the same time, no edits were made to the original pandas-based function to bring it to Spark. It is still usable on pandas DataFrames. Because the Spark execution engine was used, the returned df is now a Spark DataFrame. Fugue transform() also supports Dask and Ray as execution engines alongside the default pandas-based engine.

FugueSQL

FugueSQL is a SQL-based language capable of expressing end-to-end data workflows. The map_letter_to_food() function above is used in the SQL expression below. This is how to use a Python-defined function along with the standard SQL SELECT statement.

from fugue_sql import fsql
import json

query = """
    SELECT id, value FROM input_df
    TRANSFORM USING map_letter_to_food(mapping={{mapping}}) SCHEMA *
    PRINT
    """
map_dict_str = json.dumps(map_dict)

fsql(query,mapping=map_dict_str).run()

For FugueSQL, we can change the engine by passing it to the run() method: fsql(query,mapping=map_dict_str).run(spark).

Installation

Fugue can be installed through pip or conda. For example:

pip install fugue

It also has the following installation extras:

  • spark: to support Spark as the ExecutionEngine
  • dask: to support Dask as the ExecutionEngine.
  • ray: to support Ray as the ExecutionEngine.
  • duckdb: to support DuckDB as the ExecutionEngine, read details.
  • ibis: to enable Ibis for Fugue workflows, read details.
  • cpp_sql_parser: to enable the CPP antlr parser for Fugue SQL. It can be 50+ times faster than the pure Python parser. For the main Python versions and platforms, there is already pre-built binaries, but for the remaining, it needs a C++ compiler to build on the fly.

For example a common use case is:

pip install fugue[duckdb,spark]

Note if you already installed Spark or DuckDB independently, Fugue is able to automatically use them without installing the extras.

Getting Started

The best way to get started with Fugue is to work through the 10 minute tutorials:

The tutorials can also be run in an interactive notebook environment through binder or Docker:

Using binder

Binder

Note it runs slow on binder because the machine on binder isn't powerful enough for a distributed framework such as Spark. Parallel executions can become sequential, so some of the performance comparison examples will not give you the correct numbers.

Using Docker

Alternatively, you should get decent performance by running this Docker image on your own machine:

docker run -p 8888:8888 fugueproject/tutorials:latest

Jupyter Notebook Extension

There is an accompanying notebook extension for FugueSQL that lets users use the %%fsql cell magic. The extension also provides syntax highlighting for FugueSQL cells. It works for both classic notebook and Jupyter Lab. More details can be found in the installation instructions.

FugueSQL gif

Ecosystem

By being an abstraction layer, Fugue can be used with a lot of other open-source projects seamlessly.

Python backends:

FugueSQL backends:

  • Pandas - FugueSQL can run on Pandas
  • Duckdb - in-process SQL OLAP database management
  • dask-sql - SQL interface for Dask
  • SparkSQL

Fugue is available as a backend or can integrate with the following projects:

Further Resources

View some of our latest conferences presentations and content. For a more complete list, check the Content page in the tutorials.

Case Studies

Blogs

Conferences

Community and Contributing

Feel free to message us on Slack. We also have contributing instructions.

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

fugue-0.8.0.dev3.tar.gz (264.8 kB view details)

Uploaded Source

Built Distribution

fugue-0.8.0.dev3-py3-none-any.whl (352.0 kB view details)

Uploaded Python 3

File details

Details for the file fugue-0.8.0.dev3.tar.gz.

File metadata

  • Download URL: fugue-0.8.0.dev3.tar.gz
  • Upload date:
  • Size: 264.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.8.15

File hashes

Hashes for fugue-0.8.0.dev3.tar.gz
Algorithm Hash digest
SHA256 4b32310243a189a3fea7571df545c692cb553c0e637875ad9a3590c9246b9a60
MD5 fc033311ddcc81619cf08026aea43c82
BLAKE2b-256 4511bbd7f4a7014ee0a656bc0654ed78611ada39505a6492bc84b64fd3d3e20d

See more details on using hashes here.

File details

Details for the file fugue-0.8.0.dev3-py3-none-any.whl.

File metadata

  • Download URL: fugue-0.8.0.dev3-py3-none-any.whl
  • Upload date:
  • Size: 352.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.8.15

File hashes

Hashes for fugue-0.8.0.dev3-py3-none-any.whl
Algorithm Hash digest
SHA256 78ffe879c96e84c69e2bb4082ea54aa167ce316fc4c7ef4aa346eea239ce4419
MD5 0dceb9abf786f3ee07ecf7bb2039872c
BLAKE2b-256 c44eaefae9dde2f63370701861fba58d57636f8e3b2c681cf059665338756e24

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page