본문 바로가기
개발 관련 이야기

Aparche Spark :: pyspark RDD를 이용하여 간단한 예제 풀어보기

by 꿈틀쓰 2022. 9. 12.

이전 포스트에서 RDD에서 자주 사용되는 데이터 연산 함수를 정리해보았다.

Spark에서 데이터를 다룰 때 3가지 메커니즘이 있는데, 바로 RDD / DataFrame / Dataset 이다.

 

 

DataFrame과 Dataset은 앞으로 작성할 포스트에서 순서대로 정리해볼 것이다.

이 포스트를 참고하면 세 개념의 차이점을 자세히 알 수 있다.

 

 

 

RDD

RDD란 Resilient Distributed Dataset의 줄임말로 말그대로 fault tolerant 한 분산 데이터셋으로 여러 클러스터에 분포해 있다.

 

 

RDD의 특징을 보면 다음과 같다.

 

 

  • 2011년도에 나온 개념
  • 분산된 element의 collection
  • unstructed & structed data 취급 가능
  • Immutable 하다. (참고)
  • schema 추론 X, 필요하다면 user가 명시해야 함
  • partition으로 나눠져 있음
  • low-level transformation / action 함수를 사용할 때 권장

 

 

 

 

나이별 평균 친구 수 구하기

컬럼이 

 

이름, 나이, 친구 수

 

로 이루어진 csv 데이터가 있을 때, 나이별 평균 친구 수를 구하고자 한다면 어떻게 해야 할까.

 

 

우선 기본적인 spark boilerplate code를 작성한다.

from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local").setAppName("FriendsByAge")
sc = SparkContext(conf = conf)

lines = sc.textFile(FILE_PATH)

 

 

 

 

그리고 여기서 필요한 컬럼만 빼 올 것이다.

우리는 나이, 친구 수 데이터만 필요하다. 또한 csv는 쉼표(,)로 구분되므로 쉼표가 있을 때마다 데이터가 구분된다.

그렇다면 각 row 마다 3개의 컬럼이 쉼표로 구분되어 있는데 그 중 1, 2 index의 컬럼을 추출 할 것이다. 

 

map 을 사용하여 어떻게 데이터를 추출할 것인가를 알려주면 된다. 이때는 lambda 표현식 보다는 함수로 따로 빼는 것이 수월할 것이다. 한 줄로 표현하기에는 아무래도 좀 복잡하고 보기에도 한눈에 들어오지 않을 것이니 말이다.

def parseData(line):
    fields = line.split(',')    # 쉼표로 컬럼을 구분한다.
    age = int(fields[1])        # 2번째 컬럼인 나이 추출
    numFriends = int(fields[2]) # 3번째 컬럼인 친구 수 추출
    return (age, numFriends)    # 튜플로 리턴

rdd = lines.map(parseData)

 

 

 

이제 문제를 해결하기 위한 로직을 생각해보자. 우선 여기서 key는 나이이다.

우선 같은 key를 가진 row 수를 counting해야 한다. 그리고 같은 key를 가진 row의 친구 수의 합계를 구해야 한다.

 

그렇다면 count를 위한 컬럼을 하나 더 만들어서 1로 세팅해 놓으면 각 row의 친구 수를 합할 때마다 해당 컬럼도 합해 주면, 마지막에는 key에 해당하는 나이가 전체 데이터에서 몇개였는지 count 컬럼을 통해 알 수 있다.

 

그리하여

 

친구 수 합 / count 합

 

연산을 취하면 평균 친구 수를 구할 수 있다.

 

위 로직을 코드로 나타내고자 한다면

 

1. count 컬럼 추가

rdd.mapValues(lambda x: (x, 1))

value에 원래 친구 수 컬럼만 있었는데 (친구 수, 1)로 변환 한다. 1이 count 컬럼 역할을 하게 된다.

 

 

 

2. key 별로 그룹핑하여 sum을 구하기

rdd.reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))

x, y가 두 row의 value에 해당하는 값이다. 현재 value는 2개(친구 수, count)로 이루어져 있으므로 각각의 값을 따로 합한다. 친구 수 끼리 합하고 (x[0] + y[1]), count 끼리 합한다(x[1] + y[1]).

 

 

3. 친구 수 합 / count 합으로 평균 친구 수 계산하기

rdd.mapValues(lambda x: x[0] / x[1])

친구 수 (x[0])를 count 합(x[1])으로 나눠준다(/ 연산).

 

 

 

전체 코드

from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local").setAppName("FriendsByAge")
sc = SparkContext(conf = conf)

def parseData(line):
    fields = line.split(',')
    age = int(fields[2])
    numFriends = int(fields[3])
    return (age, numFriends)

friendsDataset = sc.textFile(FILE_PATH)
rdd = friendsDataset.map(parseData)
sumByAge = rdd.mapValues(lambda x: (x, 1)).reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
averageByAge = sumByAge.mapValues(lambda x: x[0] / x[1])

# 출력 부분
results = averageByAge.collect()
for result in sorted(results):
    print(result)

 

 

 

 

 

오늘은 나이별 평균 친구 수를 구하는 예제를 살펴보았다. 다음 포스트는 또 다른 기본 예제 문제 풀이를 할 것이다.

 

문제는 Udemy에서 나온 실습 문제를 기본으로 하고 있다. 그렇기에 오픈데이터셋이 아닌 경우는 데이터 셋을 올리지 않겠다. 궁금하신 분은 Udemy 강의를 참고하시길.

 

스파크 어린이의 오늘의 복습도 끝

댓글