Working with Apache Spark

Tanja Adžić
18 min readApr 27, 2024

--

If you are wondering about what does Apache Spark actually do, but you were afraid to ask, I got you. I’ll go over the basic components of working with Spark, and it will be enough to get you going with your next project, whether you are just starting out in the Data field, or already have experience. Spark is a very powerful tool, and has become a necessity in the Big Data field. It’s fast, scalable, and versatile. The benefits of having Spark under your belt are immense. Let’s spark!

Contents

Apache Spark Architecture

Spark applications consist of two primary processes: the driver program and executors:

  • driver program operates as a single process per application and is capable of running on a cluster node or externally as a client to the cluster. Its role is to execute the application’s user code, generate tasks, and dispatch them to the cluster.
  • executors are processes running multiple threads concurrently to handle workload for the cluster. They operate independently, with multiple executors potentially running on each node of the cluster, depending on the configuration.

Spark Context

The Spark Context starts when the application launches, and it must be created in the driver before working with DataFrames or RDDs. Any DataFrames or RDDs created under this context are tied to it, and the context has to remain active while they are active.

The driver program organizes user code into “Jobs” or parallelizable computations, which the Spark Context subsequently divides into tasks for execution on the cluster. These tasks, associated with different data subsets termed Partitions, can execute concurrently on Executors.

A Spark Worker is a cluster node responsible for processing tasks, while a Spark Executor efficiently utilizes local resources, such as memory and compute cores, assigning one task per available core: each Executor is alloted a set number of cores that each run one task at a time.

To increase cluster parallelism, we need to increase the number of executors and available cores. Tasks operate in separate threads until all cores are engaged, with task results either stored in new RDD partitions or returned to the driver upon completion. Optimally, the utilized cores should align with the total available cores per node.

Spark Stages and Shuffles

A Stage within a Spark job is a set of tasks that executors can execute on current data partitions. When tasks need data from other partitions, Spark initiates a Shuffle which marks the boundary between Stages. Shuffles are costly operations that data serialization, disk, and network I/O. This is because Shuffles enable tasks to access data from other dataset partitions outside the current partition, like when using a “groupby” operation.

This example of a Shuffle process shows two stages with a Shuffle between them. In Stage 1, a transformation is applied to dataset “a” with 2 partitions (“1a” and “2b”), resulting in dataset “b”. A shuffle operation, like “groupby”, is then performed to group keys of the same value together across partitions. The transformation results are placed in Stage 2, where the number of partitions remains the same, depending on the operation. Finally, the final results are sent to the driver program as an action, such as “collect”.

Overview of Apache Spark Cluster Modes

The Spark Cluster Manager serves as a link between an application and the cluster, and it acquires necessary resources for the said application. It operates independently outside the application, and it abstracts the cluster type, enabling resource allocation. During runtime, the Spark Context orchestrates task creation and communicates resource requirements to the cluster manager, which then allocates executor cores and memory resources. Once resources are secured, tasks are dispatched to executor processes for execution.

Types of Cluster Managers

Spark offers support for various cluster managers.

  • Spark Standalone cluster manager, comes with Spark installation, and is ideal for setting up simple clusters.
  • Apache Hadoop YARN is a cluster manager from the Hadoop project, and is defined as a general-purpose cluster manager.
  • Apache Mesos is a general-purpose cluster manager with additional benefits of enabling Spark to run alongside other frameworks.
  • Kubernetes is an open-source system for containerized applications, and it offers automation and scalability advantages.

Why use Spark Standalone?

Spark Standalone is built into the Spark installation, so there are no additional dependencies to deploy. It is a really fast way to setup a Spark cluster.

In the Spark Standalone setup there are two main components:

  • Workers execute on cluster nodes, initializing executor processes with reserved cores.
  • Master is responsible for connecting and adding Workers to the cluster. It’s crucial to avoid monopolizing node resources if the master and worker coexist.

To configure a Spark Standalone cluster manually, initiate the Master, assigning it a URL with a hostname and port number. Workers can then be started on any node using bi-directional communication with the master, which is enabled by the Master URL. Once the master and workers are operational, launching Spark applications on the cluster involves specifying the master URL as an argument.

# starting the spark master
./sbin/start-master.sh -h <hostname> -p <port_number>

# starting workers with the master url
./sbin/start-slave.sh spark://<master_url>:7077

# launching spark application on cluster bu specifying master url
./sbin/spark-submit --master spark://<master_url>:7077 <application_script>

Why use Apache Hadoop Yarn??

