from pyspark.sql import SparkSession
from pyspark.sql.functions import avg
Hands-on Introduction to DataFrame API
- DataFrames are lazily evaluated
- They are implemented on top of RDDs
- Spark will not process the data on calling the
transformation
, it will start processing when anaction
is called - Spark application starts with initializing a
SparkSession
- In the case of a pyspark shell, the shell automatically creates the session in the variable
spark
for users
= (SparkSession
spark
.builder"FirstProgram")
.appName( .getOrCreate())
- A pyspark DataFrame can be created via
pyspark.sql.SparkSession.createDataFrame
by passing a list of lists, tuples, dictionaries, pyspark.sql.Rows,Pandas DataFrame and an RDD pyspark.sql.SparkSession.createDataFrame
takes schema argument. If schema is not provided, then pyspark infers the schema from the data
= spark.createDataFrame([("Amar",21),("Akbar",25),("John",28),("Harika",32),("Amar",35),("Akbar",40)],
data "name","age"]) [
data.show()
+------+---+
| name|age|
+------+---+
| Amar| 21|
| Akbar| 25|
| John| 28|
|Harika| 32|
| Amar| 35|
| Akbar| 40|
+------+---+
DataFrame API
Schemas and Creating DataFrames
- Spark can infer the schema from the data
- If the dataset is large, it will be a overhead for spark to read a portion of the file and ascertain the datatype. This will be expensive and time-consuming
- It is a good practice to define the schema upfront
Method-1 to define a Schema
= [("John", "Doe", "The Great Gatsby", 1000.00, 5),
author_info "Jane", "Smith", "To Kill a Mockingbird", 1200.05, 3),
("Bob", "Johnson", "Pride and Prejudice", 800.05, 4),
("Alice", "Davis", "The Catcher in the Rye", 900.00, 2),
("Charlie", "Brown", "Moby-Dick", 700.30, 6),
("Emily", "Wilson", "Wuthering Heights", 1100.05, 1),
("Frank", "Garcia", "1984", 1300.06, 7),
("Grace", "Martinez", "The Odyssey", 600.00, 3),
("Henry", "Anderson", "War and Peace", 1400.75, 8),
("Isabella", "Taylor", "The Divine Comedy", 500.00, 2)] (
from pyspark.sql.types import *
= StructType(
schema "name", StringType(), False),
[StructField("surname", StringType(), False),
StructField("book", StringType(), False),
StructField("price", FloatType(), False),
StructField("rating", IntegerType(), False)
StructField( ])
= spark.createDataFrame(author_info,schema) author_data
Viewing Data
4) author_data.show(
+-----+-------+--------------------+-------+------+
| name|surname| book| price|rating|
+-----+-------+--------------------+-------+------+
| John| Doe| The Great Gatsby| 1000.0| 5|
| Jane| Smith|To Kill a Mocking...|1200.05| 3|
| Bob|Johnson| Pride and Prejudice| 800.05| 4|
|Alice| Davis|The Catcher in th...| 900.0| 2|
+-----+-------+--------------------+-------+------+
only showing top 4 rows
2,vertical=True) author_data.show(
-RECORD 0-----------------------
name | John
surname | Doe
book | The Great Gatsby
price | 1000.0
rating | 5
-RECORD 1-----------------------
name | Jane
surname | Smith
book | To Kill a Mocking...
price | 1200.05
rating | 3
only showing top 2 rows
Checking the Schema
author_data.printSchema()
root
|-- name: string (nullable = false)
|-- surname: string (nullable = false)
|-- book: string (nullable = false)
|-- price: float (nullable = false)
|-- rating: integer (nullable = false)
author_data.schema
StructType([StructField('name', StringType(), False), StructField('surname', StringType(), False), StructField('book', StringType(), False), StructField('price', FloatType(), False), StructField('rating', IntegerType(), False)])
Method-2 to define a schema
= "name STRING, surname STRING, book STRING, price FLOAT, rating INT" schema_2
= spark.createDataFrame(author_info,schema_2) author_data_2
author_data_2.show()
+--------+--------+--------------------+-------+------+
| name| surname| book| price|rating|
+--------+--------+--------------------+-------+------+
| John| Doe| The Great Gatsby| 1000.0| 5|
| Jane| Smith|To Kill a Mocking...|1200.05| 3|
| Bob| Johnson| Pride and Prejudice| 800.05| 4|
| Alice| Davis|The Catcher in th...| 900.0| 2|
| Charlie| Brown| Moby-Dick| 700.3| 6|
| Emily| Wilson| Wuthering Heights|1100.05| 1|
| Frank| Garcia| 1984|1300.06| 7|
| Grace|Martinez| The Odyssey| 600.0| 3|
| Henry|Anderson| War and Peace|1400.75| 8|
|Isabella| Taylor| The Divine Comedy| 500.0| 2|
+--------+--------+--------------------+-------+------+
author_data_2.printSchema()
root
|-- name: string (nullable = true)
|-- surname: string (nullable = true)
|-- book: string (nullable = true)
|-- price: float (nullable = true)
|-- rating: integer (nullable = true)
# Getting the column names of the pyspark DataFrame
author_data.columns
['name', 'surname', 'book', 'price', 'rating']
Summary of the DataFrame
"price","rating")
(author_data.select(
.describe() .show())
+-------+-----------------+-----------------+
|summary| price| rating|
+-------+-----------------+-----------------+
| count| 10| 10|
| mean|950.1260131835937| 4.1|
| stddev|302.8737534314304|2.330951164939612|
| min| 500.0| 1|
| max| 1400.75| 8|
+-------+-----------------+-----------------+
DataFrame.collect()
collects the data on the driver (equivaluent to local data in python). If the dataset is large and if it cannot be accomodated on the driver, it will throw a out-of-memory
error.
# Avoid using collect() method
author_data.collect()
[Row(name='John', surname='Doe', book='The Great Gatsby', price=1000.0, rating=5),
Row(name='Jane', surname='Smith', book='To Kill a Mockingbird', price=1200.050048828125, rating=3),
Row(name='Bob', surname='Johnson', book='Pride and Prejudice', price=800.0499877929688, rating=4),
Row(name='Alice', surname='Davis', book='The Catcher in the Rye', price=900.0, rating=2),
Row(name='Charlie', surname='Brown', book='Moby-Dick', price=700.2999877929688, rating=6),
Row(name='Emily', surname='Wilson', book='Wuthering Heights', price=1100.050048828125, rating=1),
Row(name='Frank', surname='Garcia', book='1984', price=1300.06005859375, rating=7),
Row(name='Grace', surname='Martinez', book='The Odyssey', price=600.0, rating=3),
Row(name='Henry', surname='Anderson', book='War and Peace', price=1400.75, rating=8),
Row(name='Isabella', surname='Taylor', book='The Divine Comedy', price=500.0, rating=2)]
To avoid out-of-memory
error use DataFrame.take()
or DataFrame.tail()
methods
1) author_data.take(
[Row(name='John', surname='Doe', book='The Great Gatsby', price=1000.0, rating=5)]
Convert pyspark dataframe to a pandas dataframe
= author_data.toPandas() pandas_df
type(pandas_df)
pandas.core.frame.DataFrame
type(author_data)
pyspark.sql.dataframe.DataFrame
Selecting and Accessing Data
from pyspark.sql import column
from pyspark.sql.functions import upper
"book").show() author_data.select(
+--------------------+
| book|
+--------------------+
| The Great Gatsby|
|To Kill a Mocking...|
| Pride and Prejudice|
|The Catcher in th...|
| Moby-Dick|
| Wuthering Heights|
| 1984|
| The Odyssey|
| War and Peace|
| The Divine Comedy|
+--------------------+
# Creating a new column from existing column
'upper_title', upper(author_data.book)).show() author_data.withColumn(
+--------+--------+--------------------+-------+------+--------------------+
| name| surname| book| price|rating| upper_title|
+--------+--------+--------------------+-------+------+--------------------+
| John| Doe| The Great Gatsby| 1000.0| 5| THE GREAT GATSBY|
| Jane| Smith|To Kill a Mocking...|1200.05| 3|TO KILL A MOCKING...|
| Bob| Johnson| Pride and Prejudice| 800.05| 4| PRIDE AND PREJUDICE|
| Alice| Davis|The Catcher in th...| 900.0| 2|THE CATCHER IN TH...|
| Charlie| Brown| Moby-Dick| 700.3| 6| MOBY-DICK|
| Emily| Wilson| Wuthering Heights|1100.05| 1| WUTHERING HEIGHTS|
| Frank| Garcia| 1984|1300.06| 7| 1984|
| Grace|Martinez| The Odyssey| 600.0| 3| THE ODYSSEY|
| Henry|Anderson| War and Peace|1400.75| 8| WAR AND PEACE|
|Isabella| Taylor| The Divine Comedy| 500.0| 2| THE DIVINE COMEDY|
+--------+--------+--------------------+-------+------+--------------------+
# Filtering the data
filter(author_data.price > 1000).show() author_data.
+-----+--------+--------------------+-------+------+
| name| surname| book| price|rating|
+-----+--------+--------------------+-------+------+
| Jane| Smith|To Kill a Mocking...|1200.05| 3|
|Emily| Wilson| Wuthering Heights|1100.05| 1|
|Frank| Garcia| 1984|1300.06| 7|
|Henry|Anderson| War and Peace|1400.75| 8|
+-----+--------+--------------------+-------+------+
Applying a Function
- pyspark supports User Defined Functions (UDFs)
import pandas as pd
from pyspark.sql.functions import pandas_udf
@pandas_udf('long')
is a decorator used to define a Pandas UDF (User-Defined Function) in PySpark. The 'long'
argument specifies the return type of the UDF. In this case, it indicates that the UDF returns a value of type long
, which is an alias for bigint
in PySpark.
Pandas UDFs allow you to use Pandas functions in PySpark and can provide significant performance improvements over regular UDFs. The return type must be specified when defining a Pandas UDF so that PySpark can correctly handle the data returned by the function.
@pandas_udf('long')
def pandas_multiply_ten(series: pd.Series) -> pd.Series:
return series*10
author_data.select(author_data.price,pandas_multiply_ten(author_data.price)).show()
+-------+--------------------------+
| price|pandas_multiply_ten(price)|
+-------+--------------------------+
| 1000.0| 10000|
|1200.05| 12000|
| 800.05| 8000|
| 900.0| 9000|
| 700.3| 7003|
|1100.05| 11000|
|1300.06| 13000|
| 600.0| 6000|
|1400.75| 14007|
| 500.0| 5000|
+-------+--------------------------+
= author_data.select(author_data.price,pandas_multiply_ten(author_data.price)) price_df
type(price_df)
pyspark.sql.dataframe.DataFrame
Grouping
= [
data "A", 1, 100),
("B", 2, 200),
("A", 3, 300),
("B", 4, 400),
("C", 5, 500)
( ]
= spark.createDataFrame(data,["key", "value1", "value2"]) df
df.show()
+---+------+------+
|key|value1|value2|
+---+------+------+
| A| 1| 100|
| B| 2| 200|
| A| 3| 300|
| B| 4| 400|
| C| 5| 500|
+---+------+------+
'key').avg().show() df.groupby(
+---+-----------+-----------+
|key|avg(value1)|avg(value2)|
+---+-----------+-----------+
| A| 2.0| 200.0|
| B| 3.0| 300.0|
| C| 5.0| 500.0|
+---+-----------+-----------+
Saving the data
Save in CSV Format
'output.csv',header=True)
df.write.csv('output.csv',header=True).show() spark.read.csv(
+---+------+------+
|key|value1|value2|
+---+------+------+
| B| 4| 400|
| A| 3| 300|
| B| 2| 200|
| A| 1| 100|
| C| 5| 500|
+---+------+------+
Save in Parquet Format
'output.parquet')
df.write.parquet('output.parquet').show() spark.read.parquet(
+---+------+------+
|key|value1|value2|
+---+------+------+
| A| 1| 100|
| B| 4| 400|
| A| 3| 300|
| B| 2| 200|
| C| 5| 500|
+---+------+------+
Save in ORC format
'output.orc')
df.write.orc('output.orc').show() spark.read.orc(
+---+------+------+
|key|value1|value2|
+---+------+------+
| C| 5| 500|
| B| 2| 200|
| B| 4| 400|
| A| 3| 300|
| A| 1| 100|
+---+------+------+