Spark Thrift Server와 IDL, RPC 정리

3 minute read

Spark는 Thrift server를 지원합니다. 그런데 Thrift가 뭔가요?

Thrift

Thrift는 IDL로서 수많은 언어를 위한 서비스의 정의 및 생성에 사용됩니다. RPC 프레임워크를 생성해서 스케일링이 가능한 언어간 서비스 개발을 위해 만들어졌습니다.

  • 무슨 말인지 잘 모르겠어요. IDL이 뭔가요?, RPC는 또 뭔가요?

IDL

IDL은 인터페이스 정의 언어(Interface Definition Language)의 약자로 말 그대로 소프트웨어 컴포넌트의 인터페이스를 정의하는 언어입니다. 언어에 국한되지 않아서 서로 다른 언어와 환경을 사용하는 컴포넌트 사이의 통신을 가능하게 합니다. 윈도우에서 동작하는 C++ 로 작성된 서버와 리눅스에서 동작하는 자바로 작성된 서버도 IDL을 이용해서 인터페이스를 이해할 수 있습니다. IDL은 RPC를 이용한 소프트웨어에서 사용됩니다.

쓰리프트에서는 아래처럼 씁니다. 쓰리프트 자체는 C++로 작성되었습니다.

// example.thrift
enum PhoneType {
  HOME,
  WORK,
  MOBILE,
  OTHER
}

struct Phone {
  1: i32 id,
  2: string number,
  3: PhoneType type
}

service PhoneSvc {
  Phone findById(1: i32 id),
  list<Phone> findAll()
}
# gen 예시
thrift --gen <language> <Thrift filename>

# thrift --gen python example.thrift
# thrift --gen go example.thrift

일반적으로 위처럼 IDL을 이용해서 각 언어에 호환되는 파일을 생성하고 각 언어에서 가져다 씁니다. IDL이라는 정의 언어를 통해 서로 다른 인터페이스(c, go, python, server, client, framework, 등등)에서 어떤 데이터 명세로 통신할 것인지 약속하는 겁니다.

RPC

IDL을 “어떻게” 쓰는지를 RPC에서 보여줍니다.

원격 프로시저 호출(remote procedure call, 리모트 프로시저 콜, RPC)은 별도의 원격 제어를 위한 코딩 없이 다른 주소 공간에서 함수나 프로시저를 실행할 수 있게 하는 프로세스 간 통신 기술입니다. 다시 말해, 요청자(클라이언트)가 일반 메서드를 호출하는 것처럼 원격지(서버)의 프로시저를 호출할 수 있습니다.

RPC의 대표적인 구현체는 본문의 Thrift (by facebook)가 있고, 이외에도 IDL로 protocol buffers(protobuf)를 사용하는 gRPC (by google)가 있습니다. 예시로 gRPC Python Quickstart가 짧아서 가져와봤습니다.

Thrift python example은 여기서

gRPC python example

순서는 다음과 같습니다.

  1. IDL (grpc 예제이니 여기서는 proto)
  2. IDL로 gRPC 코드 생성 (gen)
  3. 서버 코드
  4. 클라이언트 코드

1. define gRPC service in proto files

// https://github.com/grpc/grpc/blob/master/examples/protos/helloworld.proto
syntax = "proto3";

option java_multiple_files = true;
option java_package = "io.grpc.examples.helloworld";
option java_outer_classname = "HelloWorldProto";
option objc_class_prefix = "HLW";

package helloworld;

// The greeting service definition.
service Greeter {
  // Sends a greeting
  rpc SayHello (HelloRequest) returns (HelloReply) {}
}

// The request message containing the user's name.
message HelloRequest {
  string name = 1;
}

// The response message containing the greetings
message HelloReply {
  string message = 1;
}

IDL로 Greeter라는 서비스를 정의했습니다. GreeterHelloRequest라는 메세지 객체를 받아서 HelloReply 메세지 객체를 반환합니다.

  • HelloRequestname을 가지고 있어야합니다.
  • HelloReplymessage를 가지고 있어야합니다.

