카프카 브로커

  • 카프카 브로커 : 데이터를 주고받기 위해 사용하는 주체, 데이터를 분산 저장하여 장애 대응 애플리케이션
  • 안전한 데이터 보관 및 처리를 위해 3대 이상의 브로커를 1개의 클러스터로 운영 권장
  • 클러스터는 프로듀서에서 전송된 데이터를 안전하게 분산, 복제를 수행

image

데이터 저장, 전송

image

  • server.properties : log.dirs : /data
  • 브로커는 프로듀서로부터 전달받은 데이터를 토픽의 파티션에 데이터를 저장
  • 토픽의 갯수 = 디렉토리의 갯수

페이지 캐시

카프카는 메모리, 데이터베이스에 데이터를 저장하지 않는다. - 파일 시스템 이용
지속적인 입출력이 발생할 때, 파일 시스템은 처리 속도가 늦어지는 현상 발생
카프카에서는 페이지 캐시를 이용하여서 디스크 입출력 속도를 높여 문제를 해결
  • 메인 메모리의 남는 공간을 활용해 페이지 캐시라는 메모리 공간을 마련
  • 파일 I/O의 성능을 향상시키기 위해 사용되는 것
  • 사용된 파일의 내용을 저장해 놓았다가 읽을 때 page cache를 활용하여 효율 증가
  • ~클린 이름의 프로그램

데이터 복제, 싱크

image

  • 데이터 복제 : 장애 허용 시스템으로 동작하도록 하는 원동력 -> 장애, 데이터 유실 대응
  • 토픽 생성시 복제의 개수 설정 가능 (옵션 미선택 시 브로커 설정 옵션)
  • 복제의 개수 증가 -> 저장 용량 증가 -> 데이터를 안전하게 사용 가능
  • 복제의 개수 감소 -> 저장 용량 감소 -> 데이터를 불안전하게 사용 가능

image

  • 장애 발생시 해당 브로커의 리더 파티션은 사용 불가능
  • 새로운 리더를 선출

컨트롤러

  • 브로커 중 한대가 컨트롤러 역할
  • 다른 브로커들의 상태 체크, 리더파티션 재분배
  • 컨트롤러 브로커 장애시 다른 브로커 컨트롤러 역할
  • 죽은 브로커가 담당한 파티션에 새로운 리더 선출

데이터 삭제

  • 오직 브로커만이 데이터를 삭제 가능
  • 데이터 삭제는 파일 단위로 이루어짐
  • 로그 세그먼트 단위로 삭제 - 다수의 데이터가 존재하여 일반 DB처럼 데이터를 선별해서 삭제 불가능
    • https://developpaper.com/kafka-source-reading-log-segment-kafka-log-logsegment/
  • log.segment.bytes, log.seement.ms 옵션 값에 따라 세그먼트 파일이 닫힘 - 기본값 : 1GB
  • log.retention.bytes, log.retention.ms 옵션에 설정 값이 넘으면 삭제
  • log.retention.check.interval.ms 닫힌 세그먼트 파일 체크 간격

코디네이터

  • 컨트롤러와 마찬가지로 브로커 중 한대가 역할
  • 컨슈머 그룹의 상태 체크, 파티션을 컨슈머와 매칭 분배
  • 컨슈머가 컨슈머 그룹에서 빠지면 매칭되지 않은 파티션을 정상 동작하는 컨슈머로 할당 -파티션을 컨슈머로 재할당하는 과정 - 리밸런스

카프카 주키퍼

  • 카프카의 메타데이터를 관리하는데 사용
  • ./zkCli.sh 명령어로 실행
[zk: localhost:2181(CONNECTED) 1] ls /
[admin, brokers, cluster, config, consumers, controller, controller_epoch, feature, isr_change_notification, latest_producer_id_block, log_dir_event_notification, zookeeper]
[zk: localhost:2181(CONNECTED) 3] get /brokers/ids/1 
{"features":{},"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://172.17.19.25:19092"],"jmx_port":-1,"port":19092,"host":"172.17.19.25","version":5,"timestamp":"1650935071371"}
[zk: localhost:2181(CONNECTED) 4] get /controller
{"version":1,"brokerid":1,"timestamp":"1650935071544"}
[zk: localhost:2181(CONNECTED) 5] ls /brokers/topics
[__consumer_offsets, a_Topic1]
[zk: localhost:2181(CONNECTED) 6] 

