[Airflow] Jinja Template 써보기

Airflow는 flask에서 자주 사용되는 강력한 도구인 Jinja Template을 지원합니다.

templated_command = """

{% for i in range(5) %}
    echo "{{ ds }}"
    echo "{{ macros.ds_add(ds, 7)}}"
    echo "{{ params.my_param }}"
{% endfor %}

"""

airflow tutorial을 설명하며 위와 같은 코드를 봤습니다.

굉장히 익숙치 않은 {% 기호입니다. 저 기호를 이용해서 jinja template 안의 if, for문 등을 제어할 수 있습니다. 템플릿은 dag파일에 있어도 되고, task가 실행할 파일(sh, scala, py, …) 안에 있어도 동작합니다.

다음 코드는 BashOperator를 이용해서 템플릿이 적용된 shell script를 실행하는 예제입니다.

# jinja.py
from datetime import timedelta
import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator

default_args = {
    'owner': 'mskim',
    'start_date': airflow.utils.dates.days_ago(2),
}

dag = DAG(
    'test_jinja',
    default_args=default_args,
    schedule_interval='5 0 * * *',
    template_searchpath=['/home/mk/airflow/scripts'],
    # 위 경로에서 실행할 script를 찾습니다.
)

test1 = "It would be passed, but jinja does not work {{ ds }}"

t = BashOperator(
      task_id='test',
      bash_command='jinja.sh',
      params={'test':test1},
      dag=dag,
      )

test라는 Task에서 jinja.sh 파일을 실행시키고, 미리 설정해둔 test1 파라미터를 전달합니다.

다음은 위에서 전달한 파일 내용입니다.

# jinja.sh
#!/bin/bash

# dag 정보와 task 정보를 Jinja template으로 불러올 수 있습니다.
echo "{{ dag }}, {{ task }}"

# dag file에서 전달해 준 test 파라미터는 다음과 같이 쓸 수 있습니다.
echo "{{ params.test }}"

# execution_date를 표현합니다.
echo "execution_date : {{ execution_date }}"
echo "ds: {{ds}}"

# macros를 이용하여 format과 날짜 조작이 가능합니다.
echo "the day after tommorow with no dash format"
echo "{{ macros.ds_format(macros.ds_add(ds, days=2),'%Y-%m-%d', '%Y%m%d') }}"

# for, if 예제입니다.
{% for i in range(3) %}
	echo "{{ i }} {{ ds }}"
  {% if i%2 == 0  %}
		echo "{{ ds_nodash }}"
  {% endif %}
{% endfor %}

그럼 test 명령을 이용해 확인해 보겠습니다. airflow test test_jinja test 0

#result

[2019-03-16 14:30:12,525] {bash_operator.py:119} INFO - Output:
[2019-03-16 14:30:12,527] {bash_operator.py:123} INFO - <DAG: test_jinja>, <Task(BashOperator): test>
[2019-03-16 14:30:12,527] {bash_operator.py:123} INFO - It would be passed, but jinja does not work {{ ds }}
[2019-03-16 14:30:12,527] {bash_operator.py:123} INFO - execution_date : 2019-03-16T00:00:00+00:00
[2019-03-16 14:30:12,527] {bash_operator.py:123} INFO - ds: 2019-03-16
[2019-03-16 14:30:12,527] {bash_operator.py:123} INFO - the day after tommorow with no dash format
[2019-03-16 14:30:12,527] {bash_operator.py:123} INFO - 20190318
[2019-03-16 14:30:12,527] {bash_operator.py:123} INFO - 0 2019-03-16
[2019-03-16 14:30:12,527] {bash_operator.py:123} INFO - 20190316
[2019-03-16 14:30:12,527] {bash_operator.py:123} INFO - 1 2019-03-16
[2019-03-16 14:30:12,527] {bash_operator.py:123} INFO - 2 2019-03-16
[2019-03-16 14:30:12,528] {bash_operator.py:123} INFO - 20190316
[2019-03-16 14:30:12,528] {bash_operator.py:127} INFO - Command exited with return code 0

Jinja Template을 이용하면 반복적인 작업(대부분의 에어플로우 잡이겠죠)의 날짜 정보 전달, 조작이 간편해집니다. 특히 과거 데이터를 한꺼번에 작업해야할 경우 execution_date를 기준으로 작업을 하면 훨씬 관리가 편해집니다. 제 경우에는 쿼리에 작업연월, 전월, 차월 등을 입력해야 하는 부분이 몇 군데 있었는데, 많은 도움이 되었습니다

BashOperator와 쉘 스크립트로 예제를 만들었지만, Python, Oracle, SSH등 다른 Operator들도 동일한 방법이 가능합니다.

댓글남기기