본문 바로가기
개발 관련 이야기

Aparche Spark :: pyspark RDD로 파일 열기 및 기본 데이터 처리

by 꿈틀쓰 2022. 8. 30.

 

Boilerplate Code Snippet

 

파이썬에서 스파크 RDD를 사용할 때 항상 사용하게 되는 bolierplate code는 다음과 같다.

from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local").setAppName("AppName")		# (1)
sc = SparkContext(conf = conf)						# (2)

lines = sc.textFile("file:///Dataset/my_data.csv")			# (3)
parsed = lines.map(lambda x: x.split(",")[2])                           # (4)

 

 

(1)

스파크 환경설정을 하는 부분이다.

setMaster는 connect할 master URL을 지정하는 메소드이다. 지금은 혼자 연습하는 것이므로 "local"로 지정하였다. "local[4]"로 지정하면 로컬에서 4개의 core로 실행한다는 의미이다. 클러스터 "spark://master:7077"로 지정하면 스파크 standalone cluster로 연결하여 실행된다.

 

setAppName은 앱 이름을 지정하는 부분이다. 원하는 이름을 입력한다.

 

 

(2)

스파크 클러스터가 생성되고 이 클러스터에 접근할 수 있는 객체가 바로 SparkContext이다. 즉, SparkContext는 스파크의 엔트리포인트이다. 여기에 (1)에서 지정한 설정을 넘겨준다.

 

 

(3)

textFile로 데이터베이스의 위치를 지정한다.

 

 

(4)

여기서부터는 문제해결에 필요한 logic이다.

이 예시에는 그냥 공백으로 데이터를 split하고 그 중 세번째 값을 취한다는 의미이다.

 

 

 

 

기본적인 데이터 처리 방법

 

map

각각의 요소에 transformation을 적용하는 메소드

transformed = rdd.map(lambda x: x + 1)

위 예제는 기존의 리스트 각 요소에 1을 더한 값으로 새로운 리스트를 만들어내는 코드이다.

 

꼭 lambda를 사용할 필요는 없다. 아래처럼 트랜스포머 함수를 정의해주어도 된다.

def plus_one(x):
    return x + 1
    
transformed = rdd.map(plus_one)

# rdd = [1, 2, 3]
# transformed = [2, 3, 4]

 

 

mapValues

map 과는 다르게 key, value 페어 중 value만 넘겨주어 transform을 행하는 메소드. key를 변형시키지 않는다는 것이 가장 큰 차이점이다.

transformed = rdd.mapValues(lambda x: x + 1)
# rdd = [('a', 1), ('b', 2), ('c': 1)]
# transformed = [('a', 2), ('b', 3), ('c': 2)]

 

countByValue

각 value가 데이터셋에서 나타나는 빈도수를 dictionary 타입으로 리턴한다.

count = rdd.countByValue()  
# rdd = [1, 2, 1, 5, 6, 5, 5] 일 때
# count는 다음과 같다
# [(1, 2), (2, 1), (5, 3), (6, 1)]
# 순서는 다를 수 있음. 정렬하려면 sorted(count.items()) 적용해야 함

 

reduceByKey

key가 같은 데이터에 대해서 value를 병합(merge)하는 메소드. 그룹핑이라고 이해하면 쉽다.

trasformed = rdd.reduceByKey(lambda x, y: x + y)
# x, y 현재와 다음 데이터 레코드이다.
# 이 예제에서는 그룹별 합계를 구한다.
# rdd = [('a', 1), ('b', 2), ('a', 3)]
# transformed = [('a', 4), ('b', 2)]

 

filter

말 그대로 필터링을 해주는 메소드이다. 단독으로 쓰이기 보다는 주로 다른 데이터 처리를 하기 전에 전처리로 제외할 데이터를 걸러내는 용도로 사용된다.

filtered = rdd.filter(lambda x: x > 0)
# rdd = [1, -2, 3, 4]
# filtered = [1, 3, 4]

 

flatMap

데이터 레코드 자체가 쌍이나 리스트 등으로 되어 있을 때 다 풀어서 flat하게 만들어주는 메소드이다.

transformed = rdd.flatMap(lambda x: x.split())
# rdd = ["I am", "happy and sad", "at the same time"]
# transformed = ["I", "am", "happy", "and", "sad", "at", "the", "same", "time"]
# 그냥 map을 사용했다면 다음과 같은 결과가 나왔을 것이다.
# justMap = [["I", "am"], ["happy", "and", "sad"], ["at", "the", "same", "time"]]

 

댓글