Hands-on Introduction to DataFrame API

  • DataFrames are lazily evaluated
  • They are implemented on top of RDDs
  • Spark will not process the data on calling the transformation, it will start processing when an action is called
  • Spark application starts with initializing a SparkSession
  • In the case of a pyspark shell, the shell automatically creates the session in the variable spark for users
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg
spark = (SparkSession
    .builder
    .appName("FirstProgram")
    .getOrCreate())
  • A pyspark DataFrame can be created via pyspark.sql.SparkSession.createDataFrame by passing a list of lists, tuples, dictionaries, pyspark.sql.Rows,Pandas DataFrame and an RDD
  • pyspark.sql.SparkSession.createDataFrame takes schema argument. If schema is not provided, then pyspark infers the schema from the data

data = spark.createDataFrame([("Amar",21),("Akbar",25),("John",28),("Harika",32),("Amar",35),("Akbar",40)],
                             ["name","age"])
data.show()
+------+---+
|  name|age|
+------+---+
|  Amar| 21|
| Akbar| 25|
|  John| 28|
|Harika| 32|
|  Amar| 35|
| Akbar| 40|
+------+---+

DataFrame API

Spark Python Data Types

Schemas and Creating DataFrames

  • Spark can infer the schema from the data
  • If the dataset is large, it will be a overhead for spark to read a portion of the file and ascertain the datatype. This will be expensive and time-consuming
  • It is a good practice to define the schema upfront

Method-1 to define a Schema

author_info = [("John", "Doe", "The Great Gatsby", 1000.00, 5),
        ("Jane", "Smith", "To Kill a Mockingbird", 1200.05, 3),
        ("Bob", "Johnson", "Pride and Prejudice", 800.05, 4),
        ("Alice", "Davis", "The Catcher in the Rye", 900.00, 2),
        ("Charlie", "Brown", "Moby-Dick", 700.30, 6),
        ("Emily", "Wilson", "Wuthering Heights", 1100.05, 1),
        ("Frank", "Garcia", "1984", 1300.06, 7),
        ("Grace", "Martinez", "The Odyssey", 600.00, 3),
        ("Henry", "Anderson", "War and Peace", 1400.75, 8),
        ("Isabella", "Taylor", "The Divine Comedy", 500.00, 2)]
from pyspark.sql.types import *
schema = StructType(
    [StructField("name", StringType(), False),
     StructField("surname", StringType(), False),
     StructField("book", StringType(), False),
     StructField("price", FloatType(), False),
     StructField("rating", IntegerType(), False)
     ])
author_data = spark.createDataFrame(author_info,schema)

Viewing Data

author_data.show(4)
+-----+-------+--------------------+-------+------+
| name|surname|                book|  price|rating|
+-----+-------+--------------------+-------+------+
| John|    Doe|    The Great Gatsby| 1000.0|     5|
| Jane|  Smith|To Kill a Mocking...|1200.05|     3|
|  Bob|Johnson| Pride and Prejudice| 800.05|     4|
|Alice|  Davis|The Catcher in th...|  900.0|     2|
+-----+-------+--------------------+-------+------+
only showing top 4 rows
author_data.show(2,vertical=True)
-RECORD 0-----------------------
 name    | John                 
 surname | Doe                  
 book    | The Great Gatsby     
 price   | 1000.0               
 rating  | 5                    
-RECORD 1-----------------------
 name    | Jane                 
 surname | Smith                
 book    | To Kill a Mocking... 
 price   | 1200.05              
 rating  | 3                    
only showing top 2 rows

Checking the Schema

author_data.printSchema()
root
 |-- name: string (nullable = false)
 |-- surname: string (nullable = false)
 |-- book: string (nullable = false)
 |-- price: float (nullable = false)
 |-- rating: integer (nullable = false)
author_data.schema
StructType([StructField('name', StringType(), False), StructField('surname', StringType(), False), StructField('book', StringType(), False), StructField('price', FloatType(), False), StructField('rating', IntegerType(), False)])

Method-2 to define a schema

