import warnings
='ignore') warnings.filterwarnings(action
Fugue Quickstart
Import the required libraries
from pyspark.sql import SparkSession
= SparkSession.builder.getOrCreate() spark
import pandas as pd
import numpy as np
from sklearn.linear_model import LinearRegression
from typing import List, Dict, Iterable, Any
Create a model in Sklearn and do predictions using Spark
= pd.DataFrame({"x_1": [1, 1, 2, 2], "x_2":[1, 2, 2, 3]})
X = np.dot(X, np.array([1, 2])) + 3
y = LinearRegression().fit(X, y) reg
# define our predict function
def predict(df: pd.DataFrame, model: LinearRegression) -> pd.DataFrame:
"""
Function to predict results using a pre-built model
"""
return df.assign(predicted=model.predict(df))
# create test data
= pd.DataFrame({"x_1": [3, 4, 6, 6], "x_2":[3, 3, 6, 6]})
input_df
# test the predict function
predict(input_df, reg)
x_1 | x_2 | predicted | |
---|---|---|---|
0 | 3 | 3 | 12.0 |
1 | 4 | 3 | 13.0 |
2 | 6 | 6 | 21.0 |
3 | 6 | 6 | 21.0 |
# import Fugue
from fugue import transform
# create a spark dataframe
= spark.createDataFrame(input_df)
sdf
# use Fugue transform to switch exection to spark
= transform(
result =sdf,
df=predict,
using="*,predicted:double",
schema=dict(model=reg),
params=spark
engine
)
# display results
print(type(result))
result.show()
<class 'pyspark.sql.dataframe.DataFrame'>
[Stage 2:==========================================> (8 + 3) / 11]
+---+---+------------------+
|x_1|x_2| predicted|
+---+---+------------------+
| 3| 3| 12.0|
| 4| 3| 13.0|
| 6| 6|20.999999999999996|
| 6| 6|20.999999999999996|
+---+---+------------------+
Do the predictions in Dask
# using transform to bring predict to dask execution
= transform(
result =input_df.copy(),
df=predict,
using="*,predicted:double",
schema=dict(model=reg),
params="dask"
engine
)
# display results
print(type(result))
result.compute().head()
<class 'dask.dataframe.core.DataFrame'>
x_1 | x_2 | predicted | |
---|---|---|---|
0 | 3 | 3 | 12.0 |
0 | 4 | 3 | 13.0 |
0 | 6 | 6 | 21.0 |
0 | 6 | 6 | 21.0 |
Return the output as a Pandas Dataframe
# use as_local=True to return a Pandas DataFrame
= transform(
local_result =input_df,
df=predict,
using="*,predicted:double",
schema=dict(model=reg),
params="dask",
engine=True
as_local
)
print(type(local_result))
local_result.head()
<class 'pandas.core.frame.DataFrame'>
x_1 | x_2 | predicted | |
---|---|---|---|
0 | 3 | 3 | 12.0 |
1 | 4 | 3 | 13.0 |
2 | 6 | 6 | 21.0 |
3 | 6 | 6 | 21.0 |
Type Hints
The input type annotation tells Fugue what to convert the input data to before the function is applied whereas the output type annotation informs Fugue how to convert it back to a Pandas, Spark, Dask, or Ray DataFrame.
Schema
When using transform() function, the best practice is to provide schema definition.When using the transform(), the * in a schema expression means all existing columns. From there we can add new columns by adding “,column_name:type”
= pd.DataFrame({"a": [1,2,3], "b": [1,2,3], "c": [1,2,3]}) df
def add_col(df: pd.DataFrame) -> pd.DataFrame:
"""
Function that creates a column with a value of column a + 1.
"""
return df.assign(new_col=df["a"] + 1)
transform(=df,
df=add_col,
using="*,new_col:int"
schema )
a | b | c | new_col | |
---|---|---|---|---|
0 | 1 | 1 | 1 | 2 |
1 | 2 | 2 | 2 | 3 |
2 | 3 | 3 | 3 | 4 |
Partitioning
The type hint conversion is applied on the partition level.
= pd.DataFrame({"a": [1,2,3,4], "b": [1,2,3,4], "c": [1,2,3,4]})
df
def size(df: pd.DataFrame) -> Iterable[Dict[str,Any]]:
"""
Function that calculates the size of a DataFrame.
"""
yield {"size":df.shape[0]}
transform(=df,
df=size,
using="size:int",
schema="dask",
engine=True
as_local )
size | |
---|---|
0 | 1 |
1 | 1 |
2 | 1 |
3 | 1 |
The type hint conversion happens on each partition. We can control the partition by specifying the column.
= pd.DataFrame({"col1": ["a","a","a","b","b","b"],
df "col2": [1,2,3,4,5,6]})
df
col1 | col2 | |
---|---|---|
0 | a | 1 |
1 | a | 2 |
2 | a | 3 |
3 | b | 4 |
4 | b | 5 |
5 | b | 6 |
def min_max(df:pd.DataFrame) -> List[Dict[str,Any]]:
"""
Calculates the min and max of a given column based
on the grouping of a separate column.
"""
return [{"group": df.iloc[0]["col1"],
"max": df['col2'].max(),
"min": df['col2'].min()}]
transform(=df,
df=min_max,
using="group:str, max:int, min:int",
schema={"by": "col1"}
partition )
group | max | min | |
---|---|---|---|
0 | a | 3 | 1 |
1 | b | 6 | 4 |
We can use transform() operation to save the output as a parquet file
= pd.DataFrame({"a": [1,2,3], "b": [1,2,3], "c": [1,2,3]})
df "../data/df.parquet") df.to_parquet(
def drop_col(df: pd.DataFrame) -> pd.DataFrame:
"""
A function that drops a column labelled 'b'.
"""
return df.drop("b", axis=1)
transform(="../data/df.parquet",
df=drop_col,
using="*-b",
schema=spark,
engine="../data/processed.parquet"
save_path
)
"../data/processed.parquet/").head() pd.read_parquet(
a | c | |
---|---|---|
0 | 1 | 1 |
1 | 2 | 2 |
2 | 3 | 3 |
This expression makes it easy for users to toggle between running Pandas with sampled data and using Spark, Dask or Ray on the full dataset.We can use transform() to distribute the processing of a single step in our process.