Apache Hadoop YARN is a versatile cluster manager, and it is really popular in the big data ecosystem, although it requires more complex setup and configuration compared to Spark Standalone. To use Spark on an existing YARN cluster, specify the YARN keyword using the --master option, allowing Spark to easily make use of standard Hadoop configuration files.

./bin/spark-submit --master YARN <application_script>

What about Apache Mesos?

Apache Mesos is another supported cluster manager that offers dynamic partitioning between Spark and other frameworks, which enhances scalability. However, configuring Spark on Mesos may require additional setup based on specific requirements.

Why use Kubernets exactly?

For containerized deployments, Spark integrates easily with Kubernetes, using its native scheduler for streamlined deployment and management. Local mode facilitates application testing and debugging on a single machine, but it is not designed for optimal performance.

To deploy in local mode, specify the --master option with the ‘local’ keyword, indicating the desired number of cores for the executor. Utilizing all available cores involves substituting the number with an asterisk wildcard. It’s important to note that not all cluster configurations are applicable in local mode.

spark-submit --master local[<num_cores>] <application_script>

How to Run an Apache Spark Application

Spark provides a unified interface for application submission through the spark-submit script located in the bin/ directory.

spark-submit

The spark-submit script is compatible with all supported cluster types, and it offers numerous configuration options for applications and clusters. It has a unified interface that allows us to easily switch between local mode and cluster mode by modifying a single argument, doesn’t matter which language is used in the application (it can run Java and Python applications simultaneously as an example).

Once we use the spark-submit script the following takes place:

  1. it parses command line arguments
  2. it reads additional configurations specified in the conf/spark-defaults.conffolder
  3. it connects to the cluster manager specified in the --master argument or run in local mode if it’s setup that way
  4. it transfers application (JARs or .py files) and any additional files specified to be distributed and run in the cluster

Common spark-submit options

Certain ‘spark-submit’ options, like specifying the master and driver deploy mode, are mandatory. For Java or Scala applications packaged in JARs, specifying the full class name of the program entry point is necessary. Other options include driver deploy mode, executor resource settings, and cluster manager-specific configurations.

Additional Spark configurations can be set using the --conf argument followed by key=value pairs. For Java or Scala applications, the final arguments usually include the path to the application JAR. For Python applications, Python files can be added using the --py-files argument, connecting distribution to the cluster.

Let’s see two generic spark-submit examples that are launching SparkPi to a YARN and Standalone cluster, written in Scala and Python, with specific masters and additional argument which is how many samples to take in the cluster to compute Pi.

#scala example to YARN
./bin/spark-submit --class org.apache.spark.examples.SparkPi --master YARN \
/path/to/examples.jar 1000

#python example to Spark Standalone
./bin/spark-submit --master spark://207.185.352:7077 \
examples/src/main/python/pi.py 1000

Application Dependencies

To manage dependencies effectively, it’s recommended to bundle application projects or libraries for both driver and cluster executor processes. For Java or Scala applications, creating an uber-JAR consolidating all dependencies is advisable. With Python dependencies can be included via virtual environments or the --py-files argument.

Spark Shell

The Spark Shell is available for Scala and Python. It offers interactive access to Spark APIs for data manipulation and analysis. Compatible with both local and cluster modes, Spark Shell initializes the SparkContext and SparkSession variables upon startup, enabling immediate data interaction. Expressions entered in the shell are evaluated in the driver, and actions on shell DataFrames trigger Spark jobs scheduled as tasks on the cluster.

A practical example showcases the Spark Scala shell’s initiation in local mode, showing us key information such as Spark’s log and Web UI addresses, initialized variable names, and version details. Additionally, a code snippet demonstrates DataFrame creation, transformation, and action execution within the Scala shell.

#Spark Scala shell in local mode
spark-shell --master local[*] \
--conf spark.driver.extraJavaOptions="-Dlog4j.configuration=file:/path/to/log4j.properties" \
--conf spark.executor.extraJavaOptions="-Dlog4j.configuration=file:/path/to/log4j.properties"

Using Apache Spark on IBM Cloud

Spark can be deployed either on a local cluster or on a cloud platform. Configuring nodes in a local cluster can be difficult in terms of visibility and communication, but by using a cloud environment this process is simplified and the deployment is easy through pre-configured defaults. This makes it easy to scale up the cluster to add nodes and increase computer power.

Different cloud providers give support for running Spark on different cluster managers. Running Spark on IBM Cloud offers additional advantages, including enterprise-grade security and integration with IBM’s big data solutions for AIOps (Artificial Intelligence for IT Operations) and applications like IBM Watson and the IBM Analytics Engine.

