본문 바로가기

D.S/DE

220218금 - airflow dag file에서 내 module file 사용하기

728x90

 

 

dag py file을 작성하면서 나의 source 파일을 import해오는데 import가 안 된다고 에러가 나서 찾아보다 공식문서가 너무 친절하게 가이드를 해두어서 정리해본다.

 

 

→ 의역있고, 적당히 정리해서 표현하기도 합니다. airflow 사용하시는데 도움이 되었으면 좋겠습니다. :))

 


 

Module management

 

Airflow는 airflow의 configuration과 DAG에서 개인 파이썬 모듈(즉 나의 파이썬 코드)을 사용할 수 있도록 해두었습니다. 이 문서는 어떻게 커스텀 모듈을 생성하고 에어플로우가 그걸 제대로 로드할 수 있도록 가이드할 것입니다.

 

보통 에어플로우 배포에 보통 소스코드, 라이브러리 형태의 개인코드를 함께 사용해서 DAG를 생성하길 원할 것입니다.

 

이 때, 할 수 있는 방법은 세 가지가 있습니다:

  • 내 모듈을 Airflow가 자동으로 PATHONPATH에 추가하는 폴더 중의 하나에 넣기 (ex. dags 폴더)
  • 내 소스코드가 들어가 있는 폴더들을 PYTHONPATH에 추가하기
  • 내 코드를 파이썬 패키지로 패키징을 한 다음에 airflow와 함께 설치하기

 

다음 챕터에서는 어떻게 파이썬이 패키지와 모듈을 로드하는지 일반적인 설명을 하고, 위 세 가지 방법에 대해 더 자세하게 파볼 것입니다.

 

 


 

How package/modules loading in Python works

 

파이썬이 모듈을 로딩하는 디렉토리 리스트는 sys.path 값에 저장되어 있습니다. 파이썬은 이 변수에 저장된 path들을 오퍼레이팅 시스템에 따라서, 혹은 어디에 파이썬이 설치되어 있고 어떤 파이썬 버전을 사용하는지를 고려하여 매우 똑똑하게 선택합니다.

 

한번 본인의 파이썬 환경의 sys.path를 다음 코드를 사용해 체크할 수 있습니다.


>>> import sys
>>> from pprint import pprint
>>> pprint(sys.path)
['',
 '/home/arch/.pyenv/versions/3.7.4/lib/python37.zip',
 '/home/arch/.pyenv/versions/3.7.4/lib/python3.7',
 '/home/arch/.pyenv/versions/3.7.4/lib/python3.7/lib-dynload',
 '/home/arch/venvs/airflow/lib/python3.7/site-packages']

 

sys.path 는 프로그램 setup중에 초기화됩니다. 위의 배열의 첫번째값은 불러오려는 현재 스크립트(the current script that was used to invoke)를 포함하는 디렉토리이거나 interactive shell인 경우는 빈 문자열이 오게 됩니다.

두번째 값은 PYTHONPATH 에 저장되어 있던 값으로, site모듈에서 관리하는 installation-dependent default에 딸려있던 값입니다.

 

sys.path 는 파이썬 세션에서 append해서 값을 수정할 수 있습니다.

(ex. sys.path.append(”/path/to/custom/package”)) 파이썬은 새로운 경로가 추가되면 그 경로에서 패키지들을 찾기 시작합니다. airflow는 이러한 파이썬의 특징을 이용합니다. (후의 adding directories to the PYTHONPATH에서 설명함.)

 

sys.path 값 안에는 site-packages 디렉토리가 있는데, 이 디렉토리는 설치된 external 패키지들을 가지고 있습니다. 이 말은 즉 당신이 pip 나 conda를 통해 패키지를 설치할 수 있고 이 설치된 패키지들을 airflow에서 사용할 수 있다는 말입니다. 다음 섹션에서는 개인의 만든 간단한 설치가능 패키지를 어떻게 만들고, PYTHONPATH 환경변수를 이용해 어떻게 sys.path 에 커스텀 디렉토리를 추가하는지 배울 것입니다.

 

또한 다음도 숙지하세요. Add init file to your folders