토픽, 파티션

토픽

  • 데이터를 구분하기 위해 사용하는 단위
  • 1개 이상의 파티션을 소유
  • 프로듀서가 보낸 데이터들 저장 - 레코드

파티션

  • 카프카에서 병렬처리의 핵심 - 그룹으로 묶인 컨슈머들이 레코드를 병렬로 처리 가능
  • 큐와 비슷한 구조

https://needjarvis.tistory.com/603

레코드

  • 레코드는 타임스탬프, 메시지 키, 메시지 값, 오프셋 헤더로 구성
  • 적재된 레코드는 수정 불가능 - 리텐션 기간, 용량 설정에 따라 삭제
  • 타임 스탬프 : 레코드가 생성된 시점의 유닉스 타임
  • 메시지 키 : 메시지 값 순차처리, 종류 표현
  • 메시지 값 : 실질적으로 처리할 데이터 존재
    • 메시지 값은 직렬화 되어 브로커에 전송
    • 컨슈머가 이용할 때는 역직렬화를 수행 해야함
  • 오프셋 : 0 이상의 숫자로 형성
  • 헤더 : 레코드의 추가적인 정보를 담는 메타이터 저장소 용도

image

카프카 클라이언트

  • 카프카 클러스터에 명령을 내리거나 데이터를 송수신하기 위해 카프카 클라이언트 라이브러리 사용
  • Producer API
  • Consumer API
  • Streams API
  • Connect API
  • AdminClient API

출처: https://springboot.cloud/36 [갓.바.조.아]

프로듀서 API

  • 데이터 전송시 리터 파티션을 가지고 있는 브로커와 직접 통신
  • 데이터를 직렬화 하여 브로커에 전송
    • 직렬화 : 자바 또는 외부 시스템에서 사용 가능하도록 바이트 형태로 데이터를 변환하는 기술

dependencies


dependencies {
...
implementation 'org.apache.kafka:kafka-clients:3.1.0'
...
}

producer.java

package com.example.demo;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SimpleKafkaProducer {
    private final static Logger logger = LoggerFactory.getLogger(SimpleKafkaProducer.class);
    private final static String TOPIC_NAME = "test";
    private final static String BOOTSTRAP_SERVER = "172.17.19.25:9092";

    public static void main(String[] args){
        Properties configs = new Properties();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        KafkaProducer<String, String> producer = new KafkaProducer<>(configs);    

        String messageValue = "testMessage";
        ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, messageValue);

        producer.send(record);

        logger.info("{}", record);
        producer.flush();
        producer.close();

    }

}

image

image

토픽 생성

./kafka-topics.sh --bootstrap-server 172.17.19.25:19092 --create --topic test --partitions 3

카프카 클라이언트 프로듀서 실행

acks = -1
batch.size = 16384
bootstrap.servers = [172.17.19.25:19092]
buffer.memory = 33554432
client.dns.lookup = use_all_dns_ips
client.id = producer-1
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = true
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
...

2022-04-26 10:53:03.345  INFO 46112 --- [           main] com.example.demo.SimpleKafkaProducer     : ProducerRecord(topic=test, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=null, value=testMessage, timestamp=null)

컨슈머 테스트

./kafka-console-consumer.sh --bootstrap-server 172.17.19.25:19092 --topic test --from-beginning
testMessage

브로커 정상 전송 여부를 확인하는 프로듀서

  • send() 메서드는 Future 객체를 반환
    • RecordMetadata의 비동기 결과를 표한하는 것
    • ProducerRecord가 카프카 브로커에 정상적으로 적재 되었는지에 대한 데이터 포함
  • get() 메서드를 사용하면 프로듀서로 보낸 데이터의 결과를 동기적으로 가져옴
RecordMetadata metadata = producer.send(record).get();
logger.info(metadata.toString());
  • 레코드 정상 적재시 토픽 이름, 파티션 번호, 오프셋 번호 출력
  • 동기로 전송하는 것은 빠른 전송에 허들이 될 수 있음
  • 따라서 비동기 적으로 결과를 확인할 수 있도록 Callback인터페이스를 제공
