Fugue Quickstart

Import the required libraries

import warnings
warnings.filterwarnings(action='ignore')
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
import pandas as pd
import numpy as np
from sklearn.linear_model import LinearRegression
from typing import List, Dict, Iterable, Any

Create a model in Sklearn and do predictions using Spark


X = pd.DataFrame({"x_1": [1, 1, 2, 2], "x_2":[1, 2, 2, 3]})
y = np.dot(X, np.array([1, 2])) + 3
reg = LinearRegression().fit(X, y)
# define our predict function
def predict(df: pd.DataFrame, model: LinearRegression) -> pd.DataFrame:
    """
    Function to predict results using a pre-built model
    """
    return df.assign(predicted=model.predict(df))

# create test data
input_df = pd.DataFrame({"x_1": [3, 4, 6, 6], "x_2":[3, 3, 6, 6]})

# test the predict function
predict(input_df, reg)
x_1 x_2 predicted
0 3 3 12.0
1 4 3 13.0
2 6 6 21.0
3 6 6 21.0
# import Fugue
from fugue import transform

# create a spark dataframe
sdf = spark.createDataFrame(input_df)

# use Fugue transform to switch exection to spark
result = transform(
    df=sdf,
    using=predict,
    schema="*,predicted:double",
    params=dict(model=reg),
    engine=spark
)

# display results
print(type(result))
result.show()
<class 'pyspark.sql.dataframe.DataFrame'>
[Stage 2:==========================================>               (8 + 3) / 11]
+---+---+------------------+
|x_1|x_2|         predicted|
+---+---+------------------+
|  3|  3|              12.0|
|  4|  3|              13.0|
|  6|  6|20.999999999999996|
|  6|  6|20.999999999999996|
+---+---+------------------+
                                                                                

Do the predictions in Dask

# using transform to bring predict to dask execution
result = transform(
    df=input_df.copy(),
    using=predict,
    schema="*,predicted:double",
    params=dict(model=reg),
    engine="dask"
)

# display results
print(type(result))
result.compute().head()
<class 'dask.dataframe.core.DataFrame'>
x_1 x_2 predicted
0 3 3 12.0
0 4 3 13.0
0 6 6 21.0
0 6 6 21.0

Return the output as a Pandas Dataframe

# use as_local=True to return a Pandas DataFrame
local_result = transform(
    df=input_df,
    using=predict,
    schema="*,predicted:double",
    params=dict(model=reg),
    engine="dask",
    as_local=True
)

print(type(local_result))
local_result.head()
<class 'pandas.core.frame.DataFrame'>
x_1 x_2 predicted
0 3 3 12.0
1 4 3 13.0
2 6 6 21.0
3 6 6 21.0

Type Hints

The input type annotation tells Fugue what to convert the input data to before the function is applied whereas the output type annotation informs Fugue how to convert it back to a Pandas, Spark, Dask, or Ray DataFrame.

Schema

When using transform() function, the best practice is to provide schema definition.When using the transform(), the * in a schema expression means all existing columns. From there we can add new columns by adding “,column_name:type”

df = pd.DataFrame({"a": [1,2,3], "b": [1,2,3], "c": [1,2,3]})
def add_col(df: pd.DataFrame) -> pd.DataFrame:
    """
    Function that creates a column with a value of column a + 1.
    """
    return df.assign(new_col=df["a"] + 1)

transform(
    df=df, 
    using=add_col, 
    schema="*,new_col:int"
    )
a b c new_col
0 1 1 1 2
1 2 2 2 3
2 3 3 3 4

Partitioning

The type hint conversion is applied on the partition level.

df = pd.DataFrame({"a": [1,2,3,4], "b": [1,2,3,4], "c": [1,2,3,4]})

def size(df: pd.DataFrame) -> Iterable[Dict[str,Any]]:
    """
    Function that calculates the size of a DataFrame.
    """
    yield {"size":df.shape[0]}
transform(
    df=df, 
    using=size, 
    schema="size:int", 
    engine="dask",
    as_local=True
    )
size
0 1
1 1
2 1
3 1

The type hint conversion happens on each partition. We can control the partition by specifying the column.

df = pd.DataFrame({"col1": ["a","a","a","b","b","b"], 
                   "col2": [1,2,3,4,5,6]})
df
col1 col2
0 a 1
1 a 2
2 a 3
3 b 4
4 b 5
5 b 6
def min_max(df:pd.DataFrame) -> List[Dict[str,Any]]:
    """
    Calculates the min and max of a given column based
    on the grouping of a separate column.
    """
    return [{"group": df.iloc[0]["col1"], 
             "max": df['col2'].max(), 
             "min": df['col2'].min()}]
transform(
    df=df, 
    using=min_max, 
    schema="group:str, max:int, min:int",
    partition={"by": "col1"}
    )
group max min
0 a 3 1
1 b 6 4

We can use transform() operation to save the output as a parquet file

df = pd.DataFrame({"a": [1,2,3], "b": [1,2,3], "c": [1,2,3]})
df.to_parquet("../data/df.parquet")
def drop_col(df: pd.DataFrame) -> pd.DataFrame:
    """
    A function that drops a column labelled 'b'.
    """
    return df.drop("b", axis=1)

transform(
    df="../data/df.parquet",
    using=drop_col,
    schema="*-b",
    engine=spark,
    save_path="../data/processed.parquet"
    )

pd.read_parquet("../data/processed.parquet/").head()
                                                                                
a c
0 1 1
1 2 2
2 3 3

This expression makes it easy for users to toggle between running Pandas with sampled data and using Spark, Dask or Ray on the full dataset.We can use transform() to distribute the processing of a single step in our process.