[Data Engineering] Introduction and Implementation of Spark Python

Jacky Fu
16 min readMar 9, 2023

--

https://www.bbc.com/zhongwen/trad/science-47320761

Before you start reading this article, if you’d like to read the Mandarin version, please click below link and enjoy :)

Introduction

Today, we are going to talk about Apache Spark, a commonly used tool for processing massive datasets. It inherits the concept of MapReduce in the Hadoop ecosystem, but performs operations in memory, thereby eliminating the time-consuming disk I/O operations in Hadoop. Studies have shown that Spark is more than 10 times faster than Hadoop in terms of computing power, making it a popular tool in the field of data engineering.

So, if we have Spark, do we still need Hadoop?

The answer is yes, because there are still differences between the two. Both are distributed in the form of clusters in order to effectively process large amounts of data. Hadoop is more mature in terms of infrastructure, not only using MapReduce for computation, but also providing a series of storage resources such as HDFS and HBASE. On the other hand, Spark focuses on computation and plays the role of MapReduce in Hadoop. Therefore, there is no question of one replacing the other, and Spark also supports reading data from HDFS, making them compatible with each other. Additionally, Spark can also read data from a single file on the local machine or from a sequence data set stored as RDD. In short, they are often used together in the field of data engineering, but can also be used separately.

Before we delve into Spark, let’s first introduce RDD.

RDD (Resilient Distributed Dataset):

RDD is a concept used by Spark to express data units. Its data structure is characterized by parallelism, immutability, fault tolerance, and lazy evaluation.

  1. Parallelism: Since RDD splits data into multiple units, it can be distributed across multiple computing resources, achieving the characteristic of parallel computing.
  2. Immutability: Because RDD splits data into multiple units, it does not allow us to manipulate its data itself. If we want to adjust the data, we can only generate a new RDD through operation.
  3. Fault tolerance: The fault tolerance mechanism of RDD is not achieved through Replica, but through data lineage. When creating each RDD, it records who its upstream (Parent) is and how it was generated through an operation. Therefore, when a node fails, the disappeared RDD can be regenerated through the same steps on other nodes and complete the operation, thus achieving the fault tolerance feature of RDD.
  4. Lazy evaluation: When operating on RDD, there is usually a series of data transformations (called Transformations), and then generate results through specific operations (called Action). The concept of lazy evaluation is that regardless of how the data is transformed, even if it is transformed multiple times, RDD will only start working and generate results according to the Transformation requirements when the Action occurs. The disadvantage is that if there is a problem in the transformation process, the program will only throw an error when the Action is executed. The advantage is that if the data is significantly reduced in size after Transformation, when an Action occurs, it can avoid loading huge amounts of useless data because many of them are already known to be useless. This can achieve optimization of computational efficiency.

About operations on RDDs, they can be divided into four major categories:

  1. Create: Firstly, initialize SparkContext (sc), and then use sc.textFile(), sc.Parallelize(), sc.makeRDD() to quickly create RDDs.
  2. Transformation: Functions for data transformation on RDDs, commonly used functions include reduceByKey(), flatMap(), filter(), map(), and reduceByKey().
  3. Action: Actions used to generate operation results after data transformation on RDDs, commonly used actions include reduce(), first(), count(), collect(), and take().
  4. Persistence: Due to the lazy evaluation principle of RDD, when the same RDD is accessed, the machine actually starts to generate the requested RDD from scratch. This sounds inefficient, so when an RDD needs to be repeatedly used, we can request that the RDD be persistently stored in memory to avoid it disappearing. In this way, subsequent operations can directly access the RDD for operation. The main methods are cache() and persistence().

What is the relationship between Spark’s Partition, Task, CPU Core, Worker, and Executor?

A partition is the smallest unit of RDD after a file is read and split. The official recommendation is to set the number of partitions to be the same as the total number of computation cores in the Spark cluster.

