from pyspark.sql import SparkSession
from pyspark.sql.functions import avgHands-on Introduction to DataFrame API
- DataFrames are lazily evaluated
- They are implemented on top of RDDs
- Spark will not process the data on calling the
transformation, it will start processing when anactionis called - Spark application starts with initializing a
SparkSession - In the case of a pyspark shell, the shell automatically creates the session in the variable
sparkfor users
spark = (SparkSession
.builder
.appName("FirstProgram")
.getOrCreate())- A pyspark DataFrame can be created via
pyspark.sql.SparkSession.createDataFrameby passing a list of lists, tuples, dictionaries, pyspark.sql.Rows,Pandas DataFrame and an RDD pyspark.sql.SparkSession.createDataFrametakes schema argument. If schema is not provided, then pyspark infers the schema from the data
data = spark.createDataFrame([("Amar",21),("Akbar",25),("John",28),("Harika",32),("Amar",35),("Akbar",40)],
["name","age"])data.show()+------+---+
| name|age|
+------+---+
| Amar| 21|
| Akbar| 25|
| John| 28|
|Harika| 32|
| Amar| 35|
| Akbar| 40|
+------+---+
DataFrame API
Schemas and Creating DataFrames
- Spark can infer the schema from the data
- If the dataset is large, it will be a overhead for spark to read a portion of the file and ascertain the datatype. This will be expensive and time-consuming
- It is a good practice to define the schema upfront
Method-1 to define a Schema
author_info = [("John", "Doe", "The Great Gatsby", 1000.00, 5),
("Jane", "Smith", "To Kill a Mockingbird", 1200.05, 3),
("Bob", "Johnson", "Pride and Prejudice", 800.05, 4),
("Alice", "Davis", "The Catcher in the Rye", 900.00, 2),
("Charlie", "Brown", "Moby-Dick", 700.30, 6),
("Emily", "Wilson", "Wuthering Heights", 1100.05, 1),
("Frank", "Garcia", "1984", 1300.06, 7),
("Grace", "Martinez", "The Odyssey", 600.00, 3),
("Henry", "Anderson", "War and Peace", 1400.75, 8),
("Isabella", "Taylor", "The Divine Comedy", 500.00, 2)]from pyspark.sql.types import *
schema = StructType(
[StructField("name", StringType(), False),
StructField("surname", StringType(), False),
StructField("book", StringType(), False),
StructField("price", FloatType(), False),
StructField("rating", IntegerType(), False)
])author_data = spark.createDataFrame(author_info,schema)Viewing Data
author_data.show(4)+-----+-------+--------------------+-------+------+
| name|surname| book| price|rating|
+-----+-------+--------------------+-------+------+
| John| Doe| The Great Gatsby| 1000.0| 5|
| Jane| Smith|To Kill a Mocking...|1200.05| 3|
| Bob|Johnson| Pride and Prejudice| 800.05| 4|
|Alice| Davis|The Catcher in th...| 900.0| 2|
+-----+-------+--------------------+-------+------+
only showing top 4 rows
author_data.show(2,vertical=True)-RECORD 0-----------------------
name | John
surname | Doe
book | The Great Gatsby
price | 1000.0
rating | 5
-RECORD 1-----------------------
name | Jane
surname | Smith
book | To Kill a Mocking...
price | 1200.05
rating | 3
only showing top 2 rows
Checking the Schema
author_data.printSchema()root
|-- name: string (nullable = false)
|-- surname: string (nullable = false)
|-- book: string (nullable = false)
|-- price: float (nullable = false)
|-- rating: integer (nullable = false)
author_data.schemaStructType([StructField('name', StringType(), False), StructField('surname', StringType(), False), StructField('book', StringType(), False), StructField('price', FloatType(), False), StructField('rating', IntegerType(), False)])
Method-2 to define a schema
schema_2 = "name STRING, surname STRING, book STRING, price FLOAT, rating INT"author_data_2 = spark.createDataFrame(author_info,schema_2)author_data_2.show()+--------+--------+--------------------+-------+------+
| name| surname| book| price|rating|
+--------+--------+--------------------+-------+------+
| John| Doe| The Great Gatsby| 1000.0| 5|
| Jane| Smith|To Kill a Mocking...|1200.05| 3|
| Bob| Johnson| Pride and Prejudice| 800.05| 4|
| Alice| Davis|The Catcher in th...| 900.0| 2|
| Charlie| Brown| Moby-Dick| 700.3| 6|
| Emily| Wilson| Wuthering Heights|1100.05| 1|
| Frank| Garcia| 1984|1300.06| 7|
| Grace|Martinez| The Odyssey| 600.0| 3|
| Henry|Anderson| War and Peace|1400.75| 8|
|Isabella| Taylor| The Divine Comedy| 500.0| 2|
+--------+--------+--------------------+-------+------+
author_data_2.printSchema()root
|-- name: string (nullable = true)
|-- surname: string (nullable = true)
|-- book: string (nullable = true)
|-- price: float (nullable = true)
|-- rating: integer (nullable = true)
# Getting the column names of the pyspark DataFrame
author_data.columns['name', 'surname', 'book', 'price', 'rating']
Summary of the DataFrame
(author_data.select("price","rating")
.describe()
.show())+-------+-----------------+-----------------+
|summary| price| rating|
+-------+-----------------+-----------------+
| count| 10| 10|
| mean|950.1260131835937| 4.1|
| stddev|302.8737534314304|2.330951164939612|
| min| 500.0| 1|
| max| 1400.75| 8|
+-------+-----------------+-----------------+
DataFrame.collect() collects the data on the driver (equivaluent to local data in python). If the dataset is large and if it cannot be accomodated on the driver, it will throw a out-of-memory error.
# Avoid using collect() method
author_data.collect()[Row(name='John', surname='Doe', book='The Great Gatsby', price=1000.0, rating=5),
Row(name='Jane', surname='Smith', book='To Kill a Mockingbird', price=1200.050048828125, rating=3),
Row(name='Bob', surname='Johnson', book='Pride and Prejudice', price=800.0499877929688, rating=4),
Row(name='Alice', surname='Davis', book='The Catcher in the Rye', price=900.0, rating=2),
Row(name='Charlie', surname='Brown', book='Moby-Dick', price=700.2999877929688, rating=6),
Row(name='Emily', surname='Wilson', book='Wuthering Heights', price=1100.050048828125, rating=1),
Row(name='Frank', surname='Garcia', book='1984', price=1300.06005859375, rating=7),
Row(name='Grace', surname='Martinez', book='The Odyssey', price=600.0, rating=3),
Row(name='Henry', surname='Anderson', book='War and Peace', price=1400.75, rating=8),
Row(name='Isabella', surname='Taylor', book='The Divine Comedy', price=500.0, rating=2)]
To avoid out-of-memory error use DataFrame.take() or DataFrame.tail() methods
author_data.take(1)[Row(name='John', surname='Doe', book='The Great Gatsby', price=1000.0, rating=5)]
Convert pyspark dataframe to a pandas dataframe
pandas_df = author_data.toPandas()type(pandas_df)pandas.core.frame.DataFrame
type(author_data)pyspark.sql.dataframe.DataFrame
Selecting and Accessing Data
from pyspark.sql import column
from pyspark.sql.functions import upperauthor_data.select("book").show()+--------------------+
| book|
+--------------------+
| The Great Gatsby|
|To Kill a Mocking...|
| Pride and Prejudice|
|The Catcher in th...|
| Moby-Dick|
| Wuthering Heights|
| 1984|
| The Odyssey|
| War and Peace|
| The Divine Comedy|
+--------------------+
# Creating a new column from existing column
author_data.withColumn('upper_title', upper(author_data.book)).show()+--------+--------+--------------------+-------+------+--------------------+
| name| surname| book| price|rating| upper_title|
+--------+--------+--------------------+-------+------+--------------------+
| John| Doe| The Great Gatsby| 1000.0| 5| THE GREAT GATSBY|
| Jane| Smith|To Kill a Mocking...|1200.05| 3|TO KILL A MOCKING...|
| Bob| Johnson| Pride and Prejudice| 800.05| 4| PRIDE AND PREJUDICE|
| Alice| Davis|The Catcher in th...| 900.0| 2|THE CATCHER IN TH...|
| Charlie| Brown| Moby-Dick| 700.3| 6| MOBY-DICK|
| Emily| Wilson| Wuthering Heights|1100.05| 1| WUTHERING HEIGHTS|
| Frank| Garcia| 1984|1300.06| 7| 1984|
| Grace|Martinez| The Odyssey| 600.0| 3| THE ODYSSEY|
| Henry|Anderson| War and Peace|1400.75| 8| WAR AND PEACE|
|Isabella| Taylor| The Divine Comedy| 500.0| 2| THE DIVINE COMEDY|
+--------+--------+--------------------+-------+------+--------------------+
# Filtering the data
author_data.filter(author_data.price > 1000).show()+-----+--------+--------------------+-------+------+
| name| surname| book| price|rating|
+-----+--------+--------------------+-------+------+
| Jane| Smith|To Kill a Mocking...|1200.05| 3|
|Emily| Wilson| Wuthering Heights|1100.05| 1|
|Frank| Garcia| 1984|1300.06| 7|
|Henry|Anderson| War and Peace|1400.75| 8|
+-----+--------+--------------------+-------+------+
Applying a Function
- pyspark supports User Defined Functions (UDFs)
import pandas as pd
from pyspark.sql.functions import pandas_udf@pandas_udf('long') is a decorator used to define a Pandas UDF (User-Defined Function) in PySpark. The 'long' argument specifies the return type of the UDF. In this case, it indicates that the UDF returns a value of type long, which is an alias for bigint in PySpark.
Pandas UDFs allow you to use Pandas functions in PySpark and can provide significant performance improvements over regular UDFs. The return type must be specified when defining a Pandas UDF so that PySpark can correctly handle the data returned by the function.
@pandas_udf('long')
def pandas_multiply_ten(series: pd.Series) -> pd.Series:
return series*10author_data.select(author_data.price,pandas_multiply_ten(author_data.price)).show()
+-------+--------------------------+
| price|pandas_multiply_ten(price)|
+-------+--------------------------+
| 1000.0| 10000|
|1200.05| 12000|
| 800.05| 8000|
| 900.0| 9000|
| 700.3| 7003|
|1100.05| 11000|
|1300.06| 13000|
| 600.0| 6000|
|1400.75| 14007|
| 500.0| 5000|
+-------+--------------------------+
price_df = author_data.select(author_data.price,pandas_multiply_ten(author_data.price))type(price_df)pyspark.sql.dataframe.DataFrame
Grouping
data = [
("A", 1, 100),
("B", 2, 200),
("A", 3, 300),
("B", 4, 400),
("C", 5, 500)
]df = spark.createDataFrame(data,["key", "value1", "value2"])df.show()+---+------+------+
|key|value1|value2|
+---+------+------+
| A| 1| 100|
| B| 2| 200|
| A| 3| 300|
| B| 4| 400|
| C| 5| 500|
+---+------+------+
df.groupby('key').avg().show()+---+-----------+-----------+
|key|avg(value1)|avg(value2)|
+---+-----------+-----------+
| A| 2.0| 200.0|
| B| 3.0| 300.0|
| C| 5.0| 500.0|
+---+-----------+-----------+
Saving the data
Save in CSV Format
df.write.csv('output.csv',header=True)
spark.read.csv('output.csv',header=True).show()+---+------+------+
|key|value1|value2|
+---+------+------+
| B| 4| 400|
| A| 3| 300|
| B| 2| 200|
| A| 1| 100|
| C| 5| 500|
+---+------+------+
Save in Parquet Format
df.write.parquet('output.parquet')
spark.read.parquet('output.parquet').show()+---+------+------+
|key|value1|value2|
+---+------+------+
| A| 1| 100|
| B| 4| 400|
| A| 3| 300|
| B| 2| 200|
| C| 5| 500|
+---+------+------+
Save in ORC format
df.write.orc('output.orc')
spark.read.orc('output.orc').show()+---+------+------+
|key|value1|value2|
+---+------+------+
| C| 5| 500|
| B| 2| 200|
| B| 4| 400|
| A| 3| 300|
| A| 1| 100|
+---+------+------+