Ch8. 스파크 최적화 및 디버깅

SparkConf 로 스파크 설정하기

  • 스파크의 주된 설정 메커니즘은 SparkConf 클래스를 통해 설정되며, SparkConf 객체는 재정의 가능한 파라메터의 키/값 페어를 갖음
  • set(String key, String value): SparkConf, get(String key): String 등의 메소드를 통해 설정 값을 추가나 가져올 수 있음

    • 그 외 setAppName)(String name): SparkConf, setMaster)(String master): SparkConf 등의 유용한 메소드가 존재함scala> val conf = sc.getConf
scala> val conf = sc.getConf
conf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@672549f3

scala> conf.get("spark.app.name")
res12: String = Spark shell

scala> conf.get("spark.master")
res13: String = local[*]
  • spark-submit 도구를 사용하여 실행 어플리케이션의 설정값을 동적으로 지정해 줄 수 있음
# Run application locally on 8 cores
./bin/spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master local[8] \
  /path/to/examples.jar \
  100

# Run on a Spark standalone cluster in client deploy mode
./bin/spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master spark://207.184.161.138:7077 \
  --executor-memory 20G \
  --total-executor-cores 100 \
  /path/to/examples.jar \
  1000

# Run on a Spark standalone cluster in cluster deploy mode with supervise
./bin/spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master spark://207.184.161.138:7077 \
  --deploy-mode cluster \
  --supervise \
  --executor-memory 20G \
  --total-executor-cores 100 \
  /path/to/examples.jar \
  1000

# Run on a YARN cluster
export HADOOP_CONF_DIR=XXX
./bin/spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master yarn \
  --deploy-mode cluster \  # can be client for client mode
  --executor-memory 20G \
  --num-executors 50 \
  /path/to/examples.jar \
  1000

# Run a Python application on a Spark standalone cluster
./bin/spark-submit \
  --master spark://207.184.161.138:7077 \
  examples/src/main/python/pi.py \
  1000

# Run on a Mesos cluster in cluster deploy mode with supervise
./bin/spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master mesos://207.184.161.138:7077 \
  --deploy-mode cluster \
  --supervise \
  --executor-memory 20G \
  --total-executor-cores 100 \
  http://path/to/examples.jar \
  1000
  • 실행하는 애플리케이션의 SparkConf 는 한 번 SparkContext의 생성자에 넘겨지고 나면 파라메터 수정이 불가능함
  • SparkConf 설정 파라메터 우선 순위
    • 사용자 코드 정의 > spark-submit 절달 플래그 > spark-submit 설정 파일 > default 설정 값
  • SparkConf Default Values
  • 데이터 셔플 시 사용할 로컬 저장 디레터리 지정 옵션 (단독모드 또는 메소스 모드)
    • conf/spark-env.sh 내 SPARK_LOCAL_DIRS 변수 - 쉼표로 구분하여 경로를 지정

실행을 구성하는 것: 작업, 태스크, 작업 단계

  • 스파크 어플리케이션 실행 코드의 실행 계획을 파악하여 최적화 작업이 가능함
scala> val sqlContext = new SQLContext(sc)
warning: there was one deprecation warning; re-run with -deprecation for details
sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@6e956361
scala> sqlContext.read.json("hdfs://localhost:9000/source").createOrReplaceTempView("source")
scala> val fields = "os,store,country,channel,appid,ug1,ug2"
fields: String = os,store,country,channel,appid,ug1,ug2
scala> val arrFields = fields.split(",")
arrFields: Array[String] = Array(os, store, country, channel, appid, ug1, ug2)
scala> val sqlRDD = sqlContext.sql("SELECT %s FROM source".format(fields)).rdd
sqlRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[43] at rdd at <console>:34
scala> var resultMap = Map[String, Set[String]]()
resultMap: scala.collection.mutable.Map[String,scala.collection.mutable.Set[String]] = Map()

scala> sqlRDD.map { x => {
     | for (k:String <- arrFields) {
     | if (x.schema.exists { x => x.name == k } && x.getAs[String](k) != null) {
     | val v = x.getAs[String](k)
     | if (resultMap.contains(k)) {
     | val values = resultMap.getOrElse(k, "").asInstanceOf[Set[String]]
     | if (values != "" && !values.contains(v)) resultMap.updated(k, values += v)
     | } else {
     | resultMap += (k -> Set(v))
     | }
     | }
     | }
     | }
     | }