AIOps

AIOps applies artificial intelligence to automate or enhance IT operations. It helps you in data collection, aggregation, pattern identification, and rapid issue diagnosis. Spark’s suitability for big data analytics makes it ideal for processing large infrastructure datasets and applying machine learning algorithms to predict or detect operational issues.

IBM Cloud Pak for Watson AIOps provides Spark-based solutions that correlate data across various operations tools, offering real-time insights and issue identification.

IBM Spectrum Conductor

IBM Spectrum Conductor is a multi-tenant platform, and it facilitates deploying and managing Spark and other frameworks on a shared resource cluster, enabling multiple Spark applications to run simultaneously with dynamic resource allocation and enterprise-grade security.

IBM Watson and IBM Analytics Engine

IBM Watson assists in creating production-ready AI and machine learning environments, streamlining setup and maintenance tasks to focus on enhancing Spark’s machine learning capabilities. IBM Analytics Engine works with Spark by offering a flexible and scalable analytics solution, using Apache Hadoop cluster framework to separate storage and compute, therefore optimizing resource utilization.

Setting Apache Spark Configuration

Configuring a Spark Application involves three different methods: properties, environment variables, and logging configuration.

  1. Spark properties allow adjustment and control over various application behaviors, including properties set with the driver and shared across the cluster.
  2. Environment variables are loaded on each machine so they can be adjusted on a per-machine basis. This is useful for accommodating hardware or software differences among cluster nodes.
  3. Logging is controlled by the log4j defaults file, which dictates what level of messages are logged to file or output during application execution.

Spark Configuration Location

Spark configuration files are located in the conf directory within the installation. Although no preexisting files exist after installation, Spark provides templates for each configuration type, which can be customized by removing the .template extension. These template files contain sample configurations for common settings, which can be enabled by uncommenting.

Spark Property Configuration

Configuring Spark properties can be set in several ways:

  1. Programmatically within the driver program, when creating a SparkSession or via a separate SparkConf object passed into the session constructor.
  2. Properties can also be set in the configuration file at ‘conf/spark-defaults.conf’, or
  3. Properties can also be set when launching the application using spark-submit, using provided options or the --conf option with a key-value pair.

Spark Property Precedence

Understanding how Spark properties are merged is really important. Programmatically set configurations take precedence, followed by those provided with spark-submit, with configurations from the spark-defaults.conf file having the lowest precedence.

Static and Dynamic Configuration

Static configuration involves settings written programmatically into the application itself. We would use static configuration for something that is unlikely to be changed or tweaked between application runs, such as application name or other properties related to the application only.

Dynamic configuration is useful to avoid hardcoding specific values in the application itself. This is usually done for configuration such as the master location, so that the application can be launched on a cluster by simply changing the master location. Other examples include setting dynamically how many cores are used or how much memory is reserved by each executor so that they can be properly tuned for whatever cluster the application is run on.

Using Environment Variables

Spark environment variables, loaded from conf/spark-env.sh on each machine in the cluster. Since these can be loaded differently for each machine, it can be helpful to configure them based on the specifics of each individual machine. Common use cases are to ensure that each machine uses the same Python executable by configuring the PYSPARK_PYTHON environment variable.

Configuring Spark Logging

Logging in Spark is controlled via log4j and configured through conf/log4j-properties. Adjustments can be made to log levels, determining which messages are displayed in the logs, and to direct log outputs.

Running Spark on Kubernetes

Kubernetes, often known as k8s , is a popular framework for orchestrating containerized applications across clusters. It is an open-source system, and it offers scalability and flexible deployments, whether in the cloud or on-premises. It efficiently manages distributed systems like Spark, providing resilience and flexibility in resource utilization, even in the face of application errors or node failures.

Key benefits of Kubernetes include:

  • network service discovery
  • cluster load balancing
  • automated scaling of systems for optimized resource management
  • orchestrating storage to maximize availability.

Although Kubernetes is primarily designed for cluster deployment, it also supports local hosting, commonly used for development environments like minikube, allowing local testing before production deployment in the cloud.

Containerization means that applications behave consistently across environments, helping in quicker issue identification and resolution. Deploying Kubernetes on the cloud is ideal for production environments due to its deployment ease and flexibility. Getting started with Kubernetes on the cloud can be achieved either through existing tools or certified Kubernetes providers offering turnkey solutions for rapid cluster setup.

