Spark SQL - 기초

SparkSQL은 이름처럼 스파크 내에서 SQL문법을 사용 가능하게 하는 awesome한 도구다. 따라서 스파크를 코딩에 친숙하지 않은 구성원들이 마치 oracle을 사용하는 것 마냥 이용이 가능하다. (물론 다소간의 노력은 필요하다.)

여기에서는 SparkSQL 관련된 기본적인 사용법을 다루고자 한다. Spark Session 등에 대한 설정은 다루지 않는다. spark는 scala, java, python(pyspark) 등으로 이용이 가능하다. 아래 코드는 scala로 작성되었다.

Basic

read

Spark SQL은 여러 구조화된 데이터소스(Hive, JSON, parquet 등)을 기본적으로 지원한다.

// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._

val df = spark.read.json("examples/src/main/resources/people.json")
val df2 = spark.read.parquet("examples/src/main/resources/users.parquet")
val df3 = spark.read.
         option("delimiter", ","). //or sep
         option("encoding", "utf-8").
         option("inferSchema", "true").
         option("header", "false").
         csv("examples/src/main/resources/people.txt").
         toDF("name", "age")

다양한 옵션을 활용할 수 있다. 다음처럼 .이 앞에 있는 것이 개인적으로 보기가 더 좋지만,

val df3 = spark.read
         .option("delimiter", ",")
         .option("encoding", "utf-8")
         ...

이런 식으로 쓰면 spark-shell에서는 제대로 입력이 되지 않는다. 제플린에서는 사용이 가능하다.

function

sql에서 사용하던 show, filter, groupby 등의 명령어를 메소드처럼 사용이 가능하다

function
show() df.show()
filter() df.filter(df(“age”) > 19)
groupby() df.groupBy(df(“name”)).min()
// Print the schema in a tree format
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

df.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

df.filter($"age" > 21).show()
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+

:star: 위와 같이 사용할 수도 있고, 아래처럼 spark.sql에서 쿼리로 가져올 때 걸러서 가져올 수도 있다. 여기서 사용하는 createOrReplaceTempView는 sql의 view와 같은 역할을 한다.

// Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")

spark.sql("""
  SELECT *
  FROM people
  WHERE age > 21
""").createOrReplaceTempView("result")

spark.sql("SELECT * FROM result").show()
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+

위의 spark.sql로 가져온 쿼리는 실제 action(show() 라던지)이 일어날 때 수행되는 lazy evaluation(execution)이다. Tensorflow의 그것과 비슷한 맥락이다.

save

쿼리를 만들어서 작업을 한 뒤에는 저장을 통해 일을 마무리 해야한다.

any Language 의미
“error” or
“errorifexists”
저장 하려는 테이블명을 이미 쓰고 있다면 에러
(default)
“append” 이미 테이블이 존재할 경우, 기존 데이터에 append
“overwrite” 기존 데이터를 지우고 새로 쓴다
“ignore” 테이블이 존재할 경우, 저장하지 않음
CREATE TABLE IF NOT EXISTS 와 비슷하다고 함
spark.sql("SELECT * FROM result")
.write.mode("overwrite")
.parquet("PATH")

df.write.mode("append")
.format("parquet")
.saveAsTable("df") //  이러면 spark-warehouse 폴더의 해당 테이블명으로 저장이 된다.


  • atom에서 scala를 사용할 때는 setting에서 language-scala를 설치하자.

댓글남기기