[資料工程] Spark Python 介紹與實作

Jacky Fu
28 min readMar 8, 2023

--

https://www.bbc.com/zhongwen/trad/science-47320761

Before you start reading this article, if you’d like to read the Mandarin version, please click below link and enjoy :)

前言

今天要聊的對象是 Apache Spark,一個常用於處理巨量資料集的工具。他繼承了 Hadoop 體系中 MapReduce 的概念,將運算提升到記憶體層完成(in-memory),進而省去了 Hadoop 對磁碟進行大量讀寫(I/O)所耗費的時間。 相關研究顯示 Spark 在運算能力上,比 Hadoop 快了 10 倍以上,也因此在資料工程領域迅速地竄紅。

既然如此,人們有了 Spark 還需要 Hadoop 嗎?

兩者本質上還是存在著差異的,首先,為了有效處理大量數據,所以兩者都是以分散式集群的姿態存在。Hadoop 在基礎建設層面更加完善,除了透過 MapReduce 進行運算,還提供了一系列的儲存資源如 HDFS, HBASE 等等。Spark 則是專注於運算這件事,對標 Hadoop 就是 MapReduce 角色。也因此沒有誰取代誰的問題,反而 Spark 支援了從 HDFS 讀取資料的功能,兩者可以搭配著使用。那麼,Spark 一定要搭配從一個分散式的儲存體讀取資料嗎?答案也是否定的,Spark 也支援從本地端讀取單一文件,又或是讀取以 RDD 存在的序列資料集。簡而言之,在資料工程領域中,他們時常被搭配著使用,但也完全可以單獨被使用。

在進入 Spark 之前,我們要先介紹什麼是 RDD

RDD (Resilient Distributed Dataset)

RDD 分布式數據集是 Spark 用來表達資料單元的一種概念,其資料結構具有並行性、不可更動性、容錯性、惰性運算的特色。

  1. 並行性:由於 RDD 是將資料切分為多個單位,因此能分布在多個運算資源上,所以達到並行運算的特性。
  2. 不可更動性 (Immutable):正是因為將資料切分為多個單位,RDD 不允許我們對其資料本身做竄改,如果想要對資料做調整,只能透過操作產生新的 RDD。
  3. 容錯性:RDD 的容錯機制,並非透過 Replica 達成,而是透過資料的血緣關係 (lineage) 達成。在建立每一個 RDD 的時候,該 RDD 都會紀錄他得上游 (Parent) 是誰,以及他是經由什麼運算後產生的,因此當某個節點失效的時候,這些消失的 RDD 可以再次經由相同的步驟生成於其他節點並完成運算,藉此達到 RDD 的容錯特性。
  4. 惰性運算:在操作 RDD 的時候,通常會先有一系列的資料轉換 (我們稱為 Transformation),然後藉由某個特定運算生成結果(我們稱為 Action),所謂惰性運算的概念是,無論資料如何進行轉換,甚至轉換了數次,只有在發生 Action 的當下,RDD 才會真的按照轉換的需求開始動作然後生成結果。其缺點是如果程式碼在轉換的過程中發生問題,也要等到執行 Action 的時候才會報錯。優點則是,如果資料經過轉換量體大幅縮小,那再發生 Action 的時候,因為已知許多資料是無用的,則可以避免讀入龐大卻無用的資料,達到運算效能上的優化。

關於對 RDD 的操作,分成 4 大類:

  1. Create:首先初始化 SparkContext (簡稱 sc),接著使用 sc.textFile(), sc.Parallelize(), sc.makeRDD() 便能快速創建 RDD。
  2. Transformation:對 RDD 進行資料轉換的函數,常見的有 reduceByKey(), flatMap(), filter(), map(), reduceByKey()。
  3. Action:對 RDD 資料轉換後,用於生成運算結果的動作,常見的有 reduce(), first(), count(), collect(), take()。
  4. Persistence:由於 RDD 的惰性運算原理,當取用同一個 RDD 時,事實上機器是會從頭開始產生要求的 RDD 的。聽起來是不是很沒有效率?也因此當某一個 RDD 需要反覆被使用的時候,我們可以要求將該 RDD 持續地保存在記憶體中,避免他消失。如此一來後續的運算便能直接地取用該 RDD 進行運算。主要的方法有 cache(), persistence()。