Spark versions 2.3 and above support running on Kubernetes as an alternative deployment mode, offering several advantages. Containerization enhances application portability, simplifies dependency management, and supports efficient resource sharing among multiple concurrent Spark applications.

Submitting Spark Apps on Kubernetes

Submitting a Spark application to a Kubernetes cluster involves setting the --master option to the Kubernetes API server URL and port. Spark creates a driver and executors within Kubernetes pods. A pod is a tightly coupled groups of containers with shared resources. The application can be deployed in client or cluster mode, with additional considerations for client mode ensuring executor communication with the driver program.

To make sure proper cleanup after application is completed, it’s essential to set the spark.kubernetes.driver.pod.name to the driver’s pod name, facilitating executor pod cleanup along with the driver pod.

The Apache Spark User Interface

When a Spark application is being run, as the driver program creates a SparkContext, Spark starts a web server that serves as the application user interface. You can connect to the UI web server by entering the hostname of the driver followed by port 4040 in a browser once that application is running:

http://<driver-node>:4040

The web server runs for the duration of the Spark application, so once the SparkContext stops, the server shuts down and the application UI is no longer accessible.

Why use Spark UI?

The Spark UI provides valuable insights organized into multiple tabs, each offering valuable information about the running application.

  • Jobs tab displays job status
  • Stages tab reports task states within each stage
  • Storage tab shows the size of persisted RDDs or DataFrames.
  • Environment tab lists environment variables and system properties
  • Executor tab summarizes memory and disk usage for active executors.

Depending on the application type, additional tabs may appear. For example, SQL queries trigger the display of the SQL tab, presenting metrics for each query.

Jobs

Let’s check out the Jobs tab. It shows an event timeline indicating the start of driver and executor processes, as well as job creation. Detailed information about completed jobs, including duration and task metrics, is seen below:

Exploring Job Details gives us insights into the different stages of a specific job, including timing, completed stages, and various job metrics such as input/output sizes and shuffle data. Stage Details offer even more granular information, accessible via hyperlinks within the UI.

Stages

Moving on to the Stages tab, all stages are listed based on their current state: completed, active, or pending. Clicking on individual stage IDs reveals detailed task information.

Storage

The Storage tab provides insights into cached RDDs:

Environment

The Environment tab lists Spark configuration properties and resource profiles:

Executor

The Executors tab offers a summary table and metrics for active or dead executors, along with a detailed breakdown of metrics for each executor. Additionally, links to stdout and stderr log messages are offered:

Monitoring Application Progress

Running a Spark application can be time-consuming and there are chances of failures happening. When issues happen, for example a task fails due to faulty worker nodes, it is important to identify and resolve it, to prevent resource wastage. The Spark Application UI is as an important tool for monitoring running applications, and it offers status information and access to critical details.

The Application UI helps in:

  • quick identification of failed jobs and tasks
  • fast access to locate inefficient operations
  • application workflow optimization

Application Flow

A Spark application typically comprises parallel jobs, each involving multiple stages organized into a Directed Acyclic Graph (DAG). Tasks within stages are scheduled on the cluster, with subsequent stages initiated upon completion of dependencies.

Let’s see an example workflow:

  1. Spark jobs divide into stages, which connect as a directed acyclic graph (DAG)
  2. Tasks for the current stage are scheduled on the cluster
  3. As the stage completes all the tasks, the next dependent stage in the DAG begins
  4. The job continues through the DAG until all stages are complete

If any tasks within a stage fail, after several sttempts, Spark marks the task, stage, and job as failed and stops the application. The UI visualizes this workflow, enabling users to monitor task statuses, durations, and data processing metrics.

Now let’s see an example of an application workflow using PySpark, involving loading a Parquet file, selecting columns, caching data, grouping by a column, and aggregating data:

#read parquet file
df = spark.read.parquet('users.parquet')

#select columns of choice and cache to memory
df = df.select('country', 'salary').cache()

#groupby and aggregation
mean_salaries = df.groupBy('country').agg({'salary':'mean'}).collect()

Initiating the application triggers job creation, which shows in the UI’s Jobs tab, which then displays job details, including stages and DAG connections. Selecting a specific job shows its stages and tasks, as well as insights into task states and durations.

The UI offers access to executor logs and detailed task metrics, heping in troubleshooting and performance optimization. Once all jobs are done, the SparkContext shuts down, and the UI is not accessible anymore. However, enabling event logging allows post-application analysis via the Spark History server.

To view the Application UI post-application, make sure event logging is enabled and start the History server using the provided command. Access the History server via the host URL and default port to review completed applications and their UIs.

