본문 바로가기

빅데이터

[Airflow] Airflow란 무엇인가?

728x90
반응형

 

Apache Airflow는 워크플로우 관리 플랫폼으로, 데이터 엔지니어링 작업에서 자주 사용됩니다. Airflow는 워크플로우를 작성, 스케줄링, 모니터링할 수 있는 기능을 제공하여, 복잡한 데이터 파이프라인을 효율적으로 관리할 수 있도록 돕습니다. 

주요 특징

  1. DAG (Directed Acyclic Graph):
    • Airflow의 워크플로우는 DAG로 정의됩니다. DAG는 작업(Task)들 간의 의존성을 명확히 나타내는 유향 비순환 그래프입니다. 이를 통해 작업의 실행 순서를 제어하고 모니터링할 수 있습니다.
  2. 모듈화 및 확장성:
    • Airflow는 Python 코드로 작성되어 매우 유연하고 확장 가능합니다. 사용자는 다양한 연산자(Operators)와 센서(Sensors)를 활용해 복잡한 워크플로우를 정의할 수 있습니다.
  3. 스케줄링:
    • 크론 표현식(Cron Expression) 또는 다른 시간 기반 표현식을 사용해 워크플로우의 실행을 스케줄링할 수 있습니다.
  4. UI 및 모니터링:
    • Airflow는 웹 기반 UI를 제공하여 DAG의 상태를 시각적으로 확인하고, 개별 작업의 로그를 확인할 수 있습니다. 이를 통해 워크플로우의 상태를 실시간으로 모니터링하고 문제를 빠르게 해결할 수 있습니다.
  5. 다양한 연산자 지원:
    • BashOperator, PythonOperator, MySqlOperator, S3Operator 등 다양한 연산자를 제공하여, 다양한 시스템과의 통합을 쉽게 할 수 있습니다.

Airflow 장점

구분 내용
유연성
  • Airflow는 Python 코드로 작성되므로, 복잡한 비즈니스 로직을 쉽게 구현할 수 있습니다. 또한, 사용자 정의 연산자를 만들어 사용할 수 있습니다.
확장성
  • 모듈화된 구조로 인해 새로운 기능을 쉽게 추가할 수 있으며, 여러 워커 노드를 사용하여 작업을 분산 처리할 수 있습니다.
가시성
  • 웹 기반 UI를 통해 작업의 진행 상황을 쉽게 모니터링하고, 실패한 작업을 재시도하거나 수동으로 재실행할 수 있습니다.
표준화
  • DAG와 연산자를 사용하여 워크플로우를 표준화함으로써, 코드의 재사용성을 높이고 유지보수를 쉽게 할 수 있습니다.

Airflow 기본 구성 

https://velog.io/@bbkyoo/Airflow%EB%9E%80

Apache Airflow의 기본 구성 요소와 아키텍처를 이해하는 것은 이를 효과적으로 사용하는 데 필수적입니다.

1. Scheduler

Scheduler는 DAG의 트리거를 감시하고, 트리거된 DAG를 실행할 작업으로 배치합니다. 이는 작업의 스케줄을 관리하며, DAG 정의에 따라 작업을 순서대로 실행합니다.

2. Executor

Executor는 실제로 작업을 실행하는 역할을 합니다. 여러 종류의 Executor가 있으며, 대표적으로 LocalExecutor, CeleryExecutor, KubernetesExecutor 등이 있습니다. 선택한 Executor에 따라 작업이 단일 서버에서 실행되거나, 여러 서버에 분산되어 실행될 수 있습니다.

3. Worker

Worker는 Scheduler에 의해 배치된 작업을 실제로 수행하는 프로세스입니다. CeleryExecutor나 KubernetesExecutor를 사용할 경우, 여러 Worker가 분산된 환경에서 병렬로 작업을 처리할 수 있습니다.

4. Web Server

Web Server는 Airflow의 웹 UI를 제공하여, DAG와 작업의 상태를 모니터링하고, 로그를 확인하며, DAG의 트리거 및 관리 작업을 수행할 수 있게 합니다. Flask 기반으로 동작하며, 사용자는 이를 통해 Airflow의 모든 기능을 시각적으로 관리할 수 있습니다.

5. Metadata Database

Metadata Database는 Airflow의 핵심 정보를 저장하는 데이터베이스입니다. 여기에는 DAG 정의, 작업 상태, 스케줄러 상태, 로그 등이 저장됩니다. 일반적으로 MySQL, PostgreSQL 등을 사용합니다.

6. DAG (Directed Acyclic Graph)

https://velog.io/@sophi_e/Airflow-%EA%B8%B0%EC%B4%88-%EA%B0%9C%EB%85%90-%EB%B0%8F-%EC%9E%A5%EB%8B%A8%EC%A0%90

DAG는 작업의 논리적 흐름을 정의하는 구조입니다. 각 DAG는 하나의 워크플로우를 나타내며, 여러 개의 Task로 구성됩니다. Task 간의 의존성을 정의하여 실행 순서를 지정합니다.

7. Task

Task는 실제로 실행되는 작업의 단위입니다. Python 코드, Bash 스크립트, SQL 쿼리 등 다양한 형태로 작성될 수 있으며, Operator를 사용해 정의합니다.

 

간단한 예시로 훑어보기

간단한 ETL 파이프라인 예시

  1. 목표:
    • S3에 저장된 CSV 파일을 읽고, 데이터를 변환한 후, 변환된 데이터를 MySQL 데이터베이스에 로드합니다.
  2. DAG 정의
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.mysql_operator import MySqlOperator
from airflow.contrib.operators.s3_to_gcs_operator import S3ToGCSOperator
from datetime import datetime

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 6, 1),
    'retries': 1,
}

def extract():
    # S3에서 데이터를 추출하는 로직
    pass

def transform():
    # 데이터를 변환하는 로직
    pass

def load():
    # 데이터를 MySQL로 로드하는 로직
    pass

with DAG('simple_etl', default_args=default_args, schedule_interval='@daily') as dag:
    extract_task = PythonOperator(task_id='extract', python_callable=extract)
    transform_task = PythonOperator(task_id='transform', python_callable=transform)
    load_task = PythonOperator(task_id='load', python_callable=load)

    extract_task >> transform_task >> load_task

설명:

  • extract_task: S3에서 데이터를 추출하는 작업입니다.
  • transform_task: 추출한 데이터를 변환하는 작업입니다.
  • load_task: 변환된 데이터를 MySQL에 로드하는 작업입니다.
  • extract_task, transform_task, load_task는 의존 관계에 따라 순차적으로 실행됩니다.
728x90
반응형