욱 연구소

슬기로운 개발생활

프로그래밍/Spark

[pyspark] 스파크 완벽 가이드 - 구조적 API 기본 연산

wook-lab 2022. 3. 16. 14:24
반응형

기본 용어 개념

레코드(record) = row
컬럼(column) = 스프레드시트의 column과 유사
스키마(schema) = 컬럼과 데이터 타입 정의
파티셔닝(partitioning) = 물리적으로 배치되는 형태
파티셔닝 스키마 = 파티션을 배치하는 방법 정의

 

스키마

DataFrame의 컬럼명과 데이터 타입을 정의
데이터를 읽기 전에 스키마를 정의해야 하는지 여부는 상황에 따라 달라짐
스키마는 여러개의 StructField 타입 필드로 구성된 StructType 객체

df.printSchema()
# 실행결과
# root
# |-- DEST_COUNTRY_NAME: string (nullable = true)
# |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
# |-- count: long (nullable = true)


df.schema
# 실행결과
# StructType(List(StructField(DEST_COUNTRY_NAME,StringType,true),StructField(ORIGIN_COUNTRY_NAME,StringType,true),StructField(count,LongType,true)))


# 스키마 정의
myManualSchema = StructType([
  StructField("DEST_COUNTRY_NAME", StringType(), True),
  StructField("ORIGIN_COUNTRY_NAME", StringType(), True),
  StructField("count", LongType(), False)
])

 

컬럼과 표현식

사용자는 표현식으로 DataFrame의 컬럼을 선택, 조작, 제거할 수 있음
스파크의 컬럼은 표현식을 사용해 레코드 단위로 계산한 값을 단순하게 나타내는 논리적인 구조

컬럼

컬럼을 생성하는 간단한 방법: col(), column()

DataFrame의 컬럼은 col 메서드로 참조

  • e.g. df.col("count")  → 데이터 프레임의 count 컬럼을 명시적으로 참조

DataFrame 컬럼에 접근하기

  • df.columns

 

표현식

DataFrame 레코드의 여러 값에 대한 트랜스포메이션 집합을 의미
여러 컬럼명을 입력으로 받아 식별하고, '단일 값'을 만들기 위해 다양한 표현식을 각 레코드에 적용하는 함수

표현식을 사용하는 방법: expr()

DataFrame의 컬럼 참조

  • expr("someCol") → col("someCol")과 동일하게 동작

스파크가 연산 순서를 지정하는 논리적 트리로 컴파일 하기 때문에 아래는 같은 트랜스포메이션 과정을 거침

  • expr("someCol - 5")
  • col("someCol") - 5
  • expr("someCol") -5

컬럼은 단지 표현식일 뿐이며, 논리적 실행 계획으로 컴파일 된다.

 

레코드와 로우

DataFrame에서 각 로우는 하나의 레코드 

# 첫번째 로우 출력
df.first()