Spark 的 Partition、Task、CPU Core、Worker、Executor 彼此的關係是什麼?

Partition 是一個文件被讀入之後,被切分成 RDD 的最小單位。其數量官方建議設定與 Spark 集群內的總運算核心數一致。
在 Spark Local 模式中,預設的 partition 數目與機器的 CPU 核心數目是相同的,所謂 Local 代表只有一台 Master 主機,假設他有 5 個 CPU 核心,最多的情況下 Local [*] 可以同時啟動 5 個 Executor 並行地工作,如同有 5 個 Worker 一般。
需要澄清的是,Worker 數目不一定等於 Executor 數目。在 Spark 集群模式中,我們把 Worker 想成節點的數目 (機器的數目),Executor 想成一個運算任務的程序。由於每個 Worker 機台會有各自的資源 (RAM) 跟能力 (CPU 數),因此一台機器也可能發起不只一個運算程序。
想像一個文件被切分成了 10 個 Partition,意味著接下來會需要產生 10 個獨立的 Task 來對他做運算,這時我們有 2 台 Worker 各有配有 5 個 CPU,產生的 10 個 Executor 將並行地消化掉這 10 個 Task,然後取得結果。當然如果 Partition 數量更多,比如說有 25 個,那每次只能處理 10 個 Task 的前提下,概念上就需要反覆處理 3 次,而第 3 次會有 5 個 CPU 資源閒置。反過來說,如果 Partition 數量只有 2 個,那 1 次就能處理完,但同時會有 8 個 CPU 資源閒置。
很明顯地,隨著資源配置的不同,不同 Partition 有可能分散到不同機器,也可能被分配到相同的機器上進行運算。所以千萬不要誤會,Partition 與機器 (Worker) 數量之間的關係,他們未必是一對一。
權衡 Partition 數量與運算資源配置很吃實務經驗的累積,但也再次說明官方建議 Partition 設定與 Spark 集群內的總運算核心數一致,是有其道理的。

RDD 跟 HDFS 差別在哪裡?

在精神上,RDD 繼承了 HDFS 分散式的儲存結構,但根本上兩者可以說是無關的。我們將 HDFS 文件切分的單位稱為 Block,RDD 的切分稱為 Partition,兩者都是以分散的方式存在,所以很容易讓人誤以為 Block 數目等於讀入的 RDD Partition 數目。事實上,讀者們可以搭配下方圖片幫助理解,無論當初 HDFS 是如何儲存資料的,在讀入之後將被重新分配給我們預先設定好的 Partition 數量,以 RDD 的姿態存在。

https://blog.csdn.net/xjping0794/article/details/78218312

喘口氣,下面我們進入 Spark

https://www.debrapasquella.com/2018/05/taking-breather.html

Spark 架構

先看看 Spark 的官方介紹

Apache Spark is a unified analytics engine for large-scale data processing. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, pandas API on Spark for pandas workloads, MLlib for machine learning, GraphX for graph processing, and Structured Streaming for incremental computation and stream processing.

簡而言之,Spark 適用於大數據處理,提供了一系列的工具幫助我們分析、運算,並支援多種程式語言。下方的圖片我覺得將上述這段話詮釋地挺好的,讀者們可以參考。

https://ogirardot.wordpress.com/2015/05/29/rdds-are-the-new-bytecode-of-apache-spark/

來點名詞解釋

Spark DataFrame:

Spark SQL and DataFrames — Spark 3.3.2 Documentation

