DataFrames and Spark SQL

Tanja Adžić
7 min readApr 17, 2024

--

DataFrames and Spark SQL are a very important parts of Apache Spark, one of the leading distributed computing framework. DataFrames provide a structured data abstraction, while Spark SQL offers a SQL interface for querying DataFrames, and both form a way to simplify data processing tasks and enable efficient analysis of large-scale datasets, or The Big Data. I’ll explore and define some of the key points of DataFrames and Spark SQL in this blog. Let’s get started!

Photo by Joshua Sortino on Unsplash

Contents

RDDs in Parallel Programming and Spark

Resilient Distributed Datasets (RDDs) are an integral part of Spark’s data abstraction, partitioned across a cluster’s nodes. Let’s explore the RDD operations:

Transformations

In Spark, a Transformation generates a new RDD from an existing one. These operations are “lazy,” meaning results are computed only when “actions” are invoked.

For instance, the map transformation applies a function to each element, yielding a new RDD. Actions, such as reduce, aggregate RDD elements and return results to the driver program.

DAG

Behind the scenes, Spark employs a Directed Acyclic Graph (DAG) and a DAG Scheduler to execute RDD operations. Visualize the DAG as a graph composed of vertices representing RDDs and edges representing transformations or actions.

DAGs enable fault tolerance, which is crucial in distributed computing. When a node fails, Spark replicates the DAG and restores the affected node. Initially, Spark constructs a DAG upon RDD creation. Then, the DAG Scheduler executes transformations, updating the DAG accordingly. The driver program evaluates the DAG post-action execution.

Transformation and Action examples

RDDs offer lots of transformations and actions. Besides map and reduce, filter selectively retains dataset elements, while distinct identifies unique elements. Flatmap, similar to map, maps input items to multiple output items. Common actions include take, collect, and takeOrdered, which makes sure we retrieve the result.

Detailed information on transformations and actions is available on Spark’s official website.

The most important key points include that:

  • transformations merely map operations;
  • actions deliver computed values to the driver program.

An example scenario where a function, f(x), decrements x by one: Applying this function as a transformation via the map operation partitions the dataset. Subsequently, the collect action consolidates partitioned entries, delivering results to the driver.

Dataframes and Datasets

Datasets represent the latest data abstraction in Spark, offering an API to access distributed data collections. Unlike RDDs and DataFrames, datasets consist of strongly typed Java Virtual Machine (JVM) objects. This type-safety ensures explicit declaration of dataset datatypes during creation, enhancing reliability.

Datasets combine the advantages of RDDs, such as lambda functions and SparkSQL optimizations. Additionally, datasets offer immutability, facilitated by encoders converting JVM objects to a tabular format. They also extend the DataFrame API, treating a generic untyped “Row” dataset as a DataFrame column.

Benefits of Datasets in Spark

Currently, dataset APIs are accessible only in statically typed languages like Scala and Java, ensuring compile-time type safety. This early error detection reduces development and operational costs. Furthermore, datasets outperform RDDs, especially in aggregate queries, leveraging query optimization through Catalyst and Tungsten.

Also, datasets optimize memory usage by understanding data structures, facilitating efficient caching. They provide high-level aggregate operations like sum, average, join, and group-by, enhancing convenience.

Creating a dataset

Creating datasets in Spark involves various methods. In Scala, the toDS function converts a sequence into a dataset. Alternatively, datasets can be generated from text or JSON files, with explicit schema declarations if necessary:

//dataset created from a sequence
val ds = Seq("alpha", "beta", "gamma").toDS()

//dataset created from a txt file
val ds = spark.read.text("/text_folder/file.txt").as[String]

//dataset created from a json file
val ds = spark.read.json("/customer.json").as[Customer]

Datasets vs DataFrames

Comparing datasets with DataFrames shows us datasets’ strengths. While both offer type safety, datasets unify Java and Scala APIs, unlike DataFrames which also support Python and R. Additionally, datasets build upon DataFrames, which in turn are constructed on RDDs, showcasing Spark’s evolutionary development.

Catalyst and Tungsten

The primary objective of Spark SQL Optimization is to enhance the runtime performance of SQL queries, reducing both time and memory consumption which results in cost savings for organizations. This optimization is achieved through both rule-based and cost-based approaches.

Catalyst

Catalyst, the built-in rule-based query optimizer in Spark SQL, is designed to be extensible, allowing developers to incorporate new optimization techniques effortlessly. It operates based on predefined rules, validating aspects such as indexing and column necessity within queries. Cost-based optimization measures query paths to minimize time and memory usage, that are a crucial consideration especially for complex queries for large datasets.

Also, optimizing SQL queries is similar to optimizing travel routes for a car journey — where rule-based actions correspond to maintaining the vehicle, while cost-based actions focus on selecting the most efficient route.