(다 챙겨주는 혜자스타일..)

 

Typical structure of packages (부제: .airflowignore 사용법)

 

밑의 exampled은 당신의 dag 폴더가 가지고 있을만한 구조이죠.



| .airflowignore  -- only needed in ``dags`` folder, see below
| -- my_company
              | __init__.py
              | common_package
              |              |  __init__.py
              |              | common_module.py
              |              | subpackage
              |                         | __init__.py
              |                         | subpackaged_util_module.py
              |
              | my_custom_dags
                              | __init__.py
                              | my_dag_1.py
                              | my_dag_2.py
                              | base_dag.py


 

위의 경우 당신이 파이썬 파일에 import하는 방법은 다음과 같습니다.



from my_company.common_package.common_module import SomeClass
from my_company.common_package.subpackge.subpackaged_util_module import AnotherClass
from my_company.my_custom_dags.base_dag import BaseDag

 

폴더의 root 위치에서 .airflowignore 파일을 확인할 수 있습니다. 이 파일을 dag폴더에 넣으면 이 파일은 airflow 스케줄러가 DAGs를 살펴볼 때 폴더의 어떤 파일을 무시할지 알려줍니다. 이 파일은 무시해야 하는 경로를 표현하기 위한 정규식을 포함할 수 있습니다. 이 파일이 PYTHONPATH 안에 있는 폴더들 안에 포함되어있을 필욘 없습니다. (and also you can only keep shared code in the other folders, not the actual DAGs. 갑자기?).

 

위의 예에서 dag들은 my_custom_dags 폴더에만 있고, common_package 폴더는 airflow 스케줄러가 DAG를 스캔할 때 제외되어야 합니다. 또한 base_dag 도 제외되길 바라는데 이것은 my_dag1.py와 my_dag2.py의 기반 dag으로 이용된 dag이어서 실제로 사용하지 않기 때문이죠. 따라서 이럴 땐 .airflowignore 는 다음처럼 정의되어야 합니다.

 


my_company/common_package/.*
my_company/my_custom_dags/base_dag\.py


 

 

Built-in PYTHONPATH entries in Airflow

 

airflow는 사용중에 sys.path에 다음 세 디렉토리를 추가합니다.

  • The dags folder: It is configured with option dags_folder in section [core].
  • The config folder: It is configured by setting AIRFLOW_HOME variable ({AIRFLOW_HOME}/config) by default.
  • The plugins Folder: It is configured with option plugins_folder in section [core].

 

airflow1 사용하는 분들 참고. 2는 webserver랑은 dags폴더를 공유하지 않습니다. 이유→ 보안문제상 자세한 내용은 ...읽어보시죠 ㅎ


The DAGS folder in Airflow 2 should not be shared with the webserver. While you can do it, unlike in Airflow 1.10, Airflow has no expectations that the DAGS folder is present in the webserver. In fact it’s a bit of security risk to share the dags  folder with the webserver, because it means that people who write DAGS can write code that the webserver will be able to execute (ideally the webserver should never run code which can be modified by users who write DAGs). Therefore if you need to share some code with the webserver, it is highly recommended that you share it via config  or plugins  folder or via installed airflow packages (see below). Those folders are usually managed and accessible by different users (Admins/DevOps) than DAG folders (those are usually data-scientists), so they are considered as safe because they are part of configuration of the Airflow installation and controlled by the people managing the installation.


 

Best practices for module loading

 

당신의 코드를 import할 때 신경써야 하는 부분들입니다.

 

Use unique top package name

항상 당신의 dag/common 파일들을 deployment 폴더(ex. 위의 예에서는 my_company폴더) 아래 유니크한 subpackage에 두세요. generic한 이름을 사용하면 시스템에 이미 존재한는 패키지들과 충돌할 수 있습니다. 예를 들어서 당신이 airflow/operators 라는 하위 폴더를 만들면 읽히질 않습니다. 왜냐하면 airflow는 이미 airflow.operator라는 이름의 패키지를 가지고 있기 때문에 당신의 하위폴더는 from airflow.operators를 불러오는 것으로 간주됩니다.

 

Don’t use relative imports

