Spark SQL

Spark SQL:
* Spark SQL is the engine on which structured API’s are built
* Can read and write data in a variety of formats
* Can query data using JDBC/ODBC connectors
* Provides a programmatic interface to interact with structured data stored as tables or views in a database from a spark application
* Provides an interactive shell to write sql queries
Spark SQL

Using Spark SQL

  • use sql() method on SparkSession instance
  • spark.sql("select * from table")
  • The output of spark.sql will be a DataFrame
from pyspark.sql import SparkSession
spark = (SparkSession
         .builder         
         .getOrCreate())
23/05/06 09:24:46 WARN Utils: Your hostname, thulasiram resolves to a loopback address: 127.0.1.1; using 192.168.0.105 instead (on interface wlp0s20f3)
23/05/06 09:24:46 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/05/06 09:24:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
path_to_data = '/home/thulasiram/personal/data_engineering/data/car_data.csv'
df = (spark.read.format("csv")
      .option("inferSchema", "true")
      .option("header", "true")
      .load(path_to_data))
df.show(5)
23/05/06 09:24:58 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , make, fuel_type, aspiration, num_of_doors, body_style, drive_wheels, engine_location, wheel_base, length, width, height, curb_weight, engine_type, num_of_cylinders, engine_size, fuel_system, compression_ratio, horsepower, peak_rpm, city_mpg, highway_mpg, price
 Schema: _c0, make, fuel_type, aspiration, num_of_doors, body_style, drive_wheels, engine_location, wheel_base, length, width, height, curb_weight, engine_type, num_of_cylinders, engine_size, fuel_system, compression_ratio, horsepower, peak_rpm, city_mpg, highway_mpg, price
Expected: _c0 but found: 
CSV file: file:///home/thulasiram/personal/data_engineering/data/car_data.csv
+---+-----------+---------+----------+------------+-----------+------------+---------------+----------+------+-----+------+-----------+-----------+----------------+-----------+-----------+-----------------+----------+--------+--------+-----------+-----+
|_c0|       make|fuel_type|aspiration|num_of_doors| body_style|drive_wheels|engine_location|wheel_base|length|width|height|curb_weight|engine_type|num_of_cylinders|engine_size|fuel_system|compression_ratio|horsepower|peak_rpm|city_mpg|highway_mpg|price|
+---+-----------+---------+----------+------------+-----------+------------+---------------+----------+------+-----+------+-----------+-----------+----------------+-----------+-----------+-----------------+----------+--------+--------+-----------+-----+
|  1|alfa-romero|      gas|       std|         two|convertible|         rwd|          front|      88.6| 168.8| 64.1|  48.8|       2548|       dohc|            four|        130|       mpfi|              9.0|       111|    5000|      21|         27|13495|
|  2|alfa-romero|      gas|       std|         two|convertible|         rwd|          front|      88.6| 168.8| 64.1|  48.8|       2548|       dohc|            four|        130|       mpfi|              9.0|       111|    5000|      21|         27|16500|
|  3|alfa-romero|      gas|       std|         two|  hatchback|         rwd|          front|      94.5| 171.2| 65.5|  52.4|       2823|       ohcv|             six|        152|       mpfi|              9.0|       154|    5000|      19|         26|16500|
|  4|       audi|      gas|       std|        four|      sedan|         fwd|          front|      99.8| 176.6| 66.2|  54.3|       2337|        ohc|            four|        109|       mpfi|             10.0|       102|    5500|      24|         30|13950|
|  5|       audi|      gas|       std|        four|      sedan|         4wd|          front|      99.4| 176.6| 66.4|  54.3|       2824|        ohc|            five|        136|       mpfi|              8.0|       115|    5500|      18|         22|17450|
+---+-----------+---------+----------+------------+-----------+------------+---------------+----------+------+-----+------+-----------+-----------+----------------+-----------+-----------+-----------------+----------+--------+--------+-----------+-----+
only showing top 5 rows
df.createGlobalTempView("cars_tbl")
tables = spark.catalog.listTables("global_temp")
for table in tables:
    print(table.name)