講到 DataFrame 其實就是資料表格,我們將 Spark DataFrame 簡單理解成基於 Spark 框架下的資料表格服務。值得注意的是,這裏的表格並非真實存在,而是一種抽象的“概念”。原因很簡單,因為根本上他仍是以 RDD 的方式對資料進行儲存,也就是説這些資料仍是平行分散且不可更動的。所謂 DataFrame 只是基於 RDD 再加上 Cloumn 的概念,並透過 Spark 提供的一系列 API ,允許使用者以關聯性表格的方式來操作資料。

https://myapollo.com.tw/blog/pyspark-2/?

看了上面的敘述,可能會覺得有點複雜。慶幸的是,8 成以上的情境,我們可以運用既有對 Pandas DataFrame 的認知與方法,對 Spark DataFrame 進行同樣的操作。例如兩者都支持統計、聚合、過濾、合併、SQL語法、函數運算等等,甚至兩者也可以透過 API 進行轉換。

那麼,Spark DataFrame 與 Pandas DataFrame 有什麼不同嗎?

筆者整理了 3 點個人認為最關鍵的差異:

  1. 資料特性:Spark DataFrame 的根本是 RDD,分散儲存可以達到平行化的運算,然而其資料具有不可更動性。反觀 Pandas DataFrame 單機儲存雖不支持平行運算,但資料本身是可改變的。
  2. 自動索引:Pandas DataFrame 建立時會自動產生索引,反觀 Spark DataFrame 則是需要時才自行創建。
  3. 資料比對:Pandas DataFrame 允許使用 diff 語法來比較上下兩行的數值差異,然而 Spark DataFrame 不支持的原因也顯而易見,RDD 導致前後兩行獨立且可能分散地儲存。

還有一種資料格式,Spark DataSet,他是誰?

讀者們可以將 Spark DataSet 視為 Spark DataFrame 的概念延伸,我們知道 Spark DataFrame 他是用來處理結構化的資料,而 Spark DataSet 則可以接受非結構化的資料,雖說是非結構化,但相較於毫無章法的 RDD,Spark DataSet 仍然對資料賦予定義。官方又稱 Spark DataFrame 為 Spark DataSet [Row],原因是 Spark DataFrame 可視為 Spark DataSet 擁有結構化資料時的一種特例。也因此,官方建議優先使用 Spark DataSet 作為操作對象,目前支持 Java、Scala 兩種語言,Python 雖說尚未有用官方 API 支持,但仰賴於 Python 動態語言的特性,許多 Python 的語法皆已支持對 DataSet 進行操作。

看 code,感受一下 Spark DataFrame

#!/usr/bin/env python

from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
# spark is an existing SparkSession
df = spark.read.json("examples/src/main/resources/people.json")
# Displays the content of the DataFrame to stdout
df.show()
# +----+-------+
# | age| name|
# +----+-------+
# |null|Michael|
# | 30| Andy|
# | 19| Justin|
# +----+-------+

Spark SQL:

Data Sources — Spark 3.3.2 Documentation

Spark SQL 是一種 Spark 提供的模組,可用來操作結構化的資料。他支持多種資料來源,例如 Json, Parquet, Avaro, CSV 等…,藉由將這些資料來源先讀成 Spark DataFrame 的形式後,便能以類似 SQL 的語法調用資料,達到資料清洗、運算、分析的目的。

值得一提的是,Spark SQL 針對 Hive 進行了整合,所以我們可以透過 Spark SQL 對 Hive table 進行查詢,既有的 Hive SQL 也能相對無痛轉移到 Spark SQL 進行使用。

那麼, Spark SQL 與 Hive 的差別是?

Hive 的誕生,讓 Hadoop 生態有一個更好友善的資料操作媒介,使用者以 SQL 語法下達指令,底層便自動以 MapReduce 對存放在 HDFS 的資料進行運算。雖說 SQL 語法相當的方便,但由於 MapReduce 容易牽涉到大量的磁碟讀寫,速度並非 Hive 的優勢。 Spark SQL 的存在筆者認為概念相當接近 Hive,根本上兩者都沒有實體的資料表,都是透過抽象化的指令對資料進行運算。主要的差異在於 Spark 的運算發生於記憶體層,故使用 Spark SQL 前須先將資料讀入成 RDD 存放於記憶體,當然,發生於記憶體層的運算速度便是優勢。

