카프카 스트림즈
- 카프카에서 공식적으로 지원
- 토픽에 적재된 데이터를 상태기방 또는 비상태기반으로 실시간 변환하여 다른 토픽에 적재하는 라이브러리
- (아파치 스파크, 플링크, 스톰, 플루언트디)
- 신규 토픽 생성, 상태 저장, 데이터 조인을 제공
- 스트림즈 애플리케이션 또는 카프카 브로커의 장애가 발생해도 정확히 한번할 수 있도록 장애 허용 시스템 제공 - 안정성이 매우 뛰어남
- 내부적으로 스레드를 1개 이상 생성할 수 있으며 1개이상의 태스크를 가짐
- 태스크 : 스트림즈 애플리케이션을 실행하는 데이터 처리 최소 단위 ( 컨슈머의 병렬처리를 위해 컨슈머 그룹으로 이루어진 컨슈머 스레드를 여러 개 실행하는 것과 비슷
- 안정적 운영을 위해서 2개이상의 서버로 구성하여 스트림즈 애플리케이션을 사용하는 것을 권장
토폴로지와 카프카 스트림즈
토폴로지란?
- 토폴로지란 2개 이상의 노드들과 선으로 이루어진 집합을 뜻
- 링형, 트리형, 성형 -> 스트림즈는 트리 형태와 유사
카프카 스트림즈에서 토폴로지란?
- 토폴로지를 이루는 노드를 하나의 프로세서, 노드와 노드를 이은 선을 스트림 이라고 함
- 소스 프로세서 : 데이터를 처리하기 위해 최초로 선언해야하는 노드로, 하나 이상의 토픽에서 데이터를 가져오는 역할
- 스트림 프로세서 : 다른 프로세서가 반환한 데이터를 처리하는 역할. 변환, 분기처리와 같은 로직
- 싱크 프로세서 : 데이터를 특정 카프카 토픽으로 저장하는 역할로 데이터의 최종 종착지
스트림즈DSL
- 레코드의 흐름을 추상화한 3가지 개념인
KStream, KTable, GlobalKTable
오직 스트림즈DSL에서만 사용되는 개념
KStream
- 레코드의 흐름을 표현한 것으로
메시지 키
와 메시지 값
으로 구성됨
- 데이터 조회시 토픽에 존재하는 모든 레코드가 출력됨
- 컨슈머로 토픽을 구독하는 것과 동일한 선상에서 사용하는 것이라고 볼 수 있음
KTable
메시지 키
를 기준으로 묶어 사용
- KStream은 토픽의 모든 레코드를 조회 가능 하지만 KTable은
유니크한 메시지 키를 기준
으로 가장 최신 레코드를 사용
GlobalKTable
메시지 키
를 기준으로 묶어 사용
- KTable로 선언된 토픽은 1개 파티션이 1개 태스크에 할당되어 사용되지만,
- GlobalKTable로 선언된 토픽은 모든 파티션 데이터가 각 태스크에 할당되어 사용 됨
코파티셔닝
- GlobalKTable을 설명하기 좋은 예는 KStream과 KTable이 데이터 조인을 수행할 때
- 조인을 위해서는 코파티셔닝이 되어 있어야함
코파티셔닝
: 조인을 하는 2개 데이터의 파티션 개수가 동일하고 파티셔닝 전략을 동일하게 맞추는 작업
- 이 경우에만 조인을 수행할 수 있음
- 파티션의 개수가 다를 때
TopologyException
발생
리파티셔닝
- 코파티셔닝이 되어있지 않을 때 리파티셔닝을 함.
리파티셔닝
: 새로운 토픽에 새로운 메시지 키를 가지도록 재배열하는 과정
- 리파티셔닝을 통해 KStream 토픽과 KTable로 사용하는 토픽이 코파티셔닝되도록 할 수 있음
GlobalKTable과 코파티셔닝, 리파티셔닝
- 코파티셔닝 되지 않은 KStream, KTable을 조인해서 사용하기 위해서 GlobalKTable을 사용할 수 있음
- GlobalKTable은 KTable과 달리 스트림즈 애플리케이션의 모든 태스크에 동일하게 공유되어 사용되기 때문
- 로컬 스토리지의 사용량이 증가하고 네트워크, 브로커에 부하 발생 가능성이 있을 수 있으므로 되도록 작은 용량의 데이터일 경우에만 사용을 권장
- 많은 양의 데이터를 처리할 경우 리파티셔닝을 통해 KTable을 사용하는 것을
스트림즈DSL 주요 옵션
필수 옵션
- bootstrap.server
- application.id : 스트림즈 애플리케이션을 구분하기 위한 고유한 아이디 설정, 다른 스트림즈 애플리케이션에서는 다른 값을 설정해야함
선택 옵션
- default.key.serde : 레코드의 메시지 키를 직렬화, 역직렬화하는 클래스를 지정 (기본값 : Serdes.ByteArray().gerClass().getName())
- default.value.serde : 레코드의 메시지 값을 직렬화, 역직렬화하는 클래스를 지정 (기본값 : Serdes.ByteArray().gerClass().getName())
- num.stream.threads : 스트림 프로세싱 실행 시 실행될 스레드 개수를 지정 (기본값 : 1)
- state.dir : rocksDB 저장소가 위치할 디렉토리
- rocksDB : 페이스북이 개발한 Key-Value DB
- 카프카 스트림즈가 상태기반 데이터를 처리할 때 로컬 저장소로 사용
- (기본값 : /tmp/kafka-streams)
프로세서API
- 토폴로지를 기준으로 데이터를 처리하는 관점에서 동일한 역할
- 스트림즈DSL에 대해 추가적인 상세 로직의 구현이 필요하면 프로세서API를 활용 가능
- KStream, KTable, GlobalKTable의 개념이 없음