728x90
spotify api에서 artist, track 정보를 이용해서 관련 테이블 내용을 보충한 다음 저장하려 했지만 spotify db는 NoSQL로 유사데이터들이 다량으로 검색되어서 이 부분은 제외하고 elt 작업을 진행했다.
get_rawdata에서 raw data를 가져와서 적절히 정제한 다음 artist, user, track, user_history DB테이블에 집어넣는 작업이다. 각 테이블의 키 참조 조건을 고려하다보니 단순히 get_data → transform → store에서 먼저 저장해야 하는 테이블의 순서가 생기면서 밑과 같은 구조가 나왔다.
여기에서 spotify 데이터를 어떻게 처리할지 결정한 다음 album, track, artist 데이터를 업데이트하는 작업을 추가할 수 있겠다.
dag를 작성할 때 어떻게 깔끔하게 dag을 작성할까 예제를 찾아보다 다이나믹 dag 생성 방법을 찾았다. dag 파일 하나로 여러 개의 dag을 만들어낼 수 있다.
밑의 예제는 하나의 dag 파일로 3개의 dag를 생성하는 예제이다. (1타3피..) 지금 당장은 쓸 일이 없지만 알아두면 언제가 유용하게 쓸 수 있을 듯하다. [참조2페이지] [참조git]
rom airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
def create_dag(dag_id,
schedule,
dag_number,
default_args):
def hello_world_py(*args):
print('Hello World')
print('This is DAG: {}'.format(str(dag_number)))
dag = DAG(dag_id,
schedule_interval=schedule,
default_args=default_args)
with dag:
t1 = PythonOperator(
task_id='hello_world',
python_callable=hello_world_py)
return dag
# build a dag for each number in range(10)
for n in range(1, 4):
dag_id = 'loop_hello_world_{}'.format(str(n))
default_args = {'owner': 'airflow',
'start_date': datetime(2018, 1, 1)
}
schedule = '@daily'
dag_number = n
globals()[dag_id] = create_dag(dag_id,
schedule,
dag_number,
default_args)
참조
- MariaDB 엔진 종류 (범용)
- MariaDB 적합한 스토리지 엔진 선택 (스토리지 엔진 분류..오우 신기하네. 검색용 엔진도 있네)
반응형