What is Kafka and Strimzi Operator?

Kafka의 주요한 이론을 정리하고 Strimizi Operator를 실습해본다

Sigrid Jin
36 min readNov 18, 2023
사진 출처: https://blog.devgenius.io/kafka-on-kubernetes-using-strimzi-part-1-83d74564135e

Topic 1: Kafka의 주요한 개념을 정리해본다.

기본적으로 Pub/Sub 형태의 메시징 플랫폼은 스트리밍해야 할 이벤트가 여러 가지가 될 때 유용하다. 만약 이벤트를 emit하는 서버가 7개가 있고 이를 각각 subscriber가 4개가 있다고 가정하면 7 x 4 = 28개의 커넥션이 필요하다. 하지만 중앙의 pub/sub 서버가 모아서 뿌려준다면 7 + 4 = 11개의 커넥션만 있으면 된다.

메시징 플랫폼은 이벤트 브로커와 메시지 브로커로 나눌 수 있다. RabbitMQ로 대표되는 Message Broker는 Publisher가 생산한 메시지를 저장했다가, Consumer가 가져갈 수 있도록 하고 이후에는 데이터가 빠르게 삭제된다. 이를 스트리밍 형태라고 부른다. 반대로 Kafka로 대표되는 Event Broker는 Message Broker와 Publisher가 생산한 이벤트를 저장해서 가져가고 난 이후에도 Data & Event를 계속 hold하고 있다. 장애 상황이 발생하였을 때 장애 발생시점부터 다시 처리를 시작할 수 있다는 장점이 있다. 따라서 최근에는 메시징 플랫폼의 사실상 표준으로 Kafka가 사용되고 있다.

주로 언급되는 Kafka의 장점으로는 Topic 하나를 여러 가지로 나누어 병렬 처리가 가능하다는 것이다. 나누어진 파티션 수만큼 컨슈머를 연결할 수 있어 쓰기 작업에 일반적으로 특화되어 있다고 볼 수 있다. 이 외에도 브로커의 스케일 아웃이 용이하다는 점에서 확장성에 유리하며, 다른 메시징 플랫폼과 다르게 전송받은 데이터를 메모리가 아니라 파일 시스템에 반영구적으로 저장한다.

Kafka에서 사용되는 용어를 정리해보면, 크게 애플리케이션 단위와 클라이언트 단위로 나누어볼 수 있다. 먼저 애플리케이션 단위에서는 ZooKeeper가 있는데, 이는 메타데이터를 관리하는 측면에서 의의가 있다. 메타데이터를 관리하는 코디네이터의 역할을 하는 애플리케이션이고, 최근에는 Kraft (RAFT) 로 갈아타려고 하고 있다. 클라이언트 단위에서는 Producer와 Consumer가 있다. Producer는 카프카로 메시지를 보내는 역할의 클라이언트이며, Consumers는 카프카에서 메시지를 받는 역할의 클라이언트이다. 서로가 주고받는 메시지를 정의하는 Topic 이라는 개념이 있고, RDBMS를 기준으로 하면 테이블과 비슷하게 이해할 수 있다. Producer는 메시지에 키를 추가해서 토픽 안의 여러 파티션에 값을 넣을 때 사용할 수 있다. 예를 들어서 Key 값이 `null` 이라면, 데이터는 Round-Robin 방식으로 메시지를 보내게 된다. 만약 `null` 값이 아니라면, 프로듀서는 항상 같은 파티션에 메시지를 보내게 되므로 특정한 필드에 대한 메시지 순서의 설정을 해야 한다.

Topics

Topic은 선입선출 형태의 Queue 구조를 가지고 있으며, 토픽의 이벤트는 필요한 만큼 읽을 수 있다. 앞서 말했듯이 기존의 메시징 시스템과 달리 이벤트는 소비 후 삭제되지 않고 파일 시스템에 남아있기 때문에, Kafka가 주제별 구성 설정을 통해 이벤트를 유지해야 하는 기간을 정의해야 한다. Kafka의 성능은 데이터 크기와 관련없이 실질적으로 일정하기 때문에 장기간 데이터를 저장하더라도 지장없다. Topic은 여러 데이터 포맷을 지원하므로 JSON, Avro, TXT 모두 보내도 상관없다. Topic 내부에 있는 메시지들의 순서를 데이터의 스트림이라고 부른다.