看 code,感受一下 Spark SQL

```python
#!/usr/bin/env python

from pyspark.sql import SparkSession

spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()

# spark is an existing SparkSession
df = spark.read.json("examples/src/main/resources/people.json")
# Displays the content of the DataFrame to stdout
df.show()
# +----+-------+
# | age| name|
# +----+-------+
# |null|Michael|
# | 30| Andy|
# | 19| Justin|
# +----+-------+

# Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")

sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
# +----+-------+
# | age| name|
# +----+-------+
# |null|Michael|
# | 30| Andy|
# | 19| Justin|
# +----+-------+
```

Spark Streaming:

Structured Streaming Programming Guide — Spark 3.3.2 Documentation

Spark Streaming 是 Spark 家族中用來處理串流資料的模組,在處理資料的時候,他會以 RDD 的形式讀取一系列的資料,我們將這種連續輸入的 RDD 稱作 DStream (Discretized Streams)。 DStream 產生的方式相當多元,比如說我們可以讓 Spark 持續監聽某個資料夾 (Folder Path) 或端口 (Socket),一旦有新的文件產生,便會讀取然後運算。當然如果文件不斷地產生,便會持續地讀取然後完成運算。另外,從既有的 RDD 佇列 (Queue) 或平台 (如 Kafka、Flume) 讀入 DStream 也是相當常見的。

https://spark.apache.org/docs/latest/streaming-programming-guide.html#a-quick-example

Spark Streaming 可以近乎即時,但可能不是即時。

嚴格來講,Spark Streaming 是以 Micro-Batch 的方式在處理資料,會說批次處理,是因為當我們建立 StreamingContext 物件的時候,必須主動地提供一個時間單位,比如說 5 秒,那麼 Spark 就會以 5 秒作為時間間隔,將讀入的 DStream 重新進行切分,然後才依序處理。

假設今天只有一筆資料輸入,那在最差的情況下,可能需要等到 5 秒鐘後,資料才真正被處理,而非一旦資源許可便立即地處理。當然,如果當前有源源不絕的資料正在輸入,那體感上會很像即時的串流處理 (即便他是一批一批地做)。

可能會有讀者好奇,為什麼不直接設定一個極小的時間間隔,例如直接縮小到 1 毫秒,只要系統每 1 毫秒輸入不超過一筆資料,是不是就能實現所謂的 record-by-record 的即時串流處理了?理論上是的,但現實上可能不允許這麼操作,主要原因是我們必須考量資料處理的速度是否能跟上這個設定的時間區間,也就是說系統處理資料的速度必須比他接受資料的速度來得快,否則只會造成大量的任務排隊,容易造成系統運行的不穩定。(註:預設 spark.streaming.concurrentJobs = 1,也就是原則上一次只處理一項任務,硬體資源足夠的前提下,是可以調大數字達到並行任務處理的,但追蹤資料也相對複雜,這裡不多做論述)

一般來說,時間單位建議介於 500 毫秒到數秒鐘之間。但究竟要設定多少,仍取決於實際的資料場景。可以先採保守的態度設定較大的區間,然後逐步地進行調優。

https://www.analyticsvidhya.com/blog/2020/11/introduction-to-spark-streaming-add-new-tag/

看 code,感受一下 Spark Streaming

#!/usr/bin/env python

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# Create a local StreamingContext with two working thread and batch interval of 1 second
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)

# Create a DStream that will connect to hostname:port, like localhost:9999
lines = ssc.socketTextStream("localhost", 9999)

# Split each line into words
words = lines.flatMap(lambda line: line.split(" "))

# Count each word in each batch
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)

# Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.pprint()

ssc.start() # Start the computation
ssc.awaitTermination() # Wait for the computation to terminate

