Kafka - 4!
kafka 책 읽기
카프카 커넥트
- 카프카 오픈소스에 포함된 툴 중 하나
- 데이터 파이프라인 생성 시 반복 작업을 줄이고 효율적인 전송을 이루기 위한 애플리케이션
- 파이프라인 생성 시 자주 반복되는 값들 (토픽 이름, 파일 이름, 테이블 이름 등)을 파라미터로 받는 커넥터를 코드로 작성시 이후에 파이프라인을 실행할 때 코드 작성할 필요 없음
참고: https://bagbokman.tistory.com/8
카프카 커넥트 : Connector를 동작하게 하는 서버
카프카 커넥터 : jar파일
소스, 싱크 커넥터
소스 커넥터
: 커넥트에서프로듀서 역할
을 하는 커넥터싱크 커넥터
: 커넥트에서컨슈머 역할
을 하는 커넥터- 파일을 주고 받는 파일 소스 커넥터 예시
- 파일 소스 커넥터는 파일의 데이터를 카프카 토픽으로 전송하는 프로듀서 역할
- 파일 싱크 커넥터는 토픽의 데이터를 파일로 저장하는 컨슈머 역할
커넥터 종류
- 카프카 2.6에 포함된 커넥트는 미러메이커2, 파일 싱크 커넥터, 파일 소스 커넥터를 기본 플로그인으로 제공
- 오픈소스 : JDBC, 엘라스틱서치, HDFS 커넥터 등 100가지가 넘는 오픈소스 커넥터가 공개 되어있음
커넥터 태스크
- 커넥터는 태스크들을 관리
- 사용자가 커넥트에 커넥터 생성 명령을 내리면 커넥트는 내부에 커넥터와 태스크를 생성
- 태스크는 커넥터에 종속되는 개념으로 실질적인 데이터 처리를 수행
- 데이터 처리를 정상적으로 하는지 확인하기 위해서 각 태스크의 상태를 확인해야함
컨버터, 트랜스폼
- 사용자가 파이프라인을 생성할 때 컨버터와 트랜스폼 기능을 옵션으로 추가 가능
컨버터
: 데이터를 처리 하기 전에 스키마를 변경하도록 도외줌 : JsonConverter, StringConverter, ByteArrayConverter, 커스텀트랜스폼
: 데이터 처리 시 각 메시지 단위로 데이터를 간단하게 변환하기 위한 용도로 사용- Json 데이터를 커넥터에 사용할 때, 트랜스폼을 사용하면 특정 키를 삭제하거나 추가할 수 있음 : 기본 제공 트랜스폼 Cast, Drop 등 존재
커넥트 실행 방법
- 카프카 커넥트를 실행하는 방법은
단일 모드 커넥트
와분산 모드 커넥트
로 2가지가 존재
단일 모드 커넥트
- 1개의 프로세스만으로 실행
- 고가용성이 구성되지 않아 단일 장애점이 될 수 있음
- 중요도가 낮은 파이프라인을 운영할 때 사용
단일 장애점 : https://ko.wikipedia.org/wiki/%EB%8B%A8%EC%9D%BC_%EC%9E%A5%EC%95%A0%EC%A0%90
분산 모드 커넥트
- 2대 이상의 서버에서 클러스터 형태로 운영
- 중단이 되더라도 남은 1개의 커넥트가 파이프라인을 지속적으로 처리 가능
- 데이터 처리량의 변화에서도 유연하게 대응 가능 -> 스케일 아웃하여 처리량 증가 가능
REST API
- REST API를 통해서 현재 실행 중인 커넥트의 커넥터 플러그인 종류, 태스크 상태, 커넥터 상태 등 조회 가능
요청 메서드 | 경로 | 설명 |
---|---|---|
GET | / | 실행 중인 커넥트 정보 확인 |
GET | /connectors | 실행 중인 커넥트 이름 확인 |
POST | /connectors | 새로운 커넥터 생성 요청 |
GET | /connectors/(커넥터 이름) | 실행 중인 커넥터 정보 확인 |
GET | /connectors/(커넥터 이름)/config | 실행중인 커넥터의 설정값 확인 |
PUT | /connectors/(커넥터 이름)/config | 실행중인 커넥터의 설정값 변경 요청 |
GET | /connectors/(커넥터 이름)/status | 실행중인 커넥터 상태 확인 |
POST | /connectors/(커넥터 이름)/restart | 실행 중인 커넥터 재시작 요청 |
PUT | /connectors/(커넥터 이름)/pause | 커넥터 일시 중지 요청 |
PUT | /connectors/(커넥터 이름)/resume | 일시 중지된 커넥터 실행 요청 |
DELETE | /connectors/(커넥터 이름)/ | 실행 중인 커넥터 종료 |
GET | /connectors/(커넥터 이름)/tasks | 실행 중인 커넥터의 태스크 정보 확인 |
GET | /connectors/(커넥터 이름)/tasks/(태스크 아이디)/status | 실행 중인 커넥터의 태스크 상태 확인 |
POST | /connectors/(커넥터 이름)/tasks/(태스크 아이디)/restart | 실행 중인 커넥터의 태스크 재시작 요청 |
GET | /connectors/(커넥터 이름)/topics | 커넥터별 연동된 토픽 정보 확인 |
GET | /connector-plugins/ | 커넥트에 존재하는 커넥터 플러그인 확인 |
PUT | /connector-plugins/(커넥터 플러그인 이름)/config/validate | 커넥터 생성 시 설정값 유효 여부 확인 |
단일 모드 커넥트 설정
- 단일 모드 커넥트는
커넥트 설정파일과 함께 커넥터 설정파일
도 정의하여 실행해야 함
connect-standalone.properties
bootstrap.servers=IP_ADDRESS:9092 # 카프카와 연동할 카프카 클러스터의 주소
key.converter=org.apache.kafka.connect.json.JsonConverter # 데이터를 카프카에 저장할 때, 가져올 때 변환하는데 사용 String, ByteArray Converter 기본 제공
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enables=false
value.converter.schemas.enables=false
offset.storage.file.filename=/tmp/connect.offsets # 단일 모드 커넥트는 로컬 파일에 오프셋 정보를 저장
# 오프셋 정보는 소스, 싱크 커넥터가 데이터 처리 시점을 저장하기 위해 사용
offset.flush.interval.ms=10000 # 태스크가 처리 완료한 오프셋을 커밋하는 주기 설정
plugin.path= # 플러그인 형태로 추가할 커넥터의 디렉토리 주소 - 디렉토리 (,)로 구분 가능
file-source.properties
name=local-file-source # 커넥터의 이름 지정
connector.class=FileStreamSource # 사용할 커넥터의 클래스 이름 지정
tasks.max=1 # 커넥터로 실행할 태스크 개수를 지정
file=test.txt # 읽을 파일의 위치를 지정
topic=connect-text # 읽은 파일의 데이터를 저장할 토픽의 이름 지정
$ bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties
file-sink.properties
name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=test.sink.txt
topics=connect-test
$ bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-sink.properties
- test.txt에 내용 입력
- ./kafka-console-consumer.sh 에서
- test.sink.txt 내용 출력
분산 모드 커넥트 설정
connect-distributed.properties
bootstrap.servers=IP_ADDRESS:9092
group.id=cluster.name
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enables=false
value.converter.schemas.enables=false
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
offset.storage.topic=connect-configs
offset.storage.replication.factor=1
offset.storage.topic=connect-status
offset.storage.replication.factor=1
offset.flush.interval.ms=10000
plugin.path=
$ bin/connect-distributed.sh config/connect-distributed.properties
소스 커넥터
- 소스 애플리케이션 또는 소스 파일로부터 데이터를 가져와 토픽으로 넣는 역할
- 직접 구현한 소스 커넥터를 빌드 해서 jar파일로 만들어 플러그인으로 사용 가능
dependencies{
compile `org.apache.kafka:connect-api:2.5.0`
}
- 소스 커넥터를 만들 때 필요한 클래스
- SourceConnector
- 태스크를 실행하기 전에 커넥터 설정파일을 초기화 및 어떤 태스크 클래스를 사용할 것인지 정의
- SourceTask
- 실제로 데이터를 다루는 클래스로 소스 애플리케이션, 소스 파일로부터 데이터를 가져와 토픽으로 데이터를 보내는 역할 수행
- 자체적으로 사용하는 오프셋을 사용 - 소스 애플리케이션, 소스 파일을 어디까지 읽었는지 저장하는 역할 (중복 방지)
SourceConnector
public class TestSourceConnector extends SourceConnector { // source 커넥터 구현
@Override
public String version() {}
// 커넥터의 버전을 리턴, 커넥트에 포함된 커넥터 플러그인 조회시 이 버전 노출
@Override
public void start(Map<String, String> props) {}
// 사용자가 json, config로 작성한 설정값을 초기화
// 만약 올바른 값이 아닐 때 ConnectException()을 호출하여 커넥터를 종료 가능
// JDBC의 URL
@Override
public Class<? extends Task> taskClass() {}
// 커넥터가 사용할 태스크 클래스 지정
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {}
// 태스크 수가 2개 이상인 경우 태스크 마다 각기 다른 옵션 설정
@Override
public ConfigDef config() {}
// 커넥터가 사용할 설정값에 대한 정보 받기 ( 이름, 기본값, 중요도, 설명 정의 가능 )
@Override
public void stop() {}
// 커넥트가 종료될 때 로직 작성
}
SourceTask
public class TestSourceTask extends SourceTask {
@Override
public String version() {}
// 버전 명시
@Override
public void start(Map<String, String> props) {}
// 태스크가 시작할 때 필요한 로직 작성
// 데이터 처리에 필요한 모든 리소스를 여기서 초기화하는 것이 좋음
// JDBC 소스 커넥터를 구현한다면 JDBC 커넥션을 여기서 맺는다
@Override
public List<SourceRecord> poll() {}
// 소스 애플리케이션 또는 소스 파일로부터 데이터를 읽어오는 로직 작성
// 데이터를 읽어오면 토픽으로 보낼 데이터를 SourceRecord로 정의
// SourceRecord클래스는 토픽으로 데이터를 정의하기 위해 사용
@Override
public void stop() {}
}
싱크 커넥터
-
토픽의 데이터를 타깃 애플리케이션 또는 타깃 파일로 저장하는 역할
-
소스 커넥터를 만들 때 필요한 클래스
- SinkConnector
- 태스크를 실행하기 전에 커넥터 설정파일을 초기화 및 어떤 태스크 클래스를 사용할 것인지 정의
- SinkTask
- 실제 데이터 처리, 컨슈머 역할을 하고 데이터를 저장하는 코드를 가지게
SinkConnector
public class TestSinkConnector extends SinkConnector {
@Override
public String version() {}
// 커넥터의 버전을 리턴, 커넥트에 포함된 커넥터 플러그인 조회시 이 버전 노출
@Override
public void start(Map<String, String> props) {}
// 사용자가 json, config로 작성한 설정값을 초기화
// 만약 올바른 값이 아닐 때 ConnectException()을 호출하여 커넥터를 종료 가능
// JDBC의 URL
@Override
public Class<? extends Task> taskClass() {}
// 커넥터가 사용할 태스크 클래스 지정
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {}
// 태스크 수가 2개 이상인 경우 태스크 마다 각기 다른 옵션 설정
@Override
public ConfigDef config() {}
// 커넥터가 사용할 설정값에 대한 정보 받기 ( 이름, 기본값, 중요도, 설명 정의 가능 )
@Override
public void stop() {}
// 커넥트가 종료될 때 로직 작성
}
SinkTask
public class TestSourceTask extends SinkTask {
@Override
public String version() {}
@Override
public void start(Map<String, String> props) {}
@Override
public void put(Collection<SinkRecord> records) {}
// 싱크 애플리케이션 또는 싱크 파일에 저장할 데이터를 토픽에서 주기적으로 가져오는 메서드
// 토픽의 데이터들은 여러개의 SinkRecord를 묶어 파라미터로 사용할 수 있음
// SinkRecord는 토픽의 한 개 레코드 이며 토픽, 파티션, 타임스탬프 등의 정보를 담고 있음
@Override
public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) {}
// put 메서드를 통해서 가져온 데이터를 일정 주기로 싱크 애플리케이션 또는 싱크 파일에 저장할 때 사용하는 로직
// mysql 에서 put -> insert(), flush -> commit()
@Override
public void stop() {}
}
카프카 미러메이커2
- 미러메이커2는 서로 다른 두 개의
카프카 클러스터 간
에 토픽을 복제하는 애플리케이션 - 토픽의 모든 것을 복제할 필요성이 있기 때문에 사용
- 프로듀서와 컨슈머로 미러링하는 애플리케이션을 만들었을 때의 어려움
- 토픽에 있는 레코드는 고유한 메시지 키, 값, 파티션과 같은 모든 데이터를 완전히 동일하게 옮기는 것은 어려운 작업
- 동일한 파티션에 동일한 레코드가 들어가도록 하는 작업은 복제 전 클러스터에서 사용하던 파티셔너에 대한 정보 없이는 불가능
- 토픽 데이터, 설정, 파티션의 변화, 토픽 설정값의 변화 모두 동기화 가능
미러메이커1의 한계
- 미러메이커1은 토픽 복제에 충분한 기능을 가지고 있지 않았음
- 토픽의 데이터를 복제할 때 기본 파티셔너를 사용하여 복제 전 후로 파티션 정보가 달랐음
- 정확히 한번 전달을 보장하지 못함
- 양방향 토픽 복제 지원하지 못함
clusters = A, B # 복제할 클러스터 닉네임 설정
A.bootstrap.servers = A_host1:9092, A_host2:9092, A_host3:9092
B.bootstrap.servers = B_host1:9092, B_host2:9092, B_host3:9092
A->B.enabled = true
A->B.topics = test
B->A.enabled = false
B->A.topics = .*
replication.factor=1 # 복제되어 신규 생성된 토픽의 복제 개수를 설정
# 토픽 복제에 필요한 데이터를 저장하는 내부 토픽의 복제 개수 설정
checkpoints.topic.replication.factor=1
heartbeats.topic.replication.factor=1
offset-syncs.topic.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
config.storage.replication.factor=1
$ bin/connect-mirror-maker.sh config/connect-mirror-maker.properties
$ ./kafka-console-producer.sh --bootstrap-server a-kafka:19092 --topic test
>asdf
>hello
>sanghyun
>test
>
$ ./kafka-console-consumer.sh --bootstrap-server a-kafka:29092 --topic A.test --from-beginning
asdf
hello
sanghyun
test
미러메이커2를 활용한 지리적 복제
- 미러메이커2가 제공하는 단방향, 양방향 복제 기능, ACL 복제, 새 토픽 자동 감지 등의 기능은 여러개의 클러스타가 있을 때 좋음
액티브-스탠바이 클러스터 운영
- 재해 복구를 위해 임시 카프카 클러스터를 하나 더 구성하는 경우
- 액티브 클러스터의 모든 토픽을 스탠바이 클러스터에 복제하여 액티브 클러스터의 예상치 못한 장애 대응 가능
- 클러스터에 재해가 생기는 경우
- 자연 재해 (홍수, 지진, 허리케인 등)
- 기술적 재해 (EMP공격, 데이터센터의 중단 등)
- 인간에 의한 재해 (사이버 공격, 사보타주 등)
- 위와같이 일본에 위치한 스탠바이 클러스터로 복제하고 있어 시스템 대채 작동하여 서비스의 완전 중단을 막을 수 있음
- 하지만 replication lag이 발생할 수 있어 모든 정보가 복제되지 않을 가능성이 있음
- 이로 인해 대응 방안을 사전에 정하고 운영하는 것이 중요
액티브-액티브 클러스터 운영
- 글로벌 서비스를 운영할 경우 서비스 애플리케이션의 통신 지연을 최소화하기 위해 2개 이상의 클러스터를 두고 서로 데이터를 미러링하며 사용할 때 사용
- 글로벌 소셜 네트워크 서비스에서 한국에 있는 유저와 영국에 있는 유저 커뮤니케이션 데이터를 클러스터에 저장하는 것을 가정
- 물리적으로 멀리 떨어진 곳에 있는 두 명의 유저 데이터를 저장하고 사용하는 방법으로 각 지역마다 클러스터를 두고 필요한 데이터를 복제하여 사용하는 방법을 사용 가능
- 해당 지역의 유저가 관련 데이터를 조회할 때 데이터의 지연을 줄일 수 있음
허브 앤 스포크 클러스터 운영
- 각 팀에서 소규모 카프카 클러스터를 사용하고 있을 때
- 각 팀의 카프카 클러스터의 데이터를 한 개의 카프카 클러스터에 모아 데이터 레이크로 사용하고 싶을 때 사용
- 데이터를 수집하고 데이터 레이크용 카프카 클러스터에서 가공, 분석하여 가치 있는 데이터를 찾아낼 수 있음