FugueSQL

FugueSQL can be used on top of Pandas, Spark and Dask. FugueSQL is parsed and then executed on top of the underlying engine.

import warnings
warnings.filterwarnings(action='ignore')
from fugue_notebook import setup
setup(is_lab=False)
import pandas as pd

df = pd.DataFrame({"col1": ["A","A","A","B","B","B"], "col2": [1,2,3,4,5,6]})
df2 = pd.DataFrame({"col1": ["A", "B"], "col3": [1, 2]})

Run FugueSQL

%%fsql
   SELECT df.col1, df.col2, df2.col3
     FROM df
LEFT JOIN df2
       ON df.col1 = df2.col1
    WHERE df.col1 = "A"
    PRINT
col1 col2 col3
0 A 1 1
1 A 2 1
2 A 3 1
schema: col1:str,col2:long,col3:long

Using FugueSQL dataframe in Python

%%fsql
SELECT *
  FROM df
 YIELD DATAFRAME AS result
print(type(result))
print(result.native.head())
<class 'fugue.dataframe.pandas_dataframe.PandasDataFrame'>
  col1  col2
0    A     1
1    A     2
2    A     3
3    B     4
4    B     5

Loading files

%%fsql
df = LOAD "../data/processed.parquet"

new = SELECT *
        FROM df
       YIELD DATAFRAME AS result
print(result.native)
   a  c
0  1  1
1  2  2
2  3  3

Common table expressions (CTEs) are also supported by FugueSQL

Using python code on SQL

f = pd.DataFrame({"col1": ["A","A","A","B","B","B"], "col2": [1,2,3,4,5,6]})
# schema: *+col2:float
def std_dev(df: pd.DataFrame) -> pd.DataFrame:
    return df.assign(col2=df['col2']/df['col2'].max())

The function above is defined to handle one group of data at a time. In order to apply it per group, we partition the DataFrame first by group using the PREPARTITION and TRANSFORM keywords of FugueSQL.

%%fsql
TRANSFORM df PREPARTITION BY col1 USING std_dev
PRINT
col1 col2
0 A 0.333333
1 A 0.666667
2 A 1.000000
3 B 0.666667
4 B 0.833333
5 B 1.000000
schema: col1:str,col2:float

Run SQL code using either Duckdb, Spark or Dask engine

Fugue supports Pandas, Spark, Dask, and DuckDB. For operations on a laptop or single machine, DuckDB may give significant improvements over Pandas because it has a query optimizer.

For data that is too large to process on a single machine, Spark or Dask can be used. All we need to do is specify the engine in the cell. For example, to run on DuckDB we can do:

%%fsql duckdb
TRANSFORM df PREPARTITION BY col1 USING std_dev
PRINT
col1 col2
0 A 0.333333
1 A 0.666667
2 A 1.000000
3 B 0.666667
4 B 0.833333
5 B 1.000000
schema: col1:str,col2:float
%%fsql spark
TRANSFORM df PREPARTITION BY col1 USING std_dev
PRINT
col1 col2
0 A 0.333333
1 A 0.666667
2 A 1.000000
3 B 0.666667
4 B 0.833333
5 B 1.000000
schema: col1:str,col2:float