스파크가 제공하는 RDD는 일종의 "분산 데이터"에 대한 모델이라고 할 수 있다.
RDD란 스파크가 사용하는 핵심 데이터 모델로서 다수의 서버에 걸쳐 분산 방식으로 저장된 데이터 요소들의 집합을 의미하며, 병렬 처리가 가능하고 장애가 발생할 경우에도 스스로 복구될 수 있는 내성(tolerance)를 가지고 있다
즉 RDD란 스파크에서 정의한 분산 데이터 모델인데 내부에는 단위 데이터를 포함하고 있고 저장할때는 여러 서버에 나누어 저장되며, 처리할 때는 각 서버에 저장된 데이터를 동시에 병렬로 처리 할 수 있다는 의미입니다.
또한 데이터를 여러 서버에 나누어 저장하고, 처리하는 과정에서 일부 서버 혹은 데이터에 문제가 발생하더라도 스스로 에러를 복구할 수 있는 능력을 가지고 있는 데이터 모델이라는 의미도 가지고 있습니다.
하나의 RDD에 속한 요소들은 파티션이라고 하는 더 작은 단위로 나눠질 수 있는데, 스파크는 작업을 수행할 때 바로 이 파티션 단위로 나눠서 병렬로 처리를 수행합니다.
이렇게 만들어진 파티션은 작업이 진행되는 과정에서 재구성되거나 네트워크를 통해 다른 서버로 이동하는 이른바 셔플링이 발생할 수 있습니다.
이런 셔플링은 전체 작업 성능에 큰 영향을 주기 때문에 주의해서 다뤄야 하며, 스파크에서는 셔플링이 발생할 수 있는 주요 연산마다 파티션의 개수를 직접 지정할 수 있는 옵션을 제공합니다.
하나의 RDD가 이렇게 여러 파티션으로 나눠져 다수의 서버에서 처리되다 보니 작업 도중 이부 파티션에 장애가 발생해서 데이터가 유실될 수 있는데, 스파크는 손상된 RDD를 원래 상태로 다시 복원하기 위해 RDD의 생성 과정을 기록해 뒀다가 다시 복구해 주는 기능을 가지고 있습니다.
RDD의 첫 글자인 resilient라는 단어가 바로 이런 복구 능력을 의미하는데, 좀 더 정확하게 말하면 RDD에 포함된 데이터를 저장해 두는 것이 아니고 RDD를 생성하는 데 사용했던 작업 내용을 기억하고 있는 것입니다.
그래서 문제가 발생하면 전체 작업을 처음부터 다시 실행하는 대신 문제가 발생한 RDD를 생성했던 작업만 다시 수행해서 복구를 수행합니다.
단 이렇게 복구를 수행하기 위해서는 한번 생성된 RDD가 바뀌지 않아야 한다는 조건이 필요합니다. 만약 RDD가 생성된 이후 변경이 가능하다면 단순히 RDD가 생성되는 데 관여한 작업 뿐만 아니라 RDD를 변경시킨 모든 작업에 대한 기록을 다 저장하고 있어야 하기 때문입니다.
정리하면, 스파크는 RDD가 생성되어 변경되는 모든 과정을 일일이 기억하는 대신 RDD를 한번생성되면 변경되지 않는 읽기 전용 모델로 만든 후 RDD 생성과 관련된 내용만 기억하고 있다가 장애가 발생하면 이전에 RDD를 만들 때 수행했던 작업을 똑같이 실행해 (똑같은 데이터를 가진 새로운 RDD를 만들어) 데이터를 복구하는 방식을 사용하는 것입니다.
이처럼 스파크에서 RDD 생성 작업을 기록해 두는 것을 리니지(lineage)라고 합니다.
RDD의 이런 읽기 전용 특성은 흔히 프로그래밍 모델이라고도 하는 프로그래밍 방식에도 영향을 주어 일단 데이터를 RDD로 만들 훈 데이터 변형이 필요하면 그 RDD로 부터 변형된 새로운 RDD를 만들고 그것으로부터 또 다른 RDD를 생성해서 최종적인 모습의 RDD를 만들어 가는 형태로 데이터를 처리합니다.
이때 기존 RDD는 변형되지 않고 매번 새로운 RDD가 재생되기 때문에 장애가 발생하면 문제가 발생했던 구간의 작업만 수행해서 RDD를 재빨리 복원할 수 있는 것입니다.
RDD는 크게 세 가지 방법으로 생성할 수 있습니다.
List나 Set과 같은 프로그램의 메모리에 생성된 데이터를 이용하는 것으로 특별한 데이터 소스를 준비할 필요 없이 RDD의 기능을 즉시 테스트해 볼 수 있어 테스트 코드 작성 등에 유용하게 사용할 수 있습니다.
두 번째 방법은 로컬 파일시스템이나 하둡의 HDFS 같은 외부 저장소에 저장된 데이터를 읽어서 생성하는 방법인데, 스파크는 다양한 유형의 데이터 소스로부터 데이터를 읽고 RDD를 생성할 수 있는 유용한 함수를 제공합니다.
마지막 세 번째는 기존에 생성돼 있는 RDD로 부터 또 다른 RDD를 생성하는 방법인데, 실제로 RDD로 부터 새로운 RDD를 만들어주는 "create RDD"와 같은 함수가 제공되는 것은 아니고 기존 RDD의 모든 요소에 1을 더하는 등의 연산을 적용하면 "한번 만들어지면 수정되지 않는다"는 RDD의 특성으로 인해 새로운 RDD가 만들어지는 것을 의미합니다.
1. Collection 이용
[자바]
JavaRDD<String> rdd = sc.parallelize(Arrays.asList("a","b","c","d","e"));
[스칼라]
val rdd = sc.parallelize(List("a","b","c","d","e"))
[파이썬]
rdd = sc.parallelize(["a","b","c","d","e"])
2. 파일로부터 생성
[자바]
JavaRDD<String> rdd = sc.textFile("<path_to_file>");
[스칼라]
val rdd = sc.textFile("<path_to_file>")
[파이썬]
rdd = sc.textFile("<path_to_file>")
3. 기존 RDD로 부터 새로운 RDD 생성
[자바]
JavaRDD<String> rdd1 = rdd.map(v->v.toUpperCase());
[스칼라]
val rdd1 = rdd.map(_.toUpperCase())
[파이썬]
rdd1 = rdd.map(lambda s: s.upper())
RDD를 생성하고 나면 RDD가 제공하는 다양한 연산을 이용해 데이터를 처리하면 되는데, 이때 RDD에서 제공하는 연산은 크게 "트랜스포메이션(Transformation)"과 액션 (Action)"이라는 두종류로 나눌 수 있습니다.
이 가운데 프랜스포메이션 연산이란 어떤 RDD에 변형을 가해 새로운 RDD를 생성하는 연산으로 기존 RDD는 바뀌지 않은 채 변형된 값을 가진 새로운 RDD가 생성됩니다. 예를 들어, 위 예제에서 rdd로 부터 rdd1을 생성할 때 사용한 map연산은 rdd에 있는 요소 각각에 소문자를 대문자로 바꾸는 함수를 적용해 그 결괏값으로 구성된 새로운 RDD를 만들어 냅니다.
한편 스파크는 이러한 rdd와 rdd1의 생성 과정을 따로 기록해뒀다가 메모리를 이용한 데이터 처리 과정에서 일부 데이터 유실이 발생하면 앞서 기록해둔 생성 과정을 다시 수행해서 데이터를 복구합니다.
이러한 변환 연산이 가진 또 다른 중요한 특징은 연산이 호출되는 시점에 바로 실행되는 것이 아니고 RDD에 대한 생성 계보만 만들면서 쌓여있다가 "액션"에 해당하는 연산이 호출될 때에 비로서 한꺼번에 실행된다는 것입니다.
변환 연산이 가진 이러한 지연 실행 방식의 장점은 본격적인 작업 실행에 들어가기 전에 데이터가 어떤 방법가 절차에 따라 변형돼야 하는지 알 수 있다는 점입니다. 따라서 최종 실행이 필요한 시점에 누적된 변환 연산을 분석하고 그중에서 가장 최적의 방법을 찾아 변환 연산을 실행할 수 있습니다.
예를 들어, 어떤 온라인 쇼필몰에서 발생한 1TB 규모의 구매 정보 로그가 총 10대의 서버에 무작위로 흩어져 저장돼 있고 이 로그 파일을 분석해서 각 상품별 총 판매건수를 구한다고 할 때 다음과 같은 두 가지 방법을 생각해 볼 수 있를 것입니다.
(방법 1)
1. 10대 서버에 저장된 모든 로그를 상품에 따라 재분류한다.
2. 분류된 각 그룹별로 총 판매건수를 계산한다.
(방법 2)
1. 전체 로그를 분류하기 전에 각 서버마다 상품별 총 판매건수를 계산한다.
2. 각 서버별로 계산된 상품별 판매건수를 모두 더한다.
"방법 1"의 경우 10대의 서버에 저장된 모든 로그 파일을 상품번호별로 재분류하는 것으로 작업을 시작합니다.
그런데 이때 동일한 상품번호를 가진 로그 파일들이 10대의 서버에 무작위로 흩어져 있기 때문에 같은 상품번호에 해당하는 로그가 분류 로직에 따라 한 서버에서 다른 서버로 네트워크를 통해 이동하는 현상, 즉 "셔플(shuffle)"이 대량으로 일어납니다.
이에 반해 "방법 2"의 경우 셔플을 수행하기 전에 각 서버에 먼저 상품별 부분 집계를 수행한 후 집계된 결과 파일만을 대상으로 네트워크를 통한 2차 합계 연산을 수행하기 때문에 "방법 1"에 비해 훨씬 적은 양의 데이터를 대상으로 셔플을 수행할 수 있습니다. 따라서 전체 처리 성능 면에서 볼 때 고비용 연산(네트워크를 통한 데이터 이동)을 더 효율적으로 수행한 "방법 2"가 더 우세합니다. 스파크는 위 예제의 방법 2처럼 최종 결과를 계산하는 데 필요한 변환 연산을 전체적으로 분석하고 효율적인 실행 방법을 찾기 위해 "액션" 연산이 필요할 때 까지 변환을 수행하지 않습니다.
이때 액션 연산이란 그 연산의 결과로 RDD가 아닌 다른 값을 반환하거나 아예 반환하지 않는 연산을 의미합니다.
예를 들면, RDD의 크기를 게산해서 Long값을 반환한다거나 RDD를 화면에 출력 또는 외부 저장소에 저장하는 등의 동작이 이에 해당한다고 할 수 있습니다.
RDD는 스파크에서 사용하는 데이터 모델이면서 동싱에 스파크가 제공하는 실제 클래스의 이름이기도 합니다. 앞에서 RDD의 연산 종류를 변환 연산과 액션 연산으로 나눈다고 했지만 실제로 스파크 RDD API 문서에 그런 구분이 따로 표시돼 있는 것은 아닙니다.
이 때문에 RDD 클래스가 제공하는 메서드 가운데 어떤 것이 변혼에 속하는 메서드고 어떤 것이 액션에 속하는 메서드인지 알아보려면 API 문서에서 메서드의 반환 타입을 확인해 보면 됩니다. 이때 연산의 수행 결과가 RDD이면 변환 메서드고 RDD가 아닌 다른 타입의 값을 반환하면 액션에 해당하는 연산이라고 할 수 있습니다.
지금까지 RDD가 제공하는 메서드를 크게 액션과 트랜스포메이션 유형으로 구분할 수 있다는 것을 알아봤습니다.
그런데 RDD가 제공하는 이런 메서드 중에는 RDD를 구성하는 요소가 특정 타입일때만 사용 가능한 것들이 있습니다.
예를 들어, RDD에 포함된 모든 요소의 합계를 구하는 sum()메서드나 표준편차를 구하는 stddev() 같은 메서드는 RDD를 구성하는 요소들이 모두 숫자 타입일 경우에만 사용할 수 있습니다.
또한 RDD의 요소 중에서 같은 키(key)를 가진 것들끼리 모아주는 groupByKey()라는 메서드는 키(Key)와 값(Value) 쌍으로 구성된 RDD에만 사용할 수 있습니다.
이처럼 요소의 타입에 따라 서로 다른 다양한 메서드를 제공하는 것이 처음 시작하는 입장에서는 통일성이 없고 복잡하게 느껴질 수도 잇습니다. 하지만 데이터 유형에 상관없이 정해진 몇 가지 방법만을 조합해서 모든 문제를 해결하는 것은 초기 학습은 빠를지 몰라도 현실에서 마주치는 다양하고 복잡한 문제를 해결하는 데는 오히려 더 많은 고민과 노하우가 필요해지는 경우가 많습니다. 따라서 RDD가 제공하는 메서드들을 한번에 다 습득하기는 만만치 않지만 수시로 API를 참고해서 스파크가 제공하는 리치(Rich) API의 장점을 십분 활용하기 바랍니다.
'Big Data > Spark' 카테고리의 다른 글
[Spark]Spark SQL (0) | 2020.06.09 |
---|---|
[Spark] Java WordCount 예제를 통한 Spark 처리 흐름 파악 (1) | 2020.06.04 |
[Spark]개요 (0) | 2020.06.01 |
댓글