욱 연구소

슬기로운 개발생활

프로그래밍/Spark

[pyspark] 스파크 완벽 가이드 - 다양한 데이터 타입 다루기

wook-lab 2022. 3. 16. 14:47
반응형
표현식을 만드는 방법과 다양한 데이터 타입을 다루는 방법

 

DataFrame을 먼저 하나 생성하자

df = spark.read.format("csv")\
    .option("header", "true")\
    .option("inferSchema", "true")\
    .load("/data/retail-data/by-day/2010-12-01.csv")
df.printSchema()
df.show(5, False)
df.createOrReplaceTempView("dfTable")

 

 

스파크 데이터 아비을 변환하기 위한 lit 함수

스파크 데이터 타입으로 변환하는 방법은 반드시 알아두어야 한다. 바로 lit 함수

from pyspark.sql.functions import lit
 
df.select(lit(5), lit("five"), lit(5.0))
 
 
# 결과
# DataFrame[5: int, five: string, 5.0: double]

 

 

불리언(Boolean) 데이터 타입

불리언 구문은 and, or, true, false로 구성되며 구문을 사용해 true 또는 false로 평가되는 논리 문법을 만듦.
또한 데이터 로우를 필터링 할 때도 사용

from pyspark.sql.functions import col
 
df.where(col("InvoiceNo") != 536365)\
  .select("InvoiceNo", "Description")\
  .show(5, False)
   
# 가장 명확한 방법, 문자열 표현식에 조건절 명시
df.where("InvoiceNo = 536365")\
  .show(5, False)
   
df.where("InvoiceNo <> 536365")\ # <>: 일치하지 않음
  .show(5, False)

불리언 표현식을 사용하는 경우 항상 모든 표현식을 and 메서드로 묶어 차례대로 필터를 적용해야 함.
스파크는 내부적으로 모든 필터를 하나의 문장으로 변환하고 그 다음 동시에 모든 필터를 처리
반면 or 구문을 사용할 때는 반드시 동일한 구문에 조건을 정의해야 함

from pyspark.sql.functions import instr
 
priceFilter = col("UnitPrice") > 600
descriptFilter = instr(df.Description, "POSTAGE") >= 1
 
df.where(df.StockCode.isin("DOT"))\
  .where(priceFilter | descriptFilter)\
  .show()

불리언 컬럼을 사용해 DataFrame을 필터링

from pyspark.sql.functions import instr
 
# Boolean컬럼을 사용해 필터링
DOTCodeFilter = col("StockCode") == "DOT"
priceFilter = col("UnitPrice") > 600
descriptFilter = instr(col("Description"), "POSTAGE") >= 1
df.withColumn("isExpensive", DOTCodeFilter & (priceFilter | descriptFilter))\         # withColumn을 이용해 불리언 컬럼 생성
  .where("isExpensive")\          # 불리언 컬럼으로 필터링
  .select("unitPrice", "isExpensive").show(5)

eqNullSafe(): null 값에 안전한(null-safe) 동치(equivalence)

 

수치형 데이터 타입 다루기

카운트(Count) - 지수승(pow), 반올림(round), 내림(bround)

카운트는 빅데이터 처리에서 필터링 다음으로 많이 수행하는 작업
대부분은 수치형 데이터 타입을 사용해 연산 방식을 정의

from pyspark.sql.functions import expr, pow
 
# pow 지수승 e.g. pow(값, 지수)
fabricatedQuantity = pow(col("Quantity") * col("UnitPrice"), 2) + 5
df.select(expr("CustomerId"), fabricatedQuantity.alias("realQuantity")).show(2)
 
# 곱셈
df.selectExpr(
    "CustomerId",
    "(POWER((Quantity * UnitPrice), 2.0) + 5) as realQuantity"
).show(2)
 
# 소수점 첫째자리에서 반올림
from pyspark.sql.functions import round
 
df.select(expr("CustomerId"), round(fabricatedQuantity, 4).alias("realQuantity")).show(2)
 