Debugging Apache Spark Application Issues

Running a Spark application on a cluster is a complex task with chances of application failure. Common areas of Spark application issues are:

  • user code
  • system configurations
  • dependencies
  • resource allocation
  • network communication

User code issues

User code has the driver program and serialized functions executed by executors in parallel. The SparkContext is created in the driver, while DataFrame operations are serialized and executed as tasks across the cluster. Spark terminates when syntax errors or serialization issues happen, while reporting task errors to the driver and canceling related executor tasks.

Application dependencies issues

Application dependencies include application files (python scripts, JAR files, required data files), and application libraries. Dependencies must be available on all cluster nodes. Failure to install required libraries or discrepancies in versions across nodes leads to errors. If an error occurs, it is best to examine the event log for stack trace errors, and identify the issue.

Resource issues

Resource availability is critical; insufficient CPU cores or memory can prevent task execution. Tasks may remain in the scheduling queue indefinitely if workers lack resources, leading to timeout errors. Monitoring the UI or event logs helps diagnose these resource-related failures.

Log files

Accessing log files in the Spark installation directory provides detailed insights into application failure causes. Each application is assigned a unique ID, and corresponding log files for stdout and stderr are stored under ‘work/<application-id>/’. Spark standalone clusters generate additional log files in the ‘log/’ directory.

Understanding Memory Resources

When running a Spark application, it’s important to set memory limits for both the driver and executor processes. These limits prevent the application from using all available cluster memory, which then creates optimal performance.

Exceeding memory limits can lead to data spilling to disk or out-of-memory errors, interupting application performance. It’s really important to consider memory settings for both executors and drivers carefully.

Memory Settings Considerations

Executors require memory for processing and caching, but excessive caching can lead to out-of-memory errors. Meanwhile, the driver handles data collection, broadcasting variables, and managing results. Large datasets may surpass the driver’s memory capacity, it is important to carefully filter the data.

Spark Unified Memory

In Spark, executor and storage memory share a unified region within the Java Heap Space. Storage memory can acquire all available memory when executor memory is unused, and vice versa. However, storage memory eviction by executor memory is limited, ensuring optimal performance for various workloads without user intervention.

Data Persistance

Data persistence, or caching, in Spark allows storing intermediate calculations for reuse, enhancing performance, especially for iterative workloads like machine learning. Caching data in memory or disk reduces the need to reload data from the source, significantly speeding up operations.

Setting Memory on submit

Configuring memory for executors is typically done when submitting the application to the cluster. It’s essential to allocate enough memory for each application to run effectively without exhausting all available resources. Oversubscribing resources may lead to performance degradation.

Setting Worker Node resources

In a Spark standalone cluster, specifying total memory and CPU cores for each worker ensures efficient resource utilization. However, be cautious in order to avoid assigning more resources than available on the physical machine, which can degrade performance.

Understanding Processor Resources

CPU cores are crucial resources allocated to both the driver and executor processes in Spark applications. Executors can process tasks concurrently, limited by the number of cores assigned to the application. Once tasks complete processing, the cores become available for other tasks.

In a cluster, the number of available cores is finite. If all cores are occupied, applications must wait for tasks to finish before utilizing cores. Spark queues tasks once launched, assigning them to available executors to maximize parallelism. Tasks not immediately scheduled wait in the queue until cores become available.

Customizing core settings overrides default behavior. You can specify the number of cores per executor using the --executor-cores argument during application submission. Tasks only start when the required cores become available. Alternatively, the --total-executor-cores argument specifies the total cores for the entire cluster, not per executor.

In a Spark standalone cluster, manually starting a worker allows specifying the number of cores using the --cores argument. The default behavior is to utilize all available cores, but it's common to start one worker per node for optimal resource utilization.

Consider an example with a standalone cluster featuring one worker node and six cores. Suppose two identical applications are submitted consecutively, each requesting four cores per executor. The first application utilizes four cores, leaving two available. The second application waits until four cores are free before starting.

If the second application requests only two cores, it can start immediately, but overall processing time might increase due to running fewer tasks concurrently. Balancing Spark application performance involves optimizing workload configuration and cluster resources through careful tuning.

DISCLAIMER

Disclaimer: The notes and information presented in this blog post were compiled during the course “Introduction to Big Data with Spark and Hadoop” and are intended to provide an educational overview of the subject matter for personal use.

--

--

Tanja Adžić

Data Scientist and aspiring Data Engineer, skilled in Python, SQL. I love to solve problems.