Pandas API on Spark

import warnings
warnings.filterwarnings("ignore")
import numpy as np
import pandas as pd
import pyspark.pandas as ps
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
# Enabling pyarrow will speed up the operations
# spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "false")

Creating a Pandas-on-Spark DataFrame

  • Ensure that pandas version is below 2.0.0
  • If this is not working then disable arrow optimization and see if it works
psdf = ps.DataFrame(
        {'Name': ['John', 'Jane', 'Bob', 'Emily'],
        'Age': [32, 28, 45, 38],
        'Salary': [50000, 60000, 70000, 80000]},
        index = [0, 1, 2, 3])
type(psdf)
pyspark.pandas.frame.DataFrame
psdf
                                                                                
Name Age Salary
0 John 32 50000
1 Jane 28 60000
2 Bob 45 70000
3 Emily 38 80000

Create a pandas-on-spark df from Pandas Df

dates = pd.date_range('20130101', periods=12)
dates
DatetimeIndex(['2013-01-01', '2013-01-02', '2013-01-03', '2013-01-04',
               '2013-01-05', '2013-01-06', '2013-01-07', '2013-01-08',
               '2013-01-09', '2013-01-10', '2013-01-11', '2013-01-12'],
              dtype='datetime64[ns]', freq='D')
pdf = pd.DataFrame(np.random.randn(12, 4), index=dates, columns=list('ABCD'))
pdf
A B C D
2013-01-01 0.504450 -0.647873 0.501804 1.529663
2013-01-02 1.017610 0.576785 0.852659 0.958808
2013-01-03 0.491779 0.221775 0.393922 -0.408026
2013-01-04 -1.445449 -1.336367 1.300324 -0.837560
2013-01-05 0.514641 0.338978 0.469635 -1.078154
2013-01-06 -0.587010 0.230809 0.594970 0.737837
2013-01-07 -1.088185 -1.151821 -0.649112 0.252638
2013-01-08 -0.741795 -0.505476 0.901358 -0.883953
2013-01-09 0.174919 1.079475 -0.324199 0.580551
2013-01-10 -0.554698 0.717864 0.246687 0.468863
2013-01-11 0.080391 0.991928 -0.895202 -0.377923
2013-01-12 0.925349 -3.217782 0.121457 0.379868
psdf = ps.from_pandas(pdf)
type(psdf)
pyspark.pandas.frame.DataFrame

Creating a pandas-on-spark DataFrame from a Spark DataFrame

  • pandas-on-spark behaves the same as a pandas dataframe
  • Spark DataFrames are different from pandas-on-spark dataframe
  • We can create a pandas-on-spark dataframe from spark dataframe
sdf = spark.createDataFrame(pdf)
type(sdf)
pyspark.sql.dataframe.DataFrame
sdf.show()
+-------------------+-------------------+-------------------+--------------------+
|                  A|                  B|                  C|                   D|
+-------------------+-------------------+-------------------+--------------------+
| 0.5044503970719988|-0.6478725002220486| 0.5018039867740487|  1.5296631312371947|
| 1.0176100017752658| 0.5767848285691982| 0.8526593238367942|  0.9588083914653783|
| 0.4917793190647551|0.22177470182967668|0.39392215574863115|-0.40802597968113213|
| -1.445449391380651|-1.3363673281973991|   1.30032389460604| -0.8375597718269587|
| 0.5146410948195219| 0.3389783463920524|0.46963549095271956| -1.0781543404783653|
|-0.5870101238455766|  0.230808796563566| 0.5949702674680807|  0.7378368815369615|
| -1.088185073708579| -1.151821497439794| -0.649112008379282| 0.25263810007989795|
|-0.7417945676129846|-0.5054756939534057| 0.9013577759531805| -0.8839527075280547|
|0.17491903257936225| 1.0794753492705051|-0.3241985395622288|  0.5805514330010838|
|-0.5546977314810425| 0.7178640983371047| 0.2466873044383975|  0.4688628451924725|
| 0.0803905150857272| 0.9919277671837536|-0.8952023471182896| -0.3779230216457262|
| 0.9253487378066081| -3.217781511915491|0.12145734870229986|   0.379868166502288|
+-------------------+-------------------+-------------------+--------------------+
psdf = sdf.pandas_api()
psdf
A B C D
0 0.504450 -0.647873 0.501804 1.529663
1 1.017610 0.576785 0.852659 0.958808
2 0.491779 0.221775 0.393922 -0.408026
3 -1.445449 -1.336367 1.300324 -0.837560
4 0.514641 0.338978 0.469635 -1.078154
5 -0.587010 0.230809 0.594970 0.737837
6 -1.088185 -1.151821 -0.649112 0.252638
7 -0.741795 -0.505476 0.901358 -0.883953
8 0.174919 1.079475 -0.324199 0.580551
9 -0.554698 0.717864 0.246687 0.468863
10 0.080391 0.991928 -0.895202 -0.377923
11 0.925349 -3.217782 0.121457 0.379868
type(psdf)
pyspark.pandas.frame.DataFrame