public clas ProducerCallback implements Callback{
  private final static Logger logger = LoggerFactory.getLogger(ProdudcerCallback.class);
  
  @Override
  public void onCompletion(RecordMetadata recordMetadata, Exception e){
    if (e != null){
      logger.error(e.getMessage(), e);
    }
    else{
      logger.info(recordMetadata.toString());
    }
}

  • onCompletion qlehdrldml rufrhkfmf qkerl dnlgka
  • 데이터의 순서가 중요한 경우 사용을 하면 안됨
producer.send(record, new ProducerCallback());

프로듀서 중요 개념

image

  1. send 호출시 ProducerRecord 파티셔너에서 토픽의 어느 파티션으로 전송될 것인지 결정
  2. 파티셔너를 따로 설정하지 않으면 defaultPartitioner로 설정
  3. 파티셔너에 의해 구분된 레코드는 데이터를 전송하기 전에 어큐물레이터에 데이터를 버퍼로 쌓아놓고 발송
  4. 배치로 묶어 전송함으로써 카프카의 프로듀서 처리량 향상

https://dol9.tistory.com/277

컨슈머 API

  • 프로듀서에 의해 적재된 데이터를 사용하기 위해 데이터를 가져와서 필요한 처리
  • 마케팅 문자를 고객에게 보내는 기능이 있다면 컨슈머는 토픽으로부터 고객 데이터를 가져와서 문자 발송 처리
package com.example.demo;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SimpleConsumer {
    private final static Logger logger = LoggerFactory.getLogger(SimpleConsumer.class);
    private final static String TOPIC_NAME = "test";
    private final static String BOOTSTRAP_SERVER = "172.17.19.25:19092";
    private final static String GROUP_ID = "test-group";

    public void consumer(){
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);    
        consumer.subscribe(Arrays.asList(TOPIC_NAME)); // 컨슈머 토픽 할당

        while(true){
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> record : records){
                logger.info("{}", record);

            }
        }
    }
}

프로듀서 메시지 전송

./kafka-console-producer.sh --bootstrap-server 172.17.19.25:19092 -topic test 
>hello
>test
>sanghyun

카프카 클라이언트 컨슈머에서 전송된 메시지 확인

2022-04-26 11:32:23.022  INFO 58839 --- [           main] com.example.demo.SimpleConsumer          : ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 1, CreateTime = 1650987141761, serialized key size = -1, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello)
2022-04-26 11:32:27.411  INFO 58839 --- [           main] com.example.demo.SimpleConsumer          : ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 2, CreateTime = 1650987146182, serialized key size = -1, serialized value size = 4, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = test)
2022-04-26 11:32:32.321  INFO 58839 --- [           main] com.example.demo.SimpleConsumer          : ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 3, CreateTime = 1650987151005, serialized key size = -1, serialized value size = 8, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = sanghyun)

컨슈머 중요 개념

  • 컨슈머의 운영 방법 2가지
    1. 1개 이상의 컨슈머로 이루어진 컨슈머 그룹을 운영
    2. 토픽의 특정 파티션만 구독하는 컨슈머를 운영
  • 파티션은 하나의 토픽에만 연결이 가능
  • 컨슈머 그룹의 컨슈머 개수는 토픽의 파티션 개수보다 작거나 같아야함

image

image

유휴 상태

image

리밸런싱

  • 장애가 발생하면 장애가 발생한 컨슈머에 할당된 파티션은 장애가 발생하지 않은 컨슈머에게 소유권이 넘어감
  • 소유권을 재할당하는 과정에서 컨슈머 그룹의 컨슈머들이 토픽의 데이터를 읽을 수 없음

image

  • 컨슈머 그룹은 다른 컨슈머 그룹과 격리되는 특징을 가짐
  • 프로듀서가 보낸 데이터를 각기 다른 역할을 하는 컨슈머 그룹끼리 영향을 받지않게 처리 가능

image

  • 컨슈머는 데이터를 어디까지 가져갔는지 커밋을 통해 기록 (__consumer_offsers)
  • 오프셋 커밋은 명시적, 비명시적으로 수행이 가능
  • Default enable.auto.commit=true

컨슈머 내부 구조

  • 컨슈머는 poll() 메서드를 통해 레코드를 반환 받지만 호출하는 시점에 클러스터에서 데이터를 가져오는 것이 아님
  • 내부에서 Fether 인스턴스가 생성되어 poll() 메서드를 호출하기 전에 미리 레코드들을 내부 큐로