# 로우 생성하기
from pyspark.sql import Row
myRow = Row("Hello', None, 1, False)


# 로우 데이터 접근하기
myRow[0]

 

DataFrame의 트랜스포메이션

DataFrame은 트랜스포메이션으로 아래와 같은 작업을 수행할 수 있음

  • 로우나 컬럼 추가
  • 로우나 컬럼 제거
  • 로우를 컬럼으로 변환하건, 그 반대
  • 컬럼값을 기준으로 로우 순서 변경

 

DataFrame생성하기

# 파일 읽어서 DataFrame 생성하기
df = spark.read.format("json").load("{file_path}") # 파일을 읽어 DataFrame으로 변환하여 반환



# row와 createDataFrame으로 DataFrame 생성하기
from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, StringType, LongType


mySchema = StructType([
  StructField("some", StringType(), True),
  StructField("col", StringType(), True),
  StructField("names", LongType(), False)
])


myRow = Row("Hello", None, 1)
myDf = spark.createDataFrame([myRow], mySchema)
myDf.show()

 

select와 selectExpr

select는 문자열 컬럼을 인수로받는 메서드

# 여러 컬럼 선택하여 출력하기
df.select("DEST_COUNTRY_NAME", "ORIGIN_COUNTRY_NAME").show(2)
# 컬럼 참조 방식 혼합하여 사용하기
df.select(
	expr("DEST_COUNTRY_NAME"),
	col("DEST_COUNTRY_NAME"),
	column("DEST_COUNTRY_NAME")
).show(2)


# 컬럼 참조 방식과 문자열 참조 방식을 섞어 쓸순 없다 !!
df.select(
	col("DEST_COUNTRY_NAME"),
	"DEST_COUNTRY_NAME"
)


# 컬럼명 변경하기
df.select(expr("DEST_COUNTRY_NAME AS destination")).show(2) # 컬럼명: destination
df.select(expr("DEST_COUNTRY_NAME AS destination").alias("DEST_COUNTRY_NAME").show(2) # 컬럼명: DEST_COUNTRY_NAME

 

selectExpr 메서드는 새로운 DataFrame을 생성하는 복잡한 표현식을 간단하게 만드는 도구

# 새로운 컬럼(withinCountry) 추가하기
df.selectExpr(
	"*",	# 모든 원본 컬럼 포함
	"(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withinCountry"
).show(2)

# DataFrame 컬럼에 대한 누적집계
df.selectExpr(
	"avg(count)",
	"count(distinct(DEST_COUTNRY_NAME))"
).show(2)

 

 

스파크 데이터 타입으로 변환하기

리터럴(Literal)은 프로그래밍 언어의 리터럴값을 스파크가 이해할 수 있는 값으로 변환한다.

# 리터럴 메서드 lit()
from pyspark.sql.functions import lit
df.selectExpr(
	expr("*"),
	lit(1).alias("One") 	# One이라는 컬럼에 모든 값이 1로 채워짐
).show(2)

 

컬럼 추가하기

withColumn 메서드를 사용하여 DataFrame에 신규 컬럼을 추가하기

# 신규 컬럼 추가 메서드 withColumn()
df.withColumn("numberOne", lit(1)).show(2) 	# DataFrame의 기존 컬럼과 함께 추가된 "numberOne" 컬럼이 보이고, numberOne 컬럼에는 1로 채워짐
# withColumn({컬럼명}, {값 생성 표현식})
df.withColumn(
	"withinCountry", expr("ORIGIN_COUNTRY_NAME == DEST_COUNTRY_NAME") # 해당 표현식에 의해 두 컬럼의 값이 일치하는지 여부에 따라 true/false 리턴
).show(2)
# 결과
# +-----------------+-------------------+-----+-------------+
# |DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
# +-----------------+-------------------+-----+-------------+
# |    United States|            Romania|   15|        false|
# |    United States|            Croatia|    1|        false|
# +-----------------+-------------------+-----+-------------+

 

컬럼명 변경하기

withColumnRenamed 메서드로 컬럼명 변경

# withColumnRenamed()
df.withColumnRenamed("DEST_COUNTRY_NAME", "dest").columns

 

예약 문자와 키워드

예약 문자를 컬럼명에 사용하려면 백틱(`) 문자를 이용해 이스케이핑(escaping)해야 한다.
(문자열 참조의 경우 문자열로 지정이 되어 있어 이스케이핑 처리 불필요)

# 이스케이핑 불필요
dfWithLongColName = df.withColumn(
	"This Long Column-Name",
	expr("ORIGIN_COUNTRY_NAME")
) # 기존 DataFrame에 "This Long Column-Name" 컬럼을 추가하고 해당 컬럼의 값으로 기존 DataFrame의 "ORIGIN_COUNTRY_NAME" 필드 값을 대입한다.


# show(2) 결과
# +-----------------+-------------------+-----+---------------------+
# |DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|This Long Column-Name|
# +-----------------+-------------------+-----+---------------------+
#|    United States|            Romania|   15|              Romania|
# |    United States|            Croatia|    1|              Croatia|
# +-----------------+-------------------+-----+---------------------+
# only showing top 2 rows




# 이스케이핑 필요
dfWithLongColName.selectExpr(
	"`This Long Column-Name`",                      # "This Long Column-Name" 필드 출력
	"`This Long Column-Name` as `new col`"   # "This Long Column-Name" 필드를 "new col" 컬럼 변경하여 출력
).show(2)

 

대소문자 구분

기본적으로 스파크는 대소문자를 가리지 않음. 따라서 설정을 통해 대소문자를 구분하게 할 수 있음

set spark.sql.caseSensitive true

 

컬럼 제거하기

select로 컬럼을 제거할 수 있지만, 컬럼을 제거하는 메서드 drop도 사용

# drop 메서드를 통해 여러 컬럼을 한꺼번에 제거할 수 있음
df.drop("ORIGIN_COUNTRY_NAME", "count").columns

 

컬럼의 데이터 타입 변경하기

특정 데이터 타입을 다른 데이터 타입으로 형변환이 필요할 경우 cast 메서드 사용

# Integer to String
df.withColumn("count2", col("count").cast("string"))

 

로우 필터링

row를 필터링하려면 참과 거짓을 판별하는 표현식을 만들어야 함.
DataFrame의 가장 일반적인 방법은 문자열 표현식이나 컬럼을 다루는 기능을 이용하는 것.
DataFrame의 필터링 메서드: where, filter

#filter, where 둘다 표현식과 문자열을 인수로 받을 수 있음
# filter
df.filter(col("count") < 2).show(2)


# where
df.where("count < 2").show(2)




#여러 필터를 적용할 경우
df.where(col("count") < 2)\ 
  .where(col("ORIGIN_COUNTRY_NAME") =!= "Croatia")\
  .show()

 

고유한 로우 얻기

DataFrame에서 고유한 값이나 중복되지 않는 값을 얻는 연산 distinct 메서드

# distinct()
df.select("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").distinct().count()

 

무작위 샘플 만들기

sample 메서드를 통해 DataFrame에서 표본 데이터 추출 비율을 지정하거나 복원 추출이나 비복원 추출의 사용 여부를 지정할 수 있음.

  • 복원 추출(sample with replacement)
  • 비복원 추출(sample without replacement)
seed = 5
withReplacement = False
fraction = 0.5
df.sample(withReplacement, fraction, seed).count()

 

임의 분할하기

원본 DataFrame을 임의 크기로 '분할'할 때 유용하게 사용(머신러닝 알고리즘 학습셋, 검증셋, 텍스트셋을 만들 때 주로 사용)

dataFrames = df.randomSplit([0.25, 0.75], seed)
# dataFrames[1].show()
dataFrames[0].count() > dataFrames[1].count()

 

로우 합치기와 추가하기

DataFrame은 불변성을 가지기 때문에 변경하는 작업은 불가능
따라서 원본 DataFrame을 새로운 DataFrame과 통합(union)해야 함.

from pyspark.sql import Row
from pyspark.sql.functions import col

schema = df.schema
newRows = [
    Row("New Country", "Other Country", 5L),
    Row("New Country 2", "Other Country 3", 1L)
]
parallelizedRows = spark.sparkContext.parallelize(newRows)
newDf = spark.createDataFrame(parallelizedRows, schema)    # createDataFrame: 새로운 DataFrame 생성

df.union(newDf)\        # union 메서드로 통합
  .where("count = 1")\
  .where(col("ORIGIN_COUNTRY_NAME") != "United States")\
  .show()

 

로우 정렬하기

sort와 orderBy 메서드를 사용해 정렬할 수 있음
두 메서드 모두 컬럼 표현식과 문자열을 사용할 수 있으며 다수의 컬럼을 지정할 수 있음

# sort(), orderyBy
df.sort("count").show(5)
df.orderBy("count", "DEST_COUNTRY_NAME").show(5)
df.orderBy(col("count"), col("DEST_COUNTRY_NAME")).show(5)


# asc, desc
from pyspark.sql.functions import expr
from pyspark.sql.functions import desc, asc

df.orderBy(expr("count desc")).show(2)
df.orderBy(col("count").desc(), col("DEST_COUNTRY_NAME").asc()).show(2)

# 파티션별 정렬 sortWithinPartitions 함수 사용
spark.read.format("json").load("/data/flight-data/json/*-summary.json").sortWithinPartitions("count").show(5)

 

로우 수 제한하기

DataFrame에서 추출할 row 수를 제한해야 할 때 limit 메서드 사용

df.limit(5).show()
df.orderBy(expr("count desc")).limit(6).show()

 

repartition과 coalesce(콜레스)

최적화 기법 중 하나로 자주 필터링하는 컬럼을 기준으로 데이터를 분할하는 방법

  • repartition메서드
    • 무조건 전체 데이터 셔플
    • 향후 사용할 파티션 수가 현재보다 많을 경우 사용
    • 컬럼을 기준으로 파티션을 만들 경우 사용
  • coalesce메서드
    • 파티션을 병합하려는 경우에 사용
# repartition
df.rdd.getNumPartitions()
df.repartition(5)
df.repartition(col("DEST_COUNTRY_NAME"))
df.repartition(5, col("DEST_COUNTRY_NAME"))

# coalesce
df.repartition(5, col("DEST_COUNTRY_NAME")).coalesce(2)

 

드라이버로 로우 데이터 수집하기

스파크는 드라이버에서 클러스터 상태 정보를 유지

드라이버로 데이터를 수집하는 연산

  • collect: 전체 DataFrame의 모든 데이터를 수집
  • take: 상위 N개의 로우를 반환 (정수만 인수로 받음)
  • show: 여러 로우를 보기좋게 출력
collectDF = df.limit(10)
collectDF.take(5) # take는 정수형 값만 인수로 사용
collectDF.show()
collectDF.show(5, False)
collectDF.collect()

드라이버로 모든 데이터 컬렉션을 수집하는 작업은 매우 큰 비용(CPU, 메모리, 네트워크 등)이 발생

반응형