카프카 커넥트

  • 카프카 오픈소스에 포함된 툴 중 하나
  • 데이터 파이프라인 생성 시 반복 작업을 줄이고 효율적인 전송을 이루기 위한 애플리케이션
    • 파이프라인 생성 시 자주 반복되는 값들 (토픽 이름, 파일 이름, 테이블 이름 등)을 파라미터로 받는 커넥터를 코드로 작성시 이후에 파이프라인을 실행할 때 코드 작성할 필요 없음

image

참고: https://bagbokman.tistory.com/8

카프카 커넥트 : Connector를 동작하게 하는 서버

카프카 커넥터 : jar파일

소스, 싱크 커넥터

  • 소스 커넥터 : 커넥트에서 프로듀서 역할을 하는 커넥터
  • 싱크 커넥터 : 커넥트에서 컨슈머 역할을 하는 커넥터
  • 파일을 주고 받는 파일 소스 커넥터 예시
    • 파일 소스 커넥터는 파일의 데이터를 카프카 토픽으로 전송하는 프로듀서 역할
    • 파일 싱크 커넥터는 토픽의 데이터를 파일로 저장하는 컨슈머 역할

커넥터 종류

  • 카프카 2.6에 포함된 커넥트는 미러메이커2, 파일 싱크 커넥터, 파일 소스 커넥터를 기본 플로그인으로 제공
  • 오픈소스 : JDBC, 엘라스틱서치, HDFS 커넥터 등 100가지가 넘는 오픈소스 커넥터가 공개 되어있음

image

커넥터 태스크

  • 커넥터는 태스크들을 관리
  • 사용자가 커넥트에 커넥터 생성 명령을 내리면 커넥트는 내부에 커넥터와 태스크를 생성
  • 태스크는 커넥터에 종속되는 개념으로 실질적인 데이터 처리를 수행
  • 데이터 처리를 정상적으로 하는지 확인하기 위해서 각 태스크의 상태를 확인해야함

image

컨버터, 트랜스폼

  • 사용자가 파이프라인을 생성할 때 컨버터와 트랜스폼 기능을 옵션으로 추가 가능
  • 컨버터 : 데이터를 처리 하기 전에 스키마를 변경하도록 도외줌 : JsonConverter, StringConverter, ByteArrayConverter, 커스텀
  • 트랜스폼 : 데이터 처리 시 각 메시지 단위로 데이터를 간단하게 변환하기 위한 용도로 사용
    • Json 데이터를 커넥터에 사용할 때, 트랜스폼을 사용하면 특정 키를 삭제하거나 추가할 수 있음 : 기본 제공 트랜스폼 Cast, Drop 등 존재

커넥트 실행 방법

  • 카프카 커넥트를 실행하는 방법은 단일 모드 커넥트분산 모드 커넥트로 2가지가 존재

단일 모드 커넥트

  • 1개의 프로세스만으로 실행
  • 고가용성이 구성되지 않아 단일 장애점이 될 수 있음
  • 중요도가 낮은 파이프라인을 운영할 때 사용

단일 장애점 : https://ko.wikipedia.org/wiki/%EB%8B%A8%EC%9D%BC_%EC%9E%A5%EC%95%A0%EC%A0%90

image

분산 모드 커넥트

  • 2대 이상의 서버에서 클러스터 형태로 운영
    • 중단이 되더라도 남은 1개의 커넥트가 파이프라인을 지속적으로 처리 가능
  • 데이터 처리량의 변화에서도 유연하게 대응 가능 -> 스케일 아웃하여 처리량 증가 가능

image

REST API

  • REST API를 통해서 현재 실행 중인 커넥트의 커넥터 플러그인 종류, 태스크 상태, 커넥터 상태 등 조회 가능
