Spark SQL - UPDATE 대체

spark sql에서는 update를 지원하지 않는다. (작성일 기준) RDD의 특성에 대해 생각해 보면 그 이유는 어렵지 않게 떠올릴 수 있지만, UPDATE를 주로 사용하는 오라클 쿼리를 spark버전으로 포팅하려는 입장(필자)에서는 굉장히 아쉬운 부분이다. 그래서 공부도 할 겸 포스팅.

그리고 SQL 성능을 극대화하기 위해서도 UPDATE문은 지양하는 것을 추천한다. (DBA의 길:star:)

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+
-- 위 테이블을 업데이트하는 구문.
UPDATE DF
SET age = 25
WHERE age is null


CASE를 활용

spark.read.json("examples/src/main/resources/people.json").createOrReplaceTempView("df")

spark.sql("""
SELECT
  CASE WHEN age is null THEN 25
       ELSE age
  END AS age
  ,name
FROM
  df
""").show()
+---+-------+
|age|   name|
+---+-------+
| 25|Michael|
| 30|   Andy|
| 19| Justin|
+---+-------+


JOIN

spark.sql("select * from df").createOrReplaceTempView("base")

spark.sql("""
  select 25 as age, name
  from df where age is null
  """).createOrReplaceTempView("TO_BE")

spark.sql("""
    select base.*
    from base join TO_BE
    on base.name = TO_BE.name
    """).createOrReplaceTempView("AS_IS")

spark.sql("""
  select * from base
  except select * from AS_IS
  union select * from TO_BE
  """).show()

// OR
val base = spark.sql("select * from df")
val toBe = spark.sql("select * from to_be")
val asIs = base.join(toBe, $"df.name"===$"to_be.name", "leftsemi")
val after = base.union(toBe).except(asIs)

/*
val toBe = spark.sql("select 25 as age, name from df where age is null")
val asIs = base.join(toBe, Seq("name"), "leftsemi")
이렇게 하면 컬럼 순서가 바뀜.. 일이 늘어난다.
*/

after.write.mode(SaveMode.Overwrite).saveAsTable("updatedTable")
spark.read.parquet("./spark-warehouse/updatedtable/").show()
+---+-------+
|age|   name|
+---+-------+
| 25|Michael|
| 19| Justin|
| 30|   Andy|
+---+-------+

case가 간편한 것 같다. 성능도 뛰어날 것 같다.


댓글남기기