cars_tbl
spark.sql("""SELECT make, fuel_type, body_style
            FROM global_temp.cars_tbl
            limit 5""").show()
+-----------+---------+-----------+
|       make|fuel_type| body_style|
+-----------+---------+-----------+
|alfa-romero|      gas|convertible|
|alfa-romero|      gas|convertible|
|alfa-romero|      gas|  hatchback|
|       audi|      gas|      sedan|
|       audi|      gas|      sedan|
+-----------+---------+-----------+
spark.sql("""SELECT body_style, count(*) as body_style_count
            FROM global_temp.cars_tbl
            group by body_style
            order by body_style_count desc""").show()
+-----------+----------------+
| body_style|body_style_count|
+-----------+----------------+
|      sedan|              96|
|  hatchback|              70|
|      wagon|              25|
|    hardtop|               8|
|convertible|               6|
+-----------+----------------+
  • Using createGlobalTempView() method, the table is registered as a global temporary view
  • Global temporary views are tied to a database called global_temp
  • These are cross-session which means that we can access the global temporary view from any SparkSession within the same application

Managed Vs Unmanaged Tables

  • For managed table, Spark manages both Metadata and data in the file store
  • For an unmanaged table, spark only manages the metadata (we need to manage the data ourselves)
  • Managed table - ‘DROP TABLE table_name’ deletes both the metadata and the data
  • Unmanaged table - ‘DROP TABLE IF EXISTS table_name’ deletes only the metadata
spark.sql("CREATE DATABASE cars")
spark.sql("USE cars")
DataFrame[]

Creating a Managed table

Using SQL
spark.conf.set('spark.sql.legacy.createHiveTableByDefault', False)
spark.sql("CREATE TABLE managed_cars (make STRING, fuel_type STRING,body_style STRING, wheel_base FLOAT, length FLOAT, width FLOAT, height FLOAT)")
DataFrame[]
spark.catalog.listTables()
[Table(name='managed_cars', database='cars', description=None, tableType='MANAGED', isTemporary=False)]
Using DataFrame API
df.write.saveAsTable("managed_df_api_cars")
23/05/06 09:52:54 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , make, fuel_type, aspiration, num_of_doors, body_style, drive_wheels, engine_location, wheel_base, length, width, height, curb_weight, engine_type, num_of_cylinders, engine_size, fuel_system, compression_ratio, horsepower, peak_rpm, city_mpg, highway_mpg, price
 Schema: _c0, make, fuel_type, aspiration, num_of_doors, body_style, drive_wheels, engine_location, wheel_base, length, width, height, curb_weight, engine_type, num_of_cylinders, engine_size, fuel_system, compression_ratio, horsepower, peak_rpm, city_mpg, highway_mpg, price
Expected: _c0 but found: 
CSV file: file:///home/thulasiram/personal/data_engineering/data/car_data.csv
spark.sql("select * from managed_df_api_cars limit 3").show()
+---+-----------+---------+----------+------------+-----------+------------+---------------+----------+------+-----+------+-----------+-----------+----------------+-----------+-----------+-----------------+----------+--------+--------+-----------+-----+
|_c0|       make|fuel_type|aspiration|num_of_doors| body_style|drive_wheels|engine_location|wheel_base|length|width|height|curb_weight|engine_type|num_of_cylinders|engine_size|fuel_system|compression_ratio|horsepower|peak_rpm|city_mpg|highway_mpg|price|
+---+-----------+---------+----------+------------+-----------+------------+---------------+----------+------+-----+------+-----------+-----------+----------------+-----------+-----------+-----------------+----------+--------+--------+-----------+-----+
|  1|alfa-romero|      gas|       std|         two|convertible|         rwd|          front|      88.6| 168.8| 64.1|  48.8|       2548|       dohc|            four|        130|       mpfi|              9.0|       111|    5000|      21|         27|13495|
|  2|alfa-romero|      gas|       std|         two|convertible|         rwd|          front|      88.6| 168.8| 64.1|  48.8|       2548|       dohc|            four|        130|       mpfi|              9.0|       111|    5000|      21|         27|16500|
|  3|alfa-romero|      gas|       std|         two|  hatchback|         rwd|          front|      94.5| 171.2| 65.5|  52.4|       2823|       ohcv|             six|        152|       mpfi|              9.0|       154|    5000|      19|         26|16500|
+---+-----------+---------+----------+------------+-----------+------------+---------------+----------+------+-----+------+-----------+-----------+----------------+-----------+-----------+-----------------+----------+--------+--------+-----------+-----+