동기 오프셋 커밋

  • poll() 메서드로 리턴된 가장 마지막 레코드를 기준으로 오프셋을 커밋
  • 브로커로부터 오프셋 커밋이 완료되었음을 받기까지 컨슈머는 데이터를 처리하지 않고 기다린다.
    • 자동 커밋, 비동기 오프셋 커밋보다 동일 시간당 데이터 처리량이 작다
package com.example.demo;

import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class SimpleConsumer {
    private final static Logger logger = LoggerFactory.getLogger(SimpleConsumer.class);
    private final static String TOPIC_NAME = "test";
    private final static String BOOTSTRAP_SERVER = "172.17.19.25:19092";
    private final static String GROUP_ID = "test-group";

    public void consumer(){
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 추가

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);    
        consumer.subscribe(Arrays.asList(TOPIC_NAME)); // 컨슈머 토픽 할당
        
        while(true){
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
            Map<TopicPartition, OffsetAndMetadata> currentOffset = new HashMap<>(); // 처리한 오프셋을 매번 커밋하기 위해 사용 
            // 키 : 토픽과 파티션의 정보
            // 값 : 오프셋 정보

            for (ConsumerRecord<String, String> record : records){
                logger.info("{}", record);
                currentOffset.put(new TopicPartition(record.topic(), record.partition()),
                                  new OffsetAndMetadata(record.offset() + 1, null));      // 현재 처리한 오프셋 + 1
                                  // 이후에 컨슈머가 poll할 때 마지막으로 커밋한 오프셋 부터 레코드를 리턴하기 때문 
                consumer.commitSync(currentOffset);
            }

        }
    }
}

비동기 오프셋 커밋

  • 동기 오프셋 커밋과 마찬가지로 poll() 메서드로 리턴된 가장 마지막 레코드를 기준으로 오프셋을 커밋
  • 커밋이 완료될 때까지 응답을 기다리지 않음
  • 동일 시간당 데이터 처리량이 더 많음
for (ConsumerRecord<String, String> record : records){
                logger.info("{}", record);
                currentOffset.put(new TopicPartition(record.topic(), record.partition()),
                                  new OffsetAndMetadata(record.offset() + 1, null));      // 현재 처리한 오프셋 + 1
                                  // 이후에 컨슈머가 poll할 때 마지막으로 커밋한 오프셋 부터 레코드를 리턴하기 때문 
                consumer.commitAsync(new OffsetCommitCallback() {
                    public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e){
                        if(e != null){
                            System.err.println("Commit failed");
                        }else{
                            System.out.println("Commit succeeded!");
                        }
                        if (e != null)
                            logger.error("Commit failed for offsets {}", offsets, e);
                    }
                });
            }
  • OffsetCommitCallback 함수는 commitAsync()의 응답을 받을 수 있도록 도와주는 콜백 인터페이스
  • 커밋 응답은 onComplete() 메서드를 통해 확인

리밸런스 리스터를 가진 컨슈머

  • 리밸런스 발생 시 데이터를 중복 처리하지 않게 하기 위해서 리밸런스 발생 시 처리한 데이터를 기준으로 커밋을 시도해야 함
  • 리밸런스 발생 감지를 위한 라이브러리는 ConsumerRebalanceListener 인터페이스를 지원
    • onPartitionAssined() : 리밸런스가 끝난 뒤에 파티션이 할당 완료되면 호출되는 메서드
    • onPartitionRevoked() : 리밸런스가 시작되기 직전에 호출되는 메서드

컨슈머의 안전한 종료

  • poll() 메서드가 동작중에 wakeup() 메서드가 호출되면 다음 poll() 에서 WakeupException 예외가 발생