# 반올림 round, 내림 bround
from pyspark.sql.functions import bround
df.select(round(lit("2.5")), bround(lit("2.5"))).show(2)

 

피어슨 상관계수: 두 컬럼 사이의 상관관계를 계산

from pyspark.sql.functions import corr
 
# 피어슨 상관계수
df.stat.corr("Quantity", "UnitPrice")  # 피어슨 상관계수 값
df.select(corr("Quantity", "UnitPrice")).show()

 

describe메서드: 요약 통계

관련 컬럼에 대한 집계, 평균, 표준편차, 최솟값, 최댓값을 계산
그러나 통계 스키마는 변경 될 수 있으므로 콘솔 확인용으로만 사용

# 요약 통계 콘솔 확인용
df.describe().show()
 
 
# 결과
+-------+-----------------+------------------+--------------------+------------------+------------------+------------------+--------------+
|summary|        InvoiceNo|         StockCode|         Description|          Quantity|         UnitPrice|        CustomerID|       Country|
+-------+-----------------+------------------+--------------------+------------------+------------------+------------------+--------------+
|  count|             3108|              3108|                3098|              3108|              3108|              1968|          3108|
|   mean| 536516.684944841|27834.304044117645|                null| 8.627413127413128| 4.151946589446603|15661.388719512195|          null|
| stddev|72.89447869788873|17407.897548583845|                null|26.371821677029203|15.638659854603892|1854.4496996893627|          null|
|    min|           536365|             10002| 4 PURPLE FLOCK D...|               -24|               0.0|           12431.0|     Australia|
|    max|          C536548|              POST|ZINC WILLIE WINKI...|               600|            607.49|           18229.0|United Kingdom|
+-------+-----------------+------------------+--------------------+------------------+------------------+------------------+--------------+

정확한 집계가 필요하다면 함수를 직접 임포트하고 해당 컬럼에 적용할 수 있다.

from pyspark.sql.functions import count, mean, stddev_pop, min, max

 

StatFunction패키지

다양한 통계 함수를 제공하며, stat 속성을 사용해 접근하며 다양한 통계값을 계산할 때 사용하는 DataFrame 메서드.
approxQuantile: 백분위수 정확하게 계산하거나 근사치를 계산
crosstab: 교차표(cross-tabulation)
freqItems: 자주 사용하는 항목 쌍을 확인
monotonically_increasing_id: 모든 로우에 고유 ID값을 추가

olName = "UnitPrice"
quantileProbs = [0.5]
relError = 0.05
 
# approxQuantile():
df.stat.approxQuantile("UnitPrice", quantileProbs, relError) # 2.51
 
# crosstab: 교차표(cross-tabulation) 확인
df.stat.crosstab("StockCode", "Quantity").show()
 
# freqItems: 자주사용하는 항목 쌍 확인
df.stat.freqItems(["StockCode", "Quantity"]).show();
 
 
# monotonically_increasing_id
from pyspark.sql.functions import monotonically_increasing_id
df.select(monotonically_increasing_id()).show(2)

 

 

문자열 데이터 타입 다루기

대/소문자 변환

  • initcap: 공백으로 나눠 모든 단어의 첫글자를 대문자로 변환
  • lower: 문자열 전체를 소문자로 변환
  • upper: 문자열 전체를 대문자로 변환
  • ltrim: 왼쪽의 공백 문자열 자름
  • rtrim:오른쪽의 공백 문자열 자름
  • trim: 양쪽의 공백 문자열 자름
  • lpad: 왼쪽에 입력 된 수 만큼 문자열 값으로 채움
  • rpad: 오른쪽에 입력 된 수 만큼 문자열 값으로 채움
    • 단 pad의 경우 기준 문자열보다 작은 수를 입력하면 문자열의 우측부터 제거
# initcap() 공백으로 나눠 모든 단어의 첫글자 대문자
from pyspark.sql.functions import initcap
df.select(initcap(col("Description"))).show(2)
 
 
# lower: 문자열 전체를 소문자
# upper: 문자열 전체를 대문자
from pyspark.sql.functions import lower, upper
df.select(col("Description"),
          lower(col("Description")),
          upper(lower(col("Description")))).show(2, False)
 
 