In Spark’s Local mode, the default number of partitions is the same as the number of CPU cores on the machine. Local means that there is only one machine. If it has 5 CPU cores, in the best-case scenario, Local[*] can start 5 Executors to work in parallel, as if there were 5 Workers.

It should be clarified that the number of Workers is not necessarily the same as the number of Executors. In Spark cluster mode, we consider Workers as the number of nodes (machines), and Executors as a program for computing tasks. Because each Worker machine has its own resources (RAM) and capabilities (number of CPUs), one machine may launch more than one computing process.

Imagine a file is split into 10 partitions, which means that 10 independent tasks need to be generated to perform computation. If there are 2 Workers, each with 5 CPUs, the 10 Executors generated will consume the 10 tasks in parallel and obtain the results. Of course, if there are more partitions, such as 25, and only 10 tasks can be processed at a time, then the conceptually repeated process needs to be performed 3 times, and 5 CPU resources will be idle during the third time. Conversely, if there are only 2 partitions, it can be processed in one go, but at the same time, 8 CPU resources will be idle.

Obviously, with different resource configurations, different partitions may be distributed to different machines or assigned to the same machine for computation. So do not misunderstand that there is a one-to-one relationship between the number of partitions and the number of machines (Workers), they may not be one-to-one.

Balancing the number of partitions and computation resource configuration depends on accumulated practical experience, but it is also worth mentioning that the official recommendation to set the number of partitions to be the same as the total number of computation cores in the Spark cluster is reasonable.

What is the difference between RDD and HDFS?

In spirit, RDD inherits the distributed storage structure of HDFS, but fundamentally they can be considered unrelated. We call the unit of splitting HDFS files “Blocks” and the splitting of RDD “Partitions”, both of which exist in a distributed manner. Therefore, it is easy for people to mistakenly assume that the number of Blocks is equal to the number of RDD Partitions that are read in. In fact, we can refer to the figure below to help understand that, regardless of how HDFS originally stored the data, it will be reallocated to the number of partitions that we pre-set after it is read in, and exist in the form of an RDD.

https://blog.csdn.net/xjping0794/article/details/78218312

Taking a breather, now we’re entering Spark.

https://www.debrapasquella.com/2018/05/taking-breather.html

Spark Architecture

Let’s start by looking at the official introduction of Spark:

Apache Spark is a unified analytics engine for large-scale data processing. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, pandas API on Spark for pandas workloads, MLlib for machine learning, GraphX for graph processing, and Structured Streaming for incremental computation and stream processing.

In summary, Spark is suitable for large-scale data processing and provides a range of tools to help with analysis, computation, and supports multiple programming languages. The image below does a good job of interpreting the above statement and readers can refer to it.

https://ogirardot.wordpress.com/2015/05/29/rdds-are-the-new-bytecode-of-apache-spark/

Let’s have some Terminology Explanations

Spark DataFrame

Speaking of DataFrame, it is actually a data table. We can simply understand Spark DataFrame as a data table service based on the Spark framework. It is worth noting that the table here does not actually exist, but is an abstract “concept”. The reason is simple, because it is still storing data in an RDD manner, which means that the data is still parallelly distributed and immutable. The so-called DataFrame is just based on the concept of RDD plus Column, and allows users to operate data in a relational table manner through a series of APIs provided by Spark.

https://myapollo.com.tw/blog/pyspark-2/?

After reading the above description, it may seem a bit complicated. Fortunately, in more than 80% of scenarios, we can use existing knowledge and methods of Pandas DataFrame to perform the same operations on Spark DataFrame. For example, both support statistics, aggregation, filtering, merging, SQL syntax, function operations, etc., and even both can be converted through APIs.

So, what are the differences between Spark DataFrame and Pandas DataFrame?

