Kafka - 2!
kafka 책 읽기
카프카 브로커
- 카프카 브로커 : 데이터를 주고받기 위해 사용하는 주체, 데이터를 분산 저장하여 장애 대응 애플리케이션
- 안전한 데이터 보관 및 처리를 위해 3대 이상의 브로커를 1개의 클러스터로 운영 권장
- 클러스터는 프로듀서에서 전송된 데이터를 안전하게
분산, 복제
를 수행
데이터 저장, 전송
- server.properties : log.dirs : /data
- 브로커는 프로듀서로부터 전달받은 데이터를 토픽의 파티션에 데이터를 저장
- 토픽의 갯수 = 디렉토리의 갯수
페이지 캐시
카프카는 메모리, 데이터베이스에 데이터를 저장하지 않는다. - 파일 시스템 이용
지속적인 입출력이 발생할 때, 파일 시스템은 처리 속도가 늦어지는 현상 발생
카프카에서는 페이지 캐시를 이용하여서 디스크 입출력 속도를 높여 문제를 해결
- 메인 메모리의 남는 공간을 활용해 페이지 캐시라는 메모리 공간을 마련
- 파일 I/O의 성능을 향상시키기 위해 사용되는 것
- 사용된 파일의 내용을 저장해 놓았다가 읽을 때 page cache를 활용하여 효율 증가
- ~클린 이름의 프로그램
데이터 복제, 싱크
- 데이터 복제 : 장애 허용 시스템으로 동작하도록 하는 원동력 -> 장애, 데이터 유실 대응
- 토픽 생성시 복제의 개수 설정 가능 (옵션 미선택 시 브로커 설정 옵션)
복제의 개수 증가 -> 저장 용량 증가 -> 데이터를 안전하게 사용 가능
복제의 개수 감소 -> 저장 용량 감소 -> 데이터를 불안전하게 사용 가능
- 장애 발생시 해당 브로커의 리더 파티션은 사용 불가능
- 새로운 리더를 선출
컨트롤러
- 브로커 중 한대가 컨트롤러 역할
- 다른 브로커들의 상태 체크, 리더파티션 재분배
- 컨트롤러 브로커 장애시 다른 브로커 컨트롤러 역할
- 죽은 브로커가 담당한 파티션에 새로운 리더 선출
데이터 삭제
- 오직 브로커만이 데이터를 삭제 가능
- 데이터 삭제는 파일 단위로 이루어짐
- 로그 세그먼트 단위로 삭제 - 다수의 데이터가 존재하여 일반 DB처럼 데이터를 선별해서 삭제 불가능
- https://developpaper.com/kafka-source-reading-log-segment-kafka-log-logsegment/
log.segment.bytes
,log.seement.ms
옵션 값에 따라 세그먼트 파일이닫힘
- 기본값 : 1GBlog.retention.bytes
,log.retention.ms
옵션에 설정 값이 넘으면삭제
log.retention.check.interval.ms
닫힌 세그먼트파일 체크 간격
코디네이터
- 컨트롤러와 마찬가지로 브로커 중 한대가 역할
- 컨슈머 그룹의 상태 체크, 파티션을 컨슈머와 매칭 분배
- 컨슈머가 컨슈머 그룹에서 빠지면 매칭되지 않은 파티션을 정상 동작하는 컨슈머로 할당
-파티션을 컨슈머로 재할당하는 과정 -
리밸런스
카프카 주키퍼
- 카프카의 메타데이터를 관리하는데 사용
- ./zkCli.sh 명령어로 실행
[zk: localhost:2181(CONNECTED) 1] ls /
[admin, brokers, cluster, config, consumers, controller, controller_epoch, feature, isr_change_notification, latest_producer_id_block, log_dir_event_notification, zookeeper]
[zk: localhost:2181(CONNECTED) 3] get /brokers/ids/1
{"features":{},"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://172.17.19.25:19092"],"jmx_port":-1,"port":19092,"host":"172.17.19.25","version":5,"timestamp":"1650935071371"}
[zk: localhost:2181(CONNECTED) 4] get /controller
{"version":1,"brokerid":1,"timestamp":"1650935071544"}
[zk: localhost:2181(CONNECTED) 5] ls /brokers/topics
[__consumer_offsets, a_Topic1]
[zk: localhost:2181(CONNECTED) 6]
토픽, 파티션
토픽
- 데이터를 구분하기 위해 사용하는
단위
- 1개 이상의 파티션을 소유
- 프로듀서가 보낸 데이터들 저장 -
레코드
파티션
- 카프카에서 병렬처리의 핵심 - 그룹으로 묶인 컨슈머들이 레코드를 병렬로 처리 가능
- 큐와 비슷한 구조
https://needjarvis.tistory.com/603
레코드
- 레코드는 타임스탬프, 메시지 키, 메시지 값, 오프셋 헤더로 구성
- 적재된 레코드는 수정 불가능 - 리텐션 기간, 용량 설정에 따라 삭제
타임 스탬프
: 레코드가 생성된 시점의 유닉스 타임메시지 키
: 메시지 값 순차처리, 종류 표현메시지 값
: 실질적으로 처리할 데이터 존재- 메시지 값은
직렬화
되어 브로커에 전송 - 컨슈머가 이용할 때는
역직렬화
를 수행 해야함
- 메시지 값은
- 오프셋 : 0 이상의 숫자로 형성
- 헤더 : 레코드의 추가적인 정보를 담는 메타이터 저장소 용도
카프카 클라이언트
- 카프카 클러스터에 명령을 내리거나 데이터를 송수신하기 위해 카프카 클라이언트 라이브러리 사용
- Producer API
- Consumer API
- Streams API
- Connect API
- AdminClient API
출처: https://springboot.cloud/36 [갓.바.조.아]
프로듀서 API
- 데이터 전송시 리터 파티션을 가지고 있는 브로커와 직접 통신
- 데이터를 직렬화 하여 브로커에 전송
- 직렬화 : 자바 또는 외부 시스템에서 사용 가능하도록 바이트 형태로 데이터를 변환하는 기술
dependencies
dependencies {
...
implementation 'org.apache.kafka:kafka-clients:3.1.0'
...
}
producer.java
package com.example.demo;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SimpleKafkaProducer {
private final static Logger logger = LoggerFactory.getLogger(SimpleKafkaProducer.class);
private final static String TOPIC_NAME = "test";
private final static String BOOTSTRAP_SERVER = "172.17.19.25:9092";
public static void main(String[] args){
Properties configs = new Properties();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(configs);
String messageValue = "testMessage";
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, messageValue);
producer.send(record);
logger.info("{}", record);
producer.flush();
producer.close();
}
}
토픽 생성
./kafka-topics.sh --bootstrap-server 172.17.19.25:19092 --create --topic test --partitions 3
카프카 클라이언트 프로듀서 실행
acks = -1
batch.size = 16384
bootstrap.servers = [172.17.19.25:19092]
buffer.memory = 33554432
client.dns.lookup = use_all_dns_ips
client.id = producer-1
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = true
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
...
2022-04-26 10:53:03.345 INFO 46112 --- [ main] com.example.demo.SimpleKafkaProducer : ProducerRecord(topic=test, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=null, value=testMessage, timestamp=null)
컨슈머 테스트
./kafka-console-consumer.sh --bootstrap-server 172.17.19.25:19092 --topic test --from-beginning
testMessage
브로커 정상 전송 여부를 확인하는 프로듀서
- send() 메서드는 Future 객체를 반환
- RecordMetadata의 비동기 결과를 표한하는 것
- ProducerRecord가 카프카 브로커에 정상적으로 적재 되었는지에 대한 데이터 포함
- get() 메서드를 사용하면 프로듀서로 보낸 데이터의 결과를 동기적으로 가져옴
RecordMetadata metadata = producer.send(record).get();
logger.info(metadata.toString());
- 레코드 정상 적재시 토픽 이름, 파티션 번호, 오프셋 번호 출력
- 동기로 전송하는 것은 빠른 전송에 허들이 될 수 있음
- 따라서 비동기 적으로 결과를 확인할 수 있도록 Callback인터페이스를 제공
public clas ProducerCallback implements Callback{
private final static Logger logger = LoggerFactory.getLogger(ProdudcerCallback.class);
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e){
if (e != null){
logger.error(e.getMessage(), e);
}
else{
logger.info(recordMetadata.toString());
}
}
- onCompletion qlehdrldml rufrhkfmf qkerl dnlgka
- 데이터의 순서가 중요한 경우 사용을 하면 안됨
producer.send(record, new ProducerCallback());
프로듀서 중요 개념
- send 호출시 ProducerRecord 파티셔너에서 토픽의 어느 파티션으로 전송될 것인지 결정
- 파티셔너를 따로 설정하지 않으면 defaultPartitioner로 설정
- 파티셔너에 의해 구분된 레코드는 데이터를 전송하기 전에 어큐물레이터에 데이터를 버퍼로 쌓아놓고 발송
- 배치로 묶어 전송함으로써 카프카의 프로듀서 처리량 향상
https://dol9.tistory.com/277
컨슈머 API
- 프로듀서에 의해 적재된 데이터를 사용하기 위해 데이터를 가져와서 필요한 처리
- 마케팅 문자를 고객에게 보내는 기능이 있다면 컨슈머는 토픽으로부터 고객 데이터를 가져와서 문자 발송 처리
package com.example.demo;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SimpleConsumer {
private final static Logger logger = LoggerFactory.getLogger(SimpleConsumer.class);
private final static String TOPIC_NAME = "test";
private final static String BOOTSTRAP_SERVER = "172.17.19.25:19092";
private final static String GROUP_ID = "test-group";
public void consumer(){
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(TOPIC_NAME)); // 컨슈머 토픽 할당
while(true){
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records){
logger.info("{}", record);
}
}
}
}
프로듀서 메시지 전송
./kafka-console-producer.sh --bootstrap-server 172.17.19.25:19092 -topic test
>hello
>test
>sanghyun
카프카 클라이언트 컨슈머에서 전송된 메시지 확인
2022-04-26 11:32:23.022 INFO 58839 --- [ main] com.example.demo.SimpleConsumer : ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 1, CreateTime = 1650987141761, serialized key size = -1, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello)
2022-04-26 11:32:27.411 INFO 58839 --- [ main] com.example.demo.SimpleConsumer : ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 2, CreateTime = 1650987146182, serialized key size = -1, serialized value size = 4, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = test)
2022-04-26 11:32:32.321 INFO 58839 --- [ main] com.example.demo.SimpleConsumer : ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 3, CreateTime = 1650987151005, serialized key size = -1, serialized value size = 8, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = sanghyun)
컨슈머 중요 개념
- 컨슈머의 운영 방법 2가지
- 1개 이상의 컨슈머로 이루어진 컨슈머 그룹을 운영
- 토픽의 특정 파티션만 구독하는 컨슈머를 운영
- 파티션은 하나의 토픽에만 연결이 가능
- 컨슈머 그룹의 컨슈머 개수는 토픽의 파티션 개수보다 작거나 같아야함
유휴 상태
리밸런싱
- 장애가 발생하면 장애가 발생한 컨슈머에 할당된 파티션은 장애가 발생하지 않은 컨슈머에게 소유권이 넘어감
- 소유권을 재할당하는 과정에서 컨슈머 그룹의 컨슈머들이 토픽의 데이터를 읽을 수 없음
- 컨슈머 그룹은 다른 컨슈머 그룹과 격리되는 특징을 가짐
- 프로듀서가 보낸 데이터를 각기 다른 역할을 하는 컨슈머 그룹끼리 영향을 받지않게 처리 가능
- 컨슈머는 데이터를 어디까지 가져갔는지 커밋을 통해 기록 (__consumer_offsers)
- 오프셋 커밋은 명시적, 비명시적으로 수행이 가능
- Default
enable.auto.commit=true
컨슈머 내부 구조
- 컨슈머는 poll() 메서드를 통해 레코드를 반환 받지만 호출하는 시점에 클러스터에서 데이터를 가져오는 것이 아님
- 내부에서 Fether 인스턴스가 생성되어 poll() 메서드를 호출하기 전에 미리 레코드들을 내부 큐로
동기 오프셋 커밋
- poll() 메서드로 리턴된 가장 마지막 레코드를 기준으로 오프셋을 커밋
- 브로커로부터 오프셋 커밋이 완료되었음을 받기까지 컨슈머는 데이터를 처리하지 않고 기다린다.
- 자동 커밋, 비동기 오프셋 커밋보다 동일 시간당 데이터 처리량이 작다
package com.example.demo;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SimpleConsumer {
private final static Logger logger = LoggerFactory.getLogger(SimpleConsumer.class);
private final static String TOPIC_NAME = "test";
private final static String BOOTSTRAP_SERVER = "172.17.19.25:19092";
private final static String GROUP_ID = "test-group";
public void consumer(){
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 추가
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(TOPIC_NAME)); // 컨슈머 토픽 할당
while(true){
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
Map<TopicPartition, OffsetAndMetadata> currentOffset = new HashMap<>(); // 처리한 오프셋을 매번 커밋하기 위해 사용
// 키 : 토픽과 파티션의 정보
// 값 : 오프셋 정보
for (ConsumerRecord<String, String> record : records){
logger.info("{}", record);
currentOffset.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1, null)); // 현재 처리한 오프셋 + 1
// 이후에 컨슈머가 poll할 때 마지막으로 커밋한 오프셋 부터 레코드를 리턴하기 때문
consumer.commitSync(currentOffset);
}
}
}
}
비동기 오프셋 커밋
- 동기 오프셋 커밋과 마찬가지로 poll() 메서드로 리턴된 가장 마지막 레코드를 기준으로 오프셋을 커밋
- 커밋이 완료될 때까지 응답을 기다리지 않음
- 동일 시간당 데이터 처리량이 더 많음
for (ConsumerRecord<String, String> record : records){
logger.info("{}", record);
currentOffset.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1, null)); // 현재 처리한 오프셋 + 1
// 이후에 컨슈머가 poll할 때 마지막으로 커밋한 오프셋 부터 레코드를 리턴하기 때문
consumer.commitAsync(new OffsetCommitCallback() {
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e){
if(e != null){
System.err.println("Commit failed");
}else{
System.out.println("Commit succeeded!");
}
if (e != null)
logger.error("Commit failed for offsets {}", offsets, e);
}
});
}
- OffsetCommitCallback 함수는 commitAsync()의 응답을 받을 수 있도록 도와주는 콜백 인터페이스
- 커밋 응답은 onComplete() 메서드를 통해 확인
리밸런스 리스터를 가진 컨슈머
- 리밸런스 발생 시 데이터를 중복 처리하지 않게 하기 위해서 리밸런스 발생 시 처리한 데이터를 기준으로 커밋을 시도해야 함
- 리밸런스 발생 감지를 위한 라이브러리는 ConsumerRebalanceListener 인터페이스를 지원
- onPartitionAssined() : 리밸런스가 끝난 뒤에 파티션이 할당 완료되면 호출되는 메서드
- onPartitionRevoked() : 리밸런스가 시작되기 직전에 호출되는 메서드
컨슈머의 안전한 종료
- poll() 메서드가 동작중에 wakeup() 메서드가 호출되면 다음 poll() 에서 WakeupException 예외가 발생
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
Map<TopicPartition, OffsetAndMetadata> currentOffset = new HashMap<>(); // 처리한 오프셋을 매번 커밋하기 위해 사용
// 키 : 토픽과 파티션의 정보
// 값 : 오프셋 정보
for (ConsumerRecord<String, String> record : records) {
logger.info("{}", record);
currentOffset.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1, null)); // 현재 처리한 오프셋 + 1
// 이후에 컨슈머가 poll할 때 마지막으로 커밋한 오프셋 부터 레코드를 리턴하기 때문
consumer.commitAsync(new OffsetCommitCallback() {
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e) {
if (e != null) {
System.err.println("Commit failed");
} else {
System.out.println("Commit succeeded!");
}
if (e != null)
logger.error("Commit failed for offsets {}", offsets, e);
}
});
}
}
} catch (WakeupException e) {
logger.warn("wakeup consumer");
}
finally{
consumer.close();
}
...
Runtime.getRuntime().addShutdownHook(new ShutdownThread());
...
static class ShutdownThread extends Thread {
public void run(){
consumer.wakeup();
}
}
어드민 API
- 내부 옵션을 설정하고 확인하는 것은 실제 운영환경에서 중요
- 카프카 클라이언트에서는 내부 옵션들을 설정하거나 조회를 위해 AdminClient클래스를 제공
-
클러스터의 옵션과 관련된 부분을 자동화 가능
- 카프카 클러스터를 멀티 스레드로 생성할 때, 구독하는 토픽의 파티션 개수만큼 생성하고 싶을 때, 스레드 생성 전에 해당 토픽의 파티션 개수를 어드민 API를 통해 가져올 수 있다.
- AdminClient 클래스로 구현한 웹 대시보드를 통해 ACL이 적용된클러스터의 리소스 접근 권한 규칙을 추가할 수 있다.
- 트정 토픽의 데이터양이 늘어남을 감지하고 AdminClient 클래스로 해당 토픽의 파티션을 늘릴 수 있다.
package com.example.demo;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.config.ConfigResource;
public class SimpleAdmin {
public void test() {
Properties configs = new Properties();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.17.19.25:19092");
AdminClient admin = AdminClient.create(configs);
System.out.println("== Get broker information");
try {
for( Node node : admin.describeCluster().nodes().get()){ // describeCluster() 브로커 정보
System.out.println("node : " + node);
ConfigResource cr = new ConfigResource(ConfigResource.Type.BROKER, node.idString());
DescribeConfigsResult describeConfigs = admin.describeConfigs(Collections.singleton(cr));
describeConfigs.all().get().forEach((broker, config) -> {
config.entries().forEach(configEntry -> System.out.println(configEntry.name() + " = " + configEntry.value()));
} );
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
== Get broker information
node : 172.17.19.25:19092 (id: 1 rack: null)
log.cleaner.min.compaction.lag.ms = 0
offsets.topic.num.partitions = 50
sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
log.flush.interval.messages = 9223372036854775807
controller.socket.timeout.ms = 30000
principal.builder.class = org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
log.flush.interval.ms = null
controller.quorum.request.timeout.ms = 2000
sasl.oauthbearer.expected.audience = null
min.insync.replicas = 1
num.recovery.threads.per.data.dir = 1
ssl.keystore.type = JKS
zookeeper.ssl.protocol = TLSv1.2
sasl.mechanism.inter.broker.protocol = GSSAPI
metadata.log.segment.bytes = 1073741824
fetch.purgatory.purge.interval.requests = 1000
ssl.endpoint.identification.algorithm = https
zookeeper.ssl.keystore.location = null
replica.socket.timeout.ms = 30000
message.max.bytes = 1048588
max.connection.creation.rate = 2147483647
connections.max.reauth.ms = 0
log.flush.offset.checkpoint.interval.ms = 60000
zookeeper.clientCnxnSocket = null
zookeeper.ssl.client.enable = false
quota.window.num = 11
sasl.oauthbearer.clock.skew.seconds = 30
zookeeper.connect = zookeeper:2181
authorizer.class.name =
password.encoder.secret = null
num.replica.fetchers = 1
alter.log.dirs.replication.quota.window.size.seconds = 1
sasl.oauthbearer.jwks.endpoint.url = null
log.roll.jitter.hours = 0
password.encoder.old.secret = null
log.cleaner.delete.retention.ms = 86400000
sasl.login.retry.backoff.ms = 100
queued.max.requests = 500
log.cleaner.threads = 1
sasl.kerberos.service.name = null
socket.request.max.bytes = 104857600
log.message.timestamp.type = CreateTime
zookeeper.ssl.keystore.type = null
connections.max.idle.ms = 600000
zookeeper.set.acl = false
delegation.token.expiry.time.ms = 86400000
max.connections = 2147483647
transaction.state.log.num.partitions = 50
controller.quorum.election.timeout.ms = 1000
listener.security.protocol.map = PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
log.retention.hours = 168
client.quota.callback.class = null
ssl.provider = null
delete.records.purgatory.purge.interval.requests = 1
log.roll.ms = null
ssl.cipher.suites =
controller.quorum.retry.backoff.ms = 20
zookeeper.ssl.keystore.password = null
broker.session.timeout.ms = 9000
security.inter.broker.protocol = PLAINTEXT
delegation.token.secret.key = null
node.id = 1
replica.high.watermark.checkpoint.interval.ms = 5000
replication.quota.window.size.seconds = 1
sasl.kerberos.ticket.renew.window.factor = 0.8
zookeeper.connection.timeout.ms = 18000
metrics.recording.level = INFO
password.encoder.cipher.algorithm = AES/CBC/PKCS5Padding
ssl.principal.mapping.rules = DEFAULT
replica.selector.class = null
max.connections.per.ip = 2147483647
background.threads = 10
request.timeout.ms = 30000
log.message.format.version = 3.0-IV1
sasl.login.class = null
log.dir = /tmp/kafka-logs
log.segment.bytes = 1073741824
replica.fetch.response.max.bytes = 10485760
group.max.session.timeout.ms = 1800000
controller.listener.names = null
controller.quorum.append.linger.ms = 25
log.segment.delete.delay.ms = 60000
log.retention.minutes = null
log.dirs = /data
controlled.shutdown.enable = true
socket.connection.setup.timeout.max.ms = 30000
log.message.timestamp.difference.max.ms = 9223372036854775807
sasl.oauthbearer.scope.claim.name = scope
password.encoder.key.length = 128
sasl.login.refresh.min.period.seconds = 60
sasl.oauthbearer.expected.issuer = null
sasl.login.read.timeout.ms = null
transaction.abort.timed.out.transaction.cleanup.interval.ms = 10000
sasl.kerberos.kinit.cmd = /usr/bin/kinit
log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
auto.leader.rebalance.enable = true
leader.imbalance.check.interval.seconds = 300
log.cleaner.min.cleanable.ratio = 0.5
replica.lag.time.max.ms = 30000
num.network.threads = 3
ssl.keystore.key = null
sasl.client.callback.handler.class = null
metrics.num.samples = 2
socket.send.buffer.bytes = 102400
password.encoder.keyfactory.algorithm = null
socket.receive.buffer.bytes = 102400
sasl.oauthbearer.sub.claim.name = sub
replica.fetch.min.bytes = 1
broker.rack = null
zookeeper.ssl.truststore.password = null
unclean.leader.election.enable = false
sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
offsets.retention.check.interval.ms = 600000
producer.purgatory.purge.interval.requests = 1000
controller.quorum.voters =
metrics.sample.window.ms = 30000
log.retention.check.interval.ms = 300000
sasl.login.refresh.window.jitter = 0.05
leader.imbalance.per.broker.percentage = 10
controller.quota.window.num = 11
metric.reporters =
sasl.oauthbearer.token.endpoint.url = null
auto.create.topics.enable = true
ssl.engine.factory.class = null
replica.socket.receive.buffer.bytes = 65536
zookeeper.ssl.truststore.location = null
replica.fetch.wait.max.ms = 500
password.encoder.iterations = 4096
default.replication.factor = 1
ssl.truststore.password = null
sasl.kerberos.principal.to.local.rules = DEFAULT
log.preallocate = false
transactional.id.expiration.ms = 604800000
control.plane.listener.name = null
transaction.state.log.replication.factor = 1
num.io.threads = 8
sasl.login.refresh.buffer.seconds = 300
offsets.commit.required.acks = -1
connection.failed.authentication.delay.ms = 100
delete.topic.enable = true
quota.window.size.seconds = 1
ssl.truststore.type = JKS
offsets.commit.timeout.ms = 5000
zookeeper.ssl.ocsp.enable = false
broker.heartbeat.interval.ms = 2000
sasl.mechanism.controller.protocol = GSSAPI
log.cleaner.max.compaction.lag.ms = 9223372036854775807
zookeeper.ssl.enabled.protocols = null
log.retention.ms = null
alter.log.dirs.replication.quota.window.num = 11
log.cleaner.enable = true
controller.quorum.fetch.timeout.ms = 2000
offsets.load.buffer.size = 5242880
ssl.client.auth = none
controlled.shutdown.max.retries = 3
offsets.topic.replication.factor = 1
zookeeper.ssl.truststore.type = null
transaction.state.log.min.isr = 1
ssl.secure.random.implementation = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.trustmanager.algorithm = PKIX
zookeeper.session.timeout.ms = 18000
log.retention.bytes = -1
controller.quota.window.size.seconds = 1
sasl.jaas.config = null
sasl.kerberos.min.time.before.relogin = 60000
offsets.retention.minutes = 10080
replica.fetch.backoff.ms = 1000
inter.broker.protocol.version = 3.1-IV0
kafka.metrics.reporters =
num.partitions = 1
ssl.keystore.certificate.chain = null
socket.connection.setup.timeout.ms = 10000
broker.id.generation.enable = true
listeners = PLAINTEXT://:9092
ssl.enabled.protocols = TLSv1.2,TLSv1.3
inter.broker.listener.name = null
alter.config.policy.class.name = null
delegation.token.expiry.check.interval.ms = 3600000
log.flush.scheduler.interval.ms = 9223372036854775807
zookeeper.max.in.flight.requests = 10
log.index.size.max.bytes = 10485760
ssl.keymanager.algorithm = SunX509
sasl.login.callback.handler.class = null
replica.fetch.max.bytes = 1048576
zookeeper.ssl.crl.enable = false
sasl.server.callback.handler.class = null
log.cleaner.dedupe.buffer.size = 134217728
log.cleaner.io.buffer.size = 524288
create.topic.policy.class.name = null
ssl.truststore.certificates = null
controlled.shutdown.retry.backoff.ms = 5000
security.providers = null
log.roll.hours = 168
log.cleanup.policy = delete
initial.broker.registration.timeout.ms = 60000
log.flush.start.offset.checkpoint.interval.ms = 60000
log.roll.jitter.ms = null
transaction.state.log.segment.bytes = 104857600
offsets.topic.segment.bytes = 104857600
sasl.login.retry.backoff.max.ms = 10000
group.initial.rebalance.delay.ms = 0
log.index.interval.bytes = 4096
log.cleaner.backoff.ms = 15000
ssl.truststore.location = null
offset.metadata.max.bytes = 4096
ssl.keystore.password = null
zookeeper.sync.time.ms = 2000
fetch.max.bytes = 57671680
metadata.max.retention.bytes = -1
compression.type = producer
sasl.login.connect.timeout.ms = null
max.connections.per.ip.overrides =
sasl.login.refresh.window.factor = 0.8
kafka.metrics.polling.interval.secs = 10
metadata.log.max.record.bytes.between.snapshots = 20971520
metadata.max.retention.ms = 604800000
controller.quorum.election.backoff.max.ms = 1000
max.incremental.fetch.session.cache.slots = 1000
delegation.token.master.key = null
ssl.key.password = null
reserved.broker.max.id = 100000
transaction.remove.expired.transaction.cleanup.interval.ms = 3600000
log.message.downconversion.enable = true
ssl.protocol = TLSv1.3
metadata.log.segment.ms = 604800000
transaction.state.log.load.buffer.size = 5242880
ssl.keystore.location = null
sasl.enabled.mechanisms = GSSAPI
num.replica.alter.log.dirs.threads = null
zookeeper.ssl.cipher.suites = null
group.min.session.timeout.ms = 6000
log.cleaner.io.buffer.load.factor = 0.9
transaction.max.timeout.ms = 900000
metadata.log.dir = null
process.roles =
group.max.size = 2147483647
sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
delegation.token.max.lifetime.ms = 604800000
broker.id = 1
offsets.topic.compression.codec = 0
zookeeper.ssl.endpoint.identification.algorithm = HTTPS
replication.quota.window.num = 11
advertised.listeners = PLAINTEXT://172.17.19.25:19092
queued.max.request.bytes = -1