03.카프카 스트림즈 관리
Part 3. 카프카 스트림즈 관리
Table of contents
기본적인 카프카 모니터링
인터셉터
메트릭
카프카 스트림즈 디버깅 기술
7장. 모니터링과 성능
기본적인 카프카 모니터링
카프카 스트림즈 API는 카프카의 일부이므로 애플리케이션을 모니터링할 때 카프카도 일부 모니터링해야 한다.
.
👉🏻 컨슈머와 프로듀서 성능 측정
프로듀서와 컨슈머의 성능은 처리량과 관련이 있다.
프로듀서의 경우 프로듀서가 얼마나 빠르게 브로커로 메시지를 보내느냐가 큰 관심사분명 처리량이 높으면 높을수록 더 낮다
컨슈머의 경우 브로커에서 얼마나 빠르게 메시지를 읽을 수 있느냐가 성능에 영향컨슈머 성능을 측정하는 또 다른 방법은 컨슈머 지연
프로듀서가 브로커에 기록하는 속도와 컨슈머가 메시지를 읽는 속도 차이를
컨슈머 지연이라고 부른다
.
👉🏻 컨슈머 지연 확인하기
컨슈머 지연을 확인하기 위해 카프카는 편리한 명령줄 도구를 제공
kafka-consumer-groups.sh
활성화 상태의 모든 컨슈머 그룹을 찾기 위해
list명령을 사용
조회할 컨슈머 그룹 이름을 선택하고 다음 명령을 실행
작은 지연이나 일정한 지연은 문제가 안 되지만, 시간이 지남에 따라 계속 증가하는 지연은 컨슈머에게 더 많은 리소스를 제공해야 한다
.
👉🏻 프로듀서와 컨슈머 가로채기
인터셉터는 디버깅을 위한 일반적인 제일선 도구는 아니지만 카프카 스트리밍 애플리케이션의 동작을 관찰하는 데 유용할 수 있으며, 자신만의 도구 세트에 추가할 만한 유용한 도구이다.인터셉터를 사용하는 좋은 예제는 카프카 스트림즈 애플리케이션이 카프카 토픽으로 다시 생산하는 메시지 오프셋을 추적하는 데 사용
컨슈머 인터셉터
컨슈머 인터셉터는 가로채기를 위해 두 가지 접근점을 제공
1️⃣ ConsumerInterceptor.onConsume()
브로커에서 조회한 시점과
Consumer.poll()메소드가 메시지를 반환하기 전ConsumerRecords에서 읽는다.ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG를 통해 하나 이상의ConsumerInterceptor구현자 클래스의 컬렉션으로 지정
2️⃣ ConsumerInterceptor.onCommit()
컨슈머가 브로커에게 오프셋을 커밋하면 브로커는 토픽, 파티션 및 커밋된 오프셋과 관련된 메타데이터와 함께 정보가 포함된
Map<TopicPartition, OffsetAndMetadata>를 반환
로깅 목적으로 사용되는 간단한 ConsumerInterceptor 예시
프로듀서 인터셉터
ProducerInterceptor.onSend()및ProducerInterceptor.onAcknowledgement()두 가지 접근 지점이 존재체인상에 있는 각 프로듀서 인터셉터는 이전 인터셉터에서 반환된 객체를 받는다.
간단히 로깅하는 ProducerInterceptor 예제
ProducerInterceptor는 ProducerConfig.INTERCEPTOR_CLASSES_CONFIG에 지정하고, 하나 이상의 ProducerInterceptor 클래스의 컬렉션으로 설정
✅ 참고
카프카 스트림즈 애플리케이션에서 인터셉터를 구성할 때 컨슈머와 프로듀서 인터셉터의 속성 이름의 프리픽스를
StreamsConfig.consumerPrefix(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG)와
StreamsConfig.producerPrefix(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG)로 지정해야 한다.
부수적으로 인터셉터는 카프카 스트림즈 애플리케이션의 모든 레코드에서 작동하므로 로깅 인터셉터의 출력이 중요하다.
인터셉터 결과는 소스 코드를 설치한 로그 디렉토리에 있는 consumer_interceptor.log 및 producer_interceptor.log 로 출력
애플리케이션 메트릭
메트릭 카테고리 살펴보기
스레드 메트릭
평균 커밋, 폴링, 처리 작업 시간
초당 생성한 태스크 수, 초당 종료된 태스크 수
태스크 메트릭
초당 평균 커밋 횟수
평균 커밋 시간
프로세서 노드 메트릭
평균 및 최대 처리 시간
초당 평균 처리 작업 수
포워드 레이트
상태 저장소 메트릭
put, get, flush 작업의 평균 실행 시간
put, get, flush 작업의 초당 평균 실행 횟수
Monitor Kafka Streams Applications in Confluent
.
👉🏻 메트릭 구성
카프카 스트림즈는 이미 성능 메트릭을 수집하는 메커니즘을 제공
설정한 레벨에 따른 간으한 메트릭
메트릭 수집의 기본 레벨은 INFO
스레드
O
O
태스크
O
프로세서 노드
O
상태 저장소
O
레코드 캐시
O
메트릭을 위한 구성 변경
카프카 스트림즈 애플리케이션의 전체 범위를 측정하는 기본 메트릭이 있으며, DEBUG 레벨에서 메트릭 수집을 설정하려면 그 전에 성능 영향을 신중하게 고려해야 함
.
👉🏻 수집한 메트릭 확인 방법
카프카 스트림즈 애플리케이션의 메트릭을 수집하면 메트릭 리포터에게 배포
기본 메트릭 리포터를 JMX(Java Management Extensions)를 통해 제공
.
👉🏻 JMX(Java Management Extensions) 사용
자바 VM에서 실행되는 프로그램의 동작을 보는 표준 방법
JMX를 사용해 자바 VM 성능도 확인 가능
즉, JMX는 실행 중인 프로그램의 일부를 노출하는 인프라를 제공
추가적인 카프카 스트림즈 디버깅 기술
👉🏻 애플리케이션 구조 조회
Topology.describe()메소드는 애플리케이션 구조에 관한 일반적인 정보를 제공