Performing common pandas operations on pandas-on-spark DF

psdf.describe()
A B C D
count 12.000000 12.000000 12.000000 12.000000
mean -0.059000 -0.225142 0.292859 0.110218
std 0.804773 1.233923 0.645692 0.818947
min -1.445449 -3.217782 -0.895202 -1.078154
25% -0.741795 -1.151821 -0.324199 -0.837560
50% 0.080391 0.221775 0.393922 0.252638
75% 0.504450 0.576785 0.594970 0.580551
max 1.017610 1.079475 1.300324 1.529663
psdf.info()
<class 'pyspark.pandas.frame.DataFrame'>
Int64Index: 12 entries, 0 to 11
Data columns (total 4 columns):
 #   Column  Non-Null Count  Dtype  
---  ------  --------------  -----  
 0   A       12 non-null     float64
 1   B       12 non-null     float64
 2   C       12 non-null     float64
 3   D       12 non-null     float64
dtypes: float64(4)
psdf.dtypes
A    float64
B    float64
C    float64
D    float64
dtype: object
psdf.tail(5)
A B C D
7 -0.741795 -0.505476 0.901358 -0.883953
8 0.174919 1.079475 -0.324199 0.580551
9 -0.554698 0.717864 0.246687 0.468863
10 0.080391 0.991928 -0.895202 -0.377923
11 0.925349 -3.217782 0.121457 0.379868
psdf.index
Int64Index([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11], dtype='int64')
psdf.columns
Index(['A', 'B', 'C', 'D'], dtype='object')
psdf.to_numpy()
array([[ 0.5044504 , -0.6478725 ,  0.50180399,  1.52966313],
       [ 1.01761   ,  0.57678483,  0.85265932,  0.95880839],
       [ 0.49177932,  0.2217747 ,  0.39392216, -0.40802598],
       [-1.44544939, -1.33636733,  1.30032389, -0.83755977],
       [ 0.51464109,  0.33897835,  0.46963549, -1.07815434],
       [-0.58701012,  0.2308088 ,  0.59497027,  0.73783688],
       [-1.08818507, -1.1518215 , -0.64911201,  0.2526381 ],
       [-0.74179457, -0.50547569,  0.90135778, -0.88395271],
       [ 0.17491903,  1.07947535, -0.32419854,  0.58055143],
       [-0.55469773,  0.7178641 ,  0.2466873 ,  0.46886285],
       [ 0.08039052,  0.99192777, -0.89520235, -0.37792302],
       [ 0.92534874, -3.21778151,  0.12145735,  0.37986817]])