# Run following commands
# nc -lk 9999
# /usr/local/spark/bin/spark-submit streaming-example.py localhost 9999

另外,Spark 官方文件說明 Spark Streaming 功能已不會再做更新,取而代之的是新生代是 Spark Structure Streaming,簡單來說,他的理念建築在 Spark SQL 之上,想像有一個可以無限增長的 DataFrame,官方稱作 Unbounded Input Table,所謂串流便是以併入的方式持續增長該無邊界表格,再反覆以 Spaprk SQL 的語法對資料進行運算或分析。

Spark Streaming is the previous generation of Spark’s streaming engine. There are no longer updates to Spark Streaming and it’s a legacy project. There is a newer and easier to use streaming engine in Spark called Structured Streaming. You should use Spark Structured Streaming for your streaming applications and pipelines.

MLlib:

Collaborative Filtering — Spark 3.3.2 Documentation

顧名思義, MLlib 是 Spark 提供的一系列 ML 相關的 library,其目的是為了讓 ML 相關的任務能藉由 Spark 框架運行地更快更好。這些任務包含機器學習的分群、分類、協同過濾、基礎統計、回歸問題、資料降維、特徵轉換、特徵萃取等等。

在調用機器學習 API 時,相較於 RDD 為基礎的 API,Spark2.0 之後官方建議優先使用 DataFrame 為基礎的 API,仰賴 DataFrame 結構化的格式,API 使用起來更為友善。此外,使用者可以透過 Spark 持久化的功能,將模型持續地暫存在記憶體中反覆使用。

MLlib Pipeline

機器學習任務時常不是一個步驟完成,所以有了流程的概念。Transformer 是一種對 DataFrame 進行資料轉換的概念,例如模型就是一種 Transformer,透過模型可以對 DataFrame 增加行預測結果的欄位。Estimator 是一種數學方法或演算法,例如線性回歸,我們可以將 DataFrame 餵入 Estimator 然後產生模型 (Transformer)。也就是說,Spark MLlib Pipeline 是由一系列 Estimator 與 Transformer 所構成的流程。當然,Pipeline 也支持參數的調整,使用者可以針對任務結果對機器學習的流程與模型做持續性地優化。

看 code,感受一下 MLlib

#!/usr/bin/env python

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

lines = spark.read.text("data/mllib/als/sample_movielens_ratings.txt").rdd
parts = lines.map(lambda row: row.value.split("::"))
ratingsRDD = parts.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]),
rating=float(p[2]), timestamp=int(p[3])))
ratings = spark.createDataFrame(ratingsRDD)
(training, test) = ratings.randomSplit([0.8, 0.2])

# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
coldStartStrategy="drop")
model = als.fit(training)

# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

# Generate top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(10)
# Generate top 10 user recommendations for each movie
movieRecs = model.recommendForAllItems(10)

# Generate top 10 movie recommendations for a specified set of users
users = ratings.select(als.getUserCol()).distinct().limit(3)
userSubsetRecs = model.recommendForUserSubset(users, 10)
# Generate top 10 user recommendations for a specified set of movies
movies = ratings.select(als.getItemCol()).distinct().limit(3)
movieSubSetRecs = model.recommendForItemSubset(movies, 10)

GraphX:

GraphX — Spark 3.3.2 Documentation

有關 GraphX,官方文件針對以 Python 操作的資源相對缺乏,筆者決定不多做描述。簡單的理解,GraphX 仍是以 Spark RDD 為基礎,但加入了圖形的抽象概念,讓 Spark 也可以對圖形進行演算,甚至是平行運算。請有興趣的讀者到官方查閱相關說明文件。

最後了,動手實作一下吧

開門見山,先安裝好環境。

