카프카 스트림즈

  • 카프카에서 공식적으로 지원
  • 토픽에 적재된 데이터를 상태기방 또는 비상태기반으로 실시간 변환하여 다른 토픽에 적재하는 라이브러리
  • (아파치 스파크, 플링크, 스톰, 플루언트디)
  • 신규 토픽 생성, 상태 저장, 데이터 조인을 제공
  • 스트림즈 애플리케이션 또는 카프카 브로커의 장애가 발생해도 정확히 한번할 수 있도록 장애 허용 시스템 제공 - 안정성이 매우 뛰어남
  • 내부적으로 스레드를 1개 이상 생성할 수 있으며 1개이상의 태스크를 가짐
    • 태스크 : 스트림즈 애플리케이션을 실행하는 데이터 처리 최소 단위 ( 컨슈머의 병렬처리를 위해 컨슈머 그룹으로 이루어진 컨슈머 스레드를 여러 개 실행하는 것과 비슷
  • 안정적 운영을 위해서 2개이상의 서버로 구성하여 스트림즈 애플리케이션을 사용하는 것을 권장

토폴로지와 카프카 스트림즈

토폴로지란?

  • 토폴로지란 2개 이상의 노드들과 선으로 이루어진 집합을 뜻
  • 링형, 트리형, 성형 -> 스트림즈는 트리 형태와 유사

카프카 스트림즈에서 토폴로지란?

  • 토폴로지를 이루는 노드를 하나의 프로세서, 노드와 노드를 이은 선을 스트림 이라고 함
  • 소스 프로세서 : 데이터를 처리하기 위해 최초로 선언해야하는 노드로, 하나 이상의 토픽에서 데이터를 가져오는 역할
  • 스트림 프로세서 : 다른 프로세서가 반환한 데이터를 처리하는 역할. 변환, 분기처리와 같은 로직
  • 싱크 프로세서 : 데이터를 특정 카프카 토픽으로 저장하는 역할로 데이터의 최종 종착지

스트림즈DSL

  • 레코드의 흐름을 추상화한 3가지 개념인 KStream, KTable, GlobalKTable 오직 스트림즈DSL에서만 사용되는 개념

KStream

  • 레코드의 흐름을 표현한 것으로 메시지 키메시지 값으로 구성됨
  • 데이터 조회시 토픽에 존재하는 모든 레코드가 출력됨
  • 컨슈머로 토픽을 구독하는 것과 동일한 선상에서 사용하는 것이라고 볼 수 있음

KTable

  • 메시지 키를 기준으로 묶어 사용
  • KStream은 토픽의 모든 레코드를 조회 가능 하지만 KTable은 유니크한 메시지 키를 기준으로 가장 최신 레코드를 사용

GlobalKTable

  • 메시지 키를 기준으로 묶어 사용
    • KTable로 선언된 토픽은 1개 파티션이 1개 태스크에 할당되어 사용되지만,
    • GlobalKTable로 선언된 토픽은 모든 파티션 데이터가 각 태스크에 할당되어 사용 됨

코파티셔닝

  • GlobalKTable을 설명하기 좋은 예는 KStream과 KTable이 데이터 조인을 수행할 때
  • 조인을 위해서는 코파티셔닝이 되어 있어야함
    • 코파티셔닝 : 조인을 하는 2개 데이터의 파티션 개수가 동일하고 파티셔닝 전략을 동일하게 맞추는 작업
    • 이 경우에만 조인을 수행할 수 있음
    • 파티션의 개수가 다를 때 TopologyException 발생

리파티셔닝

  • 코파티셔닝이 되어있지 않을 때 리파티셔닝을 함.
    • 리파티셔닝 : 새로운 토픽에 새로운 메시지 키를 가지도록 재배열하는 과정
  • 리파티셔닝을 통해 KStream 토픽과 KTable로 사용하는 토픽이 코파티셔닝되도록 할 수 있음

GlobalKTable과 코파티셔닝, 리파티셔닝

  • 코파티셔닝 되지 않은 KStream, KTable을 조인해서 사용하기 위해서 GlobalKTable을 사용할 수 있음
    • GlobalKTable은 KTable과 달리 스트림즈 애플리케이션의 모든 태스크에 동일하게 공유되어 사용되기 때문
  • 로컬 스토리지의 사용량이 증가하고 네트워크, 브로커에 부하 발생 가능성이 있을 수 있으므로 되도록 작은 용량의 데이터일 경우에만 사용을 권장
  • 많은 양의 데이터를 처리할 경우 리파티셔닝을 통해 KTable을 사용하는 것을

스트림즈DSL 주요 옵션

필수 옵션

  • bootstrap.server
  • application.id : 스트림즈 애플리케이션을 구분하기 위한 고유한 아이디 설정, 다른 스트림즈 애플리케이션에서는 다른 값을 설정해야함

선택 옵션

  • default.key.serde : 레코드의 메시지 키를 직렬화, 역직렬화하는 클래스를 지정 (기본값 : Serdes.ByteArray().gerClass().getName())
  • default.value.serde : 레코드의 메시지 값을 직렬화, 역직렬화하는 클래스를 지정 (기본값 : Serdes.ByteArray().gerClass().getName())
  • num.stream.threads : 스트림 프로세싱 실행 시 실행될 스레드 개수를 지정 (기본값 : 1)
  • state.dir : rocksDB 저장소가 위치할 디렉토리
    • rocksDB : 페이스북이 개발한 Key-Value DB
    • 카프카 스트림즈가 상태기반 데이터를 처리할 때 로컬 저장소로 사용
    • (기본값 : /tmp/kafka-streams)

프로세서API

  • 토폴로지를 기준으로 데이터를 처리하는 관점에서 동일한 역할
  • 스트림즈DSL에 대해 추가적인 상세 로직의 구현이 필요하면 프로세서API를 활용 가능
  • KStream, KTable, GlobalKTable의 개념이 없음