Hello

05 Spark RDD

|

1. RDD에 관한 배경 지식

1.1. 스파크 클러스터

  • 클러스터 : 여러대의 서버가 한대의 서버처럼 동작하는 것
  • 클러스터 환경에서 동작하는 프로그램의 데이터는 여러 서버에 나눠져 병렬로 처리

1.2. 분산 데이터로서의 RDD

  • RDD는 회복력을 가진 분산 데이터 집합

1.3. RDD의 불변성

  • 한번 만들어진 RDD는 그 내용이 변경되지 않음

1.4. 파티션

  • RDD 데이터는 클러스터를 구성하는 여러 서버에 나누어 저장
  • 분할된 데이터를 파티션이라는 단위로 관리
  • 파티션의 크기 조정이 애플리케이션 성능에 큰 영향을 줌

1.5. HDFS(Hadoop Distributed File System)

1.6. Job과 Executor

  • 스파크 프로그램을 실행하는 것을 스파크 잡(Job)을 실행한다고 한다.
  • Job이 클러스터에서 처리될때 Executor라는 프로세스가 생성

1.7. 드라이버 프로그램

  • Driver Program : Job을 실행하는 프로그램. 즉, 메인함수를 가지고 있는 프로그램
  • 드라이버 프로그램은 자신을 실행한 서버에서 동작하면서 스파크 컨텍스트를 생성해 클러스터의 각 워커 노드들에게 작업을 지시하고 결과를 취합하는 역할 수행

1.8. Transformation과 Action

  • RDD Opertaion
    • Transformation : RDD의 형태를 변형하는 연산
    • Action : RDD가 아닌 다른 타입의 결과를 반환하는 연산

1.9. 지연(lazy)동작과 최적화

  • 다른 Action 연산이 호출되기 전까지는 Transformation연산은 실제로 수행되지 않음

1.10. 함수의 전달

  • 함수를 다른 함수의 파라미터로 전달하는 방식 사용(Javascript처럼)

1.11. 데이터 타입에 따른 RDD 연산

  • PairRDDFunctions : Key-Value 구성된 테이블에 대한 연산을 제공
  • OrderedRDDFunctions : Key-Value 구성된 테이블 대상으로 정렬 연산 제공
  • DoubleRDDFunctions : double 타입의 데이터를 위한 것으로 sum(), mean()등의 연산 제공
  • SequenceFileRDDFunctions : 하둡의 시퀀스 파일을 다루기 위한 연산 제공
  1. RDD Process AltText

2. Spark Context 생성 방법

// Spark Context 설정정보
val conf = new SparkConf()
  .setMaster("local[*]")   // [필수]마스터 정보  
  .setAppName("Exam01")    // [필수]애플리케이션 이름

// Spark Context 생성
val sc = new SparkContext(conf)

3. RDD 생성 방법

  • 기본 RDD 생성
val rdd1 = sc.parallelize(List("a", "b","c","d","e"))
  • 외부데이터를 읽어서 RDD 생성
val rdd1 = sc.textFile("<spark_home_dir>/README.md")