절대로 (python3부터 추가된) 상대경로로 import하지마세요.

예를 들어, my_dag1.py 에서 다음처럼 상대경로를 이용해 import하고자 하는 유혹을 느낄 수 있습니다.


from .base_dag import BaseDag  # NEVER DO THAT!!!!


 

위처럼 사용하지 말고 전체 경로를 사용해서 공유dag를 import하시길 바랍니다.

(PYTHONPATH 에 추가된 디렉토리로부터 시작되는 full path 사용)



from my_company.my_custom_dags.base_dag import BaseDag  # This is cool

 

상대경로를 사용한 import는 직관에 반대되고(counter-intuitive) 당신의 파이썬 코드를 어떻게 시작하는지에 좌우되기 때문에 상황에 따라 다르게 행동합니다. (→ 어쩔 땐 import되고 어쩔 땐 안 되고). airflow에서 같은 DAG 파일은 (테스트 중에 스케줄러나 워커에 의해) 다른 맥락으로 파싱될 수 있어서 상대경로 import는 스케줄러와 워커에서 각각 다르게 행동할 수 있습니다. (워커와 스케줄러 환경이 다를 수 있으니까.) airflow DAGs에선 뭐를 import하던지 항상 full path import를 사용하세요! 많은 에러와 트러블을 방지할 수 있습니다.

상대경로 import에 대해서는 다음 글을 읽어보세용 → this Stack Overflow thread.

 

Add __init__.py in package folders

폴더 만들 때 (아무 것도 안 적혀있는) __init__.py 을 포함하세요. python3에서는 implicit namespce라는 개념때문에 __init__.py 들은 폴더에 넣지 말하야 하는 반면, airflow에서는 이 파일이 당신이 추가하는 모든 패키지 안에 포함되어 있길 원합니다.

 

Inspecting your PYTHONPATH loading configuration

airflow info 커맨드를 통해서 path확인이 가능합니다.


Apache Airflow: 2.0.0b3

System info
OS              | Linux
architecture    | x86_64
uname           | uname_result(system='Linux', node='85cd7ab7018e', release='4.19.76-linuxkit', version='#1 SMP Tue May 26 11:42:35 UTC 2020', machine='x86_64', processor='')
locale          | ('en_US', 'UTF-8')
python_version  | 3.8.6 (default, Nov 25 2020, 02:47:44)  [GCC 8.3.0]
python_location | /usr/local/bin/python

Tools info
git             | git version 2.20.1
ssh             | OpenSSH_7.9p1 Debian-10+deb10u2, OpenSSL 1.1.1d  10 Sep 2019
kubectl         | NOT AVAILABLE
gcloud          | NOT AVAILABLE
cloud_sql_proxy | NOT AVAILABLE
mysql           | mysql  Ver 8.0.22 for Linux on x86_64 (MySQL Community Server - GPL)
sqlite3         | 3.27.2 2019-02-25 16:06:06 bd49a8271d650fa89e446b42e513b595a717b9212c91dd384aab871fc1d0alt1
psql            | psql (PostgreSQL) 11.9 (Debian 11.9-0+deb10u1)

Paths info
airflow_home    | /root/airflow
system_path     | /opt/bats/bin:/usr/local/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin
python_path     | /usr/local/bin:/opt/airflow:/files/plugins:/usr/local/lib/python38.zip:/usr/local/lib/python3.8:/usr/
                | local/lib/python3.8/lib-dynload:/usr/local/lib/python3.8/site-packages:/files/dags:/root/airflow/conf
                | ig:/root/airflow/plugins
airflow_on_path | True

Config info
executor             | LocalExecutor
task_logging_handler | airflow.utils.log.file_task_handler.FileTaskHandler
sql_alchemy_conn     | postgresql+psycopg2://postgres:airflow@postgres/airflow
dags_folder          | /files/dags
plugins_folder       | /root/airflow/plugins
base_log_folder      | /root/airflow/logs

Providers info
apache-airflow-providers-amazon           | 1.0.0b2
apache-airflow-providers-apache-cassandra | 1.0.0b2
apache-airflow-providers-apache-druid     | 1.0.0b2
apache-airflow-providers-apache-hdfs      | 1.0.0b2
apache-airflow-providers-apache-hive      | 1.0.0b2

 