Creating Unmanaged table

  • we can create unmanaged tables from our own data sources which is accessible to our spark application
Creating table from a CSV file using SQL
spark.sql("""CREATE TABLE unmanaged_cars (make STRING, fuel_type STRING,body_style STRING, wheel_base FLOAT, length FLOAT, width FLOAT, height FLOAT)
        USING csv OPTIONS(PATH '/home/thulasiram/personal/data_engineering/data/car_data.csv')""")
DataFrame[]
Creating table with DataFrame API
(df
 .write
 .option("path", "/home/thulasiram/personal/data_engineering/data/unmanaged_car_data.csv")
 .saveAsTable("unmanaged_df_api_cars"))
23/05/06 10:02:01 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , make, fuel_type, aspiration, num_of_doors, body_style, drive_wheels, engine_location, wheel_base, length, width, height, curb_weight, engine_type, num_of_cylinders, engine_size, fuel_system, compression_ratio, horsepower, peak_rpm, city_mpg, highway_mpg, price
 Schema: _c0, make, fuel_type, aspiration, num_of_doors, body_style, drive_wheels, engine_location, wheel_base, length, width, height, curb_weight, engine_type, num_of_cylinders, engine_size, fuel_system, compression_ratio, horsepower, peak_rpm, city_mpg, highway_mpg, price
Expected: _c0 but found: 
CSV file: file:///home/thulasiram/personal/data_engineering/data/car_data.csv

Creating Views

  • We can create views on top of existing tables
  • It can be global (visible across all sparkSession on a cluter) or session based
  • Views disappear after sparkSession is closed
df_hatchback = spark.sql("SELECT * FROM managed_df_api_cars WHERE body_style = 'hatchback'")
# Creating a temporary and global temporary view
df_hatchback.createOrReplaceGlobalTempView("cars_tbl_hatchback_gtemp_view")
df_hatchback.createOrReplaceTempView("cars_tbl_hatchback_temp_view")
spark.sql("SELECT * FROM global_temp.cars_tbl_hatchback_gtemp_view limit 5").show()
+---+-----------+---------+----------+------------+----------+------------+---------------+----------+------+-----+------+-----------+-----------+----------------+-----------+-----------+-----------------+----------+--------+--------+-----------+-----+
|_c0|       make|fuel_type|aspiration|num_of_doors|body_style|drive_wheels|engine_location|wheel_base|length|width|height|curb_weight|engine_type|num_of_cylinders|engine_size|fuel_system|compression_ratio|horsepower|peak_rpm|city_mpg|highway_mpg|price|
+---+-----------+---------+----------+------------+----------+------------+---------------+----------+------+-----+------+-----------+-----------+----------------+-----------+-----------+-----------------+----------+--------+--------+-----------+-----+
|  3|alfa-romero|      gas|       std|         two| hatchback|         rwd|          front|      94.5| 171.2| 65.5|  52.4|       2823|       ohcv|             six|        152|       mpfi|              9.0|       154|    5000|      19|         26|16500|
| 10|       audi|      gas|     turbo|         two| hatchback|         4wd|          front|      99.5| 178.2| 67.9|  52.0|       3053|        ohc|            five|        131|       mpfi|              7.0|       160|    5500|      16|         22|    ?|
| 19|  chevrolet|      gas|       std|         two| hatchback|         fwd|          front|      88.4| 141.1| 60.3|  53.2|       1488|          l|           three|         61|       2bbl|              9.5|        48|    5100|      47|         53| 5151|
| 20|  chevrolet|      gas|       std|         two| hatchback|         fwd|          front|      94.5| 155.9| 63.6|  52.0|       1874|        ohc|            four|         90|       2bbl|              9.6|        70|    5400|      38|         43| 6295|
| 22|      dodge|      gas|       std|         two| hatchback|         fwd|          front|      93.7| 157.3| 63.8|  50.8|       1876|        ohc|            four|         90|       2bbl|             9.41|        68|    5500|      37|         41| 5572|
+---+-----------+---------+----------+------------+----------+------------+---------------+----------+------+-----+------+-----------+-----------+----------------+-----------+-----------+-----------------+----------+--------+--------+-----------+-----+

