Search

Kafka 개념

가. 기본 동작 흐름

1) 기본 개념
이벤트: data in the form of key, value, timestamp, and optional metadata headers
Event key: "Alice" Event value: "Made a payment of $200 to Bob" Event timestamp: "Jun. 25, 2020 at 2:06 p.m."
Plain Text
복사
example from https://kafka.apache.org/intro
프로듀서: 이벤트 발생 시스템
컨슈머: 이벤트 조회 및 처리 시스템
토픽: 이벤트의 저장소, 토픽은 파일 시스템의 폴더에 해당하고, 이벤트는 앞의 폴더에 저장되는 파일로 볼 수 있습니다.
파티션: 토픽의 세부 구성 요소로 실제 이벤트가 저장되는 공간입니다.
주키퍼: 분산 처리되는 메시지의 메타정보를 관리하는 시스템입니다.
2) 전체 과정 요약
메시지 생성: 프로듀서는 메시지를 생성하고 카프카 토픽의 파티션으로 전송합니다.
메시지 저장 및 관리: 카프카 브로커는 이 메시지를 받아 파티션에 저장하고, 오프셋을 관리합니다.
메시지 소비: 컨슈머는 브로커로부터 메시지를 받아 처리하고, 처리한 메시지의 오프셋을 카프카에 커밋합니다.
3) Create Messages
1.
메시지 생성: 프로듀서는 메시지(또는 이벤트)를 생성합니다.
2.
메시지 전송: 생성된 메시지는 특정 카프카 토픽의 파티션으로 전송됩니다. 프로듀서는 메시지를 어느 파티션에 배치할지 결정할 수 있습니다.
3.
신뢰성과 효율성: 프로듀서는 메시지 전송의 신뢰성(예: 메시지의 손실 방지)과 효율성(예: 높은 처리량)을 보장하기 위해 다양한 설정을 할 수 있습니다.
4) Write Events into the Topic
1.
메시지 저장: 카프카 브로커는 프로듀서로부터 받은 메시지를 토픽의 파티션에 저장합니다. 각 파티션은 메시지를 순서대로 저장하며, 각 메시지는 고유한 오프셋으로 식별됩니다.
2.
메시지 전달 관리: 브로커는 컨슈머가 요청한 메시지를 제공하고, 컨슈머 그룹의 상태와 오프셋을 관리합니다.
5) Read the Events
1.
메시지 소비: 컨슈머는 지정된 토픽의 파티션으로부터 메시지를 읽습니다. 이때, consumer.poll() 메소드를 통해 새로운 메시지를 주기적으로 가져옵니다.
2.
오프셋 업데이트: 컨슈머는 처리한 메시지의 오프셋을 카프카에 커밋함으로써 메시지가 처리된 위치를 업데이트할 수 있습니다.

나. 개념 상세

