Data Collection and Storage System - (1)

Overview

This project aims to establish a system that periodically collects blood sugar data from external sensors, stores, and manages this data in a database. The primary components used for this system include Apache Airflow, Apache Kafka, and PostgreSQL, enabling efficient data processing and storage.

Architecture Components

  1. External API: Blood Sugar Data Provider

    • This is an external API server that provides blood sugar data. The API returns blood sugar data collected from sensors in JSON format.

  2. Apache Airflow: Workflow Management

    • Airflow manages workflows using DAGs (Directed Acyclic Graphs) to periodically call the external API and collect data. The collected data is then sent to the message broker via a Kafka producer.

  3. Apache Kafka: Message Broker

    • Kafka is a real-time data streaming platform that receives data sent by Airflow as a topic and makes it available to Kafka consumers.

  4. Kafka Consumer: Data Processing

    • The Kafka consumer subscribes to the Kafka topic and processes the incoming data, preparing it for storage.

  5. PostgreSQL: Database

    • PostgreSQL serves as the database where the processed blood sugar data is stored for further analysis and visualization.

Data Flow

  1. Data Collection: Airflow periodically sends HTTP requests to the External API to fetch the latest blood sugar data.

  2. Data Transmission: The fetched data is serialized into JSON format and sent to Kafka as a message.

  3. Data Consumption: Kafka consumers subscribe to the topic and receive the data, which they then process.

  4. Data Storage: The processed data is inserted into the PostgreSQL database, where it can be queried and analyzed.

Benefits

  • Scalability: Using Kafka allows the system to handle a large volume of data with ease.

  • Reliability: Airflow ensures that data collection tasks are executed as per the schedule, with retry mechanisms in case of failures.

  • Efficiency: PostgreSQL provides a robust storage solution for managing and querying blood sugar data.

  • Modularity: Each component of the architecture can be scaled and managed independently, allowing for flexible system management.

Conclusion

This system effectively demonstrates how modern data engineering tools can be leveraged to build a reliable and scalable data collection and storage pipeline. By integrating Apache Airflow, Apache Kafka, and PostgreSQL, the system ensures efficient data handling, from collection to storage, making it an ideal solution for real-time data processing needs.

Code

directory structure

.
├── airflow
   ├── Dockerfile
   └── dags
       ├── __pycache__
       └── fetch_and_send_data.py
├── consumer
   ├── Dockerfile
   ├── consumer.py
   └── wait-for-it.sh
├── docker-compose-logging.yml
├── docker-compose.yml
├── external_api
   ├── Dockerfile
   └── app.py
└── logging
    ├── filebeat
       ├── Dockerfile
       └── filebeat.yml
    └── logstash
        └── pipeline

fetch_and_send_data.py

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
import requests
from kafka import KafkaProducer
import json

def fetch_and_send_data():
    response = requests.get("http://external_api:5001/data")
    data = response.json()

    producer = KafkaProducer(
        bootstrap_servers='kafka:9092',
        value_serializer=lambda v: json.dumps(v).encode('utf-8')  
    )
    producer.send('blood_sugar', value=data)
    producer.close()

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
}

dag = DAG(
    'fetch_and_send_data',
    default_args=default_args,
    description='Fetch data from API and send to Kafka',
    schedule_interval='*/15 * * * *',
    start_date=days_ago(2),
)

t1 = PythonOperator(
    task_id='fetch_and_send_data_task',
    python_callable=fetch_and_send_data,
    dag=dag,
)

t1

consumer.py

import logging
from kafka import KafkaConsumer
import psycopg2
import json
import time

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

logger.info("Starting Kafka Consumer...")

consumer = None
conn = None
cur = None

for _ in range(5):
    try:
        consumer = KafkaConsumer(
            'blood_sugar',
            bootstrap_servers='kafka:9092',  
            auto_offset_reset='earliest',
            enable_auto_commit=True,
            group_id='my-group',
            value_deserializer=lambda x: json.loads(x.decode('utf-8'))
        )
        logger.info("Connected to Kafka")
        break
    except Exception as e:
        logger.error("Failed to connect to Kafka, retrying in 5 seconds...", exc_info=True)
        time.sleep(5)

