Data Collection and Storage System - (2)

Objectives

  1. Centralized Workflow Management: Utilize Airflow to manage all stages of the data pipeline, from data collection to storage.

  2. Sequential Task Execution: Demonstrate how to define multiple tasks in Airflow and ensure they are executed in sequence.

  3. Improved Monitoring and Maintenance: Simplify the monitoring and maintenance of the pipeline by using Airflow's built-in features.

Architecture

The enhanced architecture involves the following components:

  • External API: Provides blood sugar data.

  • Airflow: Manages the entire workflow, including data collection, Kafka interaction, and data storage.

  • Kafka: Acts as the message broker to handle real-time data streaming.

  • PostgreSQL: Stores the processed blood sugar data.

Airflow DAG Example

In this enhanced setup, we will define three tasks in Airflow:

  1. Fetch Data: Collects data from the external API.

  2. Send to Kafka: Sends the collected data to Kafka.

  3. Store in PostgreSQL: Consumes the data from Kafka and stores it in PostgreSQL.

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
import requests
from kafka import KafkaProducer
import json

def fetch_data():
    response = requests.get("http://external_api:5001/data")
    data = response.json()
    # Save data to file for simplicity
    with open('/tmp/data.json', 'w') as f:
        json.dump(data, f)

def send_to_kafka():
    with open('/tmp/data.json', 'r') as f:
        data = json.load(f)
    producer = KafkaProducer(
        bootstrap_servers='kafka:9092',
        value_serializer=lambda v: json.dumps(v).encode('utf-8')
    )
    producer.send('blood_sugar', value=data)
    producer.close()

def store_in_postgres():
    from kafka import KafkaConsumer
    import psycopg2

    consumer = KafkaConsumer(
        'blood_sugar',
        bootstrap_servers='kafka:9092',
        auto_offset_reset='earliest',
        enable_auto_commit=True,
        group_id='my-group',
        value_deserializer=lambda x: json.loads(x.decode('utf-8'))
    )

    conn = psycopg2.connect(
        dbname="airflow", user="airflow", password="airflow", host="postgres"
    )
    cur = conn.cursor()

    for message in consumer:
        data = message.value
        cur.execute("INSERT INTO blood_sugar (timestamp, blood_sugar_level) VALUES (%s, %s)", 
                    (data['timestamp'], data['blood_sugar_level']))
        conn.commit()
        break  # For simplicity, we'll break after one message for this example

    cur.close()
    conn.close()

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
}

dag = DAG(
    'data_pipeline',
    default_args=default_args,
    description='A simple data pipeline',
    schedule_interval='@daily',
    start_date=days_ago(2),
)

t1 = PythonOperator(
    task_id='fetch_data_task',
    python_callable=fetch_data,
    dag=dag,
)

t2 = PythonOperator(
    task_id='send_to_kafka_task',
    python_callable=send_to_kafka,
    dag=dag,
)

t3 = PythonOperator(
    task_id='store_in_postgres_task',
    python_callable=store_in_postgres,
    dag=dag,
)

t1 >> t2 >> t3

Benefits of Using Airflow for Entire Pipeline

  1. Centralized Management: All tasks are defined and managed within Airflow, providing a single point of control.

  2. Error Handling: Airflow provides built-in mechanisms for retrying failed tasks and alerting on failures.

  3. Scheduling: Airflow's scheduling capabilities allow for complex scheduling scenarios, ensuring that tasks run at the appropriate times.

  4. Visualization: The Airflow UI offers a clear visual representation of the workflow, making it easier to monitor and debug.

비교

첫 번째 방식: 컨슈머 서버를 따로 두는 방식

장점

  1. 모듈성: 각 구성 요소가 독립적으로 실행되므로, 시스템을 더 쉽게 확장하고 유지 관리할 수 있습니다.

  2. 유연성: 각 구성 요소를 독립적으로 배포, 확장, 관리할 수 있습니다. 예를 들어, 컨슈머의 부하가 증가하면 컨슈머 서버만 확장할 수 있습니다.

  3. 장애 격리: 하나의 구성 요소가 실패하더라도 다른 구성 요소에 영향을 미치지 않습니다. 예를 들어, 컨슈머 서버가 다운되더라도 Airflow는 계속해서 데이터를 Kafka로 전송할 수 있습니다.

단점

  1. 복잡성: 여러 구성 요소가 서로 독립적으로 실행되므로, 전체 시스템의 설정 및 관리가 복잡해질 수 있습니다.

  2. 추가 관리 오버헤드: 각 구성 요소에 대한 로그 관리, 모니터링, 유지 보수 등을 별도로 수행해야 합니다.

두 번째 방식: 모든 작업을 Airflow에서 처리

장점

  1. 단순화된 관리: 모든 작업이 Airflow 내에서 실행되므로, 설정, 배포, 모니터링이 단순해집니다. 모든 작업의 상태를 한 곳에서 확인할 수 있습니다.

  2. 중앙 집중화된 워크플로우 관리: 전체 데이터 파이프라인이 Airflow DAG 내에서 정의되므로, 작업 간의 종속성을 쉽게 관리할 수 있습니다.

  3. 재시도 및 실패 관리: Airflow는 자동으로 작업 실패 시 재시도를 지원하며, 작업의 상태를 쉽게 추적하고 관리할 수 있습니다.

  4. 시각화: Airflow UI를 통해 워크플로우를 시각적으로 확인하고 모니터링할 수 있습니다.

단점

  1. 확장성 제한: Airflow가 모든 작업을 처리하게 되면, Airflow 자체의 리소스 사용량이 증가하여 확장성에 제한이 있을 수 있습니다. 특히 데이터 소비 및 처리량이 많아질 경우 문제가 될 수 있습니다.

  2. 단일 장애점: 모든 작업이 Airflow 내에서 실행되므로, Airflow 자체에 문제가 발생하면 전체 파이프라인이 중단될 수 있습니다.

  3. 유연성 감소: 각 구성 요소를 개별적으로 확장하거나 최적화하는 데 제약이 있을 수 있습니다.

Last updated