Adding directories to the PYTHONPATH

당신은 환경변수 PYTHONPATH를 이용해서 디렉토리들을 sys.path에 추가할 수 있습니다. shell을 스타트할 때 당신의 프로젝트 루트경로를 추가하세요.



PYTHONPATH=/home/arch/projects/airflow_operators python

 

 

그러면 sys.path 값은 다음처럼 보일 것입니다.

우리가 넘겨준 디렉토리가 sys.path에 추가되어 있는 것을 확인할 수 있습니다.



>>> import sys
>>> from pprint import pprint
>>> pprint(sys.path)
['',
 '/home/arch/projects/airflow_operators'
 '/home/arch/.pyenv/versions/3.7.4/lib/python37.zip',
 '/home/arch/.pyenv/versions/3.7.4/lib/python3.7',
 '/home/arch/.pyenv/versions/3.7.4/lib/python3.7/lib-dynload',
 '/home/arch/venvs/airflow/lib/python3.7/site-packages']

 

그 다음 패키지를 import해보죠.



>>> import airflow_operators
Hello from airflow_operators
>>>

 

우린 또한 PYTHONPATH 값을 에어플로우 커멘드와 함께 이용할 수 있습니다.


PYTHONPATH=/home/arch/projects/airflow_operators airflow info

 

Creating a package in Python

이 방법은 당신의 커스텀 코드를 추가하는 가장 정리된 방법입니다. 패키징을 사용함으로써 어떤 버전의 코드가 컨테이너나 인스턴스에 설치되고 배포되었는지 관리할 수 있습니다. 이 방법은 여러 팀이 이 공유코드를 운영할 때 적합하지만, 개인코드도 이런 방식으로 배포할 수 있습니다. 또한 당신의 Plugins과  Provider packages 를 파이썬 패키지로 설치할 수 있습니다.

 

당신의 패키지를 만드는 방법:

 

  1. 시작하기 전에 다음의 패키지를 설치합니다.


pip install --upgrade pip setuptools wheel

setuptools: 파이썬 패키지를 만들고 배포하기 위해 디자인된 패키지 배포 프로세스 라이브러리

wheel: 이 패키지는 setuptools를 위한 bdist_wheel 커맨드를 제공합니다. 이것은 .whl 파일을 생성하는데, 이 파일은 pip install을 통해서 바로 설치가 가능합니다. 우리는 이 파일을 PyPI에 업로드하는 것이죠.

 

  1. 패키지 디랙토리를 만듭니다. 우리의 경우  airflow_operators 라는 디렉토리를 만들겁니다.


mkdir airflow_operators

 

  1. __init__.py 을 패키지(위의 디렉토리) 안에 다가 생성한 다음에 파일에 다음 코드를 추가합니다.


print("Hello from airflow_operators")

 

패키지를 import할 때 위의 메세지를 프린트해야 합니다.

 

  1. setup.py 를 만듭니다.


import setuptools

setuptools.setup(
    name="airflow_operators",
)

 

  1. wheel을 빌드합니다.


python setup.py bdist_wheel

 

위의 빌드로 인해 프로젝트 안에 몇 개의 디렉토리가 생성될 것입니다. 전체 구조는 다음과 같습니다.


.
├── airflow_operators
│   ├── __init__.py
├── airflow_operators.egg-info
│   ├── PKG-INFO
│   ├── SOURCES.txt
│   ├── dependency_links.txt
│   └── top_level.txt
├── build
│   └── bdist.macosx-10.15-x86_64
├── dist
│   └── airflow_operators-0.0.0-py3-none-any.whl
└── setup.py

 

  1. .whl 파일을 pip를 이용해 설치합니다.


pip install dist/airflow_operators-0.0.0-py3-none-any.whl

 

  1. 패키지를 이제 사용할 수 있습니다.


>>> import airflow_operators
Hello from airflow_operators
>>>

 

패키지 삭제:


pip uninstall airflow_operators

 

반응형