Spark核心程式設計RDD分割槽器以及檔案讀取與儲存

RDD 分割槽器

Spark 目前支援 Hash 分割槽和 Range 分割槽,和使用者自定義分割槽。Hash 分割槽為當前的預設

分割槽。分割槽器直接決定了 RDD 中分割槽的個數、RDD 中每條資料經過 Shuffle 後進入哪個分

區,進而決定了 Reduce 的個數。

➢ 只有 Key-Value 型別的 RDD 才有分割槽器,非 Key-Value 型別的 RDD 分割槽的值是 None

➢ 每個 RDD 的分割槽 ID 範圍:0 ~ (numPartitions - 1),決定這個值是屬於

那個

分割槽的。

1)

Hash 分割槽

:對於給定的 key,

計算其

hashCode,併除以分割槽個數取餘

2)

Range 分割槽

:將一定範圍內的資料對映到一個分割槽中,儘量保證每個分割槽資料均勻,而

且分割槽間有序

3)

自定義分割槽:

rdd1。partitionBy(new MyPartiton)

/**

* 自定義分割槽器

* 1。繼承Partitioner

* 2。重寫方法

*/

class MyPartiton extends Partitioner {

//重寫分割槽數量

override def numPartitions: Int = 3

//返回資料的分割槽索引,從零開始,根據資料的key值返回

override def getPartition(key: Any): Int = {

key match {

case “nba” => 0

case “wnba” => 1

case _ => 2

}

}

}

二 RDD 檔案讀取與儲存

Spark 的資料讀取及資料儲存可以從兩個維度來作區分:檔案格式以及檔案系統。

檔案格式分為:text 檔案、csv 檔案、sequence 檔案以及 Object 檔案;

檔案系統分為:本地檔案系統、HDFS、HBASE 以及資料庫。

text 檔案

// 讀取輸入檔案

val inputRDD: RDD[String] = sc。

textFile

(“input/1。txt”)

// 儲存資料

inputRDD。saveAsTextFile(“output”)

sequence 檔案

SequenceFile 檔案是 Hadoop 用來儲存二進位制形式的 key-value 對而設計的一種平面檔案(FlatFile)。在 SparkContext 中,可以呼叫 sequenceFile[keyClass, valueClass](path)。

// 儲存資料為 SequenceFile

dataRDD。saveAsSequenceFile(“output”)

// 讀取 SequenceFile 檔案

sc。sequenceFile[Int,Int](“output”)。collect()。foreach(println)

object 物件檔案

物件檔案是將物件序列化後儲存的檔案,採用 Java 的序列化機制。可以透過 objectFile[T:

ClassTag](path)函式接收一個路徑,讀取物件檔案,返回對應的 RDD,也可以透過呼叫

saveAsObjectFile()實現對物件檔案的輸出。因為是序列化所以要指定型別。

// 儲存資料

dataRDD。saveAsObjectFile(“output”)

// 讀取資料

sc。objectFile[Int](“output”)。collect()。foreach(println)

相關文章