# lpad, ltrim, rpad, rtrim, trim
from pyspark.sql.functions import lit, ltrim, rtrim, rpad, lpad, trim
df.select(
    ltrim(lit("   HELLO   ")).alias("ltrim"),
    rtrim(lit("   HELLO   ")).alias("rtrim"),
    trim(lit("   HELLO   ")).alias("trim"),
    lpad(lit("HELLO"), 3, " ").alias("lpad"),  # 문자열 길이보다 작은 숫자를 넣으면 문자열의 오른쪽부터 제거됨.
    lpad(lit("HELLO"), 10, " ").alias("lpad"),
    rpad(lit("HELLO"), 3, " ").alias("rpad"),
    rpad(lit("HELLO"), 10, " ").alias("rpad")
).show(1)

 

정규표현식

스파크는 자바 정규 표현식이 가진 강력한 기능을 활용.

  • regexp_extract: 해당 패턴을 만족하는 문자열을 추출
  • regexp_replace: 입력된 문자열을 치환 문자열로 변환
  • translate: 기준 문자열과 치환 문자열을 각각 변환
  • instr: 문자열의 시작위치를 알려줌
    • 단순히 문자열이 존재하는지 확인할 때 씀 (위치는 1부터시작, 때문에 0보다 크면 존재하는 것으로 간주할 수 있다)
  • locate: 문자열의 위치를 정수로 반환
    • instr과 동일하게 위치는 1부터 시작
    • instr과 다른점은 세번째 인수로 시작 위치를 지정할 수 있음
# regexp_replace
from pyspark.sql.functions import regexp_replace
 
regex_string = "BLACK|WHITE|RED|GREEN|BLUE"
df.select(regexp_replace(col("Description"), regex_string, "COLOR").alias("color_clean"),
          col("Description"))\
  .where(instr(col("color_clean"), "COLOR") == 1)\
  .show(2, False)
 
 
# translate 문자열 각각을 치환
from pyspark.sql.functions import translate
 
df.select(translate(col("Description"), "LET", "137"), col("Description")).show(2, False) # L->1, E->3, T->7
 
 
# instr 단순히 값 존재 여부 확인
from pyspark.sql.functions import instr
 
containsBlack = instr(col("Description"), "BLACK") >= 1
containsWhite = instr(col("Description"), "WHITE") >= 1
df.withColumn("hasSimpleColor", containsBlack | containsWhite)\
  .where("hasSimpleColor")\
  .select("Description").show(3, False)
 
   
# locate 문자열의 위치를 정수로 반환하는 함수(1부터 시작)
from pyspark.sql.functions import expr, locate
 
simpleColors = ["black", "white", "red", "green", "blue"]
 
def color_locator(column, color_string):
    return locate(color_string.upper(), column)\
        .cast("boolean")\
        .alias("is_" + color_string)
 
selectedColumns = [color_locator(df.Description, c) for c in simpleColors]
selectedColumns.append(expr("*")) # Column 타입이어야 함
 
df.select(*selectedColumns).where(expr("is_white OR is_red"))\
  .select("Description").show(3, False)

 

 

날짜와 타임스탬프 데이터 타입 다루기

스파크는 두 가지 종류의 시간 관련 정보만 집중적으로 관리합니다.

  • date: 달력 형태의 날짜
  • timestamp: 날짜와 시간 정보를 모두 가짐

스파크는 특정 날짜 포맷을 명시하지 않아도 자체적으로 식별해 데이터를 읽을 수 있음.

날짜나 시간을 문자열로 저장하고, 런타임에 날짜 타입으로 변경하는 경우가 많아 문자열을 다루는 작업에 많이 사용됨.

시간대 설정이 필요하다면 스파크 SQL 설정의 spark.conf.se ssionLocalTimeZone 속성을 로컬 시간대로 지정해 적용