if consumer is None:
    logger.error("Could not connect to Kafka after several attempts, exiting...")
    exit(1)

# PostgreSQL 연결 시도
try:
    conn = psycopg2.connect(
        dbname="airflow", user="airflow", password="airflow", host="postgres"  
    )
    cur = conn.cursor()
    logger.info("Connected to PostgreSQL")
except Exception as e:
    logger.error("Failed to connect to PostgreSQL", exc_info=True)
    exit(1)

if consumer and cur:
    try:
        for message in consumer:
            try:
                data = message.value
                logger.info(f"Received message: {data}")
                cur.execute("INSERT INTO blood_sugar (timestamp, blood_sugar_level) VALUES (%s, %s)", 
                            (data['timestamp'], data['blood_sugar_level']))
                conn.commit()
                logger.info("Inserted data into PostgreSQL")
            except json.JSONDecodeError as e:
                logger.error("Failed to decode JSON message", exc_info=True)
            except Exception as e:
                logger.error("Failed to process message", exc_info=True)
    except Exception as e:
        logger.error("Failed to consume messages from Kafka", exc_info=True)
    finally:
        cur.close()
        conn.close()
        logger.info("Kafka Consumer stopped")
else:
    logger.error("Kafka Consumer or PostgreSQL cursor not initialized")

docker-compose.yml

version: '3.7'

services:
  zookeeper:
    image: wurstmeister/zookeeper:3.4.6
    ports:
      - "2181:2181"
    environment:
      ZOO_MAX_BUFFER_SIZE: 200000000
    logging:
      driver: "json-file"
      options:
        max-size: "200k"
        max-file: "10"

  kafka:
    image: wurstmeister/kafka:latest
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_MESSAGE_MAX_BYTES: 200000000
      KAFKA_REPLICA_FETCH_MAX_BYTES: 200000000
    depends_on:
      - zookeeper
    logging:
      driver: "json-file"
      options:
        max-size: "200k"
        max-file: "10"

  postgres:
    image: postgres:latest
    environment:
      POSTGRES_DB: airflow
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
    ports:
      - "5432:5432"
    logging:
      driver: "json-file"
      options:
        max-size: "200k"
        max-file: "10"

  airflow:
    build: ./airflow
    depends_on:
      - postgres
      - kafka
    environment:
      - LOAD_EX=n
      - EXECUTOR=Local
    volumes:
      - ./airflow/dags:/usr/local/airflow/dags
    ports:
      - "8080:8080"
    logging:
      driver: "json-file"
      options:
        max-size: "200k"
        max-file: "10"

  consumer:
    build: ./consumer
    depends_on:
      - kafka
      - postgres
    entrypoint: ["./wait-for-it.sh", "kafka:9092", "--", "./wait-for-it.sh", "postgres:5432", "--", "python", "consumer.py"]
    logging:
      driver: "json-file"
      options:
        max-size: "200k"
        max-file: "10"

  external_api:
    build: ./external_api
    ports:
      - "5001:5001"
    logging:
      driver: "json-file"
      options:
        max-size: "200k"
        max-file: "10"

  filebeat:
    image: docker.elastic.co/beats/filebeat:7.13.2
    user: root
    volumes:
      - ./logging/filebeat/filebeat.yml:/usr/share/filebeat/filebeat.yml:ro
      - /var/lib/docker/containers:/var/lib/docker/containers:ro
      - /var/run/docker.sock:/var/run/docker.sock
    depends_on:
      - zookeeper
      - kafka
      - postgres
      - airflow
      - consumer
      - external_api

networks:
  default:
    name: kafka-net

Screen

docker desktop

apache airflow

external api(for test)

kafka

consumer

postgreSQL

Trouble Shooting

  • JSON 인코딩 디코딩

  • airflow에서 에러난거 로그 확인

    • 이게 제일 오래 걸림.

    • kafka나 consumer에서 문제있는 줄 알고 한참 헤맴

  • 로그 하나하나 확인하기 불편

Reference

Last updated