요청 메서드 경로 설명
GET / 실행 중인 커넥트 정보 확인
GET /connectors 실행 중인 커넥트 이름 확인
POST /connectors 새로운 커넥터 생성 요청
GET /connectors/(커넥터 이름) 실행 중인 커넥터 정보 확인
GET /connectors/(커넥터 이름)/config 실행중인 커넥터의 설정값 확인
PUT /connectors/(커넥터 이름)/config 실행중인 커넥터의 설정값 변경 요청
GET /connectors/(커넥터 이름)/status 실행중인 커넥터 상태 확인
POST /connectors/(커넥터 이름)/restart 실행 중인 커넥터 재시작 요청
PUT /connectors/(커넥터 이름)/pause 커넥터 일시 중지 요청
PUT /connectors/(커넥터 이름)/resume 일시 중지된 커넥터 실행 요청
DELETE /connectors/(커넥터 이름)/ 실행 중인 커넥터 종료
GET /connectors/(커넥터 이름)/tasks 실행 중인 커넥터의 태스크 정보 확인
GET /connectors/(커넥터 이름)/tasks/(태스크 아이디)/status 실행 중인 커넥터의 태스크 상태 확인
POST /connectors/(커넥터 이름)/tasks/(태스크 아이디)/restart 실행 중인 커넥터의 태스크 재시작 요청
GET /connectors/(커넥터 이름)/topics 커넥터별 연동된 토픽 정보 확인
GET /connector-plugins/ 커넥트에 존재하는 커넥터 플러그인 확인
PUT /connector-plugins/(커넥터 플러그인 이름)/config/validate 커넥터 생성 시 설정값 유효 여부 확인

단일 모드 커넥트 설정

  • 단일 모드 커넥트는 커넥트 설정파일과 함께 커넥터 설정파일도 정의하여 실행해야 함

connect-standalone.properties


bootstrap.servers=IP_ADDRESS:9092                                # 카프카와 연동할 카프카 클러스터의 주소
key.converter=org.apache.kafka.connect.json.JsonConverter        # 데이터를 카프카에 저장할 때, 가져올 때 변환하는데 사용 String, ByteArray Converter 기본 제공
value.converter=org.apache.kafka.connect.json.JsonConverter      
key.converter.schemas.enables=false
value.converter.schemas.enables=false

offset.storage.file.filename=/tmp/connect.offsets                # 단일 모드 커넥트는 로컬 파일에 오프셋 정보를 저장
# 오프셋 정보는 소스, 싱크 커넥터가 데이터 처리 시점을 저장하기 위해 사용

offset.flush.interval.ms=10000       # 태스크가 처리 완료한 오프셋을 커밋하는 주기 설정
plugin.path=                         # 플러그인 형태로 추가할 커넥터의 디렉토리 주소 - 디렉토리 (,)로 구분 가능

file-source.properties

name=local-file-source              # 커넥터의 이름 지정
connector.class=FileStreamSource    # 사용할 커넥터의 클래스 이름 지정
tasks.max=1                         # 커넥터로 실행할 태스크 개수를 지정
file=test.txt                  # 읽을 파일의 위치를 지정
topic=connect-text                  # 읽은 파일의 데이터를 저장할 토픽의 이름 지정
$ bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties

file-sink.properties

name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=test.sink.txt
topics=connect-test
$ bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-sink.properties
  • test.txt에 내용 입력

image

  • ./kafka-console-consumer.sh 에서

image

  • test.sink.txt 내용 출력

image

분산 모드 커넥트 설정

connect-distributed.properties


bootstrap.servers=IP_ADDRESS:9092                               

group.id=cluster.name

key.converter=org.apache.kafka.connect.json.JsonConverter       
value.converter=org.apache.kafka.connect.json.JsonConverter      
key.converter.schemas.enables=false
value.converter.schemas.enables=false

offset.storage.topic=connect-offsets
offset.storage.replication.factor=1

offset.storage.topic=connect-configs
offset.storage.replication.factor=1

offset.storage.topic=connect-status
offset.storage.replication.factor=1

offset.flush.interval.ms=10000                                  
plugin.path=                                                   

$ bin/connect-distributed.sh config/connect-distributed.properties

소스 커넥터

  • 소스 애플리케이션 또는 소스 파일로부터 데이터를 가져와 토픽으로 넣는 역할
  • 직접 구현한 소스 커넥터를 빌드 해서 jar파일로 만들어 플러그인으로 사용 가능
dependencies{
  compile `org.apache.kafka:connect-api:2.5.0`
}
  • 소스 커넥터를 만들 때 필요한 클래스
  1. SourceConnector
    • 태스크를 실행하기 전에 커넥터 설정파일을 초기화 및 어떤 태스크 클래스를 사용할 것인지 정의
  2. SourceTask
    • 실제로 데이터를 다루는 클래스로 소스 애플리케이션, 소스 파일로부터 데이터를 가져와 토픽으로 데이터를 보내는 역할 수행
    • 자체적으로 사용하는 오프셋을 사용 - 소스 애플리케이션, 소스 파일을 어디까지 읽었는지 저장하는 역할 (중복 방지)

SourceConnector

public class TestSourceConnector extends SourceConnector {           // source 커넥터 구현