2. Generate gRPC code

python -m grpc_tools.protoc -I../../protos --python_out=. --grpc_python_out=. ../../protos/helloworld.proto

위 IDL 파트에서 thrift --gen 했던 것처럼 proto(IDL)를 gen하면 아래와 같은 파일이 생성됩니다.

# https://github.com/grpc/grpc/blob/master/examples/python/helloworld/helloworld_pb2_grpc.py
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
import grpc

import helloworld_pb2 as helloworld__pb2


class GreeterStub(object):
  """The greeting service definition.
  """

  def __init__(self, channel):
    """Constructor.
    Args:
      channel: A grpc.Channel.
    """
    self.SayHello = channel.unary_unary(
        '/helloworld.Greeter/SayHello',
        request_serializer=helloworld__pb2.HelloRequest.SerializeToString,
        response_deserializer=helloworld__pb2.HelloReply.FromString,
        )

class GreeterServicer(object):
  """The greeting service definition.
  """

  def SayHello(self, request, context):
    """Sends a greeting
    """
    context.set_code(grpc.StatusCode.UNIMPLEMENTED)
    context.set_details('Method not implemented!')
    raise NotImplementedError('Method not implemented!')
...

특별한 일이 없는 이상 gen이 되기만 하면 됩니다.

3. Server side

그러면 서버에서는 생성된 클래스에 맞추어서 필요한 로직(메서드)을 구현하면 됩니다. IDL에서 이미 명세를 정의했기 때문에 편안하게 request에 있는 name을 받아서 HellowReply 객체에 message 값을 담아 전달하면 됩니다.

class Greeter(helloworld_pb2_grpc.GreeterServicer):

  def SayHello(self, request, context):
    return helloworld_pb2.HelloReply(message='Hello, %s!' % request.name)
...

4. Client side

그리고 클라이언트도 역시 내부 구조는 알 필요가 없고, IDL에 정의된 대로 name을 담아 요청하고 response에서 message를 꺼내 쓰면 됩니다. 다만 메서드를 호출하기 전에 grpc를 통해 서버와 연결해야 합니다.

def run():
  channel = grpc.insecure_channel('localhost:50051')
  stub = helloworld_pb2_grpc.GreeterStub(channel)
  response = stub.SayHello(helloworld_pb2.HelloRequest(name='you'))
  print("Greeter client received: " + response.message)

Spark Thrift Server

위의 IDL, RPC, 등등을 이용해서 Spark Thrift 서버는 Spark 클러스터를 JDBC/ODBC 통신이 가능한 분산처리 SQL 엔진으로 동작하게 해줍니다. 그래서 다양한 서버(go, python, spring 등등), 혹은 Tablue나 superset 같은 대쉬보드에서 spark cluster에 연결할 수 있습니다. 물론 Spark SQL은 OLTP가 아닌 OLAP로 사용하는 것이 마땅합니다.

쓰리프트 서버를 실행하려면 스파크 디렉토리에서 아래처럼 하면 됩니다. spark thrift server는 HiveServer2에 맞추어 구현되어 있습니다.

./sbin/start-thriftserver.sh \
  --hiveconf hive.server2.thrift.port=<listening-port> \
  --hiveconf hive.server2.thrift.bind.host=<listening-host> \
  --master <master-uri>
  ...

PyHive connection example

PyHive로 spark thrift server와 연결하려면?

from pyhive import hive  # or import presto or import trino
cursor = hive.connect('localhost').cursor()
cursor.execute('SELECT * FROM my_awesome_data LIMIT 10')
print cursor.fetchone()
print cursor.fetchall()

이렇게 연결을 해주고

cursor.execute('SELECT * FROM my_awesome_data LIMIT 10', async_=True)

쿼리를 던지면 됩니다. DB-API 대신 SQLAlchemy도 지원합니다.

끝.


Leave a comment