1) 토픽의 주요 특징
이벤트 저장: 토픽 내 파티션은 선입선출 방식으로 이벤트를 저장하는데, 컨슈머가 이벤트를 읽더라도 파티션 내부의 이벤트는 삭제되지 않습니다. 이와 같이 이벤트가 삭제되지 않는다면 동일한 컨슈머 그룹에 속해있고, offset.reset 설정이 earliest일 경우, 새로운 컨슈머가 파티션 내부의 이벤트를 읽을 수 있습니다. 만약 동일한 데이터셋에 대해 검색엔진으로 업데이트하고 다른 DB에도 반영한다고 하면 파티션 내부의 데이터를 읽는 작업을 독립된 시스템에서 유연하게 처리할 수 있습니다.
복수 파티션의 이벤트 저장 방법: 키의 유무에 따라 이벤트의 저장 방법이 달라집니다. 만약 키가 없다면 라운드 로빈 방식에 따라 여러개의 파티션에 골고루 분산되어 저장됩니다. 만약 키가 있다면 키의 해시값에 따라 특정 파티션에 할당됩니다.
파티션 설정 주의사항: 파티션은 늘릴 수 있지만 줄일 수 없습니다. 파티션이 늘어남에 따라 컨슈머의 개수를 늘려서 사용할 수 있는 장점이 있지만 앞의 단점을 고려하여 파티션을 설정해야 합니다.
파티션 내 레코드의 최대 보존 시간(log.retention.ms)과 최대 보존 크기(log.retention.byte)를 지정할 수 있습니다.
파티션 개수를 어떻게 정할 수 있을까? bootstrap server 설정과 별개로 하나? 이거 AWS에서 보통 어떻게들 설정하나? 특히 카프카 클러스터 구성은 어떻게 하나?
2) Broker, 레플리케이션, In Sync Replication
파티션은 리더 파티션과 리더가 복제한 팔로워 파티션이 있습니다. 리더 파티션에 대한 정보는 주키퍼가 관리합니다.
Broker: 독립적인 카프카 서버로 동작하는 컴포넌트
Replication: 파티션의 복제본으로, 카프카 브로커의 개수까지 늘릴 수 있습니다. 브로커가 3대 있고 레플리케이션이 1로 설정되어 있다면 실제적으로 파티션은 원본 파티션(리더 파티션) 하나만 존재합니다. 동일한 브로커의 숫자에서 레플리케이션이 3으로 설정되면 리더 파티션 하나에 두 개의 팔로워 파티션이 존재합니다. 팔로워 파티션은 각각의 나머지 브로커에 생성됩니다.
ISR(In Sync Replication): 리더 파티션과 팔로워 파티션을 묶어서 ISR이라고 합니다. 프로듀서의 메시지는 먼저 리더 파티션에 전달됩니다. 리더 파티션은 팔로워 파티션에 해당 메시지를 동기화합니다.
Producer의 ACK 설정: 프로듀서의 ACK 설정 옵션은 세 가지 입니다. 0, 1, all인데, 첫번째는 프로듀서가 리더 파티션에 메시지를 전달만 하는 것입니다. 두번째는 프로듀서가 동일하게 메시지를 전달하고 잘 받았는지 응답 값을 받는 것입니다. 마지막 옵션은 두번째 옵션에 더해서 팔로워 파티션에 해당 메시지가 정상적으로 동기화되었는지 확인합니다.
replication 개수 정하는 방법: 레플리케이션이 많으면 팔로워 파티션에 데이터를 동기화하는 오버헤드가 증가한다. 따라서 하나의 메시지가 저장되는 시간을 고려해서 레플리케이션의 개수를 조정할 필요가 있다.
3) Partitioner
프로듀서 내 파티셔너가 토픽 내 어떤 파티션에 데이터를 전달할지 결정합니다. 파티셔너는 메시지의 키 또는 값을 참고하여 파티션을 선택합니다.
파티셔너가 별도로 지정되지 않았다면 메시지의 키의 유무에 따라 파티션을 결정합니다. 메시지의 키가 있다면 해시 함수를 이용해서 해싱된 값에 따라 파티션을 결정합니다. 메시지의 키가 없다면 라운드 로빈 방식으로 균등하게 파티션에 메시지를 배분합니다.
커스텀 파티셔너를 정의하면 유명인 이슈를 해결할 수 있습니다. 예를 들어, 유명인에 대한 이벤트가 압도적으로 많이 발생한다면 파티션의 배분을 유명인 8:일반인 2로 처리되게 설정할 수 있습니다.
4) Consumer Lag
Consumer Lag이란 프로듀서가 마지막으로 넣은 메시지의 오프셋과 컨슈머가 마지막으로 읽은 메시지의 오프셋의 차이입니다.
하나의 컨슈머와 연결된 파티션이 두 개 이상이라면 당연히 Consumer Lag도 두 개 이상입니다.
5) Burrow
Consumer Lag 모니터링 오픈소스 툴입니다.
6) Kafka vs traditional MQ
event(or message)의 저장 위치가 다릅니다. kafka의 디스크의 파일 시스템에 저장되지만 traditional MQ는 메모리에 저장됩니다.
event(or message) 저장 방식이 다릅니다. kafka는 consumer에서 이벤트 사용(consume) 후 기본적으로 저장하지만, traditional MQ는 backend를 설정해야만 이벤트 사용 후 저장하며 기본적으로 메시지의 저장을 지원하지 않습니다.
event(or message) 저장과 성능 간 관계에도 차이가 있습니다. kafka는 디스크에 저장되기 때문에 데이터 저장 용량과 성능 간 관계 없습니다. 반면 traditional MQ의 경우 메모리를 사용하기 때문에 데이터 저장 용량에 따라 성능에 악영향을 줍니다.
프로듀서의 메시지를 보장하는 방법도 다릅니다. kafka의 경우 Producer의 ACK 설정 옵션을 1 또는 all로 설정할 경우, 브로커에서 정상적으로 메시지가 전달되었다는 응답을 확인합니다. traditional MQ의 경우, AMQP의 기능 중에 confirm mode를 설정하면 브로커는 메시지가 브로커에 정상적으로 도착했다는 메시지를 전달합니다.

다. Kafka Connect

1) 개념
정의: 카프카 내외부 시스템(RDBMS 등)의 데이터를 지속적으로 수집하는 서비스
활용: 카프카 내외부 시스템의 데이터 통합에 활용

라. Kafka Stream

1) 개념
정의: Topic에 저장된 event를 처리하는 클라이언트 라이브러리
활용: Topic에 저장되는 in/out data를 활용하여 MSA나 real-time or mission-critical 어플리케이션에 활용

마. 이슈

1) java 설치
java version 8 이상 설치 필요
설치 명령 및 버전 확인
brew update brew tap adoptopenjdk/openjdk brew install --cask adoptopenjdk11 java --version
Plain Text
복사
2) kafka broker connection fails
원인: 동일한 포트에서 이미 실행 중인 broker process가 존재
해결: 해당 프로세스 아이디 제거
lsof -i :2181 kill -9 <PID>
Shell
복사

Reference

kafka 기초 개념, https://kafka.apache.org/intro