try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
                Map<TopicPartition, OffsetAndMetadata> currentOffset = new HashMap<>(); // 처리한 오프셋을 매번 커밋하기 위해 사용
                // 키 : 토픽과 파티션의 정보
                // 값 : 오프셋 정보

                for (ConsumerRecord<String, String> record : records) {
                    logger.info("{}", record);
                    currentOffset.put(new TopicPartition(record.topic(), record.partition()),
                            new OffsetAndMetadata(record.offset() + 1, null)); // 현재 처리한 오프셋 + 1
                    // 이후에 컨슈머가 poll할 때 마지막으로 커밋한 오프셋 부터 레코드를 리턴하기 때문
                    consumer.commitAsync(new OffsetCommitCallback() {
                        public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e) {
                            if (e != null) {
                                System.err.println("Commit failed");
                            } else {
                                System.out.println("Commit succeeded!");
                            }
                            if (e != null)
                                logger.error("Commit failed for offsets {}", offsets, e);
                        }
                    });
                }

            }
        } catch (WakeupException e) {
            logger.warn("wakeup consumer");
        }
        finally{
            consumer.close();
        }
...
Runtime.getRuntime().addShutdownHook(new ShutdownThread());
...

static class ShutdownThread extends Thread {
        public void run(){
            consumer.wakeup();
        }
    }

어드민 API

  • 내부 옵션을 설정하고 확인하는 것은 실제 운영환경에서 중요
  • 카프카 클라이언트에서는 내부 옵션들을 설정하거나 조회를 위해 AdminClient클래스를 제공
  • 클러스터의 옵션과 관련된 부분을 자동화 가능

    • 카프카 클러스터를 멀티 스레드로 생성할 때, 구독하는 토픽의 파티션 개수만큼 생성하고 싶을 때, 스레드 생성 전에 해당 토픽의 파티션 개수를 어드민 API를 통해 가져올 수 있다.
    • AdminClient 클래스로 구현한 웹 대시보드를 통해 ACL이 적용된클러스터의 리소스 접근 권한 규칙을 추가할 수 있다.
    • 트정 토픽의 데이터양이 늘어남을 감지하고 AdminClient 클래스로 해당 토픽의 파티션을 늘릴 수 있다.
package com.example.demo;

import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.config.ConfigResource;


