드디어 써보는구나.
설치
- Docker 사용
- 공식사이트의 docker-compose.yml 이용. (postgre + redit)
- Airflow용 DB: postgre (*default: sqlite → 이 경우 executor를 SequentialExecutor밖에 못 쓴다고.)
- yml 설정에서 webserver 포트 9002:8080으로 바꿈.
- docker-compose
v2.2.3
으로 업그레이드.
- 누군가 만들어놓은 docker-compose (v1.10.9 )를 사용했다가 2 설치 다시 함.
위의 스크립트를 띄우면
처음에 airflow-init이 실행하면서 메모리와 CPU를 체크한 후 문제가 없으면
airflow-webserver , flower, scheduler, triggerer 등등이 설치된다. (yml 파일 확인)
저것들의 기능은 이름이 말해줄거고.. 저렇게 기능별로 에어플로우를 여러 개 띄우고 사용하는구나.
triggerer는 계속 재실행이 되서 일단 끄고 init은 상태 체크한 후 종료되는 듯.
airflow-init에서 airflow image를 체크
(base) airflow$ sudo docker ps -a
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
5c0fcc34308e apache/airflow:2.2.3 "/usr/bin/dumb-init …" 8 minutes ago Exited (2) 57 seconds ago airflow-airflow-triggerer-1
26093156cb94 apache/airflow:2.2.3 "/usr/bin/dumb-init …" 8 minutes ago Up 8 minutes (healthy) 8080/tcp airflow-airflow-scheduler-1
215b1ee6a659 apache/airflow:2.2.3 "/usr/bin/dumb-init …" 8 minutes ago Up 8 minutes (healthy) 8080/tcp airflow-airflow-worker-1
f39c84d0a521 apache/airflow:2.2.3 "/usr/bin/dumb-init …" 8 minutes ago Up 8 minutes (healthy) 0.0.0.0:9002->8080/tcp, :::9002->8080/tcp airflow-airflow-webserver-1
7ed8fb608ad2 apache/airflow:2.2.3 "/usr/bin/dumb-init …" 8 minutes ago Up 8 minutes (healthy) 0.0.0.0:5555->5555/tcp, :::5555->5555/tcp, 8080/tcp airflow-flower-1
08ee61aad576 apache/airflow:2.2.3 "/bin/bash -c 'funct…" 8 minutes ago Exited (0) 8 minutes ago airflow-airflow-init-1
UI 가 버튼이 많아서 좀 들어야 봐야할 듯.
스케줄러 설정이 예약한 시간에 못 가져온 데이터도 가져올 수 있는 듯하고, 내용을 좀 정확히 알아야 이해가 될 듯.
webserver 컨테이너에 접속해 사용자 계정 (Admin) 생성
airflow users create \
-u admin \
-f FIRST_NAME \
-l LAST_NAME \
-r Admin \
-p pasword \
-e ym@gmail.com
그래프 순서를 어떻게 정하는지 코드 확인.
순차실행은 >>
로
로그는 언제나 소중하다. 코드보면 다 echo같은 task인데 아웃풋 보려면 로그 확인.
저 분홍색 스킵작업은 왜 일어났는지도 로그에 나와있다. 코드보면 따로 skip 명시가 없어서.
로그를 보면
마지막task run_this_last
는 앞의 this_will_skip 의존성때문에 같이 skip된 듯.
트리거룰도 있고. 트리거룰은 앞에 것이 다 성공해야 현재 것이 실행되게 규칙을 정했는데 skip task때문에 skip된 거라 추측해봄.
"""Example DAG demonstrating the usage of the BashOperator."""
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
with DAG(
dag_id='example_bash_operator',
schedule_interval='0 0 * * *',
start_date=datetime(2021, 1, 1),
catchup=False,
dagrun_timeout=timedelta(minutes=60),
tags=['example', 'example2'],
params={"example_key": "example_value"},
) as dag:
run_this_last = DummyOperator(
task_id='run_this_last',
)
# [START howto_operator_bash]
run_this = BashOperator(
task_id='run_after_loop',
bash_command='echo 1',
)
# [END howto_operator_bash]
run_this >> run_this_last
for i in range(3):
task = BashOperator(
task_id='runme_' + str(i),
bash_command='echo "{{ task_instance_key_str }}" && sleep 1',
)
task >> run_this
# [START howto_operator_bash_template]
also_run_this = BashOperator(
task_id='also_run_this',
bash_command='echo "run_id={{ run_id }} | dag_run={{ dag_run }}"',
)
# [END howto_operator_bash_template]
also_run_this >> run_this_last
# [START howto_operator_bash_skip]
# 따로 skip 명령이 없어서 로그를 보면
# [2022-01-25, 14:39:33 UTC] {taskinstance.py:1356} INFO -
# Bash command returned exit code 99. Skipping.
this_will_skip = BashOperator(
task_id='this_will_skip',
bash_command='echo "hello world"; exit 99;',
dag=dag,
)
# [END howto_operator_bash_skip]
this_will_skip >> run_this_last
if __name__ == "__main__":
dag.cli()
기본 개념?
DAG(Directed Acyclic Graph)
- 유향비순환 그래프
- 하나의 DAG 안에는 한 개 이상의 Task가 있으며, Task는 실제 실행시키는 작업이다.
객체 종류
웹서핑으로 대충 알아낸 건데 이건 공식 가이드 문서 를 보는 게 나을 듯:
- DAG 객체 생성 : 실행시간, 성공/실패시 작업 등등 큰 설정
- Task 객체: 실제 실행할 작업. (ex. sample.py 파일 실행)
Operator:
bash나 python 등 미리 정의된 작업 템플릿- ex1. bashOperator
- ex2. SlackAPIPostOperator
Sensor:
이벤트로 인해 트리거되는 작업.TaskFlow:
@task 데코레이터를 사용하여 DAG에 알아서 연결해주는 기능. 이건 순차적으로 작성해야하는건가?
- Executor: 작업 실행을 시켜주는 실행기.
- SequentialExecutor
- CeleryExecutor
- LocalExecutor
이런 것들에 대한 인지와 실습이 필요할 듯.
참조