Dropping Views

spark.catalog.dropGlobalTempView("cars_tbl_hatchback_gtemp_view")
spark.catalog.dropTempView("cars_tbl_hatchback_temp_view")
True

Viewing the Metadata

spark.catalog.listDatabases()
[Database(name='cars', description='', locationUri='file:/home/thulasiram/personal/data_engineering/spark/pyspark/spark-warehouse/cars.db'),
 Database(name='default', description='default database', locationUri='file:/home/thulasiram/personal/data_engineering/spark/pyspark/spark-warehouse')]
spark.catalog.listTables()
[Table(name='managed_cars', database='cars', description=None, tableType='MANAGED', isTemporary=False),
 Table(name='managed_df_api_cars', database='cars', description=None, tableType='MANAGED', isTemporary=False),
 Table(name='unmanaged_cars', database='cars', description=None, tableType='EXTERNAL', isTemporary=False),
 Table(name='unmanaged_df_api_cars', database='cars', description=None, tableType='EXTERNAL', isTemporary=False)]
spark.catalog.listColumns("managed_df_api_cars")
[Column(name='_c0', description=None, dataType='int', nullable=True, isPartition=False, isBucket=False),
 Column(name='make', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False),
 Column(name='fuel_type', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False),
 Column(name='aspiration', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False),
 Column(name='num_of_doors', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False),
 Column(name='body_style', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False),
 Column(name='drive_wheels', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False),
 Column(name='engine_location', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False),
 Column(name='wheel_base', description=None, dataType='double', nullable=True, isPartition=False, isBucket=False),
 Column(name='length', description=None, dataType='double', nullable=True, isPartition=False, isBucket=False),
 Column(name='width', description=None, dataType='double', nullable=True, isPartition=False, isBucket=False),
 Column(name='height', description=None, dataType='double', nullable=True, isPartition=False, isBucket=False),
 Column(name='curb_weight', description=None, dataType='int', nullable=True, isPartition=False, isBucket=False),
 Column(name='engine_type', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False),
 Column(name='num_of_cylinders', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False),
 Column(name='engine_size', description=None, dataType='int', nullable=True, isPartition=False, isBucket=False),
 Column(name='fuel_system', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False),
 Column(name='compression_ratio', description=None, dataType='double', nullable=True, isPartition=False, isBucket=False),
 Column(name='horsepower', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False),
 Column(name='peak_rpm', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False),
 Column(name='city_mpg', description=None, dataType='int', nullable=True, isPartition=False, isBucket=False),
 Column(name='highway_mpg', description=None, dataType='int', nullable=True, isPartition=False, isBucket=False),
 Column(name='price', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False)]

Summary

  • Create managed and unmanaged tables using spark SQL and DataFrame API
  • use spark.sql to issue queries on spark sql tables or views
  • use catalog to inspect the metadata