Here I summarize the 3 most critical differences:

  1. Data Characteristics: The essence of Spark DataFrame is RDD, which can achieve parallel computation through distributed storage, but its data is immutable. In contrast, Pandas DataFrame can modify data itself because it is stored locally and does not support parallel computing.
  2. Automatic Indexing: Pandas DataFrame generates indexes automatically when created, while Spark DataFrame creates them only when needed.
  3. Data Comparison: Pandas DataFrame allows the use of the diff syntax to compare the numerical differences between the upper and lower rows. However, Spark DataFrame does not support this feature because RDDs cause independent and possibly distributed storage of the upper and lower rows.

Another type of data format, Spark DataSet, what is it?

We can view Spark DataSet as an extension of the concept of Spark DataFrame. We know that Spark DataFrame is used to process structured data, while Spark DataSet can accept unstructured data. Although it is unstructured, compared to the unorganized RDD, Spark DataSet still gives definitions to the data. The official also refers to Spark DataFrame as Spark DataSet [Row], because Spark DataFrame can be seen as a special case of Spark DataSet when it has structured data. Therefore, the official recommends using Spark DataSet as the preferred object of operation. Currently, it supports two languages, Java and Scala. Although Python does not yet have official API support, relying on the dynamic language features of Python, many Python syntaxes have already supported operations on DataSet.

Let’s take a look at some code to experience Spark DataFrame.

#!/usr/bin/env python

from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
# spark is an existing SparkSession
df = spark.read.json("examples/src/main/resources/people.json")
# Displays the content of the DataFrame to stdout
df.show()
# +----+-------+
# | age| name|
# +----+-------+
# |null|Michael|
# | 30| Andy|
# | 19| Justin|
# +----+-------+

Spark SQL:

Spark SQL is a module provided by Spark that can be used to manipulate structured data. It supports multiple data sources such as JSON, Parquet, Avro, CSV, etc. By first reading these data sources as Spark DataFrames, data can be called using SQL-like syntax to achieve data cleaning, computation, and analysis. It is worth mentioning that Spark SQL integrates with Hive, so we can query Hive tables through Spark SQL, and existing Hive SQL can also be used relatively seamlessly with Spark SQL.

So, what is the difference between Spark SQL and Hive?

The emergence of Hive has made data operations more friendly for the Hadoop ecosystem. Users can issue instructions using SQL syntax, and the underlying system will automatically use MapReduce to perform calculations on the data stored in HDFS. Although SQL syntax is convenient, MapReduce is prone to large-scale disk I/O, making it not the fastest option. The concept of Spark SQL is very similar to Hive. Both do not have physical data tables, and data is processed through abstract instructions. The main difference is that Spark’s computation happens at the memory level, so data needs to be read into RDD and stored in memory before using Spark SQL. Of course, the advantage of memory-level computation is speed.

Let’s take a look at some code to experience Spark SQL.

```python
#!/usr/bin/env python

from pyspark.sql import SparkSession

spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()

# spark is an existing SparkSession
df = spark.read.json("examples/src/main/resources/people.json")
# Displays the content of the DataFrame to stdout
df.show()
# +----+-------+
# | age| name|
# +----+-------+
# |null|Michael|
# | 30| Andy|
# | 19| Justin|
# +----+-------+

# Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")

sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
# +----+-------+
# | age| name|
# +----+-------+
# |null|Michael|
# | 30| Andy|
# | 19| Justin|
# +----+-------+
```

Spark Streaming:

Spark Streaming is a module in the Spark family used to process streaming data. When processing data, it reads a series of data in the form of RDDs. We call this continuous input RDD a DStream (Discretized Streams). There are many ways to generate a DStream, for example, we can let Spark continuously monitor a folder path or a socket. Once a new file is generated, it will be read and computed. Of course, if files are continuously generated, they will be continuously read and computed. In addition, reading DStream from existing RDD queues or platforms (such as Kafka, Flume) is also quite common.

https://spark.apache.org/docs/latest/streaming-programming-guide.html#a-quick-example

Spark Streaming can be almost real-time, but not necessarily real-time.

Strictly speaking, Spark Streaming processes data in a micro-batch manner, and is called batch processing because we have to provide a time unit, such as 5 seconds, when creating a StreamingContext object. Spark will then use 5 seconds as the time interval to re-split the incoming DStream and process it sequentially.

