Sitemap

Introduction to Apache Spark

Apache Spark

13 min readNov 21, 2021

--

APACHE SPARK

Apache spark is a tool widely used by data engineers, data scientists, or machine learning engineers, it was designed and built in 2009 at UC Berkeley, it is the evolution of the old paradigm that used HADOOP with the Mapreduce algorithm, Apache spark it acts as a distributed system, spreading the workloads in the memory of different nodes of a cluster, it is simpler, it is 10 to 20 times faster than Mapreduce Hadoop, it is easier to use, it is modular, that is, it can be used for different workloads, and it focuses on speed and parallel computing instead of storage, and that’s the main difference from Apache Hadoop.

MEMORY PROCESSING

  • The price of memory decreases each year, which doesn’t affect infrastructure expenses by putting more memory on each server in a cluster.
  • Many datasets fit into memory of a modern computers.
  • Memory is fast, using disk is not a good idea when you want to run small operations.
  • There is quantitative evidence about the speed of Apache Spark vs Hadoop Mapreduce

COMPONENTS

The general overview of the architecture and its components is in the image below, all its components are divided into their APIs and Apache spark core

Press enter or click to view image in full size

DISTRIBUTED EXECUTION

Spark is a distributed processing engine, all its components work in collaboration in a server cluster, it is important to understand how these components work together, image below (left section) shows an apache spark cluster in an standalone mode, for each Apache application Spark runs a driver and several executors, Here we will describe every single component and the process.

  • Driver: It is responsible for initiate the SparkSession, create the communication with the cluster manager, requesting resources (CPU and Memory) from the cluster manager to the different executors, all spark operations are being transformed into DAG calculations, and distributed throughout all executors.
  • SparkSession: it is the object that will allow you to consume and access to all kinds of operations and data in Spark, it will allow you access to all the functionality of Apache Spark.
  • Executor: The executors communicate with the driver, and they are responsible for executing tasks inside each worker, executors reserve memory and CPU in workers, it is totally dedicated to an application and ends when the application completes, an Spark Application consists of many executors often working in parallel, a worker node has a finite number of executors hosted at a given time, in case an application requires more executors and they are not available then they are scheduled to run after other executors finish their tasks and release resources.
  • Clúster manager: it is responsible for managing the cluster resources together with the master node, it also monitors the worker nodes, the cluster manager can be separated from the master node in cases when an architecture using Yarn or Mesos is chosen, the image below shows In a Standalone mode architecture, the master node and the cluster manager perform the same functionality. In this mode, the master acts as its own cluster manager.

DEPLOYMENT MODES

The deployment modes in an Apache Spark cluster specify where exactly the driver will be executed, there are two types:

  1. Client Mode: The driver runs on the user’s local machine and the executors on the cluster.
  2. Cluster Mode: The driver and executors run in the cluster.

There are several types of cluster manager:

  1. Standalone cluster
  2. Yarn
  3. Mesos

Finally, the image below (right section) explains the following:

The driver converts the spark application into one or more jobs, and each job is a DAG, and the execution plan of the entire app is based on this concept, each node into the DAG is exactly one or more stages, these stages are created depending on what type of operation needs to be executed, and can be executed in parallel, at the same time each stage is composed of different tasks, and that represents the minimum unit of execution, each task is being processed by a single processor or CPU, and receives a specific partition of the data, an executor with 4 cores can process 4 or more tasks in parallel.

Press enter or click to view image in full size

The basic flow of an application in Spark consists of reading heterogeneous data from an external source, execute necessary transformations or computation to change its form, aggregations and then storing the result in a new structure, those results are loaded into a destination data source. The process of computing and storing the result of the computation in a data structure depends on the use of 3 basic structured APIs.

APIs

  • RDD: all the APIs in Spache Spark are transformed to RDD, everything in SPARK is an RDD, an RDD is resilient(fault tolerant), it is partitioned(data chunks), it is distributed (chunks are distributed across all executors of the cluster), and it is immutable (you cannot modify the content of a current RDD). In the image below, using an RDD you can indicate how many partitions to use, in this case the example uses 4 partitions that are assigned to 4 executors, there are 2 executors per worker node, each chunk of the dataset is assigned to each executor, in a RDD you can execute two types of operations Transformations and Actions, the first 4 lines of the code are transformations. The first MAP splits the dataset and assigns each chunk to each executor, the second MAP generates a KEY VALUE, a 1 is assigned for each word, the last line generates a Shuffle.
  • DATAFRAME: It’s like a database table, where there is a schema defined for the table, each column has an assigned or inferred data type, so you can create an order at each step of the transformation process using aggregations and filters and that’s it, you will know what kind of data type is in each phase of the computation, it is easier for Spark to know what to do and how to do it, therefore there is a better translation from Dataframe to RDD which leads to a better execution plan.
  • DATASET: is a collection of strongly typed JVM objects, and can only be used with Java or Scala, they are compile-time type-safe, in this case unlike the dataframe the types are not dynamically inferred or assigned at runtime as in Python or R, only at compile time.

