Ch4. 키/값 페어로 작업하기

페어 RDD : 키/값 쌍을 가지고 있는 RDD

참고할 자료

페어 RDD 생성

첫 번째 단어를 키로 사용한 페어 RDD 생성

scala> val lines = sc.textFile("/Users/getto/usr/app/spark/README.md")
lines: org.apache.spark.rdd.RDD[String] = /Users/getto/usr/app/spark/README.md MapPartitionsRDD[1] at textFile at <console>:27

scala> val pairs = lines.map(x => (x.split(" ")(0), x))
pairs: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[2] at map at <console>:29

scala> pairs.collect()
res0: Array[(String, String)] = Array((#,# Apache Spark), ("",""), (Spark,Spark is a fast and general cluster computing system for Big Data. It provides), (high-level,high-level APIs in Scala, Java, Python, and R, and an optimized engine that), (supports,supports general computation graphs for data analysis. It also supports a), (rich,rich set of higher-level tools including Spark SQL for SQL and DataFrames,), (MLlib,MLlib for machine learning, GraphX for graph processing,), (and,and Spark Streaming for stream processing.), ("",""), (<http://spark.apache.org/>,<http://spark.apache.org/>), ("",""), ("",""), (##,## Online Documentation), ("",""), (You,You can find the latest Spark documentation, including a programming), (guide,,guide, on the [project web page](http://spark.apache.org/doc...

페어 RDD의 트랜스포메이션

http://spark.apache.org/docs/1.6.3/programming-guide.html\#working-with-key-value-pairs

페어 RDD 트렌스포메이션

# reduceByKey(func) : 동일한 키의 값들을 합친다.
scala> val nums = sc.parallelize(List((1,2),(3,4),(3,6)))
nums: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[3] at parallelize at <console>:27

scala> nums.reduceByKey((x,y) => x+y).collect()
res4: Array[(Int, Int)] = Array((1,2), (3,10))

# mapValue : 키 변경 없이 키 값(value)에 함수를 적용함 (자주 사용)
scala> nums.mapValues(x=>x+1).collect
res0: Array[(Int, Int)] = Array((1,3), (3,5), (3,7))

# keys : 키만 리턴
scala> nums.keys.collect()
res15: Array[Int] = Array(1, 3, 3)

두 페어 RDD 트랜스포메이션

# join
scala> val n1 = sc.parallelize(List((1,2),(3,4),(3,6)))
n1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[17] at parallelize at <console>:27

scala> val n2 = sc.parallelize(List((3,9)))
n2: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[18] at parallelize at <console>:27

scala> n1.join(n2).collect
res16: Array[(Int, (Int, Int))] = Array((3,(4,9)), (3,(6,9)))

두 번째 요소에 대한 단순 필터 적용

scala> pairs.filter{case(key,value) => value.length < 20}.collect
res18: Array[(String, String)] = Array((#,# Apache Spark), ("",""), ("",""), ("",""), ("",""), ("",""), ("",""), (##,## Building Spark), ("",""), ("",""), ("",""), ("",""), ("",""), ("",""), ("",""), ("",""), ("",""), ("",""), ("",""), ("","    ./bin/pyspark"), ("",""), ("",""), ("",""), (##,## Example Programs), ("",""), ("",""), ("",""), ("",""), ("",""), ("",""), ("",""), (##,## Running Tests), ("",""), (can,can be run using:), ("",""), ("","    ./dev/run-tests"), ("",""), ("",""), ("",""), ("",""), ("",""), (##,## Configuration), ("",""))

집합연산

키별 평균 구하기

scala> val rdd = sc.parallelize(List(("a",1),("b",3),("c",2),("a",3)))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[3] at parallelize at <console>:27

scala> rdd.mapValues(x => (x,1)).collect()
res4: Array[(String, (Int, Int))] = Array((a,(1,1)), (b,(3,1)), (c,(2,1)), (a,(3,1)))

scala> r.map{ case (key,value) => (key,value._1 / value._2.toFloat)}
res13: Array[(String, Float)] = Array((a,2.0), (b,3.0), (c,2.0))

출처 : http://www.slideshare.net/HyeonSeokChoi/pair-rdd-spark

단어세기

scala> val input = sc.textFile("/Users/getto/usr/app/spark/README.md")
input: org.apache.spark.rdd.RDD[String] = /Users/getto/usr/app/spark/README.md MapPartitionsRDD[10] at textFile at <console>:27

scala> val words = input.flatMap(x => x.split(" "))
words: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[11] at flatMap at <console>:29

scala> val result = words.map(x => (x,1)).reduceByKey((x,y) => x + y)
result: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[13] at reduceByKey at <console>:31

scala> result.collect()
res14: Array[(String, Int)] = Array((package,1), (this,1), (Version"](http://spark.apache.org/docs/latest/building-spark.html#specifying-the-hadoop-version),1), (Because,1), (Python,2), (cluster.,1), (its,1), ([run,1), (general,2), (have,1), (pre-built,1), (YARN,,1), (locally,2), (changed,1), (locally.,1), (sc.parallelize(1,1), (only,1), (several,1), (This,2), (basic,1), (Configuration,1), (learning,,1), (documentation,3), (first,1), (graph,1), (Hive,2), (["Specifying,1), ("yarn",1), (page](http://spark.apache.org/documentation.html),1), ([params]`.,1), ([project,2), (prefer,1), (SparkPi,2), (<http://spark.apache.org/>,1), (engine,1), (version,1), (file,1), (documentation,,1), (MASTER,1), (example,3), (are,1), (systems.,1), (params,1), (scala>,1), (DataFrames,,1), (provides,1), (refer,2...
scala>

# 한 라인으로 추출하기
scala> input.flatMap(x => x.split(" ")).countByValue()
res16: scala.collection.Map[String,Long] = Map(site, -> 1, Please -> 3, GraphX -> 1, "" -> 67, for -> 11, find -> 1, Apache -> 1, package -> 1, Hadoop, -> 2, Once -> 1, For -> 2, name -> 1, this -> 1, protocols -> 1, Hive -> 2, in -> 5, "local[N]" -> 1, MASTER=spark://host:7077 -> 1, have -> 1, your -> 1, are -> 1, is -> 6, HDFS -> 1, Data. -> 1, built -> 1, thread, -> 1, examples -> 2, using -> 2, system -> 1, Shell -> 2, mesos:// -> 1, easiest -> 1, This -> 2, [Apache -> 1, N -> 1, <class> -> 1, different -> 1, "local" -> 1, README -> 1, online -> 1, spark:// -> 1, return -> 2, Note -> 1, if -> 4, project -> 1, Scala -> 2, You -> 3, running -> 1, usage -> 1, versions -> 1, uses -> 1, must -> 1, do -> 2, programming -> 1, runs. -> 1, R, -> 1, distribution -> 1, print -> 1, About -> 1, ...

combineByKey()

  • 키별(per-key) 집합 연산 함수 중 가장 일반적으로 사용
  • 한 파티션 내의 데이터들을 하나씩 처리
  • createCombiner --> mergeValue() --> mergeCombiner()
scala> val rdd = sc.parallelize(List(("a",1),("b",3),("c",2),("a",3)))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:27

scala> val result = rdd.combineByKey(
     | (v) => (v,1),
     | (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),
     | (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)).map{ case (key, value) => (key, value._1 / value._2.toFloat) }
result: org.apache.spark.rdd.RDD[(String, Float)] = MapPartitionsRDD[2] at map at <console>:32

scala> result.collectAsMap().map(print(_))
(b,3.0)(a,2.0)(c,2.0)res0: Iterable[Unit] = ArrayBuffer((), (), ())
병렬화 수준 최적화

파티션 수 지정

scala> val data = Seq(("a",3), ("b",4), ("a",1))
data: Seq[(String, Int)] = List((a,3), (b,4), (a,1))

scala> val p1 = sc.parallelize(data).reduceByKey((x,y) => x+y)
p1: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at <console>:29

scala> p1.partitions.size
res4: Int = 4

scala> val p2 = sc.parallelize(data).reduceByKey((x,y) => x+y, 10)
p2: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[8] at reduceByKey at <console>:29

scala> p2.partitions.size
res5: Int = 10

데이터 그룹화

  • groupByKey() : key를 이용해 그룹화
  • groupBy() : 쌍을 이루지 않거나 현재 키와 관계되지 않은 다른 조건으로 그룹화할 때. 함수를 인자로 받아 결과를 키로 사용
  • cogroup() : 여러 RDD에 동일 키를 공유해 그룹화

조인

  • join, leftOuterJoin, rightOuterJoin

데이터 정렬

scala> val data = sc.parallelize(Seq((1,"a"), (4,"b"), (2,"c")))
data: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[11] at parallelize at <console>:29

scala> data.sortByKey().collect()
res10: Array[(Int, String)] = Array((1,a), (2,c), (4,b))

페어 RDD에서 쓸 수 있는 액션

ex) {(1,2), (3,4), (3,6)}

  • countByKey() : 각 키에 대한 값의 개수를 센다.
    • rdd.countByKey() --> {(1,1),(3,2)}
  • collectAsMap() : 쉬운 검색을 위해 결과를 맵 형태로 모음
    • rdd.collectAsMap() --> Map{(1,2), (3,4), (3,6)}
  • lookup(key) : 들어온 키에 대한 모든 값을 되돌려 줌
    • rdd.lookup(3) --> [4,6]

데이터 파티셔닝(고급)

  • 네트워크 부하 최소화
  • 스파크는 각 키가 어떤 노드로 전달되는지 같은 명시적인 제어를 제공하지 않음(특정노드가 장애라고 전체적으로 동작할 수 있도록 설계되었기 때문에), 하지만 어떤 키 모음들이 임의의 노드에 함께 모여 있게 해주는 것은 보장
  • hash-partition
  • range-partition

RDD의 파티셔너 정하기

scala> val pairs = sc.parallelize(List((1,1),(2,2),(3,3)))
pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:27

scala> pairs.partitioner
res0: Option[org.apache.spark.Partitioner] = None

scala> import org.apache.spark.HashPartitioner
import org.apache.spark.HashPartitioner

scala> val partitioned = pairs.partitionBy(new HashPartitioner(2))
partitioned: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[1] at partitionBy at <console>:30

scala> partitioned.partitioner
res1: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@2)

파티셔닝에 영향을 주는 연산들

cogroup(), groupWith(), join(), leftOuterJoin(), rightOuterJoin(), groupByKey(), reduceByKey(), combineByKey(), partitionBy(), sort(), mapValue(), flatMapValue(), filter()

--> 이 외 map 등은 파티셔닝 되지 않음.

끝.

results matching ""

    No results matching ""