TimestampType 클래스는 초 단위 정밀도까지만 지원 (밀리세컨드나 마이크로세컨드 단위를 다룬다면 Long 데이터 타입으로 데이터를 변환해 처리하는 우회 정책 사용)

스파크는 자바의 날짜와 타임스탬프를 사용해서 표준 체계를 따름.

#오늘 날짜와 현재 타임스탬프값 구하기
from pyspark.sql.functions import current_date, current_timestamp
 
dateDF = spark.range(10)\
    .withColumn("today", current_date())\
    .withColumn("now", current_timestamp())
dateDF.createOrReplaceTempView("dateTable")
dateDF.printSchema()

 

오늘을 기준으로 5일 전후의 날짜 구하기

  • date_sub: 첫번째 인자로 입력된 날짜로부터 두번째 인자로 입력된 일수 만큼 뺌
  • date_add: 첫번째 인자로 입력된 날짜로부터 두번째 인자로 입력된 일수 만큼 더함
from pyspark.sql.functions import date_add, date_sub, col
 
dateDF.select(date_sub(col("today"), 5), date_add(col("today"), 5)).show(1)

 

두 날짜의 차이를 구하기

  • datediff: 입력된 두 인자 사이의 날짜를 구함
  • months_between: 입력된 두 인자 사이의 월수를 계산
from pyspark.sql.functions import datediff, months_between, to_date, lit
 
dateDF.withColumn("week_ago", date_sub(col("today"), 7))\
      .select(datediff(col("week_ago"), col("today"))).show(1)
 
# 출력결과
+-------------------------+
|datediff(week_ago, today)|
+-------------------------+
|                       -7|
+-------------------------+
only showing top 1 row
 
 
dateDF.select(
    to_date(lit("2016-01-01")).alias("start"),
    to_date(lit("2017-05-22")).alias("end")
).select(months_between(col("start"), col("end"))).show(1)
 
# 출력 결과
+--------------------------+
|months_between(start, end)|
+--------------------------+
|              -16.67741935|
+--------------------------+
only showing top 1 row

 

문자열을 날짜로 변환하기

  • to_date: 문자열을 날짜로 변환, 필요에 따라 날짜 포맷을 지정
  • to_timestamp: 문자열을 날짜로 변환, 반드시 날짜 포맷을 지정
# to_date
from pyspark.sql.functions import to_date
 
dateFormat = "yyy-dd-MM"
cleanDateDF = spark.range(1).select(
    to_date(lit("2017-12-11"), dateFormat).alias("date"),
    to_date(lit("2017-20-12"), dateFormat).alias("date2"))
 
cleanDateDF.createOrReplaceTempView("dateTable2")
cleanDateDF.show()
 
 
 
 
# to_timestamp
from pyspark.sql.functions import to_timestamp
 
cleanDateDF.select(to_timestamp(col("date"), dateFormat)).show()
 
 
 
 
# 올바른 포맷과 타입의 날짜나 타임스탬프를 사용한다면 매우 쉽게 비교할 수 있음.
cleanDateDF.filter(col("date2") > lit("2017-12-12")).show()   # lit 함수를 통해 스파크 값으로 변경 후 비교
cleanDateDF.filter(col("date2") > "'2017-12-12'").show()       # 문자열 자체로 비교

 

 

null 값 다루기

DataFrame에서 빠져 있거나 비어 있는 데이터를 표현할 때는 항상 null 값을 사용.

스파크에서는 빈 문자열이나 대체 값 대신 null 값을 사용해야 최적화를 수행할 수 있음.

 

coalesce

스파크의 coalesce 함수는 인수로 지정한 여러 컬럼 중 null이 아닌 첫번 째 값을 반환

from pyspark.sql.functions import coalesce
 
df.select(coalesce(col("Description"), col("CustomerId"))).show()

 

ifnull, nullif, nvl, nvl2(coalesce와 유사한 결과를 얻을 수 있는 SQL 함수)

  • ifnull: 첫 번째 값이 null 이면 두 번째 값을 반환
  • nullif: 두 값이 같으면 null을 반환, 두 값이 다르면 첫번 째 값을 반환
  • nvl: 첫 번째 값이 null 이면 두 번째 값을 반환, 첫번째 값이 null 이 아니면 두번째 값을 반환
  • nvl2: 첫 번째 값이 null이 아니면 두번 째 값을 반환, 첫 번째 값이 null이면 세 번째 인수로 지정된 값을 반환

 