If there is only one data input today, in the worst case, it may take up to 5 seconds for the data to be processed, rather than being processed immediately once the resources permit it. Of course, if there is a continuous stream of data being input at the moment, it will feel like real-time streaming processing (even though it is done in batches).

Some readers may wonder why not set an extremely small time interval, such as directly reducing it to 1 millisecond. As long as the system inputs no more than one piece of data every 1 millisecond, can’t we achieve so-called record-by-record real-time streaming processing? In theory, this is possible, but in reality, it may not be feasible because we must consider whether the data processing speed can keep up with the time interval. In other words, the system processing speed must be faster than the data input speed; otherwise, it will only cause a large number of tasks to queue up, which can easily lead to unstable system operation. (Note: by default, spark.streaming.concurrentJobs = 1, which means that only one task is processed at a time. Under the premise of sufficient hardware resources, it is possible to increase this number to achieve parallel task processing, but data tracking will be more complex, and we won’t discuss it further here.)

Generally speaking, the time unit is recommended to be between 500 milliseconds and several seconds. However, the actual setting depends on the specific data scenario. A conservative approach is to set a larger interval first and gradually optimize it.

https://www.analyticsvidhya.com/blog/2020/11/introduction-to-spark-streaming-add-new-tag/

Let’s take a look at some code to experience Spark Streaming.

#!/usr/bin/env python

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# Create a local StreamingContext with two working thread and batch interval of 1 second
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)

# Create a DStream that will connect to hostname:port, like localhost:9999
lines = ssc.socketTextStream("localhost", 9999)

# Split each line into words
words = lines.flatMap(lambda line: line.split(" "))

# Count each word in each batch
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)

# Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.pprint()

ssc.start() # Start the computation
ssc.awaitTermination() # Wait for the computation to terminate

# Run following commands
# nc -lk 9999
# /usr/local/spark/bin/spark-submit streaming-example.py localhost 9999

In addition, the official documentation of Spark states that Spark Streaming will no longer be updated, and instead, the new generation is Spark Structured Streaming. Simply put, it is built on top of Spark SQL, imagine an infinitely growing DataFrame called the “Unbounded Input Table,” which continuously grows through merging, and is repeatedly computed or analyzed using Spark SQL syntax.

Spark Streaming is the previous generation of Spark’s streaming engine. There are no longer updates to Spark Streaming and it’s a legacy project. There is a newer and easier to use streaming engine in Spark called Structured Streaming. You should use Spark Structured Streaming for your streaming applications and pipelines.

MLlib:

As the name suggests, MLlib is a series of ML-related libraries provided by Spark. Its purpose is to make ML-related tasks run faster and better through the Spark framework. These tasks include machine learning clustering, classification, collaborative filtering, basic statistics, regression problems, data dimensionality reduction, feature transformation, feature extraction, and more.

When calling ML API, compared to RDD-based API, after Spark 2.0, the official recommends using DataFrame-based API as the preferred one, relying on the structured format of DataFrame, which is more user-friendly to use. In addition, users can use the persistent function of Spark to keep the model in memory for repeated use.

MLlib Pipeline:

Machine learning tasks often involve multiple steps, so the concept of a pipeline is introduced. A Transformer is a concept that performs data transformation on a DataFrame. For example, a model is a type of Transformer that adds a column of predicted results to the DataFrame. An Estimator is a mathematical method or algorithm, such as linear regression. We can feed the DataFrame into the Estimator to generate a model (Transformer). In other words, the Spark MLlib Pipeline is a process composed of a series of Estimators and Transformers. Of course, the Pipeline also supports parameter adjustments, and users can continuously optimize the machine learning process and model for task results.

Let’s take a look at some code to experience MLlib.

#!/usr/bin/env python

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

