Airflow에 기반한 Slack 지표 알림 봇 구축기

김정태

업데이트:

배경

띵스플로우에는 여러 앱을 운영하고 있습니다. 각 서비스에서 쌓인 데이터는 빅쿼리로 모이고 있고, 빅쿼리에 쿼리를 날려 내가 원하는 데이터를 추출할 수 있습니다. 하지만 SQL에 익숙하지 않은 비 개발자 직군의 경우에는 보고싶거나, 필요한 데이터가 생기는 경우에는 개발자에게 요청해야 할 수 밖에 없는 상황에 놓이곤 했습니다. 개발자가 당장 쿼리문을 작성할 수 없는 상황인 경우, 어쩔 수 없이 오랜 시간을 기다려야 합니다. 이러한 현상을 개선하기 위해 Airflow를 사용해 매일 특정 시간에 필요한 데이터를 보여주는 Slack 봇 서비스를 먼저 만들어 보았습니다. Slack 을 이용하는 경우, 전 팀원이 데이터를 굳이 찾아보지 않아도 날마다 확인할 수 있어, 데이터 기반 커뮤니케이션을 촉진할 수 있다는 장점이 있습니다.

image

기본 개념

1. ETL

image

ETL 이란 (Extract, Transform, Load)의 약자 입니다. 데이터를 소스에서 추출(Extract)하고, 변환(Transform)하고, 적재하는(Load) 과정을 의미합니다.

데이터의 소스가 한 군데가 아니기 때문에 여러 곳에 퍼져있는 데이터를 추출해, 용도에 맞게 변환한 뒤, 정제된 데이터를 적재하는 것 입니다. 데이터를 추출하고, 적재한 뒤, 용도에 맞게 변환하는 ELT도 있지만, 두 개념은 비슷하게 혼용됩니다.

2. Airflow

image

2014년 에어비앤비에서 개발한 에어플로우는 최근 ETL 작업을 위한 도구로 가장 많이 사용되는 기술입니다. 파이썬을 이용해, 오랜 시간을 들이지 않고 때문에 코드 작성이 가능하고, 오픈소스 이기 때문에 무료로 사용이 가능합니다. 또 최근 가장 많이 사용되는 기술이기 때문에 문제가 생긴 경우 관련 자료를 찾기가 매우 쉽습니다.

비슷한 기술로는 Luigi, Argo Workflow, Nifi 등이 있습니다.

2. GCP Composer

image

GCP Composer는 구글에서 제공하는 Airflow 관리 서비스 입니다. AWS 에서는 MWAA(Amazon Managed Workflow for Apache Airflow) 라는 이름으로 비슷한 서비스를 제공합니다. MWAA Link

gce나 aws ec2에 도커, 쿠버네티스를 이용해 Airflow 환경을 구성하고 사용할 수 있지만, 직접 구축하게 되는 경우 Airflow 환경을 관리하는데에도 시간이 소요되기 때문에, 최대한 빠르게 작업을 처리하고 관리에 드는 시간을 최소화 하기 위해 GCP Composer를 사용하였습니다.

간단하게 살펴보는 Airflow 사용법

1. DAG 작성법

  • 간단한 Airflow 코드예시
from datetime import datetime

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python import PythonOperator


default_dags_args = {
    "owner": "jeongtae",
    "start_date": datetime(2021, 10, 20),
    "email": ["jeongtae.kim@thingsflow.com"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 0,
}

def print_hello_thingsflow():
    print('Hello Thingsflow!')

with DAG(
    dag_id="set_your_dag_id",
    schedule_interval="0 * * * *",
    catchup=False,
    default_args=default_dags_args,
    tags=["thingsflow", "airflow", "test"],
) as dag:
    
    print_hello_thingsflow_python = PythonOperator(
        task_id="print_hello_thingsflow_python",
        python_callable=print_hello_thingsflow,
    )
    
    print_hello_thingsflow_bash = BashOperator(
        task_id="print_hello_thingsflow_bash",
        bash_command='echo "hello thingsflow"'
    )
    
    print_hello_thingsflow_python >> print_hello_thingsflow_bash


dag 을 작성할 때 기본적으로 필요한 기능을 dictionary 형태로 저장합니다.

default_dags_args = {
    "owner": "jeongtae",
    "start_date": datetime(2021, 10, 20),
    "email": ["jeongtae.kim@thingsflow.com"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 0,
}

위 default_dags_args에 있는 값들은 아래은 의미를 가집니다.

  1. owner : dag 작성자 이름을 기재한다.
  2. start_date : 작업이 시작할 날짜를 기재한다.
  3. email : 작업이 실패하거나, 재시도를 하는 경우 메일을 받을 주소를 입력한다.
  4. email_on_failure : 작업이 실패한 경우 이메일을 받을지 설정한다.
  5. email_on_retry : 작업이 실패해 재시도를 시행하는 경우 이메일을 받을지 설정한다.
  6. retries : 작업이 실패한 경우 몇 번의 재시도를 할지 설정한다.


본격적으로 dag을 작성하기 위해서는 아래와 같이 코드를 작성합니다. (Airflow 에는 여러 오퍼레이터가 있지만, 가장 범용성이 높은 Bash와 Python 오퍼레이터를 예시로 사용했습니다.)

with DAG(
    dag_id="set_your_dag_id",
    schedule_interval="0 * * * *",
    catchup=False,
    default_args=default_dags_args,
    tags=["thingsflow", "airflow", "test"],
) as dag:
  • DAG에 추가로 필요한 값들을 설정해주어야 합니다.
    1. dag_id : Airflow UI에 보이는 id를 기재한다.
    2. schedule_interval : cron expression 을 이용해 언제 DAG이 실행될지 설정한다.
    3. catchup : start_date부터 현재 시간까지 실행하지 못한 작업들을 실행할지 말지 설정한다.
    4. tags : DAG 관리를 위해 tag를 설정한다.


Python 함수를 사용하는 PythonOperator를 살펴보겠습니다.

print_hello_thingsflow_python = PythonOperator(
    task_id="print_hello_thingsflow_python",
    python_callable=print_hello_thingsflow,
)
  1. task_id : 작업 이름을 설정한다.
  2. python_callable : 작업이 실행될 때 사용할 파이썬 함수를 설정한다.

간단하게 위와 같이 설정할 수 있습니다.

print_hellow_thingsflow 는 단순히 Hello Thingsflow!를 출력하는 함수입니다.


Bash 를 사용하는 BashOperator를 살펴보겠습니다.

print_hello_thingsflow_bash = BashOperator(
    task_id="print_hello_thingsflow_bash",
    bash_command='echo "hello thingsflow"'
)

PythonOperator와 같이 task_id를 설정해주고, bash 쉘에서 작동시킬 명령어를 입력시켜주면 됩니다.

echo "hello thingsflow"는 쉘에서 hello thingsflow를 출력하는 명령어 입니다.


마지막으로 Airflow의 가장 중요한 기능 중 하나인 task 연결을 살펴보겠습니다.

print_hello_thingsflow_python >> print_hello_thingsflow_bash

단순히 >> 기능을 활용해 어떤 순서로 작업을 실행시킬지 설정할 수 있습니다.

앞선 작업이 마무리 되지 않으면 이후의 작업이 실행되지 않도록 직렬로 설정할 수 있고, 여러 작업을 동시에 실행시킬 수 있도록 병렬로도 설정할 수 있습니다. (병렬은 t1 >> [t2, t3] >> t4 와 같이 설정 가능합니다.) 더 복잡한 형태의 작업도 설정이 가능합니다.

위처럼 만든 파이썬 파일을 airflow의 .dags/ 폴더에 넣어주면 Airflow UI에 해당 DAG이 생성되게 됩니다.

2. Airflow UI 구경하기

image

에어플로우 홈 화면은 위와같이 깔끔하게 생겼습니다. 이 안에서도 여러가지 설정, 확인이 가능합니다.

우선 홈 화면에서는 현재 어떤 DAG가 존재하는지, 누가 owner인지, 몇번 작동하였는지, 언제 작동하는지 등 많은 정보들을 볼 수 있습니다.

아까 만든 dag을 보면 설정한 id(set_your_dag_id), owner(jeongtae), schedule(0 * * * *)를 확인할 수 있습니다.

dag에 들어가 상세 정보를 보면, 여러가지 형태로 작업을 볼 수 있도록 설정되어 있습니다.

image

이와같이 tree 형태로 볼 수 있고,

image

개인적으로 가장 보가 편하다고 생각되는 graph 형태로도 DAG을 볼 수 있습니다. 여러 색을 통해서 각 task가 어떤 상태인지 실시간으로 확인이 가능하다는 장점이 있습니다. 추가로, 디테일한 정보들은 details 나, code를 통해 확인할 수 있습니다.

코드에는 적으면 보안에 문제가 생길 수 있는 정보들을 Airflow 에 저장해서 사용할 수 있는데, Airflow UI 위쪽의 Admin/Variables 에서 변수들을 설정할 수 있고, 연결을 위한 정보들은 Admin/Connection 에서 설정이 가능합니다.

image

image

3. DAG 실행시켜보고 결과 확인해보기

아까 설정한 DAG을 실행시켜봅시다. 설정한 schedule 시간이 되면 알아서 실행되지만 직접 Trigger DAG 버튼을 눌러줘 실행시킬 수 도 있습니다.

DAG을 실행시키고난 뒤, task를 누르고 log 버튼을 눌러 로그를 확인할 수 있습니다.

  • print_hello_thingsflow_python 의 로그
    [2021-11-12 06:56:47,689] {logging_mixin.py:104} INFO - Hello Thingsflow!
    
  • print_hello_thingsflow_bash 의 로그
    [2021-11-12 06:56:52,737] {subprocess.py:63} INFO - Running command: ['bash', '-c', 'echo "hello thingsflow"']
    [2021-11-12 06:56:52,994] {subprocess.py:74} INFO - Output:
    [2021-11-12 06:56:52,999] {subprocess.py:78} INFO - hello thingsflow
    

두 task 모두 성공적으로 작업한 모습을 볼 수 있습니다.

4. Thingsflow 에서의 사용

위에서 작성한 예제 코드와 비슷하게 Dag을 구축하고 운영하였습니다.

Storyplay 에서 사용하는 DAG의 graph view는 아래와 같이 되어있습니다.

image

위처럼 task 작업 순서를 설정하기 위해 파이썬 파일에서는 아래와 같이 설정하였습니다.

check_data_exist >> [extract_storyplay_dau, extract_storyplay_new_user, extract_storyplay_inapp_ticket, extract_storyplay_inapp_purchase,] >> insert_data >> slack_alert

check_data_exist >> extract_au_and_insert_data >> slack_alert

위의 DAG이 매일 특정 시간이 되면 빅쿼리의 데이터를 이용해 필요한 데이터를 추출하고 그 데이터를 따로 저장하며, slack으로 보내줍니다.

image

  • 위 이미지의 내용은 이해를 돕기위한 이미지로, 실제 데이터가 아닙니다!

개발 환경 구축

Github Action

깃허브 액션은 개발 Workflow를 자동화할 수 있는 도구입니다. Jenkins와 비슷한 CI/CD 도구로 많이 활용합니다. (더 많은 정보는 여기 의 학습 자동화/Github Action이란? 을 참고 해 주세요.)

Composer 의 경우 dag 파일 등 airflow에서 구동하는데 필요한 파일들을 gcs bucket 에서 관리하는데, github의 repo와 gcs bucket의 자동 동기화를 하기 위해서 Github Action을 사용하였습니다. gcs-bucket-sync-action 이라는 action이 있었지만, 이 액션을 그대로 사용하는 경우 gcs bucket의 모든 파일을 지우고 업데이트를 하기 때문에, 로그처럼 삭제되면 안되는 파일까지 삭제되는 문제가 발생했습니다.

해당 문제를 해결하기 위해 위 링크의 코드를 적당히 수정하여 사용하였습니다.

name: Deploy

on:
  push:
    branches:
      - master

jobs:
  deploy:
    name: Deploy
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v2

      - name: Sync
        uses: thingsflow/gcs-bucket-sync-action-for-composer@master
        with:
          secrets: $
          bucket: 'asia-northeast3-thingsflowm-c5b75734-bucket'
          exclude: '.*\.md$|.gitignore$|.git\/.*$|.github\/.*$'

위와 같이 설정하여 master에 push 되면 gcs bucket 전체가 아닌 gcs bucket dags 폴더와 깃허브 폴더를 동기화하도록 설정하였습니다.

image

마치며

이전에는 인공지능쪽에만 관련된 데이터들을 만지며 작업했었는데, 실제 서비스에 사용되는 데이터들을 만지며 작업을 해보니 색다른 경험이었습니다.

이후에는 비트윈, 헬로우봇의 데이터까지 함께 다루는 작업을 하게 될 텐데, 더욱 많은 사내 직원분들과, 앱 사용자 분들을 편리하게 만드는 기능들을 개발하는 데이터 엔지니어가 되도록 더욱 노력하겠습니다.

긴 글이었지만 읽어주셔서 감사하고 재미있게 읽으셨으면 좋겠습니다. 감사합니다!

띵스플로우 팀은 자기의 일을 좋아하고 잘하는 사람들 입니다. 사용자와 서비스를 중심으로 빠르게 실행하고 학습하며, 다양한 직무의 사람들이 협업을 통해 시너지를 내고 있습니다. 다양한 콘텐츠 혁신을 이루고 있는 띵스플로우 팀에 함께할 분을 찾습니다! 언제든 people@thingsflow.com로 이메일을 주시기 바랍니다!

태그:

카테고리:

업데이트:

댓글남기기