03.카프카 스트림즈 관리

Part 3. 카프카 스트림즈 관리

Table of contents

  • 기본적인 카프카 모니터링

    • 인터셉터

  • 메트릭

  • 카프카 스트림즈 디버깅 기술

7장. 모니터링과 성능

기본적인 카프카 모니터링

카프카 스트림즈 API는 카프카의 일부이므로 애플리케이션을 모니터링할 때 카프카도 일부 모니터링해야 한다.

Monitoring

.

👉🏻 컨슈머와 프로듀서 성능 측정

  • 프로듀서와 컨슈머의 성능은 처리량과 관련이 있다.

  • 프로듀서의 경우 프로듀서가 얼마나 빠르게 브로커로 메시지를 보내느냐가 큰 관심사

    • 분명 처리량이 높으면 높을수록 더 낮다

  • 컨슈머의 경우 브로커에서 얼마나 빠르게 메시지를 읽을 수 있느냐가 성능에 영향

    • 컨슈머 성능을 측정하는 또 다른 방법은 컨슈머 지연

  • 프로듀서가 브로커에 기록하는 속도와 컨슈머가 메시지를 읽는 속도 차이를 컨슈머 지연이라고 부른다

.

👉🏻 컨슈머 지연 확인하기

  • 컨슈머 지연을 확인하기 위해 카프카는 편리한 명령줄 도구를 제공

    • kafka-consumer-groups.sh

  • 활성화 상태의 모든 컨슈머 그룹을 찾기 위해 list 명령을 사용

  • 조회할 컨슈머 그룹 이름을 선택하고 다음 명령을 실행

  • 작은 지연이나 일정한 지연은 문제가 안 되지만, 시간이 지남에 따라 계속 증가하는 지연은 컨슈머에게 더 많은 리소스를 제공해야 한다

.

👉🏻 프로듀서와 컨슈머 가로채기

  • 인터셉터는 디버깅을 위한 일반적인 제일선 도구는 아니지만 카프카 스트리밍 애플리케이션의 동작을 관찰하는 데 유용할 수 있으며, 자신만의 도구 세트에 추가할 만한 유용한 도구이다.

  • 인터셉터를 사용하는 좋은 예제는 카프카 스트림즈 애플리케이션이 카프카 토픽으로 다시 생산하는 메시지 오프셋을 추적하는 데 사용

컨슈머 인터셉터

  • 컨슈머 인터셉터는 가로채기를 위해 두 가지 접근점을 제공

1️⃣ ConsumerInterceptor.onConsume()

  • 브로커에서 조회한 시점과 Consumer.poll() 메소드가 메시지를 반환하기 전 ConsumerRecords에서 읽는다.

  • ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG를 통해 하나 이상의 ConsumerInterceptor 구현자 클래스의 컬렉션으로 지정

2️⃣ ConsumerInterceptor.onCommit()

  • 컨슈머가 브로커에게 오프셋을 커밋하면 브로커는 토픽, 파티션 및 커밋된 오프셋과 관련된 메타데이터와 함께 정보가 포함된 Map<TopicPartition, OffsetAndMetadata>를 반환

로깅 목적으로 사용되는 간단한 ConsumerInterceptor 예시

프로듀서 인터셉터

  • ProducerInterceptor.onSend()ProducerInterceptor.onAcknowledgement() 두 가지 접근 지점이 존재

  • 체인상에 있는 각 프로듀서 인터셉터는 이전 인터셉터에서 반환된 객체를 받는다.

간단히 로깅하는 ProducerInterceptor 예제

  • ProducerInterceptor는 ProducerConfig.INTERCEPTOR_CLASSES_CONFIG에 지정하고, 하나 이상의 ProducerInterceptor 클래스의 컬렉션으로 설정

✅ 참고

카프카 스트림즈 애플리케이션에서 인터셉터를 구성할 때 컨슈머와 프로듀서 인터셉터의 속성 이름의 프리픽스를

StreamsConfig.consumerPrefix(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG)

StreamsConfig.producerPrefix(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG) 로 지정해야 한다.

부수적으로 인터셉터는 카프카 스트림즈 애플리케이션의 모든 레코드에서 작동하므로 로깅 인터셉터의 출력이 중요하다.

  • 인터셉터 결과는 소스 코드를 설치한 로그 디렉토리에 있는 consumer_interceptor.log 및 producer_interceptor.log 로 출력

애플리케이션 메트릭

메트릭 카테고리 살펴보기

  • 스레드 메트릭

    • 평균 커밋, 폴링, 처리 작업 시간

    • 초당 생성한 태스크 수, 초당 종료된 태스크 수

  • 태스크 메트릭

    • 초당 평균 커밋 횟수

    • 평균 커밋 시간

  • 프로세서 노드 메트릭

    • 평균 및 최대 처리 시간

    • 초당 평균 처리 작업 수

    • 포워드 레이트

  • 상태 저장소 메트릭

    • put, get, flush 작업의 평균 실행 시간

    • put, get, flush 작업의 초당 평균 실행 횟수

Monitor Kafka Streams Applications in Confluent

.

👉🏻 메트릭 구성

  • 카프카 스트림즈는 이미 성능 메트릭을 수집하는 메커니즘을 제공

설정한 레벨에 따른 간으한 메트릭

  • 메트릭 수집의 기본 레벨은 INFO

매트릭 카테고리
DEBUG
INFO