public class SimpleAdmin {
    public void test() {
        Properties configs = new Properties();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.17.19.25:19092");

        AdminClient admin = AdminClient.create(configs);

        System.out.println("== Get broker information");

        try {
            for( Node node : admin.describeCluster().nodes().get()){  // describeCluster() 브로커 정보 
                System.out.println("node : " + node);
                ConfigResource cr = new ConfigResource(ConfigResource.Type.BROKER, node.idString());
                DescribeConfigsResult describeConfigs = admin.describeConfigs(Collections.singleton(cr));
                describeConfigs.all().get().forEach((broker, config) -> {
                    config.entries().forEach(configEntry -> System.out.println(configEntry.name() + " = " + configEntry.value()));
                } );
            }
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (ExecutionException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

    }

}

== Get broker information
node : 172.17.19.25:19092 (id: 1 rack: null)
log.cleaner.min.compaction.lag.ms = 0
offsets.topic.num.partitions = 50
sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
log.flush.interval.messages = 9223372036854775807
controller.socket.timeout.ms = 30000
principal.builder.class = org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
log.flush.interval.ms = null
controller.quorum.request.timeout.ms = 2000
sasl.oauthbearer.expected.audience = null
min.insync.replicas = 1
num.recovery.threads.per.data.dir = 1
ssl.keystore.type = JKS
zookeeper.ssl.protocol = TLSv1.2
sasl.mechanism.inter.broker.protocol = GSSAPI
metadata.log.segment.bytes = 1073741824
fetch.purgatory.purge.interval.requests = 1000
ssl.endpoint.identification.algorithm = https
zookeeper.ssl.keystore.location = null
replica.socket.timeout.ms = 30000
message.max.bytes = 1048588
max.connection.creation.rate = 2147483647
connections.max.reauth.ms = 0
log.flush.offset.checkpoint.interval.ms = 60000
zookeeper.clientCnxnSocket = null
zookeeper.ssl.client.enable = false
quota.window.num = 11
sasl.oauthbearer.clock.skew.seconds = 30
zookeeper.connect = zookeeper:2181
authorizer.class.name = 
password.encoder.secret = null
num.replica.fetchers = 1
alter.log.dirs.replication.quota.window.size.seconds = 1
sasl.oauthbearer.jwks.endpoint.url = null
log.roll.jitter.hours = 0
password.encoder.old.secret = null
log.cleaner.delete.retention.ms = 86400000
sasl.login.retry.backoff.ms = 100
queued.max.requests = 500
log.cleaner.threads = 1
sasl.kerberos.service.name = null
socket.request.max.bytes = 104857600
log.message.timestamp.type = CreateTime
zookeeper.ssl.keystore.type = null
connections.max.idle.ms = 600000
zookeeper.set.acl = false
delegation.token.expiry.time.ms = 86400000
max.connections = 2147483647
transaction.state.log.num.partitions = 50
controller.quorum.election.timeout.ms = 1000
listener.security.protocol.map = PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
log.retention.hours = 168
client.quota.callback.class = null
ssl.provider = null
delete.records.purgatory.purge.interval.requests = 1
log.roll.ms = null
ssl.cipher.suites = 
controller.quorum.retry.backoff.ms = 20
zookeeper.ssl.keystore.password = null
broker.session.timeout.ms = 9000
security.inter.broker.protocol = PLAINTEXT
delegation.token.secret.key = null
node.id = 1
replica.high.watermark.checkpoint.interval.ms = 5000
replication.quota.window.size.seconds = 1
sasl.kerberos.ticket.renew.window.factor = 0.8
zookeeper.connection.timeout.ms = 18000
metrics.recording.level = INFO
password.encoder.cipher.algorithm = AES/CBC/PKCS5Padding
ssl.principal.mapping.rules = DEFAULT
replica.selector.class = null
max.connections.per.ip = 2147483647
background.threads = 10
request.timeout.ms = 30000
log.message.format.version = 3.0-IV1
sasl.login.class = null
log.dir = /tmp/kafka-logs
log.segment.bytes = 1073741824
replica.fetch.response.max.bytes = 10485760
group.max.session.timeout.ms = 1800000
controller.listener.names = null
controller.quorum.append.linger.ms = 25
log.segment.delete.delay.ms = 60000
log.retention.minutes = null
log.dirs = /data
controlled.shutdown.enable = true
socket.connection.setup.timeout.max.ms = 30000
log.message.timestamp.difference.max.ms = 9223372036854775807
sasl.oauthbearer.scope.claim.name = scope
password.encoder.key.length = 128
sasl.login.refresh.min.period.seconds = 60
sasl.oauthbearer.expected.issuer = null
sasl.login.read.timeout.ms = null
transaction.abort.timed.out.transaction.cleanup.interval.ms = 10000
sasl.kerberos.kinit.cmd = /usr/bin/kinit
log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
auto.leader.rebalance.enable = true
leader.imbalance.check.interval.seconds = 300
log.cleaner.min.cleanable.ratio = 0.5
replica.lag.time.max.ms = 30000
num.network.threads = 3
ssl.keystore.key = null
sasl.client.callback.handler.class = null
metrics.num.samples = 2
socket.send.buffer.bytes = 102400
password.encoder.keyfactory.algorithm = null
socket.receive.buffer.bytes = 102400
sasl.oauthbearer.sub.claim.name = sub
replica.fetch.min.bytes = 1
broker.rack = null
zookeeper.ssl.truststore.password = null
unclean.leader.election.enable = false
sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
offsets.retention.check.interval.ms = 600000
producer.purgatory.purge.interval.requests = 1000
controller.quorum.voters = 
metrics.sample.window.ms = 30000
log.retention.check.interval.ms = 300000
sasl.login.refresh.window.jitter = 0.05
leader.imbalance.per.broker.percentage = 10
controller.quota.window.num = 11
metric.reporters = 
sasl.oauthbearer.token.endpoint.url = null
auto.create.topics.enable = true
ssl.engine.factory.class = null
replica.socket.receive.buffer.bytes = 65536
zookeeper.ssl.truststore.location = null
replica.fetch.wait.max.ms = 500
password.encoder.iterations = 4096
default.replication.factor = 1
ssl.truststore.password = null
sasl.kerberos.principal.to.local.rules = DEFAULT
log.preallocate = false
transactional.id.expiration.ms = 604800000
control.plane.listener.name = null
transaction.state.log.replication.factor = 1
num.io.threads = 8
sasl.login.refresh.buffer.seconds = 300
offsets.commit.required.acks = -1
connection.failed.authentication.delay.ms = 100
delete.topic.enable = true
quota.window.size.seconds = 1
ssl.truststore.type = JKS
offsets.commit.timeout.ms = 5000
zookeeper.ssl.ocsp.enable = false
broker.heartbeat.interval.ms = 2000
sasl.mechanism.controller.protocol = GSSAPI
log.cleaner.max.compaction.lag.ms = 9223372036854775807
zookeeper.ssl.enabled.protocols = null
log.retention.ms = null
alter.log.dirs.replication.quota.window.num = 11
log.cleaner.enable = true
controller.quorum.fetch.timeout.ms = 2000
offsets.load.buffer.size = 5242880
ssl.client.auth = none
controlled.shutdown.max.retries = 3
offsets.topic.replication.factor = 1
zookeeper.ssl.truststore.type = null
transaction.state.log.min.isr = 1
ssl.secure.random.implementation = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.trustmanager.algorithm = PKIX
zookeeper.session.timeout.ms = 18000
log.retention.bytes = -1
controller.quota.window.size.seconds = 1
sasl.jaas.config = null
sasl.kerberos.min.time.before.relogin = 60000
offsets.retention.minutes = 10080
replica.fetch.backoff.ms = 1000
inter.broker.protocol.version = 3.1-IV0
kafka.metrics.reporters = 
num.partitions = 1
ssl.keystore.certificate.chain = null
socket.connection.setup.timeout.ms = 10000
broker.id.generation.enable = true
listeners = PLAINTEXT://:9092
ssl.enabled.protocols = TLSv1.2,TLSv1.3
inter.broker.listener.name = null
alter.config.policy.class.name = null
delegation.token.expiry.check.interval.ms = 3600000
log.flush.scheduler.interval.ms = 9223372036854775807
zookeeper.max.in.flight.requests = 10
log.index.size.max.bytes = 10485760
ssl.keymanager.algorithm = SunX509
sasl.login.callback.handler.class = null
replica.fetch.max.bytes = 1048576
zookeeper.ssl.crl.enable = false
sasl.server.callback.handler.class = null
log.cleaner.dedupe.buffer.size = 134217728
log.cleaner.io.buffer.size = 524288
create.topic.policy.class.name = null
ssl.truststore.certificates = null
controlled.shutdown.retry.backoff.ms = 5000
security.providers = null
log.roll.hours = 168
log.cleanup.policy = delete
initial.broker.registration.timeout.ms = 60000
log.flush.start.offset.checkpoint.interval.ms = 60000
log.roll.jitter.ms = null
transaction.state.log.segment.bytes = 104857600
offsets.topic.segment.bytes = 104857600
sasl.login.retry.backoff.max.ms = 10000
group.initial.rebalance.delay.ms = 0
log.index.interval.bytes = 4096
log.cleaner.backoff.ms = 15000
ssl.truststore.location = null
offset.metadata.max.bytes = 4096
ssl.keystore.password = null
zookeeper.sync.time.ms = 2000
fetch.max.bytes = 57671680
metadata.max.retention.bytes = -1
compression.type = producer
sasl.login.connect.timeout.ms = null
max.connections.per.ip.overrides = 
sasl.login.refresh.window.factor = 0.8
kafka.metrics.polling.interval.secs = 10
metadata.log.max.record.bytes.between.snapshots = 20971520
metadata.max.retention.ms = 604800000
controller.quorum.election.backoff.max.ms = 1000
max.incremental.fetch.session.cache.slots = 1000
delegation.token.master.key = null
ssl.key.password = null
reserved.broker.max.id = 100000
transaction.remove.expired.transaction.cleanup.interval.ms = 3600000
log.message.downconversion.enable = true
ssl.protocol = TLSv1.3
metadata.log.segment.ms = 604800000
transaction.state.log.load.buffer.size = 5242880
ssl.keystore.location = null
sasl.enabled.mechanisms = GSSAPI
num.replica.alter.log.dirs.threads = null
zookeeper.ssl.cipher.suites = null
group.min.session.timeout.ms = 6000
log.cleaner.io.buffer.load.factor = 0.9
transaction.max.timeout.ms = 900000
metadata.log.dir = null
process.roles = 
group.max.size = 2147483647
sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
delegation.token.max.lifetime.ms = 604800000
broker.id = 1
offsets.topic.compression.codec = 0
zookeeper.ssl.endpoint.identification.algorithm = HTTPS
replication.quota.window.num = 11
advertised.listeners = PLAINTEXT://172.17.19.25:19092
queued.max.request.bytes = -1