728x90
반응형

🛠️ 장비에 적합한 Pub/Sub 프로토콜 추천 (MQTT 포함)

장비와 같은 IoT 장치경량화, 저전력, 실시간 통신이 요구되므로, 아래의 메시징 프로토콜들이 많이 사용됩니다:

1. MQTT (Message Queuing Telemetry Transport)

  • 장점:
    • 경량하고 저전력: IoT 환경에 적합하며, 네트워크 대역폭이 제한된 상황에서도 효율적.
    • QoS 레벨을 지원하여 메시지 전송 신뢰성을 높임.
    • 보안 (TLS/SSL)을 지원하고, 단방향/양방향 통신 모두 가능.
    • 배터리 효율적: 장비가 대기 상태일 때 최소한의 네트워크 자원을 사용.
    • 주제 기반: 주제(topic) 기반의 메시징으로, 유연한 구독/발행 구조를 제공.
  • 추천 사용 사례: IoT 장치 (예: 온도 센서, 카메라, 냉장고, 스마트 전기계량기 등), 홈 자동화 시스템, 실시간 모니터링 시스템.
  • 단점: 브로커가 시스템의 병목점이 될 수 있으며, 대규모 시스템에서는 스케일링에 주의가 필요.

2. CoAP (Constrained Application Protocol)

  • 장점:
    • 경량하고, HTTP와 유사한 API를 제공하면서도 UDP를 사용하여 속도 및 에너지 효율성을 극대화.
    • RESTful 구조로 HTTP와 유사하지만 IoT에 최적화된 프로토콜.
    • 멱등성 지원: 동일한 요청에 대해 동일한 응답을 보장.
  • 추천 사용 사례: 네트워크 자원이 제한적인 장치들, 실시간성이 요구되지만 HTTP보다 경량한 프로토콜을 필요로 하는 경우.
  • 단점: 일부 클라우드 환경에서는 지원이 적고, TCP 기반의 프로토콜보다 확장성이나 신뢰성에서 떨어질 수 있음.

3. AMQP (Advanced Message Queuing Protocol)

  • 장점:
    • 강력한 메시징 기능을 제공: 큐 기반 메시징을 통해 대규모 분산 시스템에서 메시지의 보장 전송순서 보장을 지원.
    • 상호 운용성: 다양한 클라이언트 및 브로커가 지원됨.
    • 보안트랜잭션 지원.
  • 추천 사용 사례: 고급 메시징 시스템이 필요한 경우, 예를 들어 상태 모니터링 시스템, 트랜잭션 처리 등에서 활용.
  • 단점: 리소스가 많이 소모되고, IoT 환경에서는 과도한 오버헤드를 발생시킬 수 있음.

4. XMPP (Extensible Messaging and Presence Protocol)

  • 장점:
    • 확장 가능성: XMPP는 IM (Instant Messaging) 프로토콜로 알려져 있지만, 다양한 IoT 애플리케이션에서도 사용할 수 있습니다.
    • 양방향 통신 지원: 실시간으로 장비와 서버 간 메시지 전송 가능.
    • 보안: TLS, 인증 기능을 지원.
  • 추천 사용 사례: 메시지 기반의 실시간 응답 시스템, 모바일 IoT 애플리케이션.
  • 단점: HTTPMQTT에 비해 상대적으로 설정이 복잡하고, 실시간성 측면에서 MQTT보다 부하가 높을 수 있음.

5. Kafka (Apache Kafka)

  • 장점:
    • 고속 데이터 스트리밍실시간 이벤트 처리에 최적화됨.
    • 내결함성: 파티션 기반으로 메시지를 관리하여 장애 복구가 용이.
    • 확장성: 대규모 분산 시스템에서도 수백만 개의 메시지를 처리할 수 있는 성능.
    • 장기적인 메시지 저장 가능: Kafka는 메시지를 파일 시스템에 저장하고, 필요에 따라 오랫동안 보관할 수 있습니다.
  • 추천 사용 사례: 대규모 분산 시스템, 실시간 로그 분석, 대용량 데이터 스트리밍.
  • 단점: IoT 장비에서는 상대적으로 오버헤드가 크며, 설정 및 관리가 복잡할 수 있음.

🧩 중계서버를 위한 Pub/Sub 시스템 추천 (Kafka 포함)

중계서버의 경우, 대규모 처리 능력고속 스트리밍을 고려할 때, Kafka와 같은 시스템이 매우 유용합니다. 아래는 중계서버에 적합한 Pub/Sub 시스템 추천 목록입니다:

1. Apache Kafka

  • 장점:
    • 대규모 메시징 시스템을 지원하며, 실시간 데이터 스트리밍 및 내결함성을 제공합니다.
    • 수평 확장성을 통해 수백만 건의 메시지를 처리할 수 있는 능력을 가짐.
    • 지속성: 데이터를 일정 기간 저장하면서도 빠른 속도로 처리할 수 있음.
    • 실시간 처리: 실시간으로 데이터 흐름을 처리하고, 소비자에게 전달.
  • 추천 사용 사례: 실시간 데이터 스트리밍, 대규모 분산 시스템, 로그 및 이벤트 처리, IoT 데이터 수집 및 전송.
  • 단점: 설정과 운영이 복잡하고, 작은 IoT 프로젝트에서는 오버헤드가 클 수 있음.

2. RabbitMQ

  • 장점:
    • AMQP 기반으로 메시지 큐 시스템을 제공.
    • 내결함성지속성을 지원하고, 다양한 클라이언트 지원.
    • 매우 직관적인 설정관리 UI 제공.
  • 추천 사용 사례: 중소 규모의 비동기 메시지 처리, 메시지 큐를 통한 백엔드 시스템과의 통신.
  • 단점: 높은 처리량을 요구하는 시스템에서는 스케일링이 어려울 수 있음.