Behind the scenes, Catalyst employs a tree data structure and executes four main phases: analysis, logical optimization, physical planning, and code generation. During these phases, it progresses from analyzing the query to generating optimized physical execution plans using a combination of rule-based and cost-based techniques.

Catalyst Phases

Tungsten

Tungsten is another optimization component in Spark, and it prioritizes CPU performance over IO by managing memory explicitly, using cache-friendly data structures using STRIDE-based memory access.It supports on-demand JVM bytecode generation to reduce CPU overhead.

By focusing on CPU efficiency and memory optimization, Tungsten significantly enhances Spark’s performance, particularly in data processing tasks, ensuring efficient resource utilization and faster query execution.

ETL with Dataframes

The fundamental DataFrame operations are:

  1. Reading (read the data)
  2. Analysis (analyze the data)
  3. Transformation (transform the data)
  4. Loading (load data into a database)
  5. Writing (write data back to disk)

This process is also known as the ETL process: Extract, Transform, Load.

What happens first is that Spark reads and loads the data into a DataFrame. Subsequently, dataset analysis involves inspecting columns, data types, and basic statistics, with potential for trend analysis and more complex operations.

Transformation enables data refinement through filtering, sorting, or joining, crucial for preparing data for downstream applications. Finally, loading transformed data back into a database or file system completes the ETL process, making data accessible for analytics, machine learning, or other services.

Another prevalent data processing approach is ELT (Extract, Load, Transform), prevalent in big data scenarios. With ELT, data resides in a data lake, allowing transformations tailored to individual project needs, as opposed to predefined transformations in ETL and data warehouses.

Read the data

Reading data involves loading directly into DataFrames or creating Spark DataFrames from existing ones. For instance, a dataset about car models’ performance is loaded, featuring parameters like miles per gallon and horsepower.

Analyzing data

Analysis entails examining column data types and dataset schema, followed by detailed inspection using functions like show and select.

Transform data

Transformation involves refining data by filtering, joining, or performing columnar operations, depending on downstream requirements.

For example, filtering can isolate cars with specific mileage criteria, while aggregation can summarize data by grouping records based on shared attributes like the number of cylinders.

Loading or Exporting data

Finally, the ETL pipeline exports data to disk or other databases, completing the cycle. This step involves writing data to disk as JSON files or exporting it to databases like Postgres, making sure data integration and utilization across various applications is seamless.

Real-worls use case of SparkSQL

Here’s a quick overview of Spark SQL: It’s a module within Spark tailored for structured data processing, enabling SQL queries on Spark DataFrames with support for Java, Scala, Python, and R APIs.

Creating a View in Spark SQL

To utilize SQL queries effectively, the first step is creating a table view. Spark SQL offers both temporary and global temporary table views. Temporary views are confined to the current Spark session and node, while global temporary views are accessible across various Spark sessions.

Hre is an example of temporary views using PySpark in Python. After creating a DataFrame from a JSON file, a temporary view named “people” is created, allowing subsequent SQL queries:

#dataframe from file
df = spark.read.json('people.json')

#temp view
df.createTempView('people')

#running sql query
spark.sql("SELECT * FROM people").show()

Aggregating data

Aggregation is a standard process in Spark SQL, presenting grouped statistics. DataFrames provide built-in aggregation functions like count, average, and max. Alternatively, aggregation can be performed programmatically via SQL queries and table views.

For instance, grouping cars by cylinder count can be achieved using DataFrame functions or by creating a temporary view named “cars” and executing a SQL query to group by cylinders.

#loading data
import pandas as pd
mtcars = pd.read_csv('mtcars.csv')
sdf = spark.createDataFrame(mtcars)

#viewing first five rows
sdf.select('mpg').show(5)

#aggregating cars by num of cylinders using dataframe func
car_counts = sdf.groupby(['cyl']).agg({"wt":"count"}).sort("count(wt)", ascending = False).show(5)

#aggregating cars by num of cylinders using temp view
sdf.createTempView('cars')
sql("SELECT cyl, COUNT(*) FROM cars GROUPBY cyl ORDER BY 2 DESC")

Spark SQL supports various data sources, including Parquet, a columnar format compatible with many data processing systems. It also seamlessly reads and writes JSON datasets, inferring schemas. Additionally, Spark SQL interacts with Hive, enabling data reading and writing from Hive storage.

DISCLAIMER

Disclaimer: The notes and information presented in this blog post were compiled during the course “Introduction to Big Data with Spark and Hadoop” and are intended to provide an educational overview of the subject matter for personal use.

--

--

Tanja Adžić

Data Scientist and aspiring Data Engineer, skilled in Python, SQL. I love to solve problems.