Kafka

  • 파이썬 서버 안띄우고 스프링부트에서 설정

  • 내 생각으로는 뭔가 스프링부트 서버의 역할이 분리가 되지 않는 느낌도 들어서 반감이 듬

Kafka 사용의 주요 장점

  1. 높은 처리량 및 확장성:

    • Kafka는 높은 처리량을 자랑하며, 초당 수백만 개의 메시지를 처리할 수 있습니다.

    • 클러스터 형태로 구성되어 있으며, 필요에 따라 노드를 추가하여 쉽게 확장할 수 있습니다.

  2. 내구성 및 신뢰성:

    • Kafka는 메시지를 디스크에 저장하여 내구성을 보장합니다.

    • 데이터 복제를 통해 장애 발생 시에도 데이터 손실을 최소화합니다.

  3. 실시간 데이터 처리:

    • 실시간 스트리밍 데이터를 처리할 수 있어, 실시간 분석 및 모니터링 시스템에 적합합니다.

    • 스트림 처리 프레임워크인 Kafka Streams와 함께 사용하여 복잡한 스트림 처리 작업을 수행할 수 있습니다.

  4. 유연한 데이터 소비:

    • 동일한 데이터를 여러 소비자가 읽을 수 있어, 다양한 애플리케이션에서 데이터를 재사용할 수 있습니다.

    • 데이터 소비자는 메시지를 읽는 속도와 방식(실시간, 배치 등)을 자유롭게 조절할 수 있습니다.

  5. 고가용성:

    • Kafka는 분산 시스템으로, 노드 간의 데이터 복제를 통해 고가용성을 보장합니다.

    • 자동 장애 복구 기능을 통해 노드 장애 시에도 지속적으로 작동할 수 있습니다.

  6. 배압 관리:

    • Kafka는 소비자가 데이터 소비 속도를 조절할 수 있도록 설계되어 있어, 소비자가 처리할 수 있는 양만큼만 데이터를 제공하여 배압을 관리합니다.

  7. 유연한 데이터 통합:

    • Kafka는 다양한 데이터 소스와 싱크를 연결할 수 있는 커넥터를 제공하여 데이터 통합을 쉽게 수행할 수 있습니다.

    • 데이터베이스, 로그 파일, 스트리밍 플랫폼 등 다양한 데이터 소스와 통합하여 데이터 파이프라인을 구축할 수 있습니다.

예제 프로젝트에서 Kafka를 사용하는 구체적인 장점

  1. 실시간 데이터 스트리밍:

    • 센서 데이터를 실시간으로 수집하고 처리하여 MySQL 및 AWS S3에 저장함으로써, 실시간 모니터링과 분석이 가능합니다.

  2. 데이터 일관성 유지:

    • 여러 소비자가 동일한 데이터 스트림을 사용할 수 있어, 데이터 일관성을 유지하면서 다양한 분석 및 처리 작업을 병렬로 수행할 수 있습니다.

  3. 확장성:

    • 데이터 양이 증가해도 Kafka 클러스터를 확장하여 쉽게 처리할 수 있습니다.

    • 추가적인 데이터 소스나 소비자를 쉽게 추가할 수 있어, 시스템 확장에 유리합니다.

  4. 유연한 처리 구조:

    • 배치 처리와 실시간 처리를 동시에 지원할 수 있어, 다양한 요구사항에 맞는 데이터 처리 구조를 설계할 수 있습니다.

    • 데이터 저장소로의 전송뿐만 아니라, 실시간 알림, 대시보드 업데이트 등 다양한 실시간 반응형 기능을 구현할 수 있습니다.

  5. 분산 처리:

    • Kafka의 분산 아키텍처를 활용하여 데이터 처리 작업을 분산시킴으로써, 시스템 부하를 균형 있게 분산할 수 있습니다.

Kafka는 이러한 다양한 장점을 통해 데이터 파이프라인의 성능과 유연성을 크게 향상시킬 수 있습니다. 따라서 실시간 데이터 처리 및 분석이 필요한 다양한 애플리케이션에서 Kafka를 활용하는 것은 매우 유리합니다.


Tutorial

  • sample code

1. install

pip install fastapi uvicorn sqlalchemy aiokafka pymysql boto3

2. setting

## kafka_producer.py: 센서 데이터를 Kafka 토픽에 게시하는 프로듀서 예제입니다.

from aiokafka import AIOKafkaProducer
import asyncio
import json

async def send_one():
    producer = AIOKafkaProducer(
        bootstrap_servers='localhost:9092',
        value_serializer=lambda v: json.dumps(v).encode('utf-8')
    )
    await producer.start()
    try:
        value = {"sensor_id": 1, "temperature": 22.5}
        await producer.send_and_wait("sensor_data", value)
    finally:
        await producer.stop()

asyncio.run(send_one())

## kafka_consumer.py: Kafka에서 데이터를 소비하고 MySQL과 AWS S3에 저장하는 컨슈머 예제입니다.

from aiokafka import AIOKafkaConsumer
import asyncio
import json
import sqlalchemy
from sqlalchemy.orm import sessionmaker
import boto3

# MySQL 설정
DATABASE_URL = "mysql+pymysql://username:password@localhost/dbname"
engine = sqlalchemy.create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

# AWS S3 설정
s3_client = boto3.client('s3')
S3_BUCKET = "your-bucket-name"

# 모델 정의 (예시)
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, Float

Base = declarative_base()

class SensorData(Base):
    __tablename__ = 'sensor_data'
    id = Column(Integer, primary_key=True, index=True)
    sensor_id = Column(Integer, index=True)
    temperature = Column(Float)

# 데이터베이스 초기화
Base.metadata.create_all(bind=engine)

async def consume():
    consumer = AIOKafkaConsumer(
        'sensor_data',
        bootstrap_servers='localhost:9092',
        group_id="sensor-group",
        value_deserializer=lambda v: json.loads(v.decode('utf-8'))
    )
    await consumer.start()
    try:
        async for msg in consumer:
            data = msg.value

            # MySQL에 데이터 저장
            db = SessionLocal()
            db_data = SensorData(sensor_id=data['sensor_id'], temperature=data['temperature'])
            db.add(db_data)
            db.commit()
            db.refresh(db_data)
            db.close()

            # AWS S3에 데이터 저장
            s3_client.put_object(
                Bucket=S3_BUCKET,
                Key=f"sensor_data/{data['sensor_id']}.json",
                Body=json.dumps(data)
            )
    finally:
        await consumer.stop()

asyncio.run(consume())

3. FastAPI application

## main.py

from fastapi import FastAPI, BackgroundTasks
from kafka_producer import send_one

app = FastAPI()

@app.post("/send_data/")
async def send_data(background_tasks: BackgroundTasks):
    background_tasks.add_task(send_one)
    return {"message": "Data is being sent to Kafka"}

4. Docker-compose

version: '3.7'

services:
  zookeeper:
    image: wurstmeister/zookeeper:3.4.6
    ports:
      - "2181:2181"

  kafka:
    image: wurstmeister/kafka:2.12-2.2.1
    ports:
      - "9092:9092"
    expose:
      - "9093"
    environment:
      KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_LISTENERS: INSIDE://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

  web:
    build: .
    command: uvicorn main:app --host 0.0.0.0 --port 8000
    ports:
      - "8000:8000"
    depends_on:
      - kafka

Last updated