Kafka Message Serializer라는 개념이 있는데 Kafka의 메시지를 생성시키는 주체이다. 메시지를 보낼 때, Producer에서 평상시 객체를 Byte로 데이터 직렬화하는 과정이 필요하다. 예를 들어, 실제 123이나 helloworld 따위의 문자나 숫자를 0101인 이진 데이터로 직렬화하고 보내야 한다. IntegerSerializer를 KeySerializer로 사용하고 StringSerializer를 ValueSerializer로 사용하는 등의 작업을 할 수 있다. Kafka는 이러한 Message Serializer를 통해 Byte 단위의 데이터 직렬화를 수행하여 Kafka에 보내기 때문에 어떤 데이터 포맷을 가진 값이 오더라도 토픽에 보낼 수 있게 된다.

Partitions

Partition이라는 개념은 DB의 샤딩과 같은 개념으로, 늘릴 수만 있고 줄일 수는 없다. Message/Record는 프로듀서에서 브로커, 브로커에서 컨슈머로 전달되는 어떤 데이터 조각의 단위라고 보면 된다. 하나의 클러스터 단위에서는 Broker들과 해당 Broker들을 대표하는 Controller가 있다. 클러스터는 Broker들의 모임이다. 각 Broker들은 Int 형식의 ID로 구분할 수 있는데, 예를 들면 Broker 101, Broker 102, Broker 103.. 처럼 늘어나는 식이다.

각 브로커는 고유한 토픽 파티션을 가지고 있으며, 프로듀서가 브로커에게 데이터를 전달하면 모든 브로커에 샤딩처럼 분산되어 저장하게 된다. 이러한 분산저장 메커니즘을 통해 Client Application이 여러 브로커에서 병렬 처리가 가능하도록 하여 속도 향상과 확장성에 도움을 준다. 이러한 Events를 Topic에 분산 저장할 수 있도록 Topic을 여러 개로 나눈 것이 인상적이다. 파티션 안의 메시지는 ID 값을 가지게 되며, 인덱스 0부터 1씩 increment하게 되는데 해당 값을 offset이라고 부른다. 또한 파티션이 리더 파티션을 계속 복제하고 있기 때문에 장애가 일어나더라도 지속적인 처리가 가능하다.

Partition의 중요한 특징은 메시지가 순차적으로 적재된다는 것이므로 순서가 보장된다. 따라서 Producer는 Partition의 맨 뒤에 적재 또는 write하고 Consumer는 맨 앞부터 메시지를 Read하게 된다. Producer와 Consumer가 서로 읽은 메시지 번호의 차이를 Consumer Lag라고 부른다. 예를 들어서 Producer가 메시지를 7까지 적재했고 Consumer는 메시지를 5까지 읽었으면 Consumer LAG는 2이다.

참고로 Kafka는 데이터 복제가 파티션 단위로 이루어지며, 팔로워 파티션은 리더 파티션의 offset과 자신의 offset을 비교하여 차이가 나면 복제하도록 설계되어 있다. 최종적으로 ISR (In-Sync Replicas) 형태를 지향하는데, 이는 리더와 팔로워 파티션이 모두 싱크된 상태를 의미한다. 만약 ISR 목록에 포함되지 않는 복제본이라면 이를 OSR (Out-of-sync replicas) 라고 부르게 된다. 리더와 충분히 동기화되지 않은 복제본이기 때문에 데이터 상태를 정확하게 표현하지 않을 수 있는데, 이는 네트워크 문제나 디스크 문제와 같은 특정한 이슈 때문에 OSR 상태에 빠질 수 있다. 문제가 해결되어 해당 복제본이 다시 리더와 동기화되면, ISR 목록에 추가된다.

앞서 언급한 바와 같이 각 파티션은 Replica를 갖기에 Leader가 있어야 하며, 오직 하나의 브로커만이 특정한 파티션의 리더가 될 수 있다. 프로듀서는 리더 파티션에게 데이터를 보낼 수 있고, 컨슈머도 리더 파티션에서만 데이터를 받을 수 있다. 다시 말해서, 프로듀서와 컨슈머가 아닌 어떤 브로커의 파티션에게 데이터를 보내고 받아야 하는지 사전에 알고 있어야 한다는 뜻이다. 만약 리더가 죽게 되면 다른 ISR이 리더가 된다. ISR의 복제본 수가 줄고 OSR의 복제본 수가 늘고 있다면 무언가 클러스터에 문제가 있다는 것을 암시하는 것이다.

Producers

Kafka의 Producing Process는 어떻게 될까? Producer가 Kafka로 Record를 발행할 때 거쳐야 하는 절차를 한 번 알아보자.