schema_2 = "name STRING, surname STRING, book STRING, price FLOAT, rating INT"
author_data_2 = spark.createDataFrame(author_info,schema_2)
author_data_2.show()
+--------+--------+--------------------+-------+------+
|    name| surname|                book|  price|rating|
+--------+--------+--------------------+-------+------+
|    John|     Doe|    The Great Gatsby| 1000.0|     5|
|    Jane|   Smith|To Kill a Mocking...|1200.05|     3|
|     Bob| Johnson| Pride and Prejudice| 800.05|     4|
|   Alice|   Davis|The Catcher in th...|  900.0|     2|
| Charlie|   Brown|           Moby-Dick|  700.3|     6|
|   Emily|  Wilson|   Wuthering Heights|1100.05|     1|
|   Frank|  Garcia|                1984|1300.06|     7|
|   Grace|Martinez|         The Odyssey|  600.0|     3|
|   Henry|Anderson|       War and Peace|1400.75|     8|
|Isabella|  Taylor|   The Divine Comedy|  500.0|     2|
+--------+--------+--------------------+-------+------+
author_data_2.printSchema()
root
 |-- name: string (nullable = true)
 |-- surname: string (nullable = true)
 |-- book: string (nullable = true)
 |-- price: float (nullable = true)
 |-- rating: integer (nullable = true)
# Getting the column names of the pyspark DataFrame
author_data.columns
['name', 'surname', 'book', 'price', 'rating']

Summary of the DataFrame

(author_data.select("price","rating")
            .describe()
            .show())
+-------+-----------------+-----------------+
|summary|            price|           rating|
+-------+-----------------+-----------------+
|  count|               10|               10|
|   mean|950.1260131835937|              4.1|
| stddev|302.8737534314304|2.330951164939612|
|    min|            500.0|                1|
|    max|          1400.75|                8|
+-------+-----------------+-----------------+

DataFrame.collect() collects the data on the driver (equivaluent to local data in python). If the dataset is large and if it cannot be accomodated on the driver, it will throw a out-of-memory error.

# Avoid using collect() method
author_data.collect()
[Row(name='John', surname='Doe', book='The Great Gatsby', price=1000.0, rating=5),
 Row(name='Jane', surname='Smith', book='To Kill a Mockingbird', price=1200.050048828125, rating=3),
 Row(name='Bob', surname='Johnson', book='Pride and Prejudice', price=800.0499877929688, rating=4),
 Row(name='Alice', surname='Davis', book='The Catcher in the Rye', price=900.0, rating=2),
 Row(name='Charlie', surname='Brown', book='Moby-Dick', price=700.2999877929688, rating=6),
 Row(name='Emily', surname='Wilson', book='Wuthering Heights', price=1100.050048828125, rating=1),
 Row(name='Frank', surname='Garcia', book='1984', price=1300.06005859375, rating=7),
 Row(name='Grace', surname='Martinez', book='The Odyssey', price=600.0, rating=3),
 Row(name='Henry', surname='Anderson', book='War and Peace', price=1400.75, rating=8),
 Row(name='Isabella', surname='Taylor', book='The Divine Comedy', price=500.0, rating=2)]

To avoid out-of-memory error use DataFrame.take() or DataFrame.tail() methods

author_data.take(1)
[Row(name='John', surname='Doe', book='The Great Gatsby', price=1000.0, rating=5)]

Convert pyspark dataframe to a pandas dataframe

pandas_df = author_data.toPandas()
type(pandas_df)
pandas.core.frame.DataFrame
type(author_data)
pyspark.sql.dataframe.DataFrame

Selecting and Accessing Data