drop

기본적으로 null 값을 가진 모든 로우를 제거

  • drop: null 값을 가진 모든 로우를 제거
  • drop("any"): 로우의 컬럼값 중 하나라도 null을 가지면 해당 로우를 제거
  • drop("all"): 모든 컬럼의 값이 null이거나 NaN인 경우 해당 로우를 제거
df.na.drop()
df.na.drop("any")
df.na.drop("all")
df.na.drop("all", subset=["StockCode", "InvoiceNo"]) # drop 메서드에 배열 형태의 컬럼을 인수로 전달해 적용할 수 있음

 

fill

하나 이상의 컬럼을 특정 값으로 채울 수 있음.

채워 넣을 값과 컬럼 집합으로 구성된 맵을 인수로 사용

# String 데이터 타입의 컬럼에 존재하는 null 값을 입력 문자열로 채움
df.na.fill("All Null values become this string")
 
# subset에 지정한 컬럼에 존재하는 null 값을 입력 문자열로 채움
df.na.fill("all", subset=["StockCode", "InvoiceNo"])
 
# key-value 형태로 key에는 컬럼, value에는 대체한 값을 넣어 해당 컬럼에 존재하는 null 값을 입력한 값으로 채움
fill_cols_vals = {"StockCode": 5, "Description": "No Value"}
df.na.fill(fill_cols_vals).show()

 

replace

null 값을 유연하게 대처할 방법 중의 하나로 조건에 따라 다른 값으로 대체 할 수 있음.

replace 메서드를 사용하려면 변경하고자 하는 값과 원래 값의 데이터 타입이 같아야 함.

# replace({변경할 문자}, {대체할 문자}, {대상 컬럼})
df.na.replace([""], ["UNKNOWN"], "Description").show()

 

 

복합 데이터 타입 다루기

복합 데이터 타입

  • 구조체(Struct)
  • 배열(Array)
  • 맵(Map)

 

구조체

구조체는 DataFrame 내부의 DataFrame으로 보면 됨. (쿼리문에서 다수의 컬럼을 괄호로 묶어 구조체를 만들 수 있음)

from pyspark.sql.functions import struct
 
# Description 컬럼과 InvoiceNo 컬럼이 묶여서 하나의 구조체가 됨
complexDF = df.select(struct("Description", "InvoiceNo").alias("complex"))
complexDF.createOrReplaceTempView("complexDF")
complexDF.show(5, False)
 
 
# 출력결과
+---------------------------------------------+
|complex                                      |
+---------------------------------------------+
|[WHITE HANGING HEART T-LIGHT HOLDER, 536365] |
|[WHITE METAL LANTERN, 536365]                |
|[CREAM CUPID HEARTS COAT HANGER, 536365]     |
|[KNITTED UNION FLAG HOT WATER BOTTLE, 536365]|
|[RED WOOLLY HOTTIE WHITE HEART., 536365]     |
+---------------------------------------------+
only showing top 5 rows

 

복합 데이터 타입을 가진  DataFrame의 경우 기존 DataFrame과 다르게 점(.)을 사용하거나 getField 메서드를 사용할 수 있음.

complexDF.select("complex.Description").show(1)
complexDF.select(col("complex").getField("Description")).show(1)
 
 
# 출력결과
+--------------------+
|         Description|
+--------------------+
|WHITE HANGING HEA...|
+--------------------+
only showing top 1 row
 
+--------------------+
| complex.Description|
+--------------------+
|WHITE HANGING HEA...|
+--------------------+
only showing top 1 row

 

배열

split함수에 구분자(delimiter)를 인수로 전달해 배열로 변환

