IT

Airflow + Redis + Kafka를 포함한 통합 docker-compose 구성

마시멜로를찾아서 2025. 4. 2. 14:44
반응형

🧱 전체 아키텍처 구성 개요

┌────────────┐       ┌──────────────┐
│  Producer  │ ───▶ │   Kafka       │ ◀──┐
└────────────┘       └──────────────┘    │
                                         ▼
 ┌─────────────┐    ┌────────────┐   ┌────────────┐
 │   Airflow   │ ◀▶ │ Redis Queue│ ◀▶│ Celery      │
 └─────────────┘    └────────────┘   └────────────┘
 
  • Kafka: 실시간 메시징/스트리밍 처리
  • Redis: Celery 브로커 or 캐시 저장소
  • Airflow: DAG 기반 워크플로 자동화
  • Celery Worker: Redis로부터 작업 수신 후 실행
  • Webserver, Scheduler, Flower: Airflow UI + 작업 모니터링

📁 폴더 구조 예시

data-pipeline/
├── dags/                         # Airflow DAG 코드
│   └── example_dag.py
├── plugins/                      # Custom Plugin (옵션)
├── Dockerfile.airflow
├── requirements.txt
├── docker-compose.yml
└── .env

📄 docker-compose.yml 예제

version: '3.8'
services:

  ### REDIS (Celery 브로커)
  redis:
    image: redis:7
    ports:
      - "6379:6379"

  ### KAFKA + ZOOKEEPER
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    ports:
      - "9092:9092"
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  ### AIRFLOW
  postgres:
    image: postgres:13
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow
    volumes:
      - postgres-db-volume:/var/lib/postgresql/data

  airflow-webserver:
    build:
      context: .
      dockerfile: Dockerfile.airflow
    environment:
      AIRFLOW__CORE__EXECUTOR: CeleryExecutor
      AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
      AIRFLOW__CELERY__BROKER_URL: redis://redis:6379/0
      AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
    volumes:
      - ./dags:/opt/airflow/dags
    ports:
      - "8080:8080"
    depends_on:
      - postgres
      - redis

  airflow-scheduler:
    build:
      context: .
      dockerfile: Dockerfile.airflow
    environment:
      <<: *airflow_common
    depends_on:
      - airflow-webserver

  airflow-worker:
    build:
      context: .
      dockerfile: Dockerfile.airflow
    depends_on:
      - redis
      - postgres

  airflow-init:
    build:
      context: .
      dockerfile: Dockerfile.airflow
    entrypoint: /bin/bash -c "airflow db init && airflow users create --username admin --firstname Air --lastname Flow --role Admin --email admin@example.com --password admin"

volumes:
  postgres-db-volume:
 

🐳 Dockerfile.airflow

FROM apache/airflow:2.7.1-python3.10

USER root
RUN apt-get update && apt-get install -y gcc g++ libpq-dev

USER airflow
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

📦 requirements.txt 예시

apache-airflow[celery,postgres,redis]==2.7.1
kafka-python

✅ 실행 방법

# 최초 설정
docker-compose up airflow-init

# 전체 서비스 실행
docker-compose up -d

📘 DAG 예시 (dags/example_dag.py)

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def sample_task():
    print("Kafka + Redis + Airflow 연동 성공!")

with DAG(dag_id='sample_pipeline',
         start_date=datetime(2024, 1, 1),
         schedule_interval='@daily',
         catchup=False) as dag:

    t1 = PythonOperator(
        task_id='print_message',
        python_callable=sample_task
    )

💡 확장 팁

 

기능설정  예시
Kafka Topic 생성 docker exec -it kafka kafka-topics ...
Airflow + Kafka 연동 KafkaProducer를 DAG 내에서 활용 가능
Celery 모니터링 (옵션) flower 도커 추가 (port 5555)
프로덕션 구성 보안 강화 dotenv, .env.prod, AIRFLOW__SECRETS__BACKEND
Git 연동 DAG 관리 DAG 폴더를 Git 서브모듈/외부 mount로 관리 가능

🔚 마무리 정리

구성  요소설명
Redis Celery 브로커 (실시간 작업 분산 처리용)
Kafka 데이터 스트리밍, ETL 이벤트 트리거, 로그 전달용
Airflow DAG 기반 스케줄링/워크플로 관리
Celery Worker DAG 작업 실행 담당
PostgreSQL Airflow metadata 저장
반응형