    @Override
    public String version() {}                                       
    // 커넥터의 버전을 리턴, 커넥트에 포함된 커넥터 플러그인 조회시 이 버전 노출

    @Override
    public void start(Map<String, String> props) {}                  
    // 사용자가 json, config로 작성한 설정값을 초기화 
    // 만약 올바른 값이 아닐 때 ConnectException()을 호출하여 커넥터를 종료 가능
    // JDBC의 URL

    @Override
    public Class<? extends Task> taskClass() {}
    // 커넥터가 사용할 태스크 클래스 지정

    @Override
    public List<Map<String, String>> taskConfigs(int maxTasks) {}
    // 태스크 수가 2개 이상인 경우 태스크 마다 각기 다른 옵션 설정

    @Override
    public ConfigDef config() {}
    // 커넥터가 사용할 설정값에 대한 정보 받기 ( 이름, 기본값, 중요도, 설명 정의 가능 )

    @Override
    public void stop() {}
    // 커넥트가 종료될 때 로직 작성
}

SourceTask

public class TestSourceTask extends SourceTask {


    @Override
    public String version() {}
    // 버전 명시
    
    @Override
    public void start(Map<String, String> props) {}
    // 태스크가 시작할 때 필요한 로직 작성
    // 데이터 처리에 필요한 모든 리소스를 여기서 초기화하는 것이 좋음
    // JDBC 소스 커넥터를 구현한다면 JDBC 커넥션을 여기서 맺는다

    @Override
    public List<SourceRecord> poll() {}
    // 소스 애플리케이션 또는 소스 파일로부터 데이터를 읽어오는 로직 작성
    // 데이터를 읽어오면 토픽으로 보낼 데이터를 SourceRecord로 정의
    // SourceRecord클래스는 토픽으로 데이터를 정의하기 위해 사용

    @Override
    public void stop() {}
}

싱크 커넥터

  • 토픽의 데이터를 타깃 애플리케이션 또는 타깃 파일로 저장하는 역할

  • 소스 커넥터를 만들 때 필요한 클래스

    1. SinkConnector
    • 태스크를 실행하기 전에 커넥터 설정파일을 초기화 및 어떤 태스크 클래스를 사용할 것인지 정의
  1. SinkTask
    • 실제 데이터 처리, 컨슈머 역할을 하고 데이터를 저장하는 코드를 가지게

SinkConnector

public class TestSinkConnector extends SinkConnector {  

    @Override
    public String version() {}                                       
    // 커넥터의 버전을 리턴, 커넥트에 포함된 커넥터 플러그인 조회시 이 버전 노출

    @Override
    public void start(Map<String, String> props) {}                  
    // 사용자가 json, config로 작성한 설정값을 초기화 
    // 만약 올바른 값이 아닐 때 ConnectException()을 호출하여 커넥터를 종료 가능
    // JDBC의 URL

    @Override
    public Class<? extends Task> taskClass() {}
    // 커넥터가 사용할 태스크 클래스 지정

    @Override
    public List<Map<String, String>> taskConfigs(int maxTasks) {}
    // 태스크 수가 2개 이상인 경우 태스크 마다 각기 다른 옵션 설정

    @Override
    public ConfigDef config() {}
    // 커넥터가 사용할 설정값에 대한 정보 받기 ( 이름, 기본값, 중요도, 설명 정의 가능 )

    @Override
    public void stop() {}
    // 커넥트가 종료될 때 로직 작성
}

SinkTask

public class TestSourceTask extends SinkTask {


    @Override
    public String version() {}
    
    @Override
    public void start(Map<String, String> props) {}

    @Override
    public void put(Collection<SinkRecord> records) {}
    // 싱크 애플리케이션 또는 싱크 파일에 저장할 데이터를 토픽에서 주기적으로 가져오는 메서드
    // 토픽의 데이터들은 여러개의 SinkRecord를 묶어 파라미터로 사용할 수 있음
    // SinkRecord는 토픽의 한 개 레코드 이며 토픽, 파티션, 타임스탬프 등의 정보를 담고 있음
    
    @Override
    public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) {}
    // put 메서드를 통해서 가져온 데이터를 일정 주기로 싱크 애플리케이션 또는 싱크 파일에 저장할 때 사용하는 로직
    // mysql 에서 put -> insert(), flush -> commit()

    @Override
    public void stop() {}
}