筆者動手寫了一些小範例放在這個 github repo ,有興趣的讀者可以自行參觀。Colne 下來之後,會有一個 docker-compose.yaml 用來快速部署 Spark 環境,我們使用的 pyspark-notebook image,他會同步幫我們建立 jupyter notebook 方便操作。其中,notebook 的初始路徑我們設定為 tutorial 資料夾,並以 8888 作為網頁端口。(底下 3 個步驟)

  1. git clone https://github.com/hwf87/pyspark-jupyter-tutorial.git
  2. cd ./pyspark-jupyter-tutorial
  3. docker-compose up

執行 docker-compose up 後,訪問 http://localhost:8888 會看到 Notebook 的登入介面,登入密碼可以在啟動的 console 找到 http://127.0.0.1:8888/?token={YOUR_TOKEN} 的字眼,當中 YOUR_TOKEN 就是密碼。完成登入後會看到下方的畫面。

筆者整理了 pyspark_demo.ipynb 作為範例程式碼,依序展示 RDD, Spark DataFrame, Spark SQL, MLlib, Spark (Structured) Streaming 的實際操作。程式碼的細節就留給讀者們自行體驗囉,相信不會有太深奧的部分!

主委加碼問問題

https://everylittled.com/article/138392

Map() 與 foreach() 的差別在哪裡?

兩者容易混淆的原因,是因為他們都需要傳入一個函數,並以該函數對 RDD 的每一列執行操作。不同的是,Map 是 Transformation,我們會定義函數的回傳值,讓 Map 執行完後產生一個新的 RDD。foreach 是 Action,
傳入的函數不需要有回傳值,而是在函數執行過程中,造成某種外部性的影響,常見的應用是搭配計數器 (Accumulator) 使用。

Spark Shuffle 是什麼?

Shuffle 就是重新洗牌的概念,有關 Repartition, ByKey, Join 相關的行為,基本上都存在 Shuffle。我們知道 Spark 是以 RDD 儲存資料,當資料是分佈的,便存在我們無法單獨對區塊做運算的情況。例如 GroupByKey,由於相同的 Key 可能存在於各個區塊,所以在計算前會觸發 Shuffle 機制,重新分配 RDD 的區塊後,才完成運算。另外,在 Shuffle 的過程中,包含了硬碟讀取、網路存取以及資料序列化的行為,相較純粹的記憶體操作,通常會慢上許多,建議要多留意效能方面的表現。

如何 submit 一個 spark job?

./bin/spark-submit \
--class <main-class> \
--master <master-url> \
--deploy-mode <deploy-mode> \
--conf <key>=<value> \
<your_pyspark_code.py> \
[other-arguments]

結語

終於完成了 (歡呼~~~),這應該是我目前寫過最長的文章了。如果有仔細讀完這篇的朋友,你會發現文內我時常自問自答 (總共問了 9 個問題),整體算是一個幫助自己更加釐清 Spark 架構的旅程吧!最後,這次花在閱讀官方文件的時間也比我想像的多,不得稱讚 Spark 的官方文件真的寫得很齊全,相當推薦讀者們去翻翻看。

參考文章

  1. Spark 基本介紹
  2. PySpark Tutorial
  3. PySpark Interview Questions
  4. Introduction to Distributed File System — HackMD
  5. RDD、DataFrame和DataSet的区别 | Spark 教程
  6. Spark Streaming 教程 | Spark 教程
  7. Spark 基本介紹
  8. Spark:任務中如何確定spark分區數、task數目、core個數、worker節點個數、excutor數量
  9. Apache Spark™ — Unified Engine for large-scale data analytics
  10. Spark、Hive、Hbase比较_好啊啊啊啊的博客-CSDN博客_spark hive
  11. Data Sources — Spark 3.3.2 Documentation
  12. RDD介紹 · parallel_processing

如果你覺得文章對你有幫助,請給我一些掌聲。按住拍手不放 10 秒,每人最多可以拍 50下哦!文章會陸續更新,主要關於數據分析、資料開發筆記,以及一些個人想法的文章,歡迎有興趣的朋友們追蹤分享呦!

--

--

Jacky Fu

Write code with the left hand, run an e-commerce business with the right. Develop the habit of thinking carefully and then sprinting to execute. 🌱