4. RDD Transformation

  • RDD를 가공해서 RDD로 반환
    1. Map Operation(map 관련 연산)
    • map
      • 함수를 파라미터로 전달받아 내부 모든 요소에 함수를 적용 시킨 새로운 RDD 생성
    • flatmap
      • map과 비슷하지만 처리 결과값의 타입이 다양할 경우 지정해준 타입으로만 RDD를 생성
    • mapPartitions
      • RDD의 파티션 단위로 map과 같이 처리
    • mapPartitionsWithIndex
      • RDD의 파티션 단위로 map과 같이 처리, 파티션의 index를 파라미터로 받아서 index별로 다른 처리를 할 수 있음
    • mapValues
      • RDD의 요소가 Key, Value쌍으로 이루고 있을 경우 Value에 map()연산을 적용한것과 같음
    • flatMapValues
      • RDD의 요소가 Key, Value쌍으로 이루고 있을 경우 Value에 flatmap()연산을 적용한것과 같음
        1. Group Operation(그룹 관련 연산)
    • zip
      • 두 개의 서로 다른 RDD를 각 요소의 인덱스에 따라 하나의 (Key, Value) 쌍으로 묶어줌
      • 두 RDD는 같은 개수의 파티션, 요소의 개수를 가져야 함
    • zipPartitions
      • 파티션 단위로 zip()연산을 수행하고, 병합 과정에서 특정 함수를 적용해 새로운 RDD생성 가능
      • 파이썬에서는 사용 불가
    • groupBy
      • 요소를 일정 기준에 따라 여러 개의 그룹으로 나누어진 RDD 생성
      • 그룹의 Key와 그 안에 Value Sequence로 구성
    • groupByKey
      • 키를 기준으로 같은 Key를 가진 요소들로 그룹을 만들어 RDD 생성
      • RDD의 구성요소가 Key-Value 쌍으로 이뤄진 경우 사용가능
    • cogroup
      • 여러 RDD에서 같은 Key를 갖는 Value 요소를 찾아와 Key와 그 Key에 속하는 Value Sequence로 구성
      • RDD의 구성요소가 Key-Value 쌍으로 이뤄진 경우 사용가 능
        1. Set Opertaion(집합 관련 연산)
    • distinct
      • RDD의 요소에서 중복을 제외한 요소로만 구성된 새로운 RDD를 생성
    • cartesian(곱집합)
      • 두 RDD 요소의 카테시안곱을 구하고 그 결과를 요소로 하는 새로운 RDD를 생성
      • [1, 2, 3] X [a, b] = [{1,a}, {1,b}, {2,a}, {2,b}, {3,a}, {3,b}]
    • subtract(차집합)
      • 두 RDD중 한쪽 RDD에는 속하고 다른쪽 RDD에는 속하지 않는 요소로 구성된 새로운 RDD 생성
    • union(합집합)
      • 두 RDD중 하나라도 속하는 요소들로 구성된 새로운 RDD 생성
    • intersection(교집합)
      • 두 RDD에 모두 속하는 요소들로 구성된 새로운 RDD 생성
      • 중복값은 제거
    • join
      • RDD의 구성요소가 KV 쌍으로 구성된 경우, 두 RDD에서 서로 같은 키를 가지고 있는 요소만을 모아 그룹을 형성하는 새로운 RDD 생성
      • 같은키가 없는 요소들은 제외
    • leftOuterJoin, rightOuterJoin
      • RDD의 구성요소가 KV 쌍으로 구성된 경우, 두 RDD에서 한쪽에 외부 조인을 수행한 결과를 새로운 RDD로 생성
      • 같은키가 없는 요소들도 포함
    • subtractByKey
      • RDD의 구성요소가 KV 쌍으로 구성된 경우, 두 RDD에서 서로 같은 키를 가지고 있는 요소를 제외한 나머지로 구성된 새로운 RDD생성
        1. [HARD] Compile Opertaion(집계 관련 연산)
    • reduceByKey
      • RDD의 구성요소가 KV쌍으로 구성된 경우, 키를 가진 값을 하나로 병합해 KV쌍으로 구성된 새로운 RDD 생성
    • foldByKey
      • RDD의 구성요소가 KV쌍으로 구성된 경우, 키를 가진 값을 하나로 병합해 KV쌍으로 구성된 새로운 RDD 생성
      • 병합 연산의 초기값을 메서드의 인자로 전달해서 병합 시 사용 할수 있음
    • combineByKey
      • RDD의 구성요소가 KV쌍으로 구성된 경우, 키를 가진 값을 하나로 병합해 KV쌍으로 구성된 새로운 RDD 생성
      • 병합을 수행하는 과정에서 값의 타입이 바뀔 수 있음
      • createCombiner
      • mergeValue
      • mergeCombiners
    • aggregateByKey
      • RDD의 구성요소가 KV쌍으로 구성된 경우, 키를 가진 값을 하나로 병합해 KV쌍으로 구성된 새로운 RDD 생성
  1. Pipe & Partition Operation
    • pipe
      • 데이터 처리 과정에서 외부 프로세스 활용 가능
    • coalesce
      • RDD의 파티션 개수를 줄일때, 성능이 repartition보다 좋음
    • repartition
      • RDD의 파티션 개수를 조정할때, 성능이 coalesce보다 나쁨
    • repartitionAndSortWithinPartitions
      • 데이터를 특정 기준에 따라 여러 개의 파티션으로 분리하고 각 파티션 단위로 정렬을 수행한 뒤 이결과로 새로운 RDD를 생성해 주는 메서드
    • partitionBy
      • KV쌍으로 구성된 RDD를 해당 인스턴스 인자에 따라 파티션으로 나누어진 RDD 생성
      • HashPartitioner
      • RangePartitioner
  2. Filter & Sort Opertaion
    • filter
      • 원하는 요소만 걸러내는 메소드
    • sortByKey
      • Key값을 기준으로 정렬하는 메소드
    • keys
      • RDD에서 Key값을 요소로 하는 RDD를 만드는 메소드
    • values
      • RDD에서 Value값을 요소로 하는 RDD를 만드는 메소드
    • sample
      • RDD에서 샘플을 추출하게 해주는 메소드
      • withReplacement : 복원/비복원 추출 여부
      • fraction :
      • seed : 반복 시행시 일정한 결과값을 나오도록 제어하는 인자