psdf.sort_index(ascending=False)
A B C D
11 0.925349 -3.217782 0.121457 0.379868
10 0.080391 0.991928 -0.895202 -0.377923
9 -0.554698 0.717864 0.246687 0.468863
8 0.174919 1.079475 -0.324199 0.580551
7 -0.741795 -0.505476 0.901358 -0.883953
6 -1.088185 -1.151821 -0.649112 0.252638
5 -0.587010 0.230809 0.594970 0.737837
4 0.514641 0.338978 0.469635 -1.078154
3 -1.445449 -1.336367 1.300324 -0.837560
2 0.491779 0.221775 0.393922 -0.408026
1 1.017610 0.576785 0.852659 0.958808
0 0.504450 -0.647873 0.501804 1.529663
psdf.sort_values(by='D')
A B C D
4 0.514641 0.338978 0.469635 -1.078154
7 -0.741795 -0.505476 0.901358 -0.883953
3 -1.445449 -1.336367 1.300324 -0.837560
2 0.491779 0.221775 0.393922 -0.408026
10 0.080391 0.991928 -0.895202 -0.377923
6 -1.088185 -1.151821 -0.649112 0.252638
11 0.925349 -3.217782 0.121457 0.379868
9 -0.554698 0.717864 0.246687 0.468863
8 0.174919 1.079475 -0.324199 0.580551
5 -0.587010 0.230809 0.594970 0.737837
1 1.017610 0.576785 0.852659 0.958808
0 0.504450 -0.647873 0.501804 1.529663

Treating Missing Values

data = {'A': [1, 2, np.nan],
        'B': [4, 5, 6],
        'C': [7, 8, 9]}
pdf = pd.DataFrame(data)
pdf
A B C
0 1.0 4 7
1 2.0 5 8
2 NaN 6 9
psdf = ps.from_pandas(pdf)
psdf.dropna(how='any')
A B C
0 1.0 4 7
1 2.0 5 8
psdf.fillna(0)
A B C
0 1.0 4 7
1 2.0 5 8
2 0.0 6 9

Grouping

data = ps.DataFrame({'A': ['foo', 'bar', 'foo', 'bar', 'foo', 'bar', 'foo', 'foo', 'bar', 'foo', 'bar', 'foo', 'bar', 'foo', 'bar'],
        'B': ['one', 'one', 'two', 'three', 'two', 'two', 'one', 'three', 'two', 'two', 'one', 'three', 'one', 'three', 'two'],
        'C': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15],
        'D': [10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 120, 130, 140, 150]})
data
A B C D
0 foo one 1 10
1 bar one 2 20
2 foo two 3 30
3 bar three 4 40
4 foo two 5 50
5 bar two 6 60
6 foo one 7 70
7 foo three 8 80
8 bar two 9 90
9 foo two 10 100
10 bar one 11 110
11 foo three 12 120
12 bar one 13 130
13 foo three 14 140
14 bar two 15 150
data.groupby('B').mean()
C D
B
one 6.8 68.0
two 8.0 80.0
three 9.5 95.0
data.groupby(['A', 'B']).mean()
C D
A B
foo one 4.000000 40.000000
bar one 8.666667 86.666667
foo two 6.000000 60.000000
bar three 4.000000 40.000000
two 10.000000 100.000000
foo three 11.333333 113.333333

Saving data

data.to_csv('output.csv')
ps.read_csv('output.csv').head(5)
A B C D
0 foo three 12 120
1 foo three 14 140
2 foo three 8 80
3 foo two 10 100
4 bar three 4 40
data.to_parquet('output.parquet')
ps.read_parquet('output.parquet').head(5)
23/04/29 13:23:00 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
23/04/29 13:23:00 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
23/04/29 13:23:00 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
23/04/29 13:23:00 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
23/04/29 13:23:00 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 63.33% for 12 writers
23/04/29 13:23:00 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 58.46% for 13 writers
23/04/29 13:23:00 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 54.29% for 14 writers
23/04/29 13:23:00 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 50.67% for 15 writers
23/04/29 13:23:00 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 47.50% for 16 writers
23/04/29 13:23:00 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 50.67% for 15 writers
23/04/29 13:23:00 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 54.29% for 14 writers
23/04/29 13:23:00 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 58.46% for 13 writers
23/04/29 13:23:00 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 63.33% for 12 writers
23/04/29 13:23:00 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
23/04/29 13:23:00 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
23/04/29 13:23:00 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
23/04/29 13:23:00 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
A B C D
0 foo three 12 120
1 bar three 4 40
2 foo three 14 140
3 foo three 8 80
4 foo two 10 100