표현식을 만드는 방법과 다양한 데이터 타입을 다루는 방법
DataFrame을 먼저 하나 생성하자
df = spark.read.format("csv")\
.option("header", "true")\
.option("inferSchema", "true")\
.load("/data/retail-data/by-day/2010-12-01.csv")
df.printSchema()
df.show(5, False)
df.createOrReplaceTempView("dfTable")
스파크 데이터 아비을 변환하기 위한 lit 함수
스파크 데이터 타입으로 변환하는 방법은 반드시 알아두어야 한다. 바로 lit 함수
from pyspark.sql.functions import lit
df.select(lit(5), lit("five"), lit(5.0))
# 결과
# DataFrame[5: int, five: string, 5.0: double]
불리언(Boolean) 데이터 타입
불리언 구문은 and, or, true, false로 구성되며 구문을 사용해 true 또는 false로 평가되는 논리 문법을 만듦.
또한 데이터 로우를 필터링 할 때도 사용
from pyspark.sql.functions import col
df.where(col("InvoiceNo") != 536365)\
.select("InvoiceNo", "Description")\
.show(5, False)
# 가장 명확한 방법, 문자열 표현식에 조건절 명시
df.where("InvoiceNo = 536365")\
.show(5, False)
df.where("InvoiceNo <> 536365")\ # <>: 일치하지 않음
.show(5, False)
불리언 표현식을 사용하는 경우 항상 모든 표현식을 and 메서드로 묶어 차례대로 필터를 적용해야 함.
스파크는 내부적으로 모든 필터를 하나의 문장으로 변환하고 그 다음 동시에 모든 필터를 처리
반면 or 구문을 사용할 때는 반드시 동일한 구문에 조건을 정의해야 함
from pyspark.sql.functions import instr
priceFilter = col("UnitPrice") > 600
descriptFilter = instr(df.Description, "POSTAGE") >= 1
df.where(df.StockCode.isin("DOT"))\
.where(priceFilter | descriptFilter)\
.show()
불리언 컬럼을 사용해 DataFrame을 필터링
from pyspark.sql.functions import instr
# Boolean컬럼을 사용해 필터링
DOTCodeFilter = col("StockCode") == "DOT"
priceFilter = col("UnitPrice") > 600
descriptFilter = instr(col("Description"), "POSTAGE") >= 1
df.withColumn("isExpensive", DOTCodeFilter & (priceFilter | descriptFilter))\ # withColumn을 이용해 불리언 컬럼 생성
.where("isExpensive")\ # 불리언 컬럼으로 필터링
.select("unitPrice", "isExpensive").show(5)
eqNullSafe(): null 값에 안전한(null-safe) 동치(equivalence)
수치형 데이터 타입 다루기
카운트(Count) - 지수승(pow), 반올림(round), 내림(bround)
카운트는 빅데이터 처리에서 필터링 다음으로 많이 수행하는 작업
대부분은 수치형 데이터 타입을 사용해 연산 방식을 정의
from pyspark.sql.functions import expr, pow
# pow 지수승 e.g. pow(값, 지수)
fabricatedQuantity = pow(col("Quantity") * col("UnitPrice"), 2) + 5
df.select(expr("CustomerId"), fabricatedQuantity.alias("realQuantity")).show(2)
# 곱셈
df.selectExpr(
"CustomerId",
"(POWER((Quantity * UnitPrice), 2.0) + 5) as realQuantity"
).show(2)
# 소수점 첫째자리에서 반올림
from pyspark.sql.functions import round
df.select(expr("CustomerId"), round(fabricatedQuantity, 4).alias("realQuantity")).show(2)
# 반올림 round, 내림 bround
from pyspark.sql.functions import bround
df.select(round(lit("2.5")), bround(lit("2.5"))).show(2)
피어슨 상관계수: 두 컬럼 사이의 상관관계를 계산
from pyspark.sql.functions import corr
# 피어슨 상관계수
df.stat.corr("Quantity", "UnitPrice") # 피어슨 상관계수 값
df.select(corr("Quantity", "UnitPrice")).show()
describe메서드: 요약 통계
관련 컬럼에 대한 집계, 평균, 표준편차, 최솟값, 최댓값을 계산
그러나 통계 스키마는 변경 될 수 있으므로 콘솔 확인용으로만 사용
# 요약 통계 콘솔 확인용
df.describe().show()
# 결과
+-------+-----------------+------------------+--------------------+------------------+------------------+------------------+--------------+
|summary| InvoiceNo| StockCode| Description| Quantity| UnitPrice| CustomerID| Country|
+-------+-----------------+------------------+--------------------+------------------+------------------+------------------+--------------+
| count| 3108| 3108| 3098| 3108| 3108| 1968| 3108|
| mean| 536516.684944841|27834.304044117645| null| 8.627413127413128| 4.151946589446603|15661.388719512195| null|
| stddev|72.89447869788873|17407.897548583845| null|26.371821677029203|15.638659854603892|1854.4496996893627| null|
| min| 536365| 10002| 4 PURPLE FLOCK D...| -24| 0.0| 12431.0| Australia|
| max| C536548| POST|ZINC WILLIE WINKI...| 600| 607.49| 18229.0|United Kingdom|
+-------+-----------------+------------------+--------------------+------------------+------------------+------------------+--------------+
정확한 집계가 필요하다면 함수를 직접 임포트하고 해당 컬럼에 적용할 수 있다.
from pyspark.sql.functions import count, mean, stddev_pop, min, max
StatFunction패키지
다양한 통계 함수를 제공하며, stat 속성을 사용해 접근하며 다양한 통계값을 계산할 때 사용하는 DataFrame 메서드.
approxQuantile: 백분위수 정확하게 계산하거나 근사치를 계산
crosstab: 교차표(cross-tabulation)
freqItems: 자주 사용하는 항목 쌍을 확인
monotonically_increasing_id: 모든 로우에 고유 ID값을 추가
olName = "UnitPrice"
quantileProbs = [0.5]
relError = 0.05
# approxQuantile():
df.stat.approxQuantile("UnitPrice", quantileProbs, relError) # 2.51
# crosstab: 교차표(cross-tabulation) 확인
df.stat.crosstab("StockCode", "Quantity").show()
# freqItems: 자주사용하는 항목 쌍 확인
df.stat.freqItems(["StockCode", "Quantity"]).show();
# monotonically_increasing_id
from pyspark.sql.functions import monotonically_increasing_id
df.select(monotonically_increasing_id()).show(2)
문자열 데이터 타입 다루기
대/소문자 변환
- initcap: 공백으로 나눠 모든 단어의 첫글자를 대문자로 변환
- lower: 문자열 전체를 소문자로 변환
- upper: 문자열 전체를 대문자로 변환
- ltrim: 왼쪽의 공백 문자열 자름
- rtrim:오른쪽의 공백 문자열 자름
- trim: 양쪽의 공백 문자열 자름
- lpad: 왼쪽에 입력 된 수 만큼 문자열 값으로 채움
- rpad: 오른쪽에 입력 된 수 만큼 문자열 값으로 채움
- 단 pad의 경우 기준 문자열보다 작은 수를 입력하면 문자열의 우측부터 제거됨
# initcap() 공백으로 나눠 모든 단어의 첫글자 대문자
from pyspark.sql.functions import initcap
df.select(initcap(col("Description"))).show(2)
# lower: 문자열 전체를 소문자
# upper: 문자열 전체를 대문자
from pyspark.sql.functions import lower, upper
df.select(col("Description"),
lower(col("Description")),
upper(lower(col("Description")))).show(2, False)
# lpad, ltrim, rpad, rtrim, trim
from pyspark.sql.functions import lit, ltrim, rtrim, rpad, lpad, trim
df.select(
ltrim(lit(" HELLO ")).alias("ltrim"),
rtrim(lit(" HELLO ")).alias("rtrim"),
trim(lit(" HELLO ")).alias("trim"),
lpad(lit("HELLO"), 3, " ").alias("lpad"), # 문자열 길이보다 작은 숫자를 넣으면 문자열의 오른쪽부터 제거됨.
lpad(lit("HELLO"), 10, " ").alias("lpad"),
rpad(lit("HELLO"), 3, " ").alias("rpad"),
rpad(lit("HELLO"), 10, " ").alias("rpad")
).show(1)
정규표현식
스파크는 자바 정규 표현식이 가진 강력한 기능을 활용.
- regexp_extract: 해당 패턴을 만족하는 문자열을 추출
- regexp_replace: 입력된 문자열을 치환 문자열로 변환
- translate: 기준 문자열과 치환 문자열을 각각 변환
- instr: 문자열의 시작위치를 알려줌
- 단순히 문자열이 존재하는지 확인할 때 씀 (위치는 1부터시작, 때문에 0보다 크면 존재하는 것으로 간주할 수 있다)
- locate: 문자열의 위치를 정수로 반환
- instr과 동일하게 위치는 1부터 시작
- instr과 다른점은 세번째 인수로 시작 위치를 지정할 수 있음
# regexp_replace
from pyspark.sql.functions import regexp_replace
regex_string = "BLACK|WHITE|RED|GREEN|BLUE"
df.select(regexp_replace(col("Description"), regex_string, "COLOR").alias("color_clean"),
col("Description"))\
.where(instr(col("color_clean"), "COLOR") == 1)\
.show(2, False)
# translate 문자열 각각을 치환
from pyspark.sql.functions import translate
df.select(translate(col("Description"), "LET", "137"), col("Description")).show(2, False) # L->1, E->3, T->7
# instr 단순히 값 존재 여부 확인
from pyspark.sql.functions import instr
containsBlack = instr(col("Description"), "BLACK") >= 1
containsWhite = instr(col("Description"), "WHITE") >= 1
df.withColumn("hasSimpleColor", containsBlack | containsWhite)\
.where("hasSimpleColor")\
.select("Description").show(3, False)
# locate 문자열의 위치를 정수로 반환하는 함수(1부터 시작)
from pyspark.sql.functions import expr, locate
simpleColors = ["black", "white", "red", "green", "blue"]
def color_locator(column, color_string):
return locate(color_string.upper(), column)\
.cast("boolean")\
.alias("is_" + color_string)
selectedColumns = [color_locator(df.Description, c) for c in simpleColors]
selectedColumns.append(expr("*")) # Column 타입이어야 함
df.select(*selectedColumns).where(expr("is_white OR is_red"))\
.select("Description").show(3, False)
날짜와 타임스탬프 데이터 타입 다루기
스파크는 두 가지 종류의 시간 관련 정보만 집중적으로 관리합니다.
- date: 달력 형태의 날짜
- timestamp: 날짜와 시간 정보를 모두 가짐
스파크는 특정 날짜 포맷을 명시하지 않아도 자체적으로 식별해 데이터를 읽을 수 있음.
날짜나 시간을 문자열로 저장하고, 런타임에 날짜 타입으로 변경하는 경우가 많아 문자열을 다루는 작업에 많이 사용됨.
시간대 설정이 필요하다면 스파크 SQL 설정의 spark.conf.se ssionLocalTimeZone 속성을 로컬 시간대로 지정해 적용
TimestampType 클래스는 초 단위 정밀도까지만 지원 (밀리세컨드나 마이크로세컨드 단위를 다룬다면 Long 데이터 타입으로 데이터를 변환해 처리하는 우회 정책 사용)
스파크는 자바의 날짜와 타임스탬프를 사용해서 표준 체계를 따름.
#오늘 날짜와 현재 타임스탬프값 구하기
from pyspark.sql.functions import current_date, current_timestamp
dateDF = spark.range(10)\
.withColumn("today", current_date())\
.withColumn("now", current_timestamp())
dateDF.createOrReplaceTempView("dateTable")
dateDF.printSchema()
오늘을 기준으로 5일 전후의 날짜 구하기
- date_sub: 첫번째 인자로 입력된 날짜로부터 두번째 인자로 입력된 일수 만큼 뺌
- date_add: 첫번째 인자로 입력된 날짜로부터 두번째 인자로 입력된 일수 만큼 더함
from pyspark.sql.functions import date_add, date_sub, col
dateDF.select(date_sub(col("today"), 5), date_add(col("today"), 5)).show(1)
두 날짜의 차이를 구하기
- datediff: 입력된 두 인자 사이의 날짜를 구함
- months_between: 입력된 두 인자 사이의 월수를 계산
from pyspark.sql.functions import datediff, months_between, to_date, lit
dateDF.withColumn("week_ago", date_sub(col("today"), 7))\
.select(datediff(col("week_ago"), col("today"))).show(1)
# 출력결과
+-------------------------+
|datediff(week_ago, today)|
+-------------------------+
| -7|
+-------------------------+
only showing top 1 row
dateDF.select(
to_date(lit("2016-01-01")).alias("start"),
to_date(lit("2017-05-22")).alias("end")
).select(months_between(col("start"), col("end"))).show(1)
# 출력 결과
+--------------------------+
|months_between(start, end)|
+--------------------------+
| -16.67741935|
+--------------------------+
only showing top 1 row
문자열을 날짜로 변환하기
- to_date: 문자열을 날짜로 변환, 필요에 따라 날짜 포맷을 지정
- to_timestamp: 문자열을 날짜로 변환, 반드시 날짜 포맷을 지정
# to_date
from pyspark.sql.functions import to_date
dateFormat = "yyy-dd-MM"
cleanDateDF = spark.range(1).select(
to_date(lit("2017-12-11"), dateFormat).alias("date"),
to_date(lit("2017-20-12"), dateFormat).alias("date2"))
cleanDateDF.createOrReplaceTempView("dateTable2")
cleanDateDF.show()
# to_timestamp
from pyspark.sql.functions import to_timestamp
cleanDateDF.select(to_timestamp(col("date"), dateFormat)).show()
# 올바른 포맷과 타입의 날짜나 타임스탬프를 사용한다면 매우 쉽게 비교할 수 있음.
cleanDateDF.filter(col("date2") > lit("2017-12-12")).show() # lit 함수를 통해 스파크 값으로 변경 후 비교
cleanDateDF.filter(col("date2") > "'2017-12-12'").show() # 문자열 자체로 비교
null 값 다루기
DataFrame에서 빠져 있거나 비어 있는 데이터를 표현할 때는 항상 null 값을 사용.
스파크에서는 빈 문자열이나 대체 값 대신 null 값을 사용해야 최적화를 수행할 수 있음.
coalesce
스파크의 coalesce 함수는 인수로 지정한 여러 컬럼 중 null이 아닌 첫번 째 값을 반환
from pyspark.sql.functions import coalesce
df.select(coalesce(col("Description"), col("CustomerId"))).show()
ifnull, nullif, nvl, nvl2(coalesce와 유사한 결과를 얻을 수 있는 SQL 함수)
- ifnull: 첫 번째 값이 null 이면 두 번째 값을 반환
- nullif: 두 값이 같으면 null을 반환, 두 값이 다르면 첫번 째 값을 반환
- nvl: 첫 번째 값이 null 이면 두 번째 값을 반환, 첫번째 값이 null 이 아니면 두번째 값을 반환
- nvl2: 첫 번째 값이 null이 아니면 두번 째 값을 반환, 첫 번째 값이 null이면 세 번째 인수로 지정된 값을 반환
drop
기본적으로 null 값을 가진 모든 로우를 제거
- drop: null 값을 가진 모든 로우를 제거
- drop("any"): 로우의 컬럼값 중 하나라도 null을 가지면 해당 로우를 제거
- drop("all"): 모든 컬럼의 값이 null이거나 NaN인 경우 해당 로우를 제거
df.na.drop()
df.na.drop("any")
df.na.drop("all")
df.na.drop("all", subset=["StockCode", "InvoiceNo"]) # drop 메서드에 배열 형태의 컬럼을 인수로 전달해 적용할 수 있음
fill
하나 이상의 컬럼을 특정 값으로 채울 수 있음.
채워 넣을 값과 컬럼 집합으로 구성된 맵을 인수로 사용
# String 데이터 타입의 컬럼에 존재하는 null 값을 입력 문자열로 채움
df.na.fill("All Null values become this string")
# subset에 지정한 컬럼에 존재하는 null 값을 입력 문자열로 채움
df.na.fill("all", subset=["StockCode", "InvoiceNo"])
# key-value 형태로 key에는 컬럼, value에는 대체한 값을 넣어 해당 컬럼에 존재하는 null 값을 입력한 값으로 채움
fill_cols_vals = {"StockCode": 5, "Description": "No Value"}
df.na.fill(fill_cols_vals).show()
replace
null 값을 유연하게 대처할 방법 중의 하나로 조건에 따라 다른 값으로 대체 할 수 있음.
replace 메서드를 사용하려면 변경하고자 하는 값과 원래 값의 데이터 타입이 같아야 함.
# replace({변경할 문자}, {대체할 문자}, {대상 컬럼})
df.na.replace([""], ["UNKNOWN"], "Description").show()
복합 데이터 타입 다루기
복합 데이터 타입
- 구조체(Struct)
- 배열(Array)
- 맵(Map)
구조체
구조체는 DataFrame 내부의 DataFrame으로 보면 됨. (쿼리문에서 다수의 컬럼을 괄호로 묶어 구조체를 만들 수 있음)
from pyspark.sql.functions import struct
# Description 컬럼과 InvoiceNo 컬럼이 묶여서 하나의 구조체가 됨
complexDF = df.select(struct("Description", "InvoiceNo").alias("complex"))
complexDF.createOrReplaceTempView("complexDF")
complexDF.show(5, False)
# 출력결과
+---------------------------------------------+
|complex |
+---------------------------------------------+
|[WHITE HANGING HEART T-LIGHT HOLDER, 536365] |
|[WHITE METAL LANTERN, 536365] |
|[CREAM CUPID HEARTS COAT HANGER, 536365] |
|[KNITTED UNION FLAG HOT WATER BOTTLE, 536365]|
|[RED WOOLLY HOTTIE WHITE HEART., 536365] |
+---------------------------------------------+
only showing top 5 rows
복합 데이터 타입을 가진 DataFrame의 경우 기존 DataFrame과 다르게 점(.)을 사용하거나 getField 메서드를 사용할 수 있음.
complexDF.select("complex.Description").show(1)
complexDF.select(col("complex").getField("Description")).show(1)
# 출력결과
+--------------------+
| Description|
+--------------------+
|WHITE HANGING HEA...|
+--------------------+
only showing top 1 row
+--------------------+
| complex.Description|
+--------------------+
|WHITE HANGING HEA...|
+--------------------+
only showing top 1 row
배열
split함수에 구분자(delimiter)를 인수로 전달해 배열로 변환
from pyspark.sql.functions import split
# split을 통해 공백 문자열을 구분자로 배열 생성
df.select(col("Description"), split(col("Description"), " ")).show(2, False)
# 출력 결과
+----------------------------------+----------------------------------------+
|Description |split(Description, ) |
+----------------------------------+----------------------------------------+
|WHITE HANGING HEART T-LIGHT HOLDER|[WHITE, HANGING, HEART, T-LIGHT, HOLDER]|
|WHITE METAL LANTERN |[WHITE, METAL, LANTERN] |
+----------------------------------+----------------------------------------+
only showing top 2 rows
# 배열 참조하기
df.select(split(col("Description"), " ").alias("array_col"))\
.selectExpr("array_col[0]").show(2, False)
# 출력 결과
+------------+
|array_col[0]|
+------------+
|WHITE |
|WHITE |
+------------+
only showing top 2 rows
배열의 길이
배열의 크기인 size로 조회
from pyspark.sql.functions import size
df.select(size(split("Description", " "))).show(2)
array_contains
배열에 특정 값이 존재하는지 확인
from pyspark.sql.functions import array_contains
df.select(array_contains(split(col("Description"), " "), "WHITE")).show(2)
explode
explode함수는 배열 타입의 컬럼을 입력으로 받음
입력된 컬럼의 배열 값에 포함된 모든 값을 로우로 변환
from pyspark.sql.functions import split, explode
df.withColumn("splitted", split(col("Description"), " "))\
.withColumn("exploded", explode(col("splitted")))\
.select("Description", "InvoiceNo", "exploded", "splitted").show(5, False)
맵
map 함수와 컬럼의 키-값 쌍을 이용하여 생성
배열과 동일한 방법으로 값을 선택할 수 도 있음
from pyspark.sql.functions import create_map
complexMapDF = df.select(create_map(col("Description"), col("InvoiceNo")).alias("complex_map"))
complexMapDF.show(5, False)
# 키로 데이터 조회가 가능(일치하는 데이터가 없다면 null 값을 반환)
complexMapDF.selectExpr("complex_map['WHITE METAL LANTERN']").show()
# explode를 이용하여 map 타입을 분해하여 컬럼으로 변환
complexMapDF.selectExpr("explode(complex_map)").show(5)
JSON 다루기
JSON 데이터를 다루기 위한 고유 기능 지원
- 문자열 형태의 JSON을 직접 조작
- JSON 파싱
- JSON 객체로 생성
get_json_object, json_tuple
- get_json_object: JSON 객체(딕셔너리나 배열)를 인라인 쿼리로 조회할 수 있음
- json_tuple: 중첩이 없는 단일 수준의 JSON 객체 조회가능
# JSON 컬럼 생성
jsonDF = spark.range(1).selectExpr("""
'{"myJSONKey" : {"myJSONValue": [1, 2, 3]}}' as jsonString
""")
jsonDF.show()
jsonDF.show(1, False)
#출력결과
+------------------------------------------+
|jsonString |
+------------------------------------------+
|{"myJSONKey" : {"myJSONValue": [1, 2, 3]}}|
+------------------------------------------+
from pyspark.sql.functions import get_json_object, json_tuple
jsonDF.select(
get_json_object(col("jsonString"), "$.myJSONKey.myJSONValue[1]").alias("column"), # get_json_object 사용하면 인라인 쿼리로 조회 가능
json_tuple(col("jsonString"), "myJSONKey") # json_tuple
).show(2)
to_json
StructType을 JSON 문자열로 변경
JSON 데이터소스와 동일한 형태의 딕셔너리(맵)를 파라미터로 사용
from pyspark.sql.functions import to_json
df.selectExpr("(InvoiceNo, Description) as myStruct")\
.select(to_json(col("myStruct"))).show(2, False)
#출력결과
+-------------------------------------------------------------------------+
|structstojson(myStruct) |
+-------------------------------------------------------------------------+
|{"InvoiceNo":"536365","Description":"WHITE HANGING HEART T-LIGHT HOLDER"}|
|{"InvoiceNo":"536365","Description":"WHITE METAL LANTERN"} |
+-------------------------------------------------------------------------+
only showing top 2 rows
from_json
JSON문자열을 객체로 변환
단, 반드시 스키마를 지정해야 함.
from pyspark.sql.functions import from_json
from pyspark.sql.types import *
parseSchema = StructType((
StructField("InvoiceNo", StringType(), True),
StructField("Description", StringType(), True)
))
df.selectExpr("(InvoiceNo, Description) as myStruct")\
.select(to_json(col("myStruct")).alias("newJSON"))\
.select(from_json(col("newJSON"), parseSchema), col("newJSON")).show(2, False) # parseSchema 스키마 지정 필수!
사용자 정의 함수(UDF)
사용자 정의 함수(UDF; User Defined Function)는 하나 이상의 컬럼을 입력으로 받고, 반환할 수 있음.
여러가지 프로그래밍 언어로 개발 가능(스칼라, 파이썬, 자바), 그러나 언어별로 성능에 영향이 다름
레코드별로 데이터를 처리하는 함수
기본적으로 특정 SparkSession이나 Context에서 사용할 수 있도록 임시 함수 형태로 등록
함수 정의하기
def power3(double_value):
return double_value ** 3
power3(2.0) # 테스트하기
스파크에 등록하기
스파크는 드라이버에서 함수를 직렬화하고 네트워크를 통해 모든 익스큐터 프로세스로 전달(언어와 상관없이 발생)
단, 언어에 따라 근본적으로 동작하는 방식은 다른데
- 스칼라, 자바:
- JVM 환경에서만 사용
- 따라서 스파크 내장 함수가 제공하는 코드 생성 기능의 장점을 활용할 수 없어 성능저하발생
- 많은 객체를 생성하거나 사용해도 성능 문제 발생
- 파이썬:
- 스파크는 워커 노드에 파이썬 프로세스를 실행하고 파이썬이 이해할 수 있는 포맷으로 모든 데이터를 직렬화함(앞서 JVM 사용 언어에도 존재했던 부분)
- 파이썬 프로세스에 있는 데이터의 로우마다 함수를 실행하고 마지막으로 JVM과 스파크에 처리 결과를 반환
- 스파크에서 워커 메모리를 관리할 수 없으므로 JVM과 파이썬이 동일한 머신에서 메모리 경합을 하면 자원에 제약이 생겨 우커가 비정상 종료가 될 수 있음
- 자바나 스칼라로 사용자 정의 함수로 개발하는 것을 권장
- 파이썬에서도 사용 가능
사용자 정의 함수 등록이 완료 되면 DataFrame에서 사용 가능(문자열 표현식에서는 아직 사용 불가)
# 등록
from pyspark.sql.functions import udf
power3udf = udf(power3)
# 사용
udfExampleDF = spark.range(5).toDF("num")
udfExampleDF.select(col("num"), power3udf(col("num"))).show(3)
사용자 정의 함수를 스파크 SQL 함수로 등록하면 모든 프로그래밍 언어와 SQL에서 사용자 정의 함수를 사용 할 수 있음.
스칼라로 등록한 함수를 파이썬에서도 사용 할 수 있게됨.
함수가 올바르게 동작할 수 있도록 반환되는 데이터 타입을 지정하는 것이 좋음.
# 스칼라로 사용자정의 함수(UDF)를 SQL 함수로 등록
import org.apache.spark.sql.functions.udf
def power3(number:Double):Double = number * number * number
spark.udf.register("power3", power3(_:Double):Double)
# 파이썬으로 사용자정의 함수(UDF)를 SQL함수로 등록
def power3Python(double_value):
return double_value ** 3
spark.udf.register("power3Python", power3Python, "Double")
'프로그래밍 > Spark' 카테고리의 다른 글
[pyspark] 스파크 완벽 가이드 - 파케이(parquet)파일 (0) | 2022.03.16 |
---|---|
[pyspark] 스파크 완벽 가이드 - 구조적 API 기본 연산 (0) | 2022.03.16 |
[pyspark] 스파크 완벽 가이드 - 구조적 API 기본개념과 용어 (0) | 2022.03.16 |