Boilerplate Code Snippet
파이썬에서 스파크 RDD를 사용할 때 항상 사용하게 되는 bolierplate code는 다음과 같다.
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local").setAppName("AppName") # (1)
sc = SparkContext(conf = conf) # (2)
lines = sc.textFile("file:///Dataset/my_data.csv") # (3)
parsed = lines.map(lambda x: x.split(",")[2]) # (4)
(1)
스파크 환경설정을 하는 부분이다.
setMaster는 connect할 master URL을 지정하는 메소드이다. 지금은 혼자 연습하는 것이므로 "local"로 지정하였다. "local[4]"로 지정하면 로컬에서 4개의 core로 실행한다는 의미이다. 클러스터 "spark://master:7077"로 지정하면 스파크 standalone cluster로 연결하여 실행된다.
setAppName은 앱 이름을 지정하는 부분이다. 원하는 이름을 입력한다.
(2)
스파크 클러스터가 생성되고 이 클러스터에 접근할 수 있는 객체가 바로 SparkContext이다. 즉, SparkContext는 스파크의 엔트리포인트이다. 여기에 (1)에서 지정한 설정을 넘겨준다.
(3)
textFile로 데이터베이스의 위치를 지정한다.
(4)
여기서부터는 문제해결에 필요한 logic이다.
이 예시에는 그냥 공백으로 데이터를 split하고 그 중 세번째 값을 취한다는 의미이다.
기본적인 데이터 처리 방법
map
각각의 요소에 transformation을 적용하는 메소드
transformed = rdd.map(lambda x: x + 1)
위 예제는 기존의 리스트 각 요소에 1을 더한 값으로 새로운 리스트를 만들어내는 코드이다.
꼭 lambda를 사용할 필요는 없다. 아래처럼 트랜스포머 함수를 정의해주어도 된다.
def plus_one(x):
return x + 1
transformed = rdd.map(plus_one)
# rdd = [1, 2, 3]
# transformed = [2, 3, 4]
mapValues
map 과는 다르게 key, value 페어 중 value만 넘겨주어 transform을 행하는 메소드. key를 변형시키지 않는다는 것이 가장 큰 차이점이다.
transformed = rdd.mapValues(lambda x: x + 1)
# rdd = [('a', 1), ('b', 2), ('c': 1)]
# transformed = [('a', 2), ('b', 3), ('c': 2)]
countByValue
각 value가 데이터셋에서 나타나는 빈도수를 dictionary 타입으로 리턴한다.
count = rdd.countByValue()
# rdd = [1, 2, 1, 5, 6, 5, 5] 일 때
# count는 다음과 같다
# [(1, 2), (2, 1), (5, 3), (6, 1)]
# 순서는 다를 수 있음. 정렬하려면 sorted(count.items()) 적용해야 함
reduceByKey
key가 같은 데이터에 대해서 value를 병합(merge)하는 메소드. 그룹핑이라고 이해하면 쉽다.
trasformed = rdd.reduceByKey(lambda x, y: x + y)
# x, y 현재와 다음 데이터 레코드이다.
# 이 예제에서는 그룹별 합계를 구한다.
# rdd = [('a', 1), ('b', 2), ('a', 3)]
# transformed = [('a', 4), ('b', 2)]
filter
말 그대로 필터링을 해주는 메소드이다. 단독으로 쓰이기 보다는 주로 다른 데이터 처리를 하기 전에 전처리로 제외할 데이터를 걸러내는 용도로 사용된다.
filtered = rdd.filter(lambda x: x > 0)
# rdd = [1, -2, 3, 4]
# filtered = [1, 3, 4]
flatMap
데이터 레코드 자체가 쌍이나 리스트 등으로 되어 있을 때 다 풀어서 flat하게 만들어주는 메소드이다.
transformed = rdd.flatMap(lambda x: x.split())
# rdd = ["I am", "happy and sad", "at the same time"]
# transformed = ["I", "am", "happy", "and", "sad", "at", "the", "same", "time"]
# 그냥 map을 사용했다면 다음과 같은 결과가 나왔을 것이다.
# justMap = [["I", "am"], ["happy", "and", "sad"], ["at", "the", "same", "time"]]
'개발 관련 이야기' 카테고리의 다른 글
AWS Kinesis Family 비교 (0) | 2022.09.01 |
---|---|
AWS SAA-C02 시험용 간단 정리 (0) | 2022.08.31 |
vpn & transit gateway & direct connect 가격 (0) | 2022.08.30 |
내가 보려고 만든 Snowball Edge / Snowmobile 정리 (0) | 2022.08.29 |
내가 보려고 만든 AWS S3 정리 (0) | 2022.08.28 |
댓글