res80: org.apache.spark.rdd.RDD[Unit] = MapPartitionsRDD[44] at map at <console>:41

scala> sqlRDD.toDebugString
res81: String =
(1) MapPartitionsRDD[43] at rdd at <console>:34 []
 |  MapPartitionsRDD[42] at rdd at <console>:34 []
 |  MapPartitionsRDD[41] at rdd at <console>:34 []
 |  FileScanRDD[40] at rdd at <console>:34 []
  • 스파크 스케쥴러는 액션을 수행할 때 필요한 RDD 연산의 물리적 실행 계획을 만듬
  • 스파크 스케쥴러는 연산되는 마지막 RDD 에서 시작하여, 연산해야 할 것을 역으로 추적 함

  • 스파크의 내부 스케줄러는 RDD가 이미 메모리나 디스크에 캐싱 되어있을 때, 건너뛰기를 시도하여, 해당 작업을 생략함

    • 재연산이 필요한 작업은 persist() 등을 통해 캐싱 처리하면 성능을 높일 수 있음
  • 스파크의 대부분의 로깅이나 조작은 stage, task, shuffle 등의 단어로 표현 됨

  • 스파크의 실행 중에는 다음의 단계 들이 발생함

    • 사용자 코드가 RDD의 DAG를 정의
    • DAG가 액션의 실행 계획으로 변환
    • 테스크들이 스케줄링 되고, 클러스터에서 실행

정보 찾기 - 스파크 웹 UI

  • YARN 클러스터 모드인 경우 어플리케이션 드라이버가 클러스터 내부에서 실행 되어, ResourceManager 를 통해 UI에 접근해야함

  • spark Jobs 페이지 - 실행 중이거나 최근에 완료 된 작업들에 대한 세부적인 실행 정보를 보유  

  • spark Task - 성능 비대칭 등 성능 이슈를 파악 할 수 있음

  • Executors

  • Stages DAG

정보 찾기 -드라이버와 익스큐터로그

  • 내부적인 경고 메시지나 사용자 코드에서 발생되는 예외 같은 비정상 이벤트의 상세 정보를 보유
    • 단독 모드 - 애플리케이션는 마스터 웹 UI에 경로 표시 (각 작업 노드의 스파크 설치 위치의 work/)
    • 메소스 - 메소스 슬레이브 노드의 work/ 경로 (마스터 UI로 확인 가능)
    • 얀모드 - yarn logs -applicationId <application ID> 로 조회가능 (log4j. 설정으로 로그 레벨 수정 가능)

성능에 관한 핵심 고려 사항

  • 병렬화 수준

    • RDD는 여러개의 파티션으로 나뉘고, 각 파티션은 전체 데이터의 일부를 갖게 됨
    • 스파크는 테스크를 스케줄링하여 실행 시 각 파티션당 저장된 데이터 처리할 테스크를 하나씩 만듬 (테스크당 하나의 코어 할당)
    • 사용자 개입 없이 RDD의 병렬화 수준이 적당한지 추론 (하부의 저장 시스템에 기반하여 병렬화 수준 선택, HDFS 파일의 각 블럭당 하나의 파티션)

    • 병렬화 개수가 너무 적으면 스파크 자원의 유휴 발생, 너무 크면 파티션의 오버헤드로 성능 이슈 발생

  • 병렬화 수준 조정

    • 데이터 셔플이 필요한 연산 간에 생성되는 RDD를 위한 병렬화 정도를 파라메터로 처리

    • 이미 존재하는 RDD를 더 적거나 많게 파티션을 재배치

  • 직렬화 포맷

    • spark.serializer -> org.apache.spark.serializer.KyroSerializer

    • spark.kyro.registrationRequired 로 엄격하게 설정가능

    • 반드시 Serializable 인터페이스를 구현한 클래스만 가능함 (NotSerializableException 발생)

  • 메모리 관리

  • 하드웨어 프로비저닝

results matching ""

    No results matching ""