from pyspark.sql.functions import split
# split을 통해 공백 문자열을 구분자로 배열 생성
df.select(col("Description"), split(col("Description"), " ")).show(2, False)
 
 
# 출력 결과
+----------------------------------+----------------------------------------+
|Description                       |split(Description,  )                   |
+----------------------------------+----------------------------------------+
|WHITE HANGING HEART T-LIGHT HOLDER|[WHITE, HANGING, HEART, T-LIGHT, HOLDER]|
|WHITE METAL LANTERN               |[WHITE, METAL, LANTERN]                 |
+----------------------------------+----------------------------------------+
only showing top 2 rows
 
 
 
# 배열 참조하기
df.select(split(col("Description"), " ").alias("array_col"))\
  .selectExpr("array_col[0]").show(2, False)
 
 
# 출력 결과
+------------+
|array_col[0]|
+------------+
|WHITE       |
|WHITE       |
+------------+
only showing top 2 rows

 

배열의 길이

배열의 크기인 size로 조회

from pyspark.sql.functions import size
 
df.select(size(split("Description", " "))).show(2)

 

array_contains

배열에 특정 값이 존재하는지 확인

from pyspark.sql.functions import array_contains
 
df.select(array_contains(split(col("Description"), " "), "WHITE")).show(2)

 

explode

explode함수는 배열 타입의 컬럼을 입력으로 받음

입력된 컬럼의 배열 값에 포함된 모든 값을 로우로 변환

from pyspark.sql.functions import split, explode
 
df.withColumn("splitted", split(col("Description"), " "))\
  .withColumn("exploded", explode(col("splitted")))\
  .select("Description", "InvoiceNo", "exploded", "splitted").show(5, False)

 

map 함수와 컬럼의 키-값 쌍을 이용하여 생성

배열과 동일한 방법으로 값을 선택할 수 도 있음

from pyspark.sql.functions import create_map
 
complexMapDF = df.select(create_map(col("Description"), col("InvoiceNo")).alias("complex_map"))
complexMapDF.show(5, False)
 
# 키로 데이터 조회가 가능(일치하는 데이터가 없다면 null 값을 반환)
complexMapDF.selectExpr("complex_map['WHITE METAL LANTERN']").show()
 
 
# explode를 이용하여 map 타입을 분해하여 컬럼으로 변환
complexMapDF.selectExpr("explode(complex_map)").show(5)

 

 

JSON 다루기

JSON 데이터를 다루기 위한 고유 기능 지원

  • 문자열 형태의 JSON을 직접 조작
  • JSON 파싱
  • JSON 객체로 생성

 

get_json_object, json_tuple

  • get_json_object: JSON 객체(딕셔너리나 배열)를 인라인 쿼리로 조회할 수 있음
  • json_tuple: 중첩이 없는 단일 수준의 JSON 객체 조회가능
# JSON 컬럼 생성
jsonDF = spark.range(1).selectExpr("""
    '{"myJSONKey" : {"myJSONValue": [1, 2, 3]}}' as jsonString
""")
jsonDF.show()
 
jsonDF.show(1, False)
 
 
#출력결과
+------------------------------------------+
|jsonString                                |
+------------------------------------------+
|{"myJSONKey" : {"myJSONValue": [1, 2, 3]}}|
+------------------------------------------+
 
 
 
 
 
 
from pyspark.sql.functions import get_json_object, json_tuple
 
 
jsonDF.select(
    get_json_object(col("jsonString"), "$.myJSONKey.myJSONValue[1]").alias("column"),   # get_json_object 사용하면 인라인 쿼리로 조회 가능
    json_tuple(col("jsonString"), "myJSONKey")  # json_tuple
).show(2)

 

to_json

StructType을 JSON 문자열로 변경

JSON 데이터소스와 동일한 형태의 딕셔너리(맵)를 파라미터로 사용

from pyspark.sql.functions import to_json
 
df.selectExpr("(InvoiceNo, Description) as myStruct")\
  .select(to_json(col("myStruct"))).show(2, False)
 
 
