본문 바로가기

D.S/DE

220226토 - 기본 ELT pipeline 완료

728x90

 

spotify api에서 artist, track 정보를 이용해서 관련 테이블 내용을 보충한 다음 저장하려 했지만 spotify db는 NoSQL로 유사데이터들이 다량으로 검색되어서 이 부분은 제외하고 elt 작업을 진행했다.

 

get_rawdata에서 raw data를 가져와서 적절히 정제한 다음 artist, user, track, user_history DB테이블에 집어넣는 작업이다. 각 테이블의 키 참조 조건을 고려하다보니 단순히 get_data → transform → store에서 먼저 저장해야 하는 테이블의 순서가 생기면서 밑과 같은 구조가 나왔다.

 

여기에서 spotify 데이터를 어떻게 처리할지 결정한 다음 album, track, artist 데이터를 업데이트하는 작업을 추가할 수 있겠다.

 

 

 

dag를 작성할 때 어떻게 깔끔하게 dag을 작성할까 예제를 찾아보다 다이나믹 dag 생성 방법을 찾았다. dag 파일 하나로 여러 개의 dag을 만들어낼 수 있다.

밑의 예제는 하나의 dag 파일로 3개의 dag를 생성하는 예제이다. (1타3피..) 지금 당장은 쓸 일이 없지만 알아두면 언제가 유용하게 쓸 수 있을 듯하다. [참조2페이지] [참조git]

 


rom airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime


def create_dag(dag_id,
               schedule,
               dag_number,
               default_args):

    def hello_world_py(*args):
        print('Hello World')
        print('This is DAG: {}'.format(str(dag_number)))

    dag = DAG(dag_id,
              schedule_interval=schedule,
              default_args=default_args)

    with dag:
        t1 = PythonOperator(
            task_id='hello_world',
            python_callable=hello_world_py)

    return dag


# build a dag for each number in range(10)
for n in range(1, 4):
    dag_id = 'loop_hello_world_{}'.format(str(n))

    default_args = {'owner': 'airflow',
                    'start_date': datetime(2018, 1, 1)
                    }

    schedule = '@daily'

    dag_number = n

    globals()[dag_id] = create_dag(dag_id,
                                  schedule,
                                  dag_number,
                                  default_args)

 

 

 

참조

 

반응형