3. Google Cloud Pub/Sub

  • 장점:
    • 관리형 서비스로, 자동 스케일링, 고가용성무제한 메시지 처리 제공.
    • Google Cloud Platform과 통합되어 데이터 파이프라인 구축에 용이.
    • 실시간 스트리밍이벤트 기반 처리에 최적화.
  • 추천 사용 사례: 클라우드 기반 시스템, 실시간 분석 시스템이벤트 처리.
  • 단점: Google Cloud에 종속적이며, 비용이 상대적으로 높을 수 있음.

4. Amazon SNS (Simple Notification Service)

  • 장점:
    • AWS 기반의 관리형 Pub/Sub 서비스로 확장성내구성을 제공.
    • 다양한 프로토콜 지원 (HTTP, HTTPS, Email, SMS 등).
    • 시스템과 쉽게 통합할 수 있는 유연성.
  • 추천 사용 사례: AWS 기반 시스템에서 이벤트 알림, 간단한 Pub/Sub 통합.
  • 단점: 메시지 용량에 제한이 있을 수 있고, 특정 AWS 서비스와의 의존성이 있을 수 있음.

5. Redis Pub/Sub

  • 장점:
    • 초고속 메시징: 메모리 기반 데이터 저장소로 빠른 Pub/Sub 메시징.
    • 간단한 사용: Redis는 간단히 설정할 수 있으며, 성능이 뛰어남.
    • 비동기 메시징분산 캐시 시스템과 잘 통합됨.
  • 추천 사용 사례: 고속 메시징이 필요한 실시간 시스템, 캐시 기반 메시징.
  • 단점: 지속성이 부족하며, 메시지 손실이 발생할 수 있음. 큰 규모의 시스템에서 확장성에 제약이 있을 수 있음.

결론

  • 장비 측 (MQTT, CoAP, XMPP): MQTT는 장비 간 통신에 가장 적합하며, CoAPXMPP도 유용할 수 있습니다. AMQPKafka는 상대적으로 무겁고 고급 시스템에서 사용됩니다.
  • 중계서버 측 (Kafka, RabbitMQ, SNS): Kafka는 대규모 시스템에서 실시간 이벤트 처리데이터 스트리밍에 적합합니다. RabbitMQ는 중소 규모 시스템에 적합하며, SNSRedis는 간단하고 빠른 구현을 제공하지만, 고급 기능을 제공하지는 않습니다.
728x90
반응형
728x90
반응형

MQTT QoS (Quality of Service) 0, 1, 2는 각기 다른 메시지 전달 품질과 보장 수준을 제공하는 방식입니다. 각 수준은 전송 보장 여부재전송 방식에 따라 달라지며, 실시간 통신에서 중요한 선택 요소입니다.

이제 각 QoS 수준의 성격적합한 사용 예시를 들어가며 설명하겠습니다.


1. QoS 0: "At most once" (최대 1회 전달)

  • 전송 보장 없음: 메시지가 한 번 전송되면 더 이상 확인하지 않으며, 실패하면 재시도하지 않습니다.
  • 속도 빠름: 오버헤드가 적고, 빠르게 처리됩니다.
  • 성격: 메시지가 중복되거나 손실되어도 큰 문제가 없는 경우에 사용합니다.

💡 사용 예시

  • 온도 센서 데이터 전송: 온도 센서가 10초마다 데이터를 발송할 때, 메시지 손실이 있어도 크게 중요한 정보가 아니고, 업데이트가 자주 이루어지는 상황에 적합합니다.
    • 예: 온도 센서의 값이 "22도"라고 발송되고, 중간에 그 값이 손실되더라도 시스템은 다음 값(예: "23도")으로 업데이트하므로 큰 문제 없음.
  • 이벤트 로그: 사용자 인터페이스(UI)에서 발생하는 비-핵심적인 이벤트를 전송할 때.
    • 예: 버튼 클릭, UI 변경, 비상 알림 등. 이 이벤트들이 실패하더라도 큰 영향이 없고, 빠른 전송이 중요할 경우 사용.

특징

  • 빠르지만, 신뢰성 없음.
  • 네트워크 부담 적고, 자원 소비가 적음.

2. QoS 1: "At least once" (최소 1회 전달)

  • 전송 보장: 메시지가 최소 1회는 전달되도록 보장합니다. 만약 수신자가 메시지를 받지 못했으면, 재전송됩니다.
  • 중복 가능: 메시지가 중복될 수 있으므로, 수신자가 여러 번 동일한 메시지를 받을 수 있습니다.
  • 성격: 메시지가 반드시 도착해야 하지만, 중복 수신이 허용되는 경우에 적합합니다.

💡 사용 예시

  • 제어 명령 전송: 스마트 홈 시스템에서 기기를 제어하는 명령을 전송할 때. 예를 들어, 스마트 전등을 켜는 명령을 전송하는데, 명령이 중복되더라도 전등이 켜지는 것만 중요할 경우.
    • 예: "전등을 켜라"라는 명령이 중복되어 전송되더라도 상관없고, 명령이 도달하는 것이 우선입니다.
  • 알림 시스템: 중요한 알림(예: 비상 상황 발생)을 사용자에게 전송할 때. 메시지가 한 번 못 받았더라도 최소 1회는 전달되어야 하므로, 중복을 감수하더라도 보장된 전송이 필요합니다.

특징

  • 메시지 보장이 필요하며, 중복 가능.
  • 메시지가 도달할 때까지 재전송됨.
  • 중복된 메시지 처리를 애플리케이션에서 처리해야 함.