#출력결과
+-------------------------------------------------------------------------+
|structstojson(myStruct)                                                  |
+-------------------------------------------------------------------------+
|{"InvoiceNo":"536365","Description":"WHITE HANGING HEART T-LIGHT HOLDER"}|
|{"InvoiceNo":"536365","Description":"WHITE METAL LANTERN"}               |
+-------------------------------------------------------------------------+
only showing top 2 rows
 

from_json

JSON문자열을 객체로 변환

단, 반드시 스키마를 지정해야 함.

from pyspark.sql.functions import from_json
from pyspark.sql.types import *
 
parseSchema = StructType((
    StructField("InvoiceNo", StringType(), True),
    StructField("Description", StringType(), True)
))
df.selectExpr("(InvoiceNo, Description) as myStruct")\
  .select(to_json(col("myStruct")).alias("newJSON"))\
  .select(from_json(col("newJSON"), parseSchema), col("newJSON")).show(2, False) # parseSchema 스키마 지정 필수!

 

 

사용자 정의 함수(UDF)

사용자 정의 함수(UDF; User Defined Function)는 하나 이상의 컬럼을 입력으로 받고, 반환할 수 있음.

여러가지 프로그래밍 언어로 개발 가능(스칼라, 파이썬, 자바), 그러나 언어별로 성능에 영향이 다름

레코드별로 데이터를 처리하는 함수

기본적으로 특정 SparkSession이나 Context에서 사용할 수 있도록 임시 함수 형태로 등록

 

함수 정의하기

def power3(double_value):
    return double_value ** 3
 
power3(2.0) # 테스트하기

 

스파크에 등록하기

스파크는 드라이버에서 함수를 직렬화하고 네트워크를 통해 모든 익스큐터 프로세스로 전달(언어와 상관없이 발생)

단, 언어에 따라 근본적으로 동작하는 방식은 다른데

  • 스칼라, 자바: 
    • JVM 환경에서만 사용
    • 따라서 스파크 내장 함수가 제공하는 코드 생성 기능의 장점을 활용할 수 없어 성능저하발생
    • 많은 객체를 생성하거나 사용해도 성능 문제 발생
  • 파이썬: 
    • 스파크는 워커 노드에 파이썬 프로세스를 실행하고 파이썬이 이해할 수 있는 포맷으로 모든 데이터를 직렬화함(앞서 JVM 사용 언어에도 존재했던 부분)
    • 파이썬 프로세스에 있는 데이터의 로우마다 함수를 실행하고 마지막으로 JVM과 스파크에 처리 결과를 반환
    • 스파크에서 워커 메모리를 관리할 수 없으므로 JVM과 파이썬이 동일한 머신에서 메모리 경합을 하면 자원에 제약이 생겨 우커가 비정상 종료가 될 수 있음
  • 자바나 스칼라로 사용자 정의 함수로 개발하는 것을 권장
    • 파이썬에서도 사용 가능

사용자 정의 함수 등록이 완료 되면 DataFrame에서 사용 가능(문자열 표현식에서는 아직 사용 불가)

# 등록
from pyspark.sql.functions import udf
 
power3udf = udf(power3)
 
 
# 사용
udfExampleDF = spark.range(5).toDF("num")
udfExampleDF.select(col("num"), power3udf(col("num"))).show(3)

사용자 정의 함수를 스파크 SQL 함수로 등록하면 모든 프로그래밍 언어와 SQL에서 사용자 정의 함수를 사용 할 수 있음.

스칼라로 등록한 함수를 파이썬에서도 사용 할 수 있게됨.

함수가 올바르게 동작할 수 있도록 반환되는 데이터 타입을 지정하는 것이 좋음.

# 스칼라로 사용자정의 함수(UDF)를 SQL 함수로 등록
import org.apache.spark.sql.functions.udf
 
def power3(number:Double):Double = number * number * number
spark.udf.register("power3", power3(_:Double):Double)
 
 
 
 
# 파이썬으로 사용자정의 함수(UDF)를 SQL함수로 등록
def power3Python(double_value):
    return double_value ** 3
spark.udf.register("power3Python", power3Python, "Double")

 

반응형