스레드

O

O

태스크

O

프로세서 노드

O

상태 저장소

O

레코드 캐시

O

메트릭을 위한 구성 변경

  • 카프카 스트림즈 애플리케이션의 전체 범위를 측정하는 기본 메트릭이 있으며, DEBUG 레벨에서 메트릭 수집을 설정하려면 그 전에 성능 영향을 신중하게 고려해야 함

.

👉🏻 수집한 메트릭 확인 방법

.

👉🏻 JMX(Java Management Extensions) 사용

  • 자바 VM에서 실행되는 프로그램의 동작을 보는 표준 방법

  • JMX를 사용해 자바 VM 성능도 확인 가능

  • 즉, JMX는 실행 중인 프로그램의 일부를 노출하는 인프라를 제공

  • 모니터링 수행을 위해 VisualVM, JConsole, JMC 사용

추가적인 카프카 스트림즈 디버깅 기술

👉🏻 애플리케이션 구조 조회

  • Topology.describe() 메소드는 애플리케이션 구조에 관한 일반적인 정보를 제공

Result
  • 애플리케이션에서 실행 시간 정보를 보여주는 StreamThread 객체에 관한 정보를 얻는 것도 유용

    • KafkaStreams.localThreadsMetadata() 메소드 사용

.

👉🏻 다양한 애플리케이션 상태 알림 받기

StateListener 사용

  • 카프카 스트림즈 애플리케이션에서 가능한 여섯 가지 유효한 상태를 보여준다.

카프카 스트림즈 애플리케이션의 상태

Result

상태 리스토어 리스너

  • 카프카 스트림즈에서는 상태 저장소의 백업으로 변경로그 토픽을 사용

  • 변경로그는 변경이 발생한 상태 저장소의 업데이트를 기록

  • 카프카 스트림즈 애플리케이션이 실패하거나 재시작할 때, 상태 저장소는 로컬 상태 파일에서 복구 가능

  • StateListener와 흡사한 StateRestoreListener 인터페이스는 애플리케이션 내부에서 일어나는 일들에 대한 알림을 허용

    • onRestoreStart, onBatchRestored, onRestoreEnd 세 가지 메소드 존재

  • 내부 컨슈머를 사용해 번경로그 토픽을 읽으므로 애플리케이션이 각 consumer.poll() 메소드 호출에서 레코드를 일괄적으로 복원

  • 복원 프로세스가 최근 배치를 상태 저장소에 로드한 후 onBatchRestored 메소드가 호출

  • 애플리케이션이 복구 프로세스를 완료하면 복원된 마지막 리스너를 총 레코드 수와 함께 호출

uncaught 예외 핸들러

  • 예기치 않은 오류 처리를 위해 KafkaStreams.setUncaughtExceptionHandler 제공

📖 요약

카프카 스트림즈를 모니터링하려면 카프카 브로커도 살펴봐야 한다.

애플리케이션의 성능이 어떻게 되는지 보고 싶다면 메트릭 리포팅을 수시로 활성화해야 한다.

내부를 살펴볼 필요가 있으며 가끔 jstack(스레드 덤프)과 jmap/jhat(힙 덤프) 같은 자바에 포함된 명령줄 도구를 사용해 좀 더 저 수준에서 애플리케이션 동작을 이해해야 한다.

8장. 카프카 스트림즈 애플리케이션 테스트

토폴로지 테스트

ProcessorTopologyTestDriver를 사용하면 테스트 실행을 위해 카프카 없이도 테스트 작성이 가능

.

👉🏻 테스트 만들기

.

👉🏻 토폴로지에서 상태 저장소 테스트

.

👉🏻 프로세서와 트랜스포머 테스트

  • ProcessorTransformer에 대한 단위 테스트 작성은 어렵지 않지만, 두 클래스 모두 상태 저장소를 얻고 펑추에이션 액션을 스케줄링하기 위해 ProcessorContext에 의존해야 한다.

  • Mockito 같은 모의 객체 프레임워크를 사용해 테스트에서 모의 객체를 생성하는 방법이 있다.

  • 또 다른 옵션은 ProcessorTopologyTestDriver와 동일한 테스트 라이브러리에 있는 MockProcessorContext 객체를 사용하는 것

통합 테스트

카프카 테스트 라이브러리와 함께 사용 가능한 내장 카프카 클러스터를 사용할 수 있다.

  • 내장 카프카 클러스터를 사용하면 언제든 개별 테스트 또는 전체 테스트의 일부분이 되었든 사용자 머신에서 카프카 클러스터가 필요한 통합 테스트 실행이 가능하다.

👉🏻 통합 테스트 구축

  • 내장 카프카 서버를 사용하는 첫 단계는 3개의 추가적인 테스트 의존성을 추가하는 것

    • scala-library-2.12.4.jar

    • kafka_2.12-1.0.0-test.jar

    • kafka_2.12-1.0.0.jar

.

내장 카프카 클러스터 추가

내장 카프카 클러스터 추가

  • 다음 단계에 따라 통합 테스트 실행

    • (1) 카프카 스트림즈 애플리케이션 시작

    • (2) 소스 토픽에 레코드를 쓰고 정확한 결과인지 검증

    • (3) 패턴과 일치하는 새 토픽 생성

    • (4) 추가적인 레코드를 새로 생성된 토픽에 쓰고 정확한 결과인지 검증

Last updated