3. QoS 2: "Exactly once" (정확히 1회 전달)

  • 최고의 보장: 메시지가 정확히 1회만 전달되도록 보장합니다. 메시지가 중복되거나 손실되지 않도록 4단계 핸드셰이크를 통해 처리됩니다.
  • 성격: 메시지가 중복되거나 손실되지 않도록 보장되어야 할 경우에 사용합니다. 전송 과정에서 약간의 지연이 있을 수 있습니다.

💡 사용 예시

  • 금융 거래 시스템: 주식 거래나 결제 시스템과 같이 데이터 손실이 치명적인 경우. 예를 들어, 주식 거래 시 "주식 10주 매도"라는 메시지가 정확히 한 번만 전달되어야 합니다.
    • 예: 결제 정보계좌 이체 명령과 같은 중요한 거래 데이터에서 정확한 1회만 전달되도록 보장해야 합니다.
  • 센서 데이터 수집: 의료 모니터링 시스템에서의 데이터 수집. 예를 들어, 심장 박동 수, 혈압 값 등은 정확히 한번만 수신되어야 하며, 중복되거나 손실될 경우 큰 문제가 발생할 수 있습니다.

특징

  • 메시지 중복 및 손실이 없음.
  • 핸드셰이크를 통한 신뢰성 보장.
  • 전송 시간이 오래 걸리고, 네트워크 부담이 큼.

QoS 사용법 예시

1. QoS 0 사용 예시 (온도 센서)

client.publish("sensor/temperature", "22", qos=0)
  • 온도 센서 데이터는 자주 갱신되므로 메시지 손실이 있더라도 시스템이 정상적으로 동작하는 경우에 사용합니다.
  • 실시간으로 갱신되기 때문에 중복된 값이 나와도 크게 문제가 없습니다.

2. QoS 1 사용 예시 (스마트 홈 제어)

client.publish("home/lights/on", "1", qos=1)
  • 스마트 홈에서 전등을 켜는 명령을 보낼 때 사용합니다.
  • 전등을 켜라는 명령이 반드시 도달해야 하며, 명령이 중복될 수 있지만, 여전히 전등을 켤 수 있으면 되므로 중복 전송은 허용됩니다.

3. QoS 2 사용 예시 (금융 거래)

client.publish("bank/transaction", '{"account": "1234", "amount": 500}', qos=2)
  • 금융 거래에서 정확한 1회 전송이 보장되어야 하므로 QoS 2를 사용합니다.
  • 같은 거래 명령이 여러 번 도달하는 것보다 한 번만 전송되도록 보장하는 것이 중요합니다.

결론

 

QoS 레벨 설명 사용 예시 적합한 사용 사례
QoS 0 "최대 1회 전달", 전송 보장 없음 온도 센서 데이터, 비핵심 이벤트 메시지 손실이 있어도 큰 문제가 없는 경우
QoS 1 "최소 1회 전달", 중복 가능 스마트 홈 제어 명령, 알림 시스템 메시지가 도달해야 하지만 중복 가능
QoS 2 "정확히 1회 전달", 중복 & 손실 없음 금융 거래, 중요한 센서 데이터 데이터 손실이 치명적이거나 중복을 허용할 수 없는 경우
 

요약:

  • QoS 0은 속도가 중요하고 메시지 손실을 감수할 수 있을 때 사용합니다.
  • QoS 1은 메시지가 적어도 한 번 전달되도록 보장하고, 중복을 허용할 수 있을 때 사용합니다.
  • QoS 2는 메시지가 정확히 한 번만 전달되도록 보장하고, 중요한 데이터 전송에 적합합니다.

따라서 각 상황에 맞게 QoS 수준을 선택하여 네트워크 효율성전송 신뢰성을 최적화할 수 있습니다.

728x90
반응형
728x90
반응형

✅ Selenium 기반 화면 테스트 코드 (예시)

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
import time

# 설정
URL = "https://example.com"  # ← 실제 서비스 주소로 교체
USER_ID = "testuser"
USER_PW = "testpass123"

# 크롬 드라이버 경로
chrome_options = Options()
chrome_options.add_argument("--start-maximized")
service = Service("/path/to/chromedriver")  # ← chromedriver 경로 지정

# 드라이버 실행
driver = webdriver.Chrome(service=service, options=chrome_options)
driver.get(URL)

# 로그인 테스트
try:
    driver.find_element(By.NAME, "id").send_keys(USER_ID)
    driver.find_element(By.NAME, "password").send_keys(USER_PW)
    driver.find_element(By.TAG_NAME, "form").submit()  # 또는 로그인 버튼 클릭

    time.sleep(2)  # 로그인 후 대기

    assert "dashboard" in driver.current_url.lower()
    print("✅ 로그인 성공")

    # 메뉴 테스트: 재고관리 > AI 냉장고 관리
    menu = driver.find_element(By.XPATH, "//span[contains(text(), '재고관리')]")
    menu.click()
    time.sleep(1)
    sub_menu = driver.find_element(By.XPATH, "//span[contains(text(), 'AI 냉장고 관리')]")
    sub_menu.click()
    time.sleep(2)

    assert "냉장고" in driver.page_source
    print("✅ AI 냉장고 관리 페이지 확인됨")

except Exception as e:
    print("❌ 테스트 실패:", e)

finally:
    driver.quit()

🧰 사용 방법 요약

  1. pip install selenium
  2. 크롬 버전에 맞는 chromedriver 다운로드 후 경로 지정
  3. 실제 URL, 로그인 필드의 name/id/class, 메뉴 텍스트 등을 환경에 맞게 수정
  4. 터미널에서 실행

📋 주요 테스트 항목


 