5. RDD Action

  • RDD를 가공해서 RDD가 아닌 다른 값을 반환
  • lazy evaluation
  • RDD에 Action을 할때 Transform도 한꺼번에 발생하기 때문에, 반복되는 Action의 경우 성능향상을 위해 적절한 Caching이 필요
    1. 출력과 관련된 연산
    • first
      • RDD의 첫번째 요소 하나 반환
    • take
      • RDD의 첫번째 요소로부터 매개변수 개수 만큼 collection(Array, List)으로 반환
    • takeSample
      • RDD에서 지정된 크기의 샘플을 추출
      • sample과는 달리 샘플의 크기를 지정할 수 있고, 반환 타입이 컬렉션 타입
    • collect
      • RDD의 모든 요소를 컬렉션 타입으로 반환
      • 메모리를 많이 사용하기 때문에 적당한 크기의 RDD를 대상으로만 사용해야 함
    • count
      • RDD에 있는 모든 요소의 개수를 반환
    • countByValue
      • RDD에 속하는 각 Value값들이 나타나는 횟수를 구해서 맵 형태로 반환
    • reduce
      • RDD에 포함된 임의값 두개를 하나로 합치는 함수를 인자로 받아서 RDD에 포함된 모든 요소를 하나의 값으로 병합하고 그 결과값을 반환
    • reduceByKey
      • reduce와 비슷하지만 합산이 key별로 진행됨
    • fold
      • RDD 내의 모든 요소를 대상으로 교환법칙과 결합법칙이 성립되는 바이너리 함수를 순차 적용해 최종 결과를 구하는 메서드
      • reduce와 비슷하지만 병합연산의 초기값을 지정해 줄 수 있다.
    • aggregate
      • 타입 제약 사항떄문에 입력과 출력의 타입이 다른경우
      • 인자 : zeroValue(기본값), seqOp(함수), combOp(함수)
    • sum
      • RDD 내의 모든 요소가 숫자 타입일 경우에만 사용 가능하며 전체 요소의 합을 구해줌
    • foreach
      • RDD의 모든 요소에 특정 함수를 적용하는 메서드
    • foreachPartition
      • foreach를 파티션 단위로 적용
    • toDebugString
      • RDD의 세부정보를 알고 싶을때 사용
      • RDD의 파티션 개수, 의존성 정보등을 알 수 있음
    • cache
      • 반복되는 RDD 트랜스포메이션 연산이 비효율 적일때, 트랜스포메이션한 RDD를 메모리에 저장하여 사용 하게 해주는 메서드
      • 메모리가 부족하면 가용 메모리 만큼만 저장
    • persist
      • cache와 비슷하지만 StorageLevel이라는 옵션을 이용해 저장 위치(메모리, 디스크), 저장 방식(직렬화 여부) 지정 가능
      • 저장위치를 MEMORY_AND_DISK_SER로 설정하면 메모리에 저장하다가 부족하면 DISK를 사용하되 직렬화된 포맷을 이용해 저장
    • unpersist
      • 저장중인 데이터가 필요 없을때 캐시 설정을 취소하는 메서드
    • partitions
      • RDD의 파티션 정보가 담긴 배열을 반환
    • getNumPartitions
      • RDD의 파티션 수를 반환