카프카 미러메이커2

  • 미러메이커2는 서로 다른 두 개의 카프카 클러스터 간에 토픽을 복제하는 애플리케이션
  • 토픽의 모든 것을 복제할 필요성이 있기 때문에 사용
    • 프로듀서와 컨슈머로 미러링하는 애플리케이션을 만들었을 때의 어려움
    • 토픽에 있는 레코드는 고유한 메시지 키, 값, 파티션과 같은 모든 데이터를 완전히 동일하게 옮기는 것은 어려운 작업
    • 동일한 파티션에 동일한 레코드가 들어가도록 하는 작업은 복제 전 클러스터에서 사용하던 파티셔너에 대한 정보 없이는 불가능
  • 토픽 데이터, 설정, 파티션의 변화, 토픽 설정값의 변화 모두 동기화 가능

미러메이커1의 한계

  • 미러메이커1은 토픽 복제에 충분한 기능을 가지고 있지 않았음
  • 토픽의 데이터를 복제할 때 기본 파티셔너를 사용하여 복제 전 후로 파티션 정보가 달랐음
  • 정확히 한번 전달을 보장하지 못함
  • 양방향 토픽 복제 지원하지 못함
clusters = A, B  # 복제할 클러스터 닉네임 설정

A.bootstrap.servers = A_host1:9092, A_host2:9092, A_host3:9092
B.bootstrap.servers = B_host1:9092, B_host2:9092, B_host3:9092


A->B.enabled = true
A->B.topics = test

B->A.enabled = false
B->A.topics = .*

replication.factor=1  # 복제되어 신규 생성된 토픽의 복제 개수를 설정

# 토픽 복제에 필요한 데이터를 저장하는 내부 토픽의 복제 개수 설정
checkpoints.topic.replication.factor=1
heartbeats.topic.replication.factor=1
offset-syncs.topic.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
config.storage.replication.factor=1

image

$ bin/connect-mirror-maker.sh config/connect-mirror-maker.properties
$ ./kafka-console-producer.sh --bootstrap-server a-kafka:19092 --topic test
>asdf
>hello
>sanghyun
>test
>
$ ./kafka-console-consumer.sh --bootstrap-server a-kafka:29092 --topic A.test --from-beginning
asdf
hello
sanghyun
test

미러메이커2를 활용한 지리적 복제

  • 미러메이커2가 제공하는 단방향, 양방향 복제 기능, ACL 복제, 새 토픽 자동 감지 등의 기능은 여러개의 클러스타가 있을 때 좋음

액티브-스탠바이 클러스터 운영

  • 재해 복구를 위해 임시 카프카 클러스터를 하나 더 구성하는 경우
  • 액티브 클러스터의 모든 토픽을 스탠바이 클러스터에 복제하여 액티브 클러스터의 예상치 못한 장애 대응 가능

image

  • 클러스터에 재해가 생기는 경우
    1. 자연 재해 (홍수, 지진, 허리케인 등)
    2. 기술적 재해 (EMP공격, 데이터센터의 중단 등)
    3. 인간에 의한 재해 (사이버 공격, 사보타주 등)

image

  • 위와같이 일본에 위치한 스탠바이 클러스터로 복제하고 있어 시스템 대채 작동하여 서비스의 완전 중단을 막을 수 있음
  • 하지만 replication lag이 발생할 수 있어 모든 정보가 복제되지 않을 가능성이 있음
  • 이로 인해 대응 방안을 사전에 정하고 운영하는 것이 중요

액티브-액티브 클러스터 운영

  • 글로벌 서비스를 운영할 경우 서비스 애플리케이션의 통신 지연을 최소화하기 위해 2개 이상의 클러스터를 두고 서로 데이터를 미러링하며 사용할 때 사용

image

  • 글로벌 소셜 네트워크 서비스에서 한국에 있는 유저와 영국에 있는 유저 커뮤니케이션 데이터를 클러스터에 저장하는 것을 가정
  • 물리적으로 멀리 떨어진 곳에 있는 두 명의 유저 데이터를 저장하고 사용하는 방법으로 각 지역마다 클러스터를 두고 필요한 데이터를 복제하여 사용하는 방법을 사용 가능
  • 해당 지역의 유저가 관련 데이터를 조회할 때 데이터의 지연을 줄일 수 있음

image

허브 앤 스포크 클러스터 운영

  • 각 팀에서 소규모 카프카 클러스터를 사용하고 있을 때
  • 각 팀의 카프카 클러스터의 데이터를 한 개의 카프카 클러스터에 모아 데이터 레이크로 사용하고 싶을 때 사용

image

  • 데이터를 수집하고 데이터 레이크용 카프카 클러스터에서 가공, 분석하여 가치 있는 데이터를 찾아낼 수 있음