RDD (DISADVANTAGES)

You are not indicating to the process what to do and how to do it, using an RDD, Spark will not know if you are executing a join, filter, select or a specific aggregation, Spark only has access to a lambda expression using generic objects, it is impossible to know what data type is being operated, so it is more difficult for Spark to optimize the execution plan, Spark doesn’t know if there is another efficient way to express the computation using another order in the expression. This is only achieved with other types of APIs such as dataframes and datasets.

WHEN TO USE AN RDD

Although it is not recommended to use them, I would say that their use is appropriate if you need to tell Spark how exactly a query should be executed.

Press enter or click to view image in full size

SPARK SQL

The process of building queries efficiently and generating code as compact as possible is the job of the Spark SQL engine. Spark SQL generates a compact and optimized final execution plan for the JVM, it also provides a bridge to and from external tools via JDBC and ODBC drivers. All Spark components are unified and a common abstraction is allowed for DataFrames and Datasets in Java/Scala, Python and R, this simplifies the fact of working with structured data. Spark Reads and writes structured data with a schema defined in a specific structured file format like: Json, csv, Avro, Paquet etc. and convert this data to temporary tables.

CATALYST OPTIMIZER

Spark SQL’s Catalyst Optimizer improves the performance of SQL queries. Catalyst automatically transforms relational queries to execute them more efficiently using techniques such as filtering and indexing, and ensuring that joins from data sources are performed in the most efficient order. Furthermore, its design allows the Spark community to implement and extend the optimizer with new functionality.

The Catalyst Optimizer takes a query and converts it to an execution plan, commonly the entire query goes through 4 transformation phases, check the image below:

  1. Analysis: SparkSQL starts by generating an AST tree for the SQL query or the query that uses a dataframe, in this initial phase any column or table name is looked up in an internal catalog that contains all objects with column names, data types, functions and databases, after each of them are identified, it goes to the next phase.
  2. Logical Optimization: At this level the Catalyst optimizer will generate different plans and using the cost optimizer will assign a cost to each plan, these plans are presented as a trees with operators and are composed of: Expressions (Columns transformations, filters, conditions, joinings) and Relational operators (Filter, Join and DataFrame transformations).
  3. Physical Plan: In this phase Spark SQL generates one or more physical plans based on the optimized logical plan of the previous phase, it contains a more specific description of how things should happen using the operators already defined. A clear example would be: In the logical plan the Join operator is used and the physical plan decides which type of Join should be used, a SortMergeJoin or a BroadcastHashJoin.
  4. Code Generation: It is the final phase of optimization, up to this point efficient java bytecode has been generated to run on each machine, in this case Spark acts as a compiler. Code generation reduces the packing of primitive data types and, more importantly, avoids the costly use of polymorphic functions.
Press enter or click to view image in full size

TUNNING APACHE SPARK

Large workloads in Spark are called Batch jobs, commonly scheduled to run for a day, Apache Spark is able to control batches that can be measured in terabytes of data, and there is always a probability of failure in the execution of these jobs, Most of the time the success in executing these depends on the understanding you have about the main components of Spark: Driver, executor, and how the shuffles that occur in each executor are managed. Here are the most common techniques for optimizing workloads to reduce the probability of failure with Apache Spark:

Dynamic resource allocation

When you enable the “Dynamic resource allocation” option, the Driver requests more resources only when it is necessary, in this way the computing resources are better managed and more resources are only added in sudden peaks. There are scenarios where analytics are done on demand and a large number of SQL queries are expected to be received at specific times of the day, in these cases “Dynamic resource allocation” will take care of freezing them or add executors only when it is necessary.

  • spark.dynamicAllocation.enabled
  • spark.dynamicAllocation.minExecutors
  • spark.dynamicAllocation.schedulerBacklogTimeout
  • spark.dynamicAllocation.maxExecutors
  • spark.dynamicAllocation.executorIdleTimeout

Scaling Spark Executor

Enabling the previous option is not enough to guarantee the absence of fault, it is necessary to ensure that each new executor that is started has sufficient memory capacity. The amount of memory available for each executor is controlled by: spark.executor.memory, The image below shows how memory is distributed in an executor (Executor Memory Layout). Different types of queries require a different amount of memory.

  • Shuffle Memory: it is used for shuffles, sorts, joins and any kind of aggregations.
  • User Memory: it is for storing user-generated caches, data structures, and partitions derived from dataframes.
  • Reserved Memory: It would store internal Spark objects, it is reserved by the system.
  • Memory Buffer: VM overheads.

Scaling External Shuffle Service

During Map and Shuffle operations, spark writes and reads “shuffle files” from the hard drive, so there is quite a lot of IO, which consequently ends up in a bottleneck, for each seek of a shuffle, we return to open the same index file and read it. It is more efficient to avoid opening the same file several times and storing the data in a cache, by storing the index of the file with its information, this way we can also limit the number of reads on the cache.