애플리케이션에서 실행 시간 정보를 보여주는
StreamThread객체에 관한 정보를 얻는 것도 유용KafkaStreams.localThreadsMetadata() 메소드 사용
.
👉🏻 다양한 애플리케이션 상태 알림 받기
StateListener 사용
카프카 스트림즈 애플리케이션에서 가능한 여섯 가지 유효한 상태를 보여준다.
카프카 스트림즈 애플리케이션의 상태

상태 리스토어 리스너
카프카 스트림즈에서는 상태 저장소의 백업으로
변경로그토픽을 사용변경로그는 변경이 발생한 상태 저장소의 업데이트를 기록
카프카 스트림즈 애플리케이션이 실패하거나 재시작할 때, 상태 저장소는 로컬 상태 파일에서 복구 가능
StateListener와 흡사한
StateRestoreListener인터페이스는 애플리케이션 내부에서 일어나는 일들에 대한 알림을 허용onRestoreStart,onBatchRestored,onRestoreEnd세 가지 메소드 존재
내부 컨슈머를 사용해 번경로그 토픽을 읽으므로 애플리케이션이 각 consumer.poll() 메소드 호출에서 레코드를 일괄적으로 복원
복원 프로세스가 최근 배치를 상태 저장소에 로드한 후
onBatchRestored메소드가 호출애플리케이션이 복구 프로세스를 완료하면 복원된 마지막 리스너를 총 레코드 수와 함께 호출
uncaught 예외 핸들러
예기치 않은 오류 처리를 위해
KafkaStreams.setUncaughtExceptionHandler제공
📖 요약
카프카 스트림즈를 모니터링하려면 카프카 브로커도 살펴봐야 한다.
애플리케이션의 성능이 어떻게 되는지 보고 싶다면 메트릭 리포팅을 수시로 활성화해야 한다.
내부를 살펴볼 필요가 있으며 가끔
jstack(스레드 덤프)과jmap/jhat(힙 덤프) 같은 자바에 포함된 명령줄 도구를 사용해 좀 더 저 수준에서 애플리케이션 동작을 이해해야 한다.
8장. 카프카 스트림즈 애플리케이션 테스트
토폴로지 테스트
ProcessorTopologyTestDriver를 사용하면 테스트 실행을 위해 카프카 없이도 테스트 작성이 가능
.
👉🏻 테스트 만들기
.
👉🏻 토폴로지에서 상태 저장소 테스트
.
👉🏻 프로세서와 트랜스포머 테스트
Processor나Transformer에 대한 단위 테스트 작성은 어렵지 않지만, 두 클래스 모두 상태 저장소를 얻고 펑추에이션 액션을 스케줄링하기 위해ProcessorContext에 의존해야 한다.Mockito같은 모의 객체 프레임워크를 사용해 테스트에서 모의 객체를 생성하는 방법이 있다.또 다른 옵션은
ProcessorTopologyTestDriver와 동일한 테스트 라이브러리에 있는MockProcessorContext객체를 사용하는 것
통합 테스트
카프카 테스트 라이브러리와 함께 사용 가능한 내장 카프카 클러스터를 사용할 수 있다.
내장 카프카 클러스터를 사용하면 언제든 개별 테스트 또는 전체 테스트의 일부분이 되었든 사용자 머신에서 카프카 클러스터가 필요한 통합 테스트 실행이 가능하다.
👉🏻 통합 테스트 구축
내장 카프카 서버를 사용하는 첫 단계는 3개의 추가적인 테스트 의존성을 추가하는 것
scala-library-2.12.4.jar
kafka_2.12-1.0.0-test.jar
kafka_2.12-1.0.0.jar
.
내장 카프카 클러스터 추가
내장 카프카 클러스터 추가
다음 단계에 따라 통합 테스트 실행
(1) 카프카 스트림즈 애플리케이션 시작
(2) 소스 토픽에 레코드를 쓰고 정확한 결과인지 검증
(3) 패턴과 일치하는 새 토픽 생성
(4) 추가적인 레코드를 새로 생성된 토픽에 쓰고 정확한 결과인지 검증
Last updated