lines = spark.read.text("data/mllib/als/sample_movielens_ratings.txt").rdd
parts = lines.map(lambda row: row.value.split("::"))
ratingsRDD = parts.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]),
rating=float(p[2]), timestamp=int(p[3])))
ratings = spark.createDataFrame(ratingsRDD)
(training, test) = ratings.randomSplit([0.8, 0.2])

# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
coldStartStrategy="drop")
model = als.fit(training)

# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

# Generate top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(10)

# Generate top 10 user recommendations for each movie
movieRecs = model.recommendForAllItems(10)

# Generate top 10 movie recommendations for a specified set of users
users = ratings.select(als.getUserCol()).distinct().limit(3)
userSubsetRecs = model.recommendForUserSubset(users, 10)

# Generate top 10 user recommendations for a specified set of movies
movies = ratings.select(als.getItemCol()).distinct().limit(3)
movieSubSetRecs = model.recommendForItemSubset(movies, 10)

GraphX

Regarding GraphX, the official documentation lacks resources specifically for Python operation, so we won’t describe it in detail. In simple terms, GraphX is still based on Spark RDD but incorporates the abstraction concept of graphs, allowing Spark to perform computations on graphs, even in parallel. Interested readers are advised to refer to the official documentation for more information.

Finally, let’s implement it.

To begin with, install the environment first.

I have written some small examples on this GitHub repo for interested readers to visit. After cloning, there will be a docker-compose.yaml file for quick deployment of the Spark environment. We use the pyspark-notebook image, which will synchronize the creation of a Jupyter notebook for us to operate on. Among them, we set the initial path of the notebook to the tutorial folder and use 8888 as the web port. (The following 3 steps)

  1. git clone https://github.com/hwf87/pyspark-jupyter-tutorial.git
  2. cd ./pyspark-jupyter-tutorial
  3. docker-compose up

After executing docker-compose up, accessing http://localhost:8888 will bring up the login interface for Jupyter Notebook. The login password can be found in the console output, which should contain the text http://127.0.0.1:8888/?token={YOUR_TOKEN}. Replace YOUR_TOKEN with the actual password. After logging in, you will see the following screen.

I have prepared pyspark_demo.ipynb as an example code, demonstrating the practical use of RDD, Spark DataFrame, Spark SQL, MLlib, and Spark (Structured) Streaming. The details of the code are left for readers to experience, and there should be no overly complex parts!

Bonus Questions

https://everylittled.com/article/138392

What is the difference between Map() and foreach()?

The reason why these two methods are easily confused is that both require passing in a function that will be applied to each row of the RDD. The difference is that Map is a Transformation where we define a return value from the function, and the function produces a new RDD after execution. foreach, on the other hand, is an Action and does not require a return value from the function. Instead, the function has some side effect, such as counting with an Accumulator.

What is Spark Shuffle?

Shuffle is used in operations such as Repartition, ByKey, and Join. In Spark, data is stored in RDDs, and when the data is distributed, there may be situations where we cannot perform operations on individual partition. For example, with GroupByKey, the same key may exist in various partitions, so the Shuffle mechanism is triggered to redistribute the RDD partitions before the calculation is completed. Additionally, the Shuffle process includes disk reading, network access, and data serialization, which are typically much slower than pure memory operations. Therefore, it is recommended to pay attention to performance when Shuffle happens.

How to submit a Spark job?

./bin/spark-submit \
--class <main-class> \
--master <master-url> \
--deploy-mode <deploy-mode> \
--conf <key>=<value> \
<your_pyspark_code.py> \
[other-arguments]

Conclusion

Finally, it’s done (cheers~~~), this is probably the longest article I’ve ever written. If you have read this article carefully, you will find that I often ask and answer myself (a total of 9 questions), it is a journey to help myself clarify the Spark architecture! Finally, I spent more time reading the official documentation than I expected. The official documentation of Spark is really well-written and highly recommended for everyone to take a look.

--

--

Jacky Fu

Write code with the left hand, run an e-commerce business with the right. Develop the habit of thinking carefully and then sprinting to execute. 🌱