二、 如何避免數據傾斜
2.1 避免數據源傾斜-HDFS
Spark通過 textFile(path, minPartitions) 方法讀取文件時,使用 TextInputFormat。對于不可切分的文件,每個文件對應一個 Split 從而對應一個 Partition。此時各文件大小是否一致,很大程度上決定了是否存在數據源側的數據傾斜。另外,對于不可切分的壓縮文件,即使壓縮后的文件大 小一致,它所包含的實際數據量也可能差別很多,因為源文件數據重復度越高,壓縮比越高。反過來, 即使壓縮文件大小接近,但由于壓縮比可能差距很大,所需處理的數據量差距也可能很大。此時可通過在數據生成端將不可切分文件存儲為可切分文件,或者保證各文件包含數據量相同的方式避免數據傾斜。
# 對于不可切分文件可能出現數據傾斜,對于可切分文件,一般來說,不存在數據傾斜問題。
1. 可切分: 基本上不會! 默認數據塊大小:128M
2. 不可切分: 源文件不均勻,最終導致 分布式引用程序計算產生數據傾斜 日志:每一個小時生成一個日志文件
2.2 避免數據源傾斜-Kaka
Topic 主題: 分布式的組織形式: 分區, 既然要進行數據分區,那就有可能產生數據分布不均勻
以 Spark Stream 通過 DirectStream 方式讀取 Kafka 數據為例。由于 Kafka 的每一個 Partition 對應 Spark 的一個 Task(Partition),所以 Kafka 內相關 Topic 的各 Partition 之間數據是否平衡,直接決 定 Spark 處理該數據時是否會產生數據傾斜。
Kafka 某一 Topic 內消息在不同 Partition 之間的分布,主要由 Producer 端所使用的 Partitioner 實現 類決定。如果使用隨機 Partitioner,則每條消息會隨機發送到一個 Partition 中,從而從概率上來講, 各 Partition 間的數據會達到平衡。此時源 Stage(直接讀取 Kafka 數據的 Stage)不會產生數據傾斜。
但很多時候,業務場景可能會要求將具備同一特征的數據順序消費,此時就需要將具有相同特征的數據 放于同一個 Partition 中。一個典型的場景是,需要將同一個用戶相關的PV信息置于同一個 Partition 中。此時,如果產生了數據傾斜,則需要通過其它方式處理。
* 以 Spark Stream 通過 DirectStream 方式讀取 Kafka 數據為例。由于 Kafka 的每一個 Partition 對應 Spark 的一個 Task(Partition),所以 Kafka 內相關 Topic 的各 Partition 之間數據是否平衡,直接決 定 Spark 處理該數據時是否會產生數據傾斜。
* Kafka 某一 Topic 內消息在不同 Partition 之間的分布,主要由 Producer 端所使用的 Partitioner 實現 類決定。如果使用隨機 Partitioner,則每條消息會隨機發送到一個 Partition 中,從而從概率上來講, 各 Partition 間的數據會達到平衡。此時源 Stage(直接讀取 Kafka 數據的 Stage)不會產生數據傾斜。
* 但很多時候,業務場景可能會要求將具備同一特征的數據順序消費,此時就需要將具有相同特征的數據 放于同一個 Partition 中。一個典型的場景是,需要將同一個用戶相關的PV信息置于同一個 Partition 中。此時,如果產生了數據傾斜,則需要通過其它方式處理。
2.3 定位處理邏輯 - Stage 和 Task
歸根結底,數據傾斜產生的原因,就是兩個 stage 中的 shuffle 過程導致的。所以我們只需要研究Shuffle 算子即可。我們知道了導致數據傾斜的問題就是 shuffle 算子,所以我們先去找到代碼中的 shuffle 的算子,比如 distinct、groupByKey、reduceByKey、aggergateByKey、join、cogroup、repartition 等,那么問 題一定就出現在這里。spark的執行,按照hsuffle算子分成多個stage來執行。
* 如果 Spark Application 運行過程中,出現數據傾斜,可以通過 web 管理監控界面,查看 各stage 的運行情況,如果某一個 stage 的運行很長,并且這個 stage 的大部分Task都運行很快,則
2.4 查看導致傾斜的key的數據分布情況
知道了數據傾斜發生在哪里之后,通常需要分析一下那個執行了shuffle操作并且導致了數據傾斜的 RDD/Hive表,查看一下其中key的分布情況。這主要是為之后選擇哪一種技術方案提供依據。針對不同 的key分布與不同的shuffle算子組合起來的各種情況,可能需要選擇不同的技術方案來解決。此時根據你執行操作的情況不同,可以有很多種查看key分布的方式:
1. 如果是Spark SQL中的group by、join語句導致的數據傾斜,那么就查詢一下 SQL 中使用的表的key 分布情況。
2. 如果是對 Spark RDD執行shuffle算子導致的數據傾斜,那么可以在Spark作業中加入查看 key 分布 的代碼,比如 RDD.countByKey()。然后對統計出來的各個key出現的次數,collect/take到客戶端打印 一下,就可以看到key的分布情況。
舉例來說,對于上面所說的單詞計數程序,如果確定了是 stage1 的 reduceByKey 算子導致了數據傾 斜,那么就應該看看進行 reduceByKey 操作的 RDD 中的 key 分布情況,在這個例子中指的就是 pairs RDD。如下示例,我們可以先對 pairs 采樣 10% 的樣本數據,然后使用 countByKey 算子統計出每個 key 出現的次數,最后在客戶端遍歷和打印樣本數據中各個 key 的出現次數。
val sampledPairs = pairs.sample(false, 0.1)
val sampledWordCounts = sampledPairs.countByKey()
sampledWordCounts.foreach(println(_))
采樣!(離線處理:無放回采樣, 流式處理:魚塘采樣)