6. RDD 데이터 불러오기, 저장하기

  • 파일 : 텍스트, JSON, 하둡의 시퀀스파일, csv
  • 파일시스템 : 로컬 파일시스템, HDFS, AWS S3, 오픈스택의 Swift
  • RDBMS : MySQL
  • NoSQL : HBase, 카산드라, Hive
  • 경로는 모든 클러스터에서 동일하게 접근이 가능해야 한다.
  • RDD는 Data Set혹은 Data Frame과는 달리 데이터를 저장할 때 기존에 생성되있는 파일이 존재하면 오류를 발생시킨다.
  1. 텍스트 파일(Text File)
    • textFile : 불러오기
    • saveAsTextFile : 저장하기
    • 저장할때 압축 코덱을 사용할거면 두번째 인자로 압축코덱의 클래스 경로를 넣으면 됨
  2. 오브젝트 파일(Object File)
    • objectFile : 불러오기
    • saveAsObjectFile : 저장하기
  3. 시퀀스 파일(Sequence File)
    • sequenceFile : 불러오기
    • saveAsSequenceFile : 저장하기

7. 클러스터 공유변수

  1. 브로드캐스트 변수(Broadcast Variables)
    • 스파크 잡이 실행되는 동안 클러스터 내의 모든 서버에서 공유할 수 있는 읽기전용 자원을 설정할 수 있는 변수
    • 브로드캐스트 변수 설정 방법
      1. 공유하고자 하는 데이터를 포함한 오브젝트 생성
      2. 생성된 오브젝트를 broadcast() 메서드의 인자로 지정해 실행
      3. 생성된 broadcast 변수는 value() 메서드를 통해 접근 가능
        val broadcastUsers = sc.broadcast(Set("u1", "u2"))
        val rdd = sc.parallelize(List("u1", "u3", "u3", "u4", "u5", "u6"), 3)
        val result = rdd.filter(broadcastUsers.value.contains(_))
      
    • 동일한 스테이지 내에서 실행되는 테스크 간에는 동작에 필요한 변수를 자동으로 브로드캐스트 변수를 이용해 전달
  2. 어큐뮬레이터(Accumulators)
    • 브로드캐스트 변수와는 반대로 Accumulator는 쓰기 동작을 위한 것
    • 주로 에러 정보같은 로그를 한곳에 모아서 디버깅 하기 위해 사용
    • 기본 제공 Accumulator 종류
      1. longAccumulator
      2. DoubleAccumulator
      3. CollectionAccumulator
    • Accumulator 사용 방법
     val acc1 = sc.longAccumlatior("invalidFormat")
     val acc2 = sc.collectionAccumlator[String]("invalidFormat2")
     va data = List("U1:Addr1", "U2:Addr2", "U3", "U4:Addr4", "U5;Addr5", "U6:Addr6", "U7::Addr7")
     sc.parallelize(data, 3).foreach { v =>
         if (v.split(":").length != 2) {
             acc1.add(1L)
             acc2.add(v)
         }
     }
     println("잘못된 데이터 수 : " + acc1.value)
     println("잘못된 데이터 : " + acc2.value)
    
    • 사용자 정의 Accumulator ```scala import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.util.AccumulatorV2

    class RecordAccumulator extends AccumulatorV2[Record, Long] {

    private var _record = Record(0)

    // 초깃값 여부
    def isZero: Boolean = _record.amount == 0 && _record.number == 1

    // 동일한 값을 가진 새로운 어큐뮬레이터 생성 def copy(): AccumulatorV2[Record, Long] = { val newAcc = new RecordAccumulator newAcc._record = Record(_record.amount, _record.number) newAcc }

    // 어큐뮬레이터에 포함된 데이터의 값을 초기화 def reset(): Unit = { _record.amount = 0L _record.number = 1L }

    // 다른 데이터 병합 def add(other: Record): Unit = { _record.add(other) }

    // 다른 어큐뮬레이터 병합 def merge(other: AccumulatorV2[Record, Long]): Unit = other match { case o: RecordAccumulator => _record.add(o._record); case _ => throw new RuntimeException }

    // 최종 결과(AccumulatorV2의 출력 타입과 같아야 함) def value: Long = { _record.amount } } ```

References

Comments