Maximizing Spark Parallelism

Partitions are atomic units of parallelism, a simple task represented by a single thread must run on a CPU core and it processes a single partition, therefore, the partition level must be chosen based on the number of cores that an executor has, when more partitions are created than the existing cores in the executor then they will be waiting. Apache Spark reads data from disk in blocks of defined size, by default they are between 64 and 128 MB, a partition is represented as a collection of these blocks. By default the size of a partition can be set using: spark.sql.files.maxPartitionBytes or using the .repartition(numbers of partition) method to indicate the number of shuffle partitions, configure: spark.sql.shuffle.partitions, in this way it will indicate how many partitions are sent through the network to the executors.

Cache and Persist:

Dataframes are commonly used or read, when they are involved in transformations that are frequent in your data pipelines or ETLs can be cached or persisted to avoid creating them again.

  • Cache: stores partitions across all executors, as long as memory allows to store, a dataframe can be cached in fractions, but partitions cannot, if you have 4 partitions and only 2.5 of them fit in memory, only 2 will be cached.
  • Persist unlike “cache” persist allows you to use one storage tier, there are 6 storage tiers.

Before using any of these two techniques, make sure that the dataframes used in the ETLs are not too large, and regardless of their size, avoid caching or persisting dataframes that do not need frequent and expensive transformations.

Press enter or click to view image in full size

JOINS

One of the most common operations in advanced analytics are the JOIN operations (inner join, right join, left join, outer join), two dataframes in spark can be joined using a common column in both, in the same way as it happens in a relational database, all these operations generate a large amount of movement through the executors in Apache Spark, or in other words: Shuffle, in order to reduce this amount of data movement in an Apache Spark cluster there are 5 JOIN strategies:

  1. Sort Merge Join
  2. Broadcast Hash Join
  3. Shuffle Hash Join
  4. Broadcast Nedted Loop Join
  5. Cartesian Product Join

Sort Merge Join (SMJ) and Broadcast Hash Join (BHJ) are the most common to use:

SORT MERGE JOIN (SMJ)

All rows of a dataframe with the same KEY are placed in the same partition and in the same executor (Hash key partitioned), which causes an initial exchange between executors.

The phases are as follows:

  • Both tables are partitioned using a JOIN KEY across all partitions in the cluster.
  • The data rows in each partition are sorted in parallel.
  • The union of the already partitioned and ordered data is done by iterating over each row and joining the common rows based on the KEY.

When to use the SMJ?

  • When both tables are considerably large.
  • If the data in the columns involved in the join has a nearly uniform distribution.

What if the data in the column is skewed?

  • Add more resources to the cluster.
  • Use the .repartition
  • Consider using an iterative Broadcast JOIN strategy.

BROADCAST HASH JOIN (BHJ)

When you intend to do a JOIN between two dataframes and one of them is a small dataframe or one with few records, you should broadcast the small dataframe to all the executors of the cluster.

The phases are as follows:

  • The small dataframe is broadcast through all executors.
  • The partitions of the largest dataframe are distributed evenly across all executors.
  • The join is made between the partition in each executor and the small dataframe based on the KEY.

When to use BHJ?

  • If one of the dataframes involved in the UNION is small.
  • If you want to considerably reduce the shuffle in the cluster.
  • If the KEYs involved in the JOIN are not sorted.

What if the dataframe is not small?

  • The Iterative Broadcast Join strategy could be used.
  • Use the Sort merge Join strategy.
  • Configure by default what is the maximum size allowed for the smallest dataframe using: spark.sql.autoBroadcastJoinThreshold, by default it is 10mb.
Press enter or click to view image in full size

APACHE SPARK PROVIDERS

Some of the cloud technologies that offer the Apache Spark platform as a service:

  1. Databricks: It was founded by the creators of Apache Spark, this same technology being the technological base of its business model, the community version (Community Edition) can help you get started with this technology totally free.
  2. Google Dataproc: you can create fully managed and secure apache spark clusters.
  3. AWS EMR: is the best option, because Amazon is the execution provider of the databricks clusters, Elastic map Reduce offers you to create Apache Spark clusters and at the same time offers you options in which you can reduce the prices of the computation to only adapt to the necessary peaks, using spot instances on demand, and control via IAC the steps associated with an app running in a Spark environment, in the example below I attached a basic project where you can create an EMR cluster from your computer in AWS and control the level of granularity of the code for the entire application.

That article explains the arguments why I separated the application into steps:

This other example can help you to have an Apache Spark cluster in standalone mode, using docker.

--

--

Ramses Alexander Coraspe Valdez
Ramses Alexander Coraspe Valdez

Written by Ramses Alexander Coraspe Valdez

Very passionate about data engineering and technology, love to design, create, test and write ideas, I hope you like my articles.

No responses yet