먼저 Producer의 역할은 다음과 같다.
1. 직렬화 (Serialziation) : Serializer는 지정된 설정을 통해 Message Key와 Value를 어떤 Byte Array로 변환할 지 결정한다.
2. 파티셔닝 (Partitioning) : 어떤 Partition으로 Record를 보낼 지 결정한다.
3. 압축 (Compression) : Record 압축 설정이 되어있을 경우 설정된 포맷으로 압축을 수행하는데, 예를 들어 snappy나 iz4와 같은 포맷으로 진행하게 된다.
4. 메시지 배치 (Message Accumulator) : 설정값에 의거하여 Record를 Queue에 저장했다가 한 번에 Broker에게 전달한다.
5. 전달 (Sender) : 실제로 Broker에게 보내야 하는 Record들은 Record Batch에 의해 전송된다.

Producer가 Record를 만들고 `send` 메소드를 통해 Record를 보내면, Producer Config에서 이미 지정해놓은 Serializer 설정을 통해 특정한 Byte Array로 변환된다. 해당 포맷은 JSON, Avro, Parquet 등 다양한 형태가 지원될 수 있다. Partitioner에 의해 어떤 파티션으로 보낼 지 결정할 수 있고, Kafka에서는 성공과 실패 여부를 metadata 정보로 응답을 보내주게 되며 Producer의 Retry 설정을 통해 재시도 여부를 결정하게 된다.

Consumers

그렇다면 Consumer는 어떨까? Consumer는 특정한 Topic을 구독하고 Topic 내부의 파티션에 저장된 Record를 가져오는 역할을 수행한다. 1개의 Topic에 서로 다른 Consumer Application이 동시에 구독할 수 있는데, APP_1이 Record를 가져갔다고 하더라도 Record가 삭제되지 않도록 디자인되어 있기 때문에 각 Consumer는 어느 Topic의 어떤 Partition의 어느 offset까지 읽었는지를 내부 topic에 저장하여 관리한다. 따라서 Consumer Application이 중단 후 다시 구동되어도 자신이 어디서부터 Record를 읽어야 하는 지 `_consumer_offset` 토픽으로부터 다시 확인하여 메시지를 읽어들이게 된다. 이를 통해 안정적인 메시지 구독을 보장할 수 있다.

이러한 Consumer들이 많은데 어떻게 관리하고 있을까. Kafka Consumer는 1개 이상의 Consumer가 Consumer Group을 구성하여 1개의 Topic을 구독하고 있다. Consumer Group 내부의 Consumer는 Topic Partition의 소유권을 나누어 갖게 된다. 예를 들어 Consumer 0, Consumer 1이 있고 3개의 Partition으로 이루어진 Topic A가 있다고 가정해보자. Consumer 0은 Partition 0을, Consumer 1은 Partition 1과 2에 대한 소유권을 가지고 구독할 수 있다. 이 때 Consumer Group에 새로운 Consumer가 추가되거나 이탈할 경우 Consumer Group 내 Partition에 대한 소유권이 재조정될 수 있는데, 이를 Rebalancing 이라고 부른다.

Kafka의 브로커는 Bootstrap Broker 메커니즘이 있는데, 이는 Client와 Producer 그리고 Consumer가 Kafka Cluster에 존재하는 임의의 Broker에게만 연결하더라도 모든 Broker에게 자동으로 연결된다는 것이다. 따라서 Broker의 수평 확장이 용이한데, Broker의 개수가 늘어나더라도 Client와 Producer, Consumer의 모든 Broker에 대해 인지할 필요가 없다는 것을 의미한다. Kafka의 Broker들은 Kafka Cluster에 존재하는 모든 Broker의 메타데이터를 공유하기 때문에 가능한 일이다. Kafka의 Consumer들은 토픽 파티션 내부에 있는 데이터를 읽어오는 주체라고 이야기했는데, 이 때 Consumer는 Kafka의 토픽 파티션에 있는 데이터를 Pull 모델을 통해 받아오고, 파티션에 있는 데이터를 받아오는 순서는 increment하게 순차적으로 받아오게 된다. 특정 토픽의 파티션을 어떤 컨슈머 그룹이 몇 번째에 가져갔는지 브로커 내부에서 사용되는 내부 토픽이 있는데 이를 `__consumer_offsets` 라고 부르며 해당 토픽에 기록된다.

Acknowledgements