from pyspark.sql import column
from pyspark.sql.functions import upper
author_data.select("book").show()
+--------------------+
|                book|
+--------------------+
|    The Great Gatsby|
|To Kill a Mocking...|
| Pride and Prejudice|
|The Catcher in th...|
|           Moby-Dick|
|   Wuthering Heights|
|                1984|
|         The Odyssey|
|       War and Peace|
|   The Divine Comedy|
+--------------------+
# Creating a new column from existing column
author_data.withColumn('upper_title', upper(author_data.book)).show()
+--------+--------+--------------------+-------+------+--------------------+
|    name| surname|                book|  price|rating|         upper_title|
+--------+--------+--------------------+-------+------+--------------------+
|    John|     Doe|    The Great Gatsby| 1000.0|     5|    THE GREAT GATSBY|
|    Jane|   Smith|To Kill a Mocking...|1200.05|     3|TO KILL A MOCKING...|
|     Bob| Johnson| Pride and Prejudice| 800.05|     4| PRIDE AND PREJUDICE|
|   Alice|   Davis|The Catcher in th...|  900.0|     2|THE CATCHER IN TH...|
| Charlie|   Brown|           Moby-Dick|  700.3|     6|           MOBY-DICK|
|   Emily|  Wilson|   Wuthering Heights|1100.05|     1|   WUTHERING HEIGHTS|
|   Frank|  Garcia|                1984|1300.06|     7|                1984|
|   Grace|Martinez|         The Odyssey|  600.0|     3|         THE ODYSSEY|
|   Henry|Anderson|       War and Peace|1400.75|     8|       WAR AND PEACE|
|Isabella|  Taylor|   The Divine Comedy|  500.0|     2|   THE DIVINE COMEDY|
+--------+--------+--------------------+-------+------+--------------------+
# Filtering the data
author_data.filter(author_data.price > 1000).show()
+-----+--------+--------------------+-------+------+
| name| surname|                book|  price|rating|
+-----+--------+--------------------+-------+------+
| Jane|   Smith|To Kill a Mocking...|1200.05|     3|
|Emily|  Wilson|   Wuthering Heights|1100.05|     1|
|Frank|  Garcia|                1984|1300.06|     7|
|Henry|Anderson|       War and Peace|1400.75|     8|
+-----+--------+--------------------+-------+------+

Applying a Function

  • pyspark supports User Defined Functions (UDFs)
import pandas as pd
from pyspark.sql.functions import pandas_udf

@pandas_udf('long') is a decorator used to define a Pandas UDF (User-Defined Function) in PySpark. The 'long' argument specifies the return type of the UDF. In this case, it indicates that the UDF returns a value of type long, which is an alias for bigint in PySpark.

Pandas UDFs allow you to use Pandas functions in PySpark and can provide significant performance improvements over regular UDFs. The return type must be specified when defining a Pandas UDF so that PySpark can correctly handle the data returned by the function.

@pandas_udf('long')
def pandas_multiply_ten(series: pd.Series) -> pd.Series:
    return series*10
author_data.select(author_data.price,pandas_multiply_ten(author_data.price)).show()
                                                                                
+-------+--------------------------+
|  price|pandas_multiply_ten(price)|
+-------+--------------------------+
| 1000.0|                     10000|
|1200.05|                     12000|
| 800.05|                      8000|
|  900.0|                      9000|
|  700.3|                      7003|
|1100.05|                     11000|
|1300.06|                     13000|
|  600.0|                      6000|
|1400.75|                     14007|
|  500.0|                      5000|
+-------+--------------------------+
price_df = author_data.select(author_data.price,pandas_multiply_ten(author_data.price))
type(price_df)
pyspark.sql.dataframe.DataFrame

Grouping

data = [
    ("A", 1, 100),
    ("B", 2, 200),
    ("A", 3, 300),
    ("B", 4, 400),
    ("C", 5, 500)
]
df = spark.createDataFrame(data,["key", "value1", "value2"])
df.show()
+---+------+------+
|key|value1|value2|
+---+------+------+
|  A|     1|   100|
|  B|     2|   200|
|  A|     3|   300|
|  B|     4|   400|
|  C|     5|   500|
+---+------+------+
df.groupby('key').avg().show()
+---+-----------+-----------+
|key|avg(value1)|avg(value2)|
+---+-----------+-----------+
|  A|        2.0|      200.0|
|  B|        3.0|      300.0|
|  C|        5.0|      500.0|
+---+-----------+-----------+

Saving the data

Save in CSV Format

df.write.csv('output.csv',header=True)
spark.read.csv('output.csv',header=True).show()
+---+------+------+
|key|value1|value2|
+---+------+------+
|  B|     4|   400|
|  A|     3|   300|
|  B|     2|   200|
|  A|     1|   100|
|  C|     5|   500|
+---+------+------+

Save in Parquet Format

df.write.parquet('output.parquet')
spark.read.parquet('output.parquet').show()
+---+------+------+
|key|value1|value2|
+---+------+------+
|  A|     1|   100|
|  B|     4|   400|
|  A|     3|   300|
|  B|     2|   200|
|  C|     5|   500|
+---+------+------+

Save in ORC format

df.write.orc('output.orc')
spark.read.orc('output.orc').show()
+---+------+------+
|key|value1|value2|
+---+------+------+
|  C|     5|   500|
|  B|     2|   200|
|  B|     4|   400|
|  A|     3|   300|
|  A|     1|   100|
+---+------+------+