항목 확인 방식
로그인 성공 여부 URL 이동 여부 및 페이지 요소 검출
메뉴 클릭 가능 여부 XPath 클릭 → 이동 후 해당 텍스트 포함 여부
페이지 렌더링 확인 page_source 내 키워드 존재 여부 확인
 

📌 추가로 가능할 것들

  • 다른 메뉴 순회 (반복문으로 메뉴 리스트 자동 순회)
  • 버튼 클릭/입력값 테스트
  • 스크린샷 저장 (driver.save_screenshot("ai_fridge.png"))
  • 실패 시 Slack이나 이메일 알림 연동
728x90
반응형
728x90
반응형

✅ 1. Idempotency (중복 처리 방지)

💡 목적

MQTT는 QoS 1/2 사용 시 메시지가 중복 전송될 수 있으므로
Kafka Consumer나 DB 처리 단계에서 중복 방지가 필요합니다.

✅ 전략

  • MQTT 메시지에 messageId 또는 eventId 필드 포함 (UUID 등)
  • Kafka Consumer가 해당 ID를 기준으로 처리 여부 조회
  • 이미 처리된 경우 skip (DB 또는 Redis 등에 처리 이력 저장)

✅ 예시

{
  "messageId": "c2e2a8dd-52aa-403f-bb6d-1a7a2eeac341",
  "deviceId": "abc123",
  "timestamp": "2025-06-27T10:30:00Z",
  "data": { "temp": 25.3 }
}

✅ 처리 예 (Kafka Consumer):

if (messageRepository.existsByMessageId(message.getMessageId())) {
    log.info("Duplicate message: {}", message.getMessageId());
    return; // 중복 skip
}

messageRepository.save(message); // 최초 처리

※ 또는 Redis에 messageId를 TTL과 함께 저장하여 빠르게 검사


✅ 2. Topic Routing (MQTT → Kafka 토픽 매핑)

💡 목적

MQTT의 토픽 구조(/device/abc/temp)를 Kafka에 맞는 구조(device.abc.temp)로 변환

✅ 기본 전략

  • / → . 치환 또는
  • 패턴 기반 변환: 정규식 or prefix 매핑

✅ 예시 구현

public String mapMqttToKafkaTopic(String mqttTopic) {
    return mqttTopic.replace("/", ".");
}

또는 더 복잡한 경우:

if (mqttTopic.matches("/device/[^/]+/temp")) {
    return "device.temp";
} else if (mqttTopic.matches("/sensor/.+")) {
    return "sensor.data";
} else {
    return "unknown.topic";
}

✅ 3. DLQ (Dead Letter Queue) 처리

💡 목적

Kafka publish나 Consumer 처리 실패 시 메시지를 별도 토픽(DLQ)으로 보내고
문제 분석 또는 재처리 대상으로 격리

✅ 구현 전략 (Kafka publish 실패 시):

try {
    kafkaTemplate.send(topic, message).get(); // 동기 전송 (예외 캐치용)
} catch (Exception e) {
    log.error("Kafka publish failed: {}", e.getMessage());
    kafkaTemplate.send("deadletter.mqtt", message);
}

deadletter.mqtt는 운영 Kafka 클러스터에서 모니터링하거나 Alert 대상으로 사용 가능

✅ Kafka Consumer → DLQ 처리 예시도 가능:

Spring Kafka에서 ErrorHandler를 통해 DLQ 자동 라우팅 설정 가능


✅ 4. JWT / TLS 기반 MQTT 보안 설정

💡 목적

MQTT Broker(예: EMQX, Mosquitto 등)와의 통신 시 기기 인증과 전송 보안을 확보


✅ TLS 인증 (Broker 측 설정)

EMQX 예시:

listener.ssl.external {
  enable: true
  bind: 8883
  keyfile: etc/certs/server.key
  certfile: etc/certs/server.crt
  cacertfile: etc/certs/ca.crt
}

Spring MQTT Client 설정:

java
복사편집
MqttConnectOptions options = new MqttConnectOptions(); options.setSocketFactory(SSLSocketFactoryFactory.get("ca.crt", "client.crt", "client.key"));

직접 인증서 기반 인증(TLS mutual authentication)


✅ JWT 인증 (EMQX)

  1. JWT를 username 또는 password 필드에 담아 전송
  2. EMQX에서 JWT 인증 플러그인 활성화
  3. PEM 키 또는 JWKS URL 기반 서명 검증

Spring MQTT 설정:

MqttConnectOptions options = new MqttConnectOptions();
options.setSocketFactory(SSLSocketFactoryFactory.get("ca.crt", "client.crt", "client.key"));

📌 종합 구조 예시

options.setUserName("jwt");
options.setPassword("<jwt_token>".toCharArray());

📝 요약


 

항목 구현 포인트 도구/구현 위치
Idempotency messageId → Redis/DB로 중복검사 Kafka Consumer
Topic Routing MQTT topic → Kafka topic 규칙 변환 Spring App 또는 EMQX
DLQ 처리 Kafka publish 실패 시 fallback KafkaTemplate try-catch
보안 연동 TLS 인증서, JWT token 사용 MQTT Client + Broker 설정

 

728x90
반응형
728x90
반응형

Spring에서 MQTT Topic 필터별로 Kafka Topic으로 라우팅하는 전략은 매우 유연하게 구현할 수 있습니다.
기본적으로 다음과 같은 두 가지 방식이 존재합니다:


✅ 1. Prefix/Suffix 기반 라우팅

MQTT Topic 구조: /device/{deviceId}/{type}
Kafka Topic 구조: kafka.device.{deviceId}.{type}

구현 예시 (Spring Boot Service)

public String routeToKafkaTopic(String mqttTopic) {
    // 예: /device/abc123/temp → kafka.device.abc123.temp
    String[] tokens = mqttTopic.split("/");
    if (tokens.length >= 3 && "device".equals(tokens[1])) {
        return String.format("kafka.device.%s.%s", tokens[2], tokens[3]);
    }
    return "kafka.unknown"; // fallback
}

사용:

@Override
public void messageArrived(String mqttTopic, MqttMessage mqttMessage) {
    String payload = new String(mqttMessage.getPayload(), StandardCharsets.UTF_8);
    String kafkaTopic = routeToKafkaTopic(mqttTopic);

    kafkaTemplate.send(kafkaTopic, payload);
}

✅ 2. 정규표현식 기반 고급 라우팅

private final Pattern sensorPattern = Pattern.compile("/sensor/(?<location>[^/]+)/(?<type>[^/]+)");
private final Pattern devicePattern = Pattern.compile("/device/(?<deviceId>[^/]+)/(?<type>[^/]+)");

public String routeToKafkaTopic(String topic) {
    Matcher m1 = sensorPattern.matcher(topic);
    if (m1.matches()) {
        String loc = m1.group("location");
        String type = m1.group("type");
        return "sensor." + loc + "." + type;
    }

    Matcher m2 = devicePattern.matcher(topic);
    if (m2.matches()) {
        return "device." + m2.group("deviceId") + "." + m2.group("type");
    }

    return "unknown.topic";
}

✅ 3. YAML 또는 DB 기반의 동적 매핑 전략

  • application.yml 또는 별도 JSON 설정 파일을 통해 매핑 규칙을 정의하고,
  • 브릿지 앱에서 이를 기준으로 라우팅

application.yml 예시:

mqtt-routing:
  "/device/+/temp": "kafka.device.temp"
  "/sensor/factory1/#": "kafka.sensor.factory1"

Java 코드:

@ConfigurationProperties(prefix = "mqtt-routing")
public class MqttRoutingProperties {
    private Map<String, String> routes = new HashMap<>();
}
public String resolveKafkaTopic(String mqttTopic) {
    return routingProperties.getRoutes().entrySet().stream()
        .filter(entry -> TopicMatcher.match(entry.getKey(), mqttTopic))
        .map(Map.Entry::getValue)
        .findFirst()
        .orElse("default.topic");
}

※ TopicMatcher.match()는 MQTT 와일드카드(+, #) 비교용 유틸로 직접 구현 또는 라이브러리 사용


✅ 4. 와일드카드 매칭 유틸 참고 구현

public class TopicMatcher {
    public static boolean match(String pattern, String topic) {
        String regex = pattern
            .replace(".", "\\.")
            .replace("+", "[^/]+")
            .replace("#", ".+");
        return topic.matches(regex);
    }
}

📌 결론 및 추천


전략 추천 상황
Prefix/Suffix 매핑 주제 구조가 고정되어 있을 때
정규표현식 매핑 복잡하거나 다단계 주제 구조일 때
YAML 기반 동적 매핑 주제-토픽 대응을 외부 설정으로 통제하고 싶을 때
DB 기반 매핑 대규모 매핑 관리 필요 시 (관리 콘솔과 연동 등)
728x90
반응형
728x90
반응형

"MQTT Topic → Kafka Topic 변환" (Topic Routing) 로직은 보통 중계 서버(Bridge App 혹은 MQTT Broker의 Kafka Connector) 측면에서 처리하는 것이 표준적이고 안전한 방법입니다. 기계(디바이스) 측면에서 처리하는 것은 유연성이 낮고 관리가 어렵기 때문입니다.


✅ 1. 중계 서버 측에서 처리하는 것이 권장되는 이유


 

기준 기기 측 처리 브릿지 서버 측 처리
유연성 기기마다 하드코딩됨 서버 측 rule만 바꾸면 전체 반영
운영 편의성 수백~수천 대 기기 업데이트 필요 서버 설정만 변경하면 OK
보안/접근 제어 Topic이 외부에 노출 내부 topic 구조만 사용 가능
표준화 어려움 (벤더별 다름) 공통 매핑 전략 적용 가능
 

✅ 2. 실전 예시

예: MQTT 주제 /device/abc123/temp

이런 MQTT 주제를 다음과 같이 Kafka topic으로 라우팅:

 

MQTT Topic Kafka Topic
/device/abc123/temp device.abc123.temp
/sensor/factory1/line2/humidity sensor.factory1.line2.humidity
 

 


🔧 3. Spring Boot 브리지 앱에서 Routing 처리 방식 예시

@Override
public void messageArrived(String topic, MqttMessage message) {
    String payload = new String(message.getPayload(), StandardCharsets.UTF_8);

    // Topic Routing: MQTT topic → Kafka topic 변환
    String kafkaTopic = topic.replace("/", ".");

    log.info("Routing MQTT[{}] → Kafka[{}]: {}", topic, kafkaTopic, payload);

    kafkaTemplate.send(kafkaTopic, payload);
}

 


📦 4. Broker 자체에서 처리 (EMQX 예시)

EMQX Kafka Connector에서는 다음과 같은 설정으로 topic 매핑이 가능합니다:

routes:
  - topic: "/device/+/data"
    kafka_topic: "device.${clientid}.data"

또는:

kafka_topic: "${topic}"  # mqtt topic 그대로 사용
  • ${topic}: MQTT topic 그대로
  • ${clientid}: MQTT client ID
  • ${username} 등 동적 변수도 사용 가능

✅ 결론

Topic Routing은 “중계 서버 측”에서 처리하는 것이 일반적이며 권장됩니다.

  • 브릿지 앱(Spring Boot) 또는 MQTT Broker(EMQX, HiveMQ)의 Kafka connector 설정에서 라우팅 처리
  • 기기에서는 단순 주제 구조만 신경 쓰게 하고, 서버에서 Kafka 구조에 맞게 변환
  • Topic name rule은 중앙 통제 가능 → 운영 효율성 ↑
728x90
반응형
728x90
반응형

✅ MQTT → Kafka 아키텍처 개요

🧱 구조 개요

[Device/Sensor] 
   ↓  (MQTT)
[MQTT Broker]  (e.g., EMQX, Mosquitto, HiveMQ)
   ↓  (MQTT → Kafka Bridge)
[Kafka Producer] 
   ↓
[Kafka Cluster] 
   ↓
[Consumer Group]
   ↓
[Processing Services / DB / Alerting]

⚙️ 구성 요소별 설명 및 고려사항

1. Device Layer: MQTT 사용

  • ✅ 경량 통신, 배터리/연결 상태 최적화
  • ✅ QoS (1 or 2)로 안정성 조절
  • ✅ Retained message, LWT(Last Will and Testament) 등 기기 상태 관리 가능

EMQX, HiveMQ는 Kafka connector 내장


2. MQTT Broker → Kafka 연동 방식

✅ 옵션 1: MQTT Broker 자체 Kafka Connector 사용

  • EMQX, HiveMQ, VerneMQ는 공식 Kafka Plugin 제공
  • 주제 기반 → Kafka topic 매핑
MQTT topic: /device/abc123/data  
→ Kafka topic: device.abc123.data

✅ 옵션 2: 중간 Bridge 애플리케이션 구성

  • 직접 MQTT Subscribe → Kafka에 Publish하는 Bridge 앱
  • Spring Boot, Node.js 등으로 쉽게 구성 가능
  • 유연한 변환/필터/보안 처리 가능

🟡 적절한 옵션은 시스템 복잡성과 유연성 요구에 따라 선택


3. Kafka Layer: 강력한 수신/처리 백본

  • ✅ 메시지 유실 방지 (Durable log)
  • ✅ Offset 기반 재처리/리커버리 용이
  • ✅ Consumer Group을 통해 병렬 처리 확장 가능
  • ✅ Kafka Connect or Stream으로 다른 시스템 연동 (DB, S3, Elasticsearch 등)

✅ 장점 요약: MQTT + Kafka


 

항목 장점
확장성 Kafka가 고속 다량 메시지 수신 처리에 특화
안정성 MQTT QoS + Kafka Durable Log 조합으로 유실 거의 없음
리커버리 Kafka의 Offset, DLT로 재처리/복구 쉬움
유지보수성 WebSocket보다 표준화된 프레임워크 많음
통신 효율 MQTT는 배터리/대역폭에 효율적
비동기 처리 Kafka 기반 비동기 아키텍처와 잘 맞음
 

🔻 보완 고려 사항


 

이슈 해결 전략
MQTT 재연결 시 중복 메시지 MQTT Client ID + QoS 1 or 2 + Idempotency 처리
브로커 장애 시 대응 MQTT 클러스터 (HA 구성), Kafka Replication 구성
Kafka 처리 지연 Backpressure 처리, Consumer Buffering
포맷 불일치 MQTT ↔ Kafka 브릿지에서 JSON 표준화 처리
 

✅ 추천 아키텍처 스택


계층 구성
Device MQTT Client (QoS1/2, KeepAlive 설정)
Broker EMQX (Kafka Plugin 내장), Mosquitto+Bridge 앱
Bridge/Adapter (옵션) Spring Boot 또는 Node.js MQTT→Kafka Publisher
Kafka Kafka + Zookeeper 또는 Confluent Platform
Consumer Spring Kafka Consumer, Kafka Streams, Spark/Flink
Monitoring Prometheus + Grafana, Kafka Manager, MQTT Dashboard
 

🧩 확장 아이디어

  • 📦 Kafka → Redis, MongoDB, PostgreSQL 로 실시간 데이터 저장
  • 📊 Kafka Streams → 실시간 분석/경보 시스템
  • 🔁 Kafka Connect → Elasticsearch 연동 (검색/대시보드)

📝 결론

WebSocket 대신 MQTT → Kafka 기반 구조는 매우 효과적인 선택입니다.

  • IoT/센서 → 브로커 → 대용량 수신 처리 → DB저장/분석 구조에 적합
  • 확장성, 내결함성, 복구성, 유지보수성 모두 우수
  • WebSocket보다 성능-안정성의 균형이 좋음
728x90
반응형
728x90
반응형

✅ 1. 리커버리 요구 유형 구분


 

유형 예시 필요한 리커버리 방식
메시지 유실 브로커 다운, 연결 중단 재전송, ACK 기반 재요청
메시지 중복 재전송 중 중복 처리 Idempotency 처리
메시지 순서 꼬임 연결 재설정 시 순서 역전 시퀀스 기반 정렬
장시간 단절 후 데이터 폭주 오프라인 동안 누적 → 복구 시 대량 처리 Backpressure 처리, Batch 처리
수신 누락 특정 Subscriber가 누락 메시지 큐 또는 로그 기반 재구성
 

✅ 2. 아키텍처 구성: 공통 복원 패턴

🧱 기본 구성 요소

[Publisher]
   │
[Message Broker (MQTT / Kafka / WebSocket Gateway)]
   │
[Message Buffer (Optional)]
   │
[Consumer Service]
   │
[Recovery Layer] ← 핵심
   │
[DB / Processing System]

✅ 3. 통신 방식별 리커버리 적용 모델

🔸 A. MQTT

MQTT는 기본적으로 경량 메시지 전송에 특화되었으며, QoS(Quality of Service) 수준을 통해 일부 리커버리를 지원합니다.


 

QoS Level 설명 리커버리 특징
0 At most once 손실 가능성 있음
1 At least once 중복 가능성 있음 (→ idempotent 처리 필요)
2 Exactly once 성능 저하 but 정확성 높음
 

MQTT 기반 아키텍처 리커버리

  • QoS=1 또는 2 설정
  • Persistent Session 사용
  • Redis/DB 기반 Last Processed Message ID 저장
  • 중복 방지를 위한 Idempotent Consumer Layer 구현
  • 장애 시: Client 재접속 → 마지막 처리된 ID 기준 재구성 요청

🔸 B. Kafka

Kafka는 기본적으로 Replayable, Durable, Partitioned Ordered Log를 제공하므로 리커버리 관점에서 매우 강력합니다.

Kafka 기반 리커버리 전략

  • Consumer Offset 관리 (commit 또는 external 저장)
  • 장애 시 마지막 커밋된 offset부터 재소비 가능
  • Dead Letter Topic(DLT) 활용 → 실패 메시지 격리 후 재처리
  • Exactly Once 처리 (EOS): Kafka Streams, Transaction 기반

Kafka 리커버리 패턴 구조

[Producer]
   ↓
[Kafka Broker]
   ↓
[Consumer Group]
   ↓                ↘
[Processing Service]  [DLT Processor]
   ↓
[DB (with Idempotency)]

🔸 C. WebSocket

WebSocket은 상태 유지 연결로 처리되지만, 연결이 끊기면 메시지 손실 가능성이 존재.

WebSocket 리커버리 전략

  • Client ↔ Server 양방향 Ping/Pong + Heartbeat
  • 연결 종료 시점 기준으로 “마지막 수신 시퀀스” or “이벤트 타임스탬프” 저장
  • 복구 시점에 Request Replay API or Snapshot Sync API 제공
  • 보조 메시지 채널 (예: Kafka 또는 REST Sync API)와 병행 운영

✅ 4. 보편적으로 적용 가능한 리커버리 패턴

⛓️ 1) Idempotent Consumer

  • 중복 메시지가 와도 처리 결과가 동일하게 유지되도록 설계
  • 기준: messageId, eventId, timestamp+source 등

📦 2) Message Replay Buffer (Persistent Queue)

  • 장애 발생 시 재요청 가능한 로그 저장소
  • 예: Kafka, Redis Stream, Durable RabbitMQ Queue 등

🔁 3) Backpressure / Retry Queue

  • 장애 발생 시 메시지를 별도 Queue로 보내고,
  • 일정 시간 후 또는 수동 트리거로 재처리

🧾 4) Audit Log or Event Sourcing

  • 모든 입력 메시지와 상태 변화를 별도로 저장
  • 리커버리 시: 이벤트 재적용

🧪 5. 부가적으로 고려할 리커버리 기법

  • Circuit Breaker + Retry Policy (Spring Retry, Resilience4j)
  • Snapshot + Delta Sync (주기적으로 전체 상태 저장)
  • Prometheus + Alertmanager: 통신 이상 감지 자동화
  • Chaos Monkey 등 장애 테스트 도구로 사전 검증

🔧 아키텍처 예시 (Kafka + WebSocket 혼합)

[Sensor Client (MQTT/WebSocket)]
   │
   ├─> [WebSocket API Gateway]
   │       └─> [Snapshot Sync API]
   │
   └─> [Kafka Producer]
           ↓
      [Kafka Broker] ─────> [DLT Topic]
           ↓
      [Processing App (Consumer)]
           ↓
      [PostgreSQL + Offset Table]
  • 이 구조는 실시간 전달 실패 시 Kafka로 우회,
    또는 WebSocket 통신 실패 후 Snapshot Sync로 복원하는 구조입니다.

✅ 요약

 

목적 적용 기술/패턴
메시지 중복 방지 Idempotent 처리
유실 방지 Kafka, MQTT QoS, Persistent Queue
순서 보장 Kafka Partition, Sequence ID 사용
오프라인 누적 대응 Backpressure, Rate Control
중단 시 복원 Snapshot + Delta Sync, Replay Buffer
728x90
반응형
728x90
반응형

JWT(Json Web Token) 방식이 **고보안(high security)**으로 간주되는 이유는 단순히 "토큰"이기 때문이 아니라, 위변조 방지, 무상태(stateless) 인증, 만료 시간 설정, 그리고 암호화된 서명 덕분입니다. 아래에 상세히 설명드릴게요.


🔐 JWT 방식이 고보안인 이유

✅ 1. 서명(Signature) 기반 위변조 방지

JWT는 다음 3가지 구조로 되어 있습니다:

[Header].[Payload].[Signature]

예:

eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.
eyJzZXJpYWwiOiJJVExKLUFQUEFSQVRVUyIsImV4cCI6MTcxOTg0NTYwMH0.
VJFJjJSkcUzddzy3yO9xK3....
 

이중 Signature는 다음과 같이 생성됩니다:

signature = HMACSHA256(base64(header) + "." + base64(payload), secret)
  • 즉, 누가 Payload나 Header를 조작하더라도, Signature가 검증에 실패합니다.
  • 이 방식은 서버 측의 비밀 키(secret or private key) 없이는 생성 불가능하기 때문에 위변조 방지에 강력합니다.

✅ 2. Token 자체에 유효 시간(exp) 포함 가능

{
  "serial": "K-001",
  "exp": 1719986471,     // 만료 시각 (UNIX timestamp)
  "iat": 1719982871      // 발급 시각
}
  • 기기가 제시한 JWT의 유효 시간(exp)이 현재 시간을 초과했다면 자동 만료로 간주 → 서버 인증 거절
  • 즉, 기기 도난/키 유출 시 피해 범위를 자동 제한할 수 있습니다.

✅ 3. 서버가 상태를 기억할 필요 없음 (Stateless)

  • 일반적인 accessKey 방식은 DB 또는 Redis에서 해당 key가 유효한지 확인해야 함.
  • 반면 JWT는 토큰 자체가 완전한 인증 정보를 담고 있어서 서버가 별도 조회 없이도 검증 가능.
  • 분산 환경, 서버 확장성, 로드 밸런싱에 유리.

✅ 4. 공개키 기반 비대칭 서명 (RS256 등) 지원

  • JWT는 대칭 서명(HMAC-SHA256)뿐 아니라 비대칭 키 서명(RS256, ES256 등) 도 지원합니다.
  • 서버는 private key로 서명하고, 기기/클라이언트는 public key로 검증 가능.
  • 서버-기기 간 인증서 기반 통신이 필요한 경우 활용 가능.

✅ 5. Scope, Role, Version 등 추가 정보 포함 가능

  • JWT의 Payload에는 역할(role), 허용된 명령(permission), 펌웨어 버전 등도 포함 가능
{
  "serial": "K-001",
  "role": "reader",
  "version": "v1.2.3",
  "exp": ...
}
  • 이를 통해 기능 제한, 접근 제어, 정책 분기 등을 서버에서 빠르게 수행 가능

⚠️ 단, JWT는 고보안 ≠ 만능

 

장점 (고보안) 보완 필요사항
위변조 방지, 서명 검증 유출 시 만료 전까지 사용 가능 (→ TTL 짧게)
서버 없이도 검증 가능 토큰 취소(revoke)가 복잡 (→ 블랙리스트 필요)
구조적으로 범용 인증 확장에 유리 발급/서명 방식은 반드시 안전하게 관리 필요
 

✨ 결론

JWT가 고보안인 이유는 "서명 기반 위변조 방지 + 만료 시간 포함 + 서버 상태 의존성 없음 + 공개키 암호화 지원" 때문입니다.

만약 장치가 전용 하드웨어이고, 초기 등록 이후 인증정보를 저장해두는 구조라면 JWT (짧은 TTL + 주기적 재발급) 구조가 매우 적합합니다.

728x90
반응형
728x90
반응형

🧭 요약: Kafka가 하는 역할

"실시간 수신 데이터를 분산 처리 가능한 구조로 중계해주는 중심 허브 역할"


🔍 Kafka의 역할 상세 정리

1. 비동기 처리 (Decoupling)

  • WebSocket 서버는 데이터를 즉시 Kafka에 넣고 종료 (빠름)
  • 이후 처리 로직(MariaDB 저장, 알림 발송, 로그 기록 등)은 Kafka consumer들이 비동기적으로 수행

✅ WebSocket 서버의 부담이 줄고, 처리 지연 없이 응답 가능
✅ 소비 로직은 느려도 상관없음


2. 데이터 버퍼/완충지 역할 (Buffering)

  • 순간적으로 데이터 폭주가 발생해도 Kafka가 데이터를 큐 형태로 저장해서 순차 처리 가능
  • 예: 1초에 수천 개 메시지가 들어와도 consumer가 순차 처리

✅ 시스템 안정성 증가
✅ 장애 시 메시지 유실 방지 (Kafka는 디스크에 저장)


3. 데이터 전달 허브 (Routing)

  • 여러 consumer들이 동일한 topic을 구독할 수 있음
  • 예:
    • consumer A: MariaDB 저장
    • consumer B: 실시간 대시보드 반영
    • consumer C: 이상 탐지 분석

✅ 하나의 데이터로 여러 기능 분기 가능
✅ 마이크로서비스 구조와 궁합 좋음


4. 재처리 가능성 확보 (Reprocessing)

  • Kafka는 데이터를 로그처럼 저장하기 때문에,
  • 일정 시간(예: 7일) 내에는 이전 메시지 재처리 가능

✅ 예전 데이터를 다시 분석하거나 저장 가능
✅ 소비자가 장애났다가 복구되어도 처리 재시도 가능


5. 확장성 (Scalability)

  • Kafka는 topic을 partition으로 나누고,
  • consumer group을 늘려서 병렬 소비 가능

✅ 대규모 IoT/임베디드 데이터에도 대응 가능
✅ 소비자가 여러 인스턴스로 scale-out 가능


🔁 비교: Kafka 없이 직접 DB 저장했을 경우

항목 Kafka 미사용 Kafka 사용
WebSocket 서버 처리 부담 높음 낮음
장애 전파 범위 큼 (DB/네트워크 장애 시 전체 영향) 작음 (consumer만 영향)
실시간 분석/대시보드 연동 구현 복잡 여러 consumer로 분리 가능
확장성 낮음 높음
재처리 어려움 가능 (offset 기반)
 

🧪 실무 예시

Kafka Topic 구독 Consumer
device.data MariaDB 저장 서비스
device.alert 알림 시스템 (FCM, Slack)
device.log 로그/감사 이력 저장 시스템
device.analytics Spark/Flink 스트리밍 분석 시스템
 

📌 결론

Kafka는 단순히 “전송해주는 애”가 아니라 다음과 같은 역할을 수행합니다:

✅ 빠르게 받고
✅ 큐잉하고
✅ 다양한 서비스로 나눠 보내고
✅ 장애를 막고
✅ 대용량을 감당하며
✅ 재처리까지 가능한 통합 메시지 허브

728x90
반응형

+ Recent posts