그렇다면 이제 Producer는 Broker에게 데이터가 정상적으로 전송되었는지 어떻게 acknowledgements를 하게 될까? 이 때 ACKS를 활용하여 확인할 수 있다. 다음의 케이스에 대하여 살펴보자.

  1. 만약 `acks=0` 이라면 : 프로듀서는 데이터 송신 확인을 기다리지 않는다. 프로듀서가 메시지를 보낸 순간 메시지 쓰기에 성공했다고 간주한다. Broker가 down 되거나 에러가 발생하더라도 무시하게 되며 데이터는 손실된다. 예를 들어 브로커가 다운되었을 경우 데이터 유실 가능성이 있고 메시지를 유실해도 괜찮을 때 사용하기 유용하다. 해당 옵션을 사용하는 이유는 네트워크의 오버헤드가 최소화되고 속도가 빠르며 처리량이 많을 때이다.
  2. 만약 `acks=1` 이라면 : Producer가 Leader 파티션에게만 송신 확인을 받는다. 리더 브로커에게 메시지 전송을 완료되었다 라고 하는 응답을 받을 때, 메시지 쓰기에 성공했다고 간주한다. 그래서 리더 파티션에게 전송은 되었으나 그 데이터가 ISR로까지 복제가 정상적으로 이루어졌는지는 보장할 수 없다. 만약 리더 파티션을 가지고 있는 브로커가 다운되었을 경우 데이터가 유실될 가능성이 있다. ACKS=0 옵션보다는 요청 헤드에 따라 네트워크의 오버헤드가 조금 커지지만, 비교적 안정적이다.
  3. 만약 `acks=all or acks = -1` 이라면 : Leader와 ISR 파티션 모두에게 송신 확인을 받게 된다. ISR 파티션에서 메시지를 수신했을 때 메시지가 정상적으로 쓰여졌다고 간주한다. 3.0 버전 이상의 기본 옵션은 해당 옵션이며, 데이터가 유실될 가능성이 없다. 이와 연관이 되어 있는 옵션이 바로 `min.insync.replicas` 인데, 몇 개의 ISR에 대하여 확인 응답을 받을 것이냐에 대한 설정이다. Leader Replica가 현재 클러스터 내의 동기화된 Replica 개수 만으로 안전한 쓰기가 가능한 지 확인하는 옵션인데, 만약 값이 1이라면 리더 브로커만 성공적으로 확인 응답을 보내면 되고 값이 2라면 리더 브로커와 함께 적어도 1개 이상의 ISR에게 확인 응답을 받아야 한다. 여기서는 값이 2여야지 해당 옵션을 사용하는 의미가 있다.

만약 브로커가 fail 상태라서 재해 복구를 해야 한다면 컨슈머는 Topic Replication Factor 값을 참고하게 된다. 각각의 Topic은 Replication Factor를 가지게 되는데, 만약 replication factor가 1개라고 한다면, 다른 Broker에게 나라는 Broker가 가진 파티션을 다른 Broker에게 파티션 1개를 복제시킨다. 따라서 특정한 Broker가 죽더라도 데이터를 잃지 않고 계속 제공할 수 있도록 도움을 준다.

Topic 2. Kafka 생태계 정리

Kafka를 둘러싼 여러 Utility 성격의 프로젝트들이 정말 많은데 한 번 정리해보도록 한다. 크게 3가지로 분류할 수 있다.

1. Kafka Streams

  • Kafka Streams는 Data Stream을 읽고 처리하여 다른 Topic으로 전송하거나 데이터의 집계를 도와주는 등의 역할을 수행하도록 하는 라이브러리이다. 라이브러리로 제공되므로 복잡한 비즈니스 로직을 처리할 수 있다는 장점이 있고, Kafka 특정 Topic으로부터 데이터를 읽고 실시간으로 데이터를 발행할 수 있도록 도와준다. 1개 이상의 Topic에서 데이터를 가져오는 프로세서를 Source Processor라고 부르며, 다른 프로세서가 변환한 데이터를 처리하는 분기 역할을 수행하는 프로세서를 Stream Processor라고 부른다. 마지막으로 데이터를 특정한 Topic으로 저장하는 역할을 Sink Processor라고 부른다.

2. Kafka Connect

  • 여러 가지의 데이터 소스와 Kafka 간의 데이터 이동을 간단하게 관리하고 설정하는데 도움을 주는 플랫폼이다. 여기서 데이터 소스라고 하면 MySQL, S3, ElasticSearch 등을 의미할 수 있겠다. 여기서 데이터 저장소에 있는 데이터로부터 Kafka에 데이터를 저장하기 위한 애플리케이션으로 Source Connector라고 부르며, Kafka에 저장된 Record를 읽고 S3 Bucket 등 대상 저장소로 저장하기 위한 앱을 Sink Connector라고 부른다. 여기서 Connectors는 Tasks를 관리하여 데이터 스트리밍을 조정하는 높은 수준의 추상화를 이룩한 애플리케이션을 의미하며, Tasks는 Kafka에 데이터를 적재하거나 가져올 때 어떤 방식으로 처리할 지에 대한 구현체를 의미한다. Workers는 Connector 및 Tasks를 실행하는 프로세스를 의미한다.

3. KsqlDB

내부 Topic을 구독하여 분석하거나, 구독한 Topic을 정제하여 다른 Topic으로 발행할 때 SQL과 유사한 문법을 가진 KSQL을 이용하여 로직을 구현하는 데 도움을 주는 플랫폼이다. 이를 활용하기 위해서는 KSQL Engine이 있는 KSQL Server가 필요한데, Kafka Streams 없이 KSQL만으로 간단하게 처리할 때 용이하다. 여기서 유의할 점은 Stream은 구조화된 데이터의 연속적이면서도 불변한 데이터를 의미하는데, 해당 데이터는 Kafka Topic에서 만들거나 아니면 이미 존재하는 Stream을 기존의 데이터로 하여 만들 수 있다. 이와 달리 Table은 변경되는 데이터의 현재 상태를 의미하며 Mutable한데, 다시 말해서 `group by` 또는 `count` 같은 상태 기반의 Stateful한 연산을 수행할 수 있다는 의미이겠다.

4. Schema Regisry

  • Kafka의 클라이언트 간의 소통에서 필요한 Record Schema를 지정하는 데 도움을 준다. 스키마는 데이터의 형식을 정의하는데 사용되므로 이러한 Schema를 중앙에서 관리할 수 있도록 다양한 형식을 지원한다.

쿠버네티스 환경에서 Kafka의 운영 관리에 도움을 주는 오퍼레이터로는 Kafka Strimizi가 있다. 카프카 클러스터와 구성요소를 배포하고 관리하며, 카프카의 접속설정과 업그레이드, 브로커 관리, 토픽과 유저의 생성 관리를 모두 매니징한다는 점이 인상적이다. 하기와 같은 component를 CRD로 배포하고 관리한다.

  1. cluster operator: Kafka cluster, zookeeper cluster 등 컴퍼넌트를 배포하고 관리
  2. entity operator: user operator와 topic operator를 관리
  3. topic operator: topic 생성, 삭제 등 topic 관리
  4. user operator: Kafka 유저 관리
  5. zookeeper cluster : 카프카의 메타데이터 관리 및 브로커의 정상 상태 점검
  6. kafka cluster : Kafka 클러스터(여러 대 브로커 구성) 구성

5. MirrorMaker2 (MM2)
다중 클러스터 환경에서 클러스터 간의 미러링 또는 Replication을 구성한다. 서로 다른 Region에 있는 카프카 클러스터를 기준으로 미러링할 클러스터를 Source Kafka Cluster로 두고, 미러링을 받을 클러스터를 Target으로 지정하여 데이터 동기화가 가능하다. 특히 클러스터 간의 Region이 다를 때 사용하기 용이하다.

사진 출처: https://strimzi.io/docs/operators/latest/deploying#bidirectional_replication_activeactive

예를 들어 기존에는 Cluster 1번에 데이터 소스 1번이 연결되어 있고, 브로커 내부에서는 Topic-1 로 지정되어 있다고 하자. 해당 토픽의 파티션은 그러면 2개가 된다. 또한, Cluster 2번에 데이터 소스 2번이 연결되어 있고, 브로커 내부에는 Topic-1로 역시 지정되어 있다고 하자. 그러면 해당 토픽의 파티션도 2개가 된다. 이 때 미러메이커를 진행하면 어떻게 될까? Cluster 1의 Topic 1에 대하여 미러링을 하고, 이 때 Target은 Cluster 2이므로 Cluster 2의 브로커에 저장한다. 저장될 때 출발지의 클러스터 이름이 들어가서 Cluster1-Topic1 이런 형태로 이름이 지어진다.

그러면 실습해보자. 먼저 Strimzi Operator를 Helm 차트로 설치한다.

kubectl create namespace kafka
helm repo add strimzi https://strimzi.io/charts/
helm show values strimzi/strimzi-kafka-operator
helm install kafka-operator strimzi/strimzi-kafka-operator - version 0.38.0 - namespace kafka
kubectl get deploy,pod -n kafka
kubectl get-all -n kafka
kubectl describe deploy -n kafka | grep KAFKA_IMAGES: -A3
kubectl get crd | grep strimzi
kubectl describe crd kafkas.kafka.strimzi.io
kubectl describe crd kafkatopics.kafka.strimzi.io

그러면 카프카 클러스터를 배포해보자. 배포가 마무리되면 마스터 노드에 배포된 파드들이 워커 노드에 배치되는데, 이 때 주키퍼와 브로커 파드가 배치되는 노드의 순서와 토픽의 리더 파티션의 배치는 다를 수 있음을 주지하자. 기존의 StatefulSet (STS) 대신에 StrimziPodSets (SPS) 를 기본 설정으로 사용하게 되는데, 그 이유는 SPS의 경우 STS와 달리 중간 파드를 삭제할 수 없고 파드 Spec이 동일하도록 강제할 수 있기 때문이다.

curl -s -O https://raw.githubusercontent.com/gasida/DOIK/main/strimzi/kafka-1.yaml
kubectl apply -f kafka-1.yaml -n kafka
kubectl get kafka -n kafka
kubectl get strimzipodsets -n kafka
kubectl get pod -n kafka -l app.kubernetes.io/instance=my-cluster
kubectl get kafka -n kafka my-cluster -o jsonpath={.status.listeners} | jq

먼저 Zookeeper 가 배포된 이후에, ZooKeeper 배포 완료 이후에야 Kafka(Broker)가 배포되는 모습을 볼 수 있다.

이제 테스트용 파드를 생성하고 카프카의 클러스터 정보를 확인해보자.

curl -s -O https://raw.githubusercontent.com/gasida/DOIK/main/strimzi/myclient.yaml
# DaemonSet으로 MyClient라는 파드가 배포된다
VERSION=3.6 envsubst < myclient.yaml | kubectl apply -f -
kubectl get pod -l name=kafkaclient -owide
kubectl exec -it ds/myclient - ls /opt/bitnami/kafka/bin
SVCDNS=my-cluster-kafka-bootstrap.kafka.svc:9092
echo "export SVCDNS=my-cluster-kafka-bootstrap.kafka.svc:9092" >> /etc/profile
kubectl exec -it ds/myclient - kafka-broker-api-versions.sh - bootstrap-server $SVCDNS
kubectl exec -it ds/myclient - kafka-configs.sh - bootstrap-server $SVCDNS - broker 0 - all - describe
kubectl exec -it ds/myclient - kafka-topics.sh - bootstrap-server $SVCDNS - list
kubectl get kafkatopics -n kafka

Kafka Topics를 확인해보자. 기본 토픽을 확인할 수 있다.

이제 Kafka UI를 배포해보자.

helm repo add kafka-ui https://provectus.github.io/kafka-ui-charts cat <<EOF > kafkaui-values.yml yamlApplicationConfig: kafka: clusters: - name: yaml bootstrapServers: my-cluster-kafka-bootstrap.kafka.svc:9092 auth: type: disabled management: health: ldap: enabled: false EOF
helm install kafka-ui kafka-ui/kafka-ui -f kafkaui-values.yml
kubectl patch svc kafka-ui -p '{"spec":{"type":"LoadBalancer"}}'
kubectl annotate service kafka-ui "external-dns.alpha.kubernetes.io/hostname=kafka-ui.$MyDomain"
echo -e "kafka-ui Web URL = http://kafka-ui.$MyDomain"

토픽을 생성하고 관리해본다. 여기서는 MyTopic1, MyTopic2를 만들어보겠다.

curl -s -O https://raw.githubusercontent.com/gasida/DOIK/main/3/mytopic.yaml
cat mytopic.yaml | yh
TOPICNAME=mytopic1 envsubst < mytopic.yaml | kubectl apply -f - -n kafka
kubectl get kafkatopics -n kafka

MyTopic2이라는 신규 토픽을 만든다. 파티션은 1개이고 리플리케이션은 3개이다.

kubectl exec -it ds/myclient - kafka-topics.sh - create - bootstrap-server $SVCDNS - topic mytopic2 - partitions 1 - replication-factor 3 - config retention.ms=172800000
kubectl exec -it ds/myclient - kafka-topics.sh - bootstrap-server $SVCDNS - list | grep mytopic
kubectl exec -it ds/myclient - kafka-topics.sh - bootstrap-server $SVCDNS - topic mytopic2 - alter - partitions 2
kubectl exec -it ds/myclient - kafka-topics.sh - bootstrap-server $SVCDNS - topic mytopic2 - describe

토픽의 파티션 개수를 늘릴 수도 있다. 여기에서는 1개에서 2개로 늘렸다.

UI를 통해서 확인해볼 수 있다.

토픽의 파티션을 줄이는 것은 허용되지 않는다.

kubectl exec -it ds/myclient - kafka-topics.sh - bootstrap-server $SVCDNS - topic mytopic2 - alter - partitions 1

토픽에 메시지를 보내고 받을 수 있다.

kubectl exec -it ds/myclient - kafka-console-producer.sh - bootstrap-server $SVCDNS - topic mytopic1
> something
kubectl exec -it ds/myclient - kafka-console-consumer.sh - bootstrap-server $SVCDNS - topic mytopic1 - from-beginning
// something
kubectl exec -it ds/myclient - kafka-console-producer.sh - bootstrap-server $SVCDNS - topic mytopic1 - property "parse.key=true" - property "key.separator=:"
>key1:sigridjin
kubectl exec -it ds/myclient - kafka-console-consumer.sh - bootstrap-server $SVCDNS - topic mytopic1 - property print.key=true - property key.separator="-" - from-beginning
> null-something
> key1-sigridjin
kubectl exec -it ds/myclient - kafka-console-consumer.sh - bootstrap-server $SVCDNS - topic mytopic1 - max-messages 2 - from-beginning
kubectl exec -it ds/myclient - kafka-console-consumer.sh - bootstrap-server $SVCDNS - topic mytopic1 - partition 0 - from-beginning

컨슈머 그룹을 확인해보자.

kubectl exec -it ds/myclient - kafka-console-producer.sh - bootstrap-server $SVCDNS - topic mytopic2 <<EOF
kubectl exec -it ds/myclient - kafka-console-producer.sh - bootstrap-server $SVCDNS - topic mytopic2 <<EOF
kubectl exec -it ds/myclient - kafka-consumer-groups.sh - bootstrap-server $SVCDNS - list

이제 Consumer LAG를 터미널에서 확인해보자.

kubectl exec -it ds/myclient - kafka-consumer-groups.sh - bootstrap-server $SVCDNS - group mygroup - describe
kubectl exec -it ds/myclient - kafka-consumer-groups.sh - bootstrap-server $SVCDNS - group mygroup - topic mytopic2 - reset-offsets - to-earliest - execute
kubectl exec -it ds/myclient - kafka-consumer-groups.sh - bootstrap-server $SVCDNS - group mygroup - describe

Consumer LAG를 없애기 위해 컨슈머 그룹 메시지를 소비해보자.

kubectl exec -it ds/myclient - kafka-console-consumer.sh - bootstrap-server $SVCDNS - topic mytopic2 - group mygroup
kubectl exec -it ds/myclient - kafka-consumer-groups.sh - bootstrap-server $SVCDNS - group mygroup - describe

메시지 키를 통해서 정해진 파티션을 통해서 메시지 전달 순서를 보장 및 분산 확인해보자.

kubectl exec -it ds/myclient - kafka-console-consumer.sh - bootstrap-server $SVCDNS - topic mytopic2 - partition 0 - property print.key=true - property key.separator=":"
kubectl exec -it ds/myclient - kafka-console-consumer.sh - bootstrap-server $SVCDNS - topic mytopic2 - partition 1 - property print.key=true - property key.separator=":"
# 모니터링 파티션 번호, 현재까지 가져간 레코드의 오프셋, 파티션 마지막 레코드의 오프셋, 컨슈머 랙 LAG, 컨슈머 ID, 호스트 정보 확인
kubectl exec -it ds/myclient - kafka-console-producer.sh - bootstrap-server $SVCDNS - topic mytopic2 - property "parse.key=true" - property "key.separator=:" <<EOF
kubectl exec -it ds/myclient - kafka-consumer-groups.sh - bootstrap-server $SVCDNS - group mygroup - describe

Topic 3. Kubernetes Event-Driven Autoscaling이란?

KEDA, 줄여서 Kubernetes Event-Driven Autoscaling은 쿠버네티스에서 지원하는 Pod 오토 스케일러는 CPU와 Memory 사용률을 기반으로 Pod의 개수를 동적으로 스케일링하는 기능이다. 또한 job queue size, http request rate 등과 같은 여러 외부 지표에 기반하여 Pod 개수를 스케일링하기 용이하도록 한다. 예를 들어 지표를 Prometheus 등의 메트릭스 서버에 수집하고, HPA에서 외부 메트릭스 서버의 값을 참조하도록 하는 부담을 덜어주는 프로젝트라고 생각하면 된다. 이 역시 CNCF의 인큐베이팅 프로젝트 중 하나이다.

KEDA는 다양한 소스로부터 이벤트를 받아 애플리케이션 Pod를 오토 스케일링한다. KEDA는 크게 1) Agent의 역할 2) Metrics 서버의 역할 3) Admission Webhooks 역할을 진행한다.

  1. Agent: 이벤트가 유무에 따라 Application Deployment를 `activate/deactivate` 시키는 역할을 수행하고, 이는 Keda-operator라고 하는 Pod에 의해서 수행된다.
  2. Metrics: 쿠버네티스의 Metrics 서버처럼 다양한 이벤트를 제공하는 메트릭스 서버의 역할을 수행한다. 쿠버네티스의 Horizontal Pod Autoscaler에 의해 참조되는 지표들이 제공되며, 이는 `keda-operator-metrics-apiserver` Pod에 의해서 수행된다.
  3. Admission Webhooks: KEDA 관련 자원들을 변경할 때 `validate` 자동으로 수행하는 역할을 진행해주는데, 예를 들어 하나의 scaled target에 대해서 복수 개의 scaled object 자원을 허용할 수 없다는 등의 옵션을 수행할 수 있다. 이는 `keda-admission-webhooks` Pod에 의해서 수행된다.
사진 출처: https://devocean.sk.com/blog/techBoardDetail.do?ID=164800

따라서 사용자가 KEDA에 관리를 요청하기 위한 임의의 `ScaledObject` 자원을 생성하면, admission에 의해서 자동으로 검수되고, 유효한 자원일 경우 `ScaledObject` 와 `KEDA` 가 관리하는 HPA 자원이 자동으로 생성된다. 자동으로 생성된 HPA 자원을 참조하는 지표는 Kubernetes의 Metrics 서버를 참조하지 않고 위에서 언급한 `keda-metrics-apiserver` 서버를 참조하도록 되어 있다. 여기서 알 수 있는 사실은 KEDA가 애플리케이션 Pod의 개수를 조절하는 스케일링 작업을 직접 수행하지 않는다는 뜻이다.

그렇다면 우리의 주제와는 어떻게 연결될까? 만약 Producer와 Consumer 간 메시지 처리 지연 LAG가 발생할 경우, 스케일 대응이 없으면 Producer의 Traffice Rate를 Consumer가 처리할 수 없어서 문제가 발생한다. 이와 달리 KEDA를 활용하여 스케일 관리에 대응하면, Producer의 Traffic Rate가 기준 이상일 경우 Consumer Instances를 자동으로 증가시켜서 처리한다.

사진 출처: https://blog.devgenius.io/kafka-on-kubernetes-using-strimzi-part-4-scalability-59da50575fec
사진 출처: https://blog.devgenius.io/kafka-on-kubernetes-using-strimzi-part-4-scalability-59da50575fec

이제 KEDA를 Helm Charts로 설치해보자. 특정 이벤트, 즉 여기에서는 카프카의 특정 토픽 LAG Threshold가 1 이상일 때 파드의 오토 스케일링 하는 것이다.

kubectl create namespace keda
helm repo add kedacore https://kedacore.github.io/charts
helm install keda kedacore/keda - version 2.12.0 - namespace keda
curl -s -O https://raw.githubusercontent.com/gasida/DOIK/main/3/keda-deploy-svc.yaml
kubectl apply -f keda-deploy-svc.yaml
kubectl get pod -n keda -l app=consumer-service
kubectl get kafkatopics -n kafka
kubectl exec -it ds/myclient - kafka-topics.sh - bootstrap-server $SVCDNS - topic my-topic - describe
kubectl exec -it ds/myclient - kafka-consumer-groups.sh - bootstrap-server $SVCDNS - group keda-consumer - describe
kubectl logs -n keda -l app=consumer-service -f

이제 KEDA 스케일 관련 정책을 설정한다. LAG 1 기준을 달성할 경우 파드를 증가시킨다. Producer Traffic Rate가 기준 이상이 되면 Consumer Instances를 증가시킨다. 컨슈머 LAG는 프로듀서가 보낸 메시지 개수(카프카에 남아있는 메시지 개수) 에서 컨슈머가 가져간 메시지 개수를 뺀 값이다.

curl -s -O https://raw.githubusercontent.com/gasida/DOIK/main/3/keda-scale.yaml 
cat keda-scale.yaml | yh
kubectl apply -f keda-scale.yaml

터미널로 for 문 반복 메시지를 보내보겠다.

kubectl get ScaledObject,hpa -n keda
kubectl logs -n keda -l app=consumer-service
for ((i=1; i<=100; i++)); do echo "keda-scale-test-$i" ; kubectl exec -it ds/myclient - sh -c "echo test1-$i | kafka-console-producer.sh - bootstrap-server $SVCDNS - topic my-topic" ; date ; done

증가한 컨슈머의 개수를 확인할 수 있다. 메시지 보내기 취소 후 일정 시간이 지나면 자동으로 consumer 파드가 최소 개수 1개로 줄어드는 것을 확인해보기도 한다.

kubectl get pod -n keda -l app=consumer-service

--

--

Sigrid Jin
Sigrid Jin

Written by Sigrid Jin

Software Engineer at Sionic AI / Machine Learning Engineer, Kubernetes. twitter.com/@sigridjin_eth

No responses yet