기본 용어 개념
레코드(record) = row
컬럼(column) = 스프레드시트의 column과 유사
스키마(schema) = 컬럼과 데이터 타입 정의
파티셔닝(partitioning) = 물리적으로 배치되는 형태
파티셔닝 스키마 = 파티션을 배치하는 방법 정의
스키마
DataFrame의 컬럼명과 데이터 타입을 정의
데이터를 읽기 전에 스키마를 정의해야 하는지 여부는 상황에 따라 달라짐
스키마는 여러개의 StructField 타입 필드로 구성된 StructType 객체
df.printSchema()
# 실행결과
# root
# |-- DEST_COUNTRY_NAME: string (nullable = true)
# |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
# |-- count: long (nullable = true)
df.schema
# 실행결과
# StructType(List(StructField(DEST_COUNTRY_NAME,StringType,true),StructField(ORIGIN_COUNTRY_NAME,StringType,true),StructField(count,LongType,true)))
# 스키마 정의
myManualSchema = StructType([
StructField("DEST_COUNTRY_NAME", StringType(), True),
StructField("ORIGIN_COUNTRY_NAME", StringType(), True),
StructField("count", LongType(), False)
])
컬럼과 표현식
사용자는 표현식으로 DataFrame의 컬럼을 선택, 조작, 제거할 수 있음
스파크의 컬럼은 표현식을 사용해 레코드 단위로 계산한 값을 단순하게 나타내는 논리적인 구조
컬럼
컬럼을 생성하는 간단한 방법: col(), column()
DataFrame의 컬럼은 col 메서드로 참조
- e.g. df.col("count") → 데이터 프레임의 count 컬럼을 명시적으로 참조
DataFrame 컬럼에 접근하기
- df.columns
표현식
DataFrame 레코드의 여러 값에 대한 트랜스포메이션 집합을 의미
여러 컬럼명을 입력으로 받아 식별하고, '단일 값'을 만들기 위해 다양한 표현식을 각 레코드에 적용하는 함수
표현식을 사용하는 방법: expr()
DataFrame의 컬럼 참조
- expr("someCol") → col("someCol")과 동일하게 동작
스파크가 연산 순서를 지정하는 논리적 트리로 컴파일 하기 때문에 아래는 같은 트랜스포메이션 과정을 거침
- expr("someCol - 5")
- col("someCol") - 5
- expr("someCol") -5
컬럼은 단지 표현식일 뿐이며, 논리적 실행 계획으로 컴파일 된다.
레코드와 로우
DataFrame에서 각 로우는 하나의 레코드
# 첫번째 로우 출력
df.first()
# 로우 생성하기
from pyspark.sql import Row
myRow = Row("Hello', None, 1, False)
# 로우 데이터 접근하기
myRow[0]
DataFrame의 트랜스포메이션
DataFrame은 트랜스포메이션으로 아래와 같은 작업을 수행할 수 있음
- 로우나 컬럼 추가
- 로우나 컬럼 제거
- 로우를 컬럼으로 변환하건, 그 반대
- 컬럼값을 기준으로 로우 순서 변경
DataFrame생성하기
# 파일 읽어서 DataFrame 생성하기
df = spark.read.format("json").load("{file_path}") # 파일을 읽어 DataFrame으로 변환하여 반환
# row와 createDataFrame으로 DataFrame 생성하기
from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, StringType, LongType
mySchema = StructType([
StructField("some", StringType(), True),
StructField("col", StringType(), True),
StructField("names", LongType(), False)
])
myRow = Row("Hello", None, 1)
myDf = spark.createDataFrame([myRow], mySchema)
myDf.show()
select와 selectExpr
select는 문자열 컬럼을 인수로받는 메서드
# 여러 컬럼 선택하여 출력하기
df.select("DEST_COUNTRY_NAME", "ORIGIN_COUNTRY_NAME").show(2)
# 컬럼 참조 방식 혼합하여 사용하기
df.select(
expr("DEST_COUNTRY_NAME"),
col("DEST_COUNTRY_NAME"),
column("DEST_COUNTRY_NAME")
).show(2)
# 컬럼 참조 방식과 문자열 참조 방식을 섞어 쓸순 없다 !!
df.select(
col("DEST_COUNTRY_NAME"),
"DEST_COUNTRY_NAME"
)
# 컬럼명 변경하기
df.select(expr("DEST_COUNTRY_NAME AS destination")).show(2) # 컬럼명: destination
df.select(expr("DEST_COUNTRY_NAME AS destination").alias("DEST_COUNTRY_NAME").show(2) # 컬럼명: DEST_COUNTRY_NAME
selectExpr 메서드는 새로운 DataFrame을 생성하는 복잡한 표현식을 간단하게 만드는 도구
# 새로운 컬럼(withinCountry) 추가하기
df.selectExpr(
"*", # 모든 원본 컬럼 포함
"(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withinCountry"
).show(2)
# DataFrame 컬럼에 대한 누적집계
df.selectExpr(
"avg(count)",
"count(distinct(DEST_COUTNRY_NAME))"
).show(2)
스파크 데이터 타입으로 변환하기
리터럴(Literal)은 프로그래밍 언어의 리터럴값을 스파크가 이해할 수 있는 값으로 변환한다.
# 리터럴 메서드 lit()
from pyspark.sql.functions import lit
df.selectExpr(
expr("*"),
lit(1).alias("One") # One이라는 컬럼에 모든 값이 1로 채워짐
).show(2)
컬럼 추가하기
withColumn 메서드를 사용하여 DataFrame에 신규 컬럼을 추가하기
# 신규 컬럼 추가 메서드 withColumn()
df.withColumn("numberOne", lit(1)).show(2) # DataFrame의 기존 컬럼과 함께 추가된 "numberOne" 컬럼이 보이고, numberOne 컬럼에는 1로 채워짐
# withColumn({컬럼명}, {값 생성 표현식})
df.withColumn(
"withinCountry", expr("ORIGIN_COUNTRY_NAME == DEST_COUNTRY_NAME") # 해당 표현식에 의해 두 컬럼의 값이 일치하는지 여부에 따라 true/false 리턴
).show(2)
# 결과
# +-----------------+-------------------+-----+-------------+
# |DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
# +-----------------+-------------------+-----+-------------+
# | United States| Romania| 15| false|
# | United States| Croatia| 1| false|
# +-----------------+-------------------+-----+-------------+
컬럼명 변경하기
withColumnRenamed 메서드로 컬럼명 변경
# withColumnRenamed()
df.withColumnRenamed("DEST_COUNTRY_NAME", "dest").columns
예약 문자와 키워드
예약 문자를 컬럼명에 사용하려면 백틱(`) 문자를 이용해 이스케이핑(escaping)해야 한다.
(문자열 참조의 경우 문자열로 지정이 되어 있어 이스케이핑 처리 불필요)
# 이스케이핑 불필요
dfWithLongColName = df.withColumn(
"This Long Column-Name",
expr("ORIGIN_COUNTRY_NAME")
) # 기존 DataFrame에 "This Long Column-Name" 컬럼을 추가하고 해당 컬럼의 값으로 기존 DataFrame의 "ORIGIN_COUNTRY_NAME" 필드 값을 대입한다.
# show(2) 결과
# +-----------------+-------------------+-----+---------------------+
# |DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|This Long Column-Name|
# +-----------------+-------------------+-----+---------------------+
#| United States| Romania| 15| Romania|
# | United States| Croatia| 1| Croatia|
# +-----------------+-------------------+-----+---------------------+
# only showing top 2 rows
# 이스케이핑 필요
dfWithLongColName.selectExpr(
"`This Long Column-Name`", # "This Long Column-Name" 필드 출력
"`This Long Column-Name` as `new col`" # "This Long Column-Name" 필드를 "new col" 컬럼 변경하여 출력
).show(2)
대소문자 구분
기본적으로 스파크는 대소문자를 가리지 않음. 따라서 설정을 통해 대소문자를 구분하게 할 수 있음
set spark.sql.caseSensitive true
컬럼 제거하기
select로 컬럼을 제거할 수 있지만, 컬럼을 제거하는 메서드 drop도 사용
# drop 메서드를 통해 여러 컬럼을 한꺼번에 제거할 수 있음
df.drop("ORIGIN_COUNTRY_NAME", "count").columns
컬럼의 데이터 타입 변경하기
특정 데이터 타입을 다른 데이터 타입으로 형변환이 필요할 경우 cast 메서드 사용
# Integer to String
df.withColumn("count2", col("count").cast("string"))
로우 필터링
row를 필터링하려면 참과 거짓을 판별하는 표현식을 만들어야 함.
DataFrame의 가장 일반적인 방법은 문자열 표현식이나 컬럼을 다루는 기능을 이용하는 것.
DataFrame의 필터링 메서드: where, filter
#filter, where 둘다 표현식과 문자열을 인수로 받을 수 있음
# filter
df.filter(col("count") < 2).show(2)
# where
df.where("count < 2").show(2)
#여러 필터를 적용할 경우
df.where(col("count") < 2)\
.where(col("ORIGIN_COUNTRY_NAME") =!= "Croatia")\
.show()
고유한 로우 얻기
DataFrame에서 고유한 값이나 중복되지 않는 값을 얻는 연산 distinct 메서드
# distinct()
df.select("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").distinct().count()
무작위 샘플 만들기
sample 메서드를 통해 DataFrame에서 표본 데이터 추출 비율을 지정하거나 복원 추출이나 비복원 추출의 사용 여부를 지정할 수 있음.
- 복원 추출(sample with replacement)
- 비복원 추출(sample without replacement)
seed = 5
withReplacement = False
fraction = 0.5
df.sample(withReplacement, fraction, seed).count()
임의 분할하기
원본 DataFrame을 임의 크기로 '분할'할 때 유용하게 사용(머신러닝 알고리즘 학습셋, 검증셋, 텍스트셋을 만들 때 주로 사용)
dataFrames = df.randomSplit([0.25, 0.75], seed)
# dataFrames[1].show()
dataFrames[0].count() > dataFrames[1].count()
로우 합치기와 추가하기
DataFrame은 불변성을 가지기 때문에 변경하는 작업은 불가능
따라서 원본 DataFrame을 새로운 DataFrame과 통합(union)해야 함.
from pyspark.sql import Row
from pyspark.sql.functions import col
schema = df.schema
newRows = [
Row("New Country", "Other Country", 5L),
Row("New Country 2", "Other Country 3", 1L)
]
parallelizedRows = spark.sparkContext.parallelize(newRows)
newDf = spark.createDataFrame(parallelizedRows, schema) # createDataFrame: 새로운 DataFrame 생성
df.union(newDf)\ # union 메서드로 통합
.where("count = 1")\
.where(col("ORIGIN_COUNTRY_NAME") != "United States")\
.show()
로우 정렬하기
sort와 orderBy 메서드를 사용해 정렬할 수 있음
두 메서드 모두 컬럼 표현식과 문자열을 사용할 수 있으며 다수의 컬럼을 지정할 수 있음
# sort(), orderyBy
df.sort("count").show(5)
df.orderBy("count", "DEST_COUNTRY_NAME").show(5)
df.orderBy(col("count"), col("DEST_COUNTRY_NAME")).show(5)
# asc, desc
from pyspark.sql.functions import expr
from pyspark.sql.functions import desc, asc
df.orderBy(expr("count desc")).show(2)
df.orderBy(col("count").desc(), col("DEST_COUNTRY_NAME").asc()).show(2)
# 파티션별 정렬 sortWithinPartitions 함수 사용
spark.read.format("json").load("/data/flight-data/json/*-summary.json").sortWithinPartitions("count").show(5)
로우 수 제한하기
DataFrame에서 추출할 row 수를 제한해야 할 때 limit 메서드 사용
df.limit(5).show()
df.orderBy(expr("count desc")).limit(6).show()
repartition과 coalesce(콜레스)
최적화 기법 중 하나로 자주 필터링하는 컬럼을 기준으로 데이터를 분할하는 방법
- repartition메서드
- 무조건 전체 데이터 셔플
- 향후 사용할 파티션 수가 현재보다 많을 경우 사용
- 컬럼을 기준으로 파티션을 만들 경우 사용
- coalesce메서드
- 파티션을 병합하려는 경우에 사용
# repartition
df.rdd.getNumPartitions()
df.repartition(5)
df.repartition(col("DEST_COUNTRY_NAME"))
df.repartition(5, col("DEST_COUNTRY_NAME"))
# coalesce
df.repartition(5, col("DEST_COUNTRY_NAME")).coalesce(2)
드라이버로 로우 데이터 수집하기
스파크는 드라이버에서 클러스터 상태 정보를 유지
드라이버로 데이터를 수집하는 연산
- collect: 전체 DataFrame의 모든 데이터를 수집
- take: 상위 N개의 로우를 반환 (정수만 인수로 받음)
- show: 여러 로우를 보기좋게 출력
collectDF = df.limit(10)
collectDF.take(5) # take는 정수형 값만 인수로 사용
collectDF.show()
collectDF.show(5, False)
collectDF.collect()
드라이버로 모든 데이터 컬렉션을 수집하는 작업은 매우 큰 비용(CPU, 메모리, 네트워크 등)이 발생
'프로그래밍 > Spark' 카테고리의 다른 글
[pyspark] 스파크 완벽 가이드 - 파케이(parquet)파일 (0) | 2022.03.16 |
---|---|
[pyspark] 스파크 완벽 가이드 - 다양한 데이터 타입 다루기 (0) | 2022.03.16 |
[pyspark] 스파크 완벽 가이드 - 구조적 API 기본개념과 용어 (0) | 2022.03.16 |