본문 바로가기

D.S/DE

220125화 - 에어플로우 설치와 기본 개념 찾아보기 #1

728x90

 

드디어 써보는구나.

 

 


 

설치

  • Docker 사용
  • Airflow용 DB: postgre (*default: sqlite → 이 경우 executor를 SequentialExecutor밖에 못 쓴다고.)
  • yml 설정에서 webserver 포트 9002:8080으로 바꿈.
  • 누군가 만들어놓은 docker-compose (v1.10.9 )를 사용했다가 2 설치 다시 함.

 

위의 스크립트를 띄우면

처음에 airflow-init이 실행하면서 메모리와 CPU를 체크한 후 문제가 없으면

airflow-webserver , flower, scheduler, triggerer 등등이 설치된다. (yml 파일 확인)

저것들의 기능은 이름이 말해줄거고.. 저렇게 기능별로 에어플로우를 여러 개 띄우고 사용하는구나.

 

 

triggerer는 계속 재실행이 되서 일단 끄고 init은 상태 체크한 후 종료되는 듯.

airflow-init에서 airflow image를 체크

(base) airflow$ sudo docker ps -a
CONTAINER ID   IMAGE                                                  COMMAND                  CREATED         STATUS                        PORTS                                                 NAMES
5c0fcc34308e   apache/airflow:2.2.3                                   "/usr/bin/dumb-init …"   8 minutes ago   Exited (2) 57 seconds ago                                                           airflow-airflow-triggerer-1
26093156cb94   apache/airflow:2.2.3                                   "/usr/bin/dumb-init …"   8 minutes ago   Up 8 minutes (healthy)        8080/tcp                                              airflow-airflow-scheduler-1
215b1ee6a659   apache/airflow:2.2.3                                   "/usr/bin/dumb-init …"   8 minutes ago   Up 8 minutes (healthy)        8080/tcp                                              airflow-airflow-worker-1
f39c84d0a521   apache/airflow:2.2.3                                   "/usr/bin/dumb-init …"   8 minutes ago   Up 8 minutes (healthy)        0.0.0.0:9002->8080/tcp, :::9002->8080/tcp             airflow-airflow-webserver-1
7ed8fb608ad2   apache/airflow:2.2.3                                   "/usr/bin/dumb-init …"   8 minutes ago   Up 8 minutes (healthy)        0.0.0.0:5555->5555/tcp, :::5555->5555/tcp, 8080/tcp   airflow-flower-1
08ee61aad576   apache/airflow:2.2.3                                   "/bin/bash -c 'funct…"   8 minutes ago   Exited (0) 8 minutes ago                                                            airflow-airflow-init-1

 

UI 가 버튼이 많아서 좀 들어야 봐야할 듯. 

스케줄러 설정이 예약한 시간에 못 가져온 데이터도 가져올 수 있는 듯하고, 내용을 좀 정확히 알아야 이해가 될 듯.

 

 

webserver 컨테이너에 접속해 사용자 계정 (Admin) 생성

 airflow users create \  
 -u admin \  
 -f FIRST_NAME \  
 -l LAST_NAME \  
 -r Admin \  
 -p pasword \  
 -e ym@gmail.com

 

728x90

 

그래프 순서를 어떻게 정하는지 코드 확인.

순차실행은 >>

로그는 언제나 소중하다. 코드보면 다 echo같은 task인데 아웃풋 보려면 로그 확인.

저 분홍색 스킵작업은 왜 일어났는지도 로그에 나와있다. 코드보면 따로 skip 명시가 없어서.

 

로그를 보면

마지막task run_this_last는 앞의 this_will_skip 의존성때문에 같이 skip된 듯.

트리거룰도 있고. 트리거룰은 앞에 것이 다 성공해야 현재 것이 실행되게 규칙을 정했는데 skip task때문에 skip된 거라 추측해봄.

 



"""Example DAG demonstrating the usage of the BashOperator."""

from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator

with DAG(
    dag_id='example_bash_operator',
    schedule_interval='0 0 * * *',
    start_date=datetime(2021, 1, 1),
    catchup=False,
    dagrun_timeout=timedelta(minutes=60),
    tags=['example', 'example2'],
    params={"example_key": "example_value"},
) as dag:
    run_this_last = DummyOperator(
        task_id='run_this_last',
    )

    # [START howto_operator_bash]
    run_this = BashOperator(
        task_id='run_after_loop',
        bash_command='echo 1',
    )
    # [END howto_operator_bash]

    run_this >> run_this_last

    for i in range(3):
        task = BashOperator(
            task_id='runme_' + str(i),
            bash_command='echo "{{ task_instance_key_str }}" && sleep 1',
        )
        task >> run_this

    # [START howto_operator_bash_template]
    also_run_this = BashOperator(
        task_id='also_run_this',
        bash_command='echo "run_id={{ run_id }} | dag_run={{ dag_run }}"',
    )
    # [END howto_operator_bash_template]
    also_run_this >> run_this_last

# [START howto_operator_bash_skip]
# 따로 skip 명령이 없어서 로그를 보면
# [2022-01-25, 14:39:33 UTC] {taskinstance.py:1356} INFO - 
# Bash command returned exit code 99. Skipping.
this_will_skip = BashOperator(
    task_id='this_will_skip',
    bash_command='echo "hello world"; exit 99;',
    dag=dag,
)
# [END howto_operator_bash_skip]
this_will_skip >> run_this_last

if __name__ == "__main__":
    dag.cli()

 

 


기본 개념?

DAG(Directed Acyclic Graph)

  • 유향비순환 그래프
  • 하나의 DAG 안에는 한 개 이상의 Task가 있으며, Task는 실제 실행시키는 작업이다.

 

객체 종류

웹서핑으로 대충 알아낸 건데 이건 공식 가이드 문서 를 보는 게 나을 듯:

 

  • DAG 객체 생성 : 실행시간, 성공/실패시 작업 등등 큰 설정
  • Task 객체: 실제 실행할 작업. (ex. sample.py 파일 실행)
    • Operator: bash나 python 등 미리 정의된 작업 템플릿
      • ex1. bashOperator
      • ex2. SlackAPIPostOperator
    • Sensor: 이벤트로 인해 트리거되는 작업.
    • TaskFlow: @task 데코레이터를 사용하여 DAG에 알아서 연결해주는 기능. 이건 순차적으로 작성해야하는건가?
  • Executor: 작업 실행을 시켜주는 실행기.
    • SequentialExecutor
    • CeleryExecutor
    • LocalExecutor

 

이런 것들에 대한 인지와 실습이 필요할 듯.

이미지 출처

 

참조

 

반응형