📖
Aaron's TECH BOOK
  • Intro
    • About me
  • Lecture
    • Kubernetes
      • Begin Kubernetes
    • Kafka
      • Begin Kafka
    • Kotlin
      • TDD, Clean Code Preview
      • woowa Kotlin
    • Java
      • Multithread Concurrency
      • The Java
    • Toby
      • Toby Spring 6
      • Toby Spring Boot
    • MSA
      • 01.Micro Service
      • 02.DDD 설계
      • 03.DDD 구현
      • 04.EDA 구현
    • Spring Boot
    • Spring Batch
    • Spring Core Advanced
    • Spring DB Part II
    • Spring DB Part I
    • JPA API and Performance Optimization
    • JPA Web Application
    • JPA Programming Basic
    • Spring MVC Part 2
      • 01.Thymeleaf
      • 02.ETC
      • 03.Validation
      • 04.Login
      • 05.Exception
    • Spring MVC Part 1
      • 01.Servlet
      • 02.MVC
    • Http
      • 01.Basic
      • 02.Method
      • 03.Header
    • Spring Core
    • Study
      • Concurrency issues
      • First Come First Served
      • Performance Test
      • TDD
      • IntelliJ
  • Book
    • Kafka Streams in Action
      • 01.카프카 스트림즈
      • 02.카프카 스트림즈 개발
      • 03.카프카 스트림즈 관리
    • Effective Kotlin
      • 01.좋은 코드
      • 02.코드 설계
      • 03.효율성
    • 이벤트 소싱과 MSA
      • 01.도메인 주도 설계
      • 02.객체지향 설계 원칙
      • 03-04.이벤트 소싱
      • 05.마이크로서비스 협업
      • 06.결과적 일관성
      • 07.CQRS
      • 08.UI
      • 09.클라우드 환경
    • 몽고DB 완벽 가이드
      • I. 몽고DB 시작
      • II. 몽고DB 개발
    • Kotlin Cookbook
      • 코틀린 기초
      • 코틀린 기능
      • ETC
    • Kotlin in Action
      • 함수/클래스/객체/인터페이스
      • 람다와 타입
      • 오버로딩과 고차 함수
      • 제네릭스, 애노테이션, 리플렉션
    • Kent Beck Tidy First?
    • 대규모 시스템 설계 기초
      • 01.사용자 수에 따른 규모 확장성
      • 02.개략적인 규모 추정
      • 03.시스템 설계 공략법
      • 04.처리율 제한 장치 설계
      • 05.안정 해시 설계
      • 06.키-값 저장소 설계
      • 07.유일 ID 생성기 설계
      • 08.URL 단축기 설계
      • 09.웹 크롤러 설계
      • 10.알림 시스템 설계
      • 11.뉴스 피드 시스템 설계
      • 12.채팅 시스템 설계
      • 13.검색어 자동완성 시스템
      • 14.유튜브 설계
      • 15.구글 드라이브 설계
      • 16.배움은 계속된다
    • 실용주의 프로그래머📖
    • GoF Design Patterns
    • 도메인 주도 개발 시작하기
      • 01.도메인 모델 시작하기
      • 02.아키텍처 개요
      • 03.애그리거트
      • 04.리포지터리와 모델 구현
      • 05.Spring Data JPA를 이용한 조회 기능
      • 06.응용 서비스와 표현 영역
      • 07.도메인 서비스
      • 08.애그리거트 트랜잭션 관리
      • 09.도메인 모델과 바운디드 컨텍스트
      • 10.이벤트
      • 11.CQRS
    • Effective Java 3/E
      • 객체, 공통 메서드
      • 클래스, 인터페이스, 제네릭
    • 소프트웨어 장인
    • 함께 자라기
    • Modern Java In Action
      • 01.기초
      • 02.함수형 데이터 처리
      • 03.스트림과 람다를 이용한 효과적 프로그래밍
      • 04.매일 자바와 함께
    • Refactoring
      • 01.리펙터링 첫 번째 예시
      • 02.리펙터링 원칙
      • 03.코드에서 나는 악취
      • 06.기본적인 리펙터링
      • 07.캡슐화
      • 08.기능 이동
      • 09.데이터 조직화
      • 10.조건부 로직 간소화
      • 11.API 리팩터링
      • 12.상속 다루기
    • 객체지향의 사실과 오해
      • 01.협력하는 객체들의 공동체
      • 02.이상한 나라의 객체
      • 03.타입과 추상화
      • 04.역할, 책임, 협력
      • 05.책임과 메시지
      • 06.객체 지도
      • 07.함께 모으기
      • 부록.추상화 기법
    • Clean Code
    • 자바 ORM 표준 JPA 프로그래밍
Powered by GitBook
On this page
  • Part 3. 카프카 스트림즈 관리
  • 7장. 모니터링과 성능
  • 기본적인 카프카 모니터링
  • 애플리케이션 메트릭
  • 추가적인 카프카 스트림즈 디버깅 기술
  • 8장. 카프카 스트림즈 애플리케이션 테스트
  • 토폴로지 테스트
  • 통합 테스트
  1. Book
  2. Kafka Streams in Action

03.카프카 스트림즈 관리

Last updated 26 days ago

Part 3. 카프카 스트림즈 관리

Table of contents

  • 기본적인 카프카 모니터링

    • 인터셉터

  • 메트릭

  • 카프카 스트림즈 디버깅 기술

7장. 모니터링과 성능

기본적인 카프카 모니터링

카프카 스트림즈 API는 카프카의 일부이므로 애플리케이션을 모니터링할 때 카프카도 일부 모니터링해야 한다.

.

👉🏻 컨슈머와 프로듀서 성능 측정

  • 프로듀서와 컨슈머의 성능은 처리량과 관련이 있다.

  • 프로듀서의 경우 프로듀서가 얼마나 빠르게 브로커로 메시지를 보내느냐가 큰 관심사

    • 분명 처리량이 높으면 높을수록 더 낮다

  • 컨슈머의 경우 브로커에서 얼마나 빠르게 메시지를 읽을 수 있느냐가 성능에 영향

    • 컨슈머 성능을 측정하는 또 다른 방법은 컨슈머 지연

  • 프로듀서가 브로커에 기록하는 속도와 컨슈머가 메시지를 읽는 속도 차이를 컨슈머 지연이라고 부른다

.

👉🏻 컨슈머 지연 확인하기

  • 컨슈머 지연을 확인하기 위해 카프카는 편리한 명령줄 도구를 제공

    • kafka-consumer-groups.sh

  • 활성화 상태의 모든 컨슈머 그룹을 찾기 위해 list 명령을 사용

kafka-installed-path/bin/kafka-consumer-groups.sh
                --bootstrap-server localhost:9092
                --list
  • 조회할 컨슈머 그룹 이름을 선택하고 다음 명령을 실행

kafka-installed-path/bin/kafka-consumer-groups.sh
            --bootstrap-server localhost:9092
            --group GROUP-NAME
            --describe
  • 작은 지연이나 일정한 지연은 문제가 안 되지만, 시간이 지남에 따라 계속 증가하는 지연은 컨슈머에게 더 많은 리소스를 제공해야 한다

.

👉🏻 프로듀서와 컨슈머 가로채기

  • 인터셉터는 디버깅을 위한 일반적인 제일선 도구는 아니지만 카프카 스트리밍 애플리케이션의 동작을 관찰하는 데 유용할 수 있으며, 자신만의 도구 세트에 추가할 만한 유용한 도구이다.

  • 인터셉터를 사용하는 좋은 예제는 카프카 스트림즈 애플리케이션이 카프카 토픽으로 다시 생산하는 메시지 오프셋을 추적하는 데 사용

컨슈머 인터셉터

  • 컨슈머 인터셉터는 가로채기를 위해 두 가지 접근점을 제공

1️⃣ ConsumerInterceptor.onConsume()

  • 브로커에서 조회한 시점과 Consumer.poll() 메소드가 메시지를 반환하기 전 ConsumerRecords에서 읽는다.

  • ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG를 통해 하나 이상의 ConsumerInterceptor 구현자 클래스의 컬렉션으로 지정

ConsumerRecords<String, String> poll(long timeout) {
  ConsumerRecords<String, String> consumerRecords = 
    ...consuming records
  // 인터셉터 체인을 통해 레코드를 실행하고 결과를 반헌
  return interceptors.onConsume(consumerRecords);
}

2️⃣ ConsumerInterceptor.onCommit()

  • 컨슈머가 브로커에게 오프셋을 커밋하면 브로커는 토픽, 파티션 및 커밋된 오프셋과 관련된 메타데이터와 함께 정보가 포함된 Map<TopicPartition, OffsetAndMetadata>를 반환

로깅 목적으로 사용되는 간단한 ConsumerInterceptor 예시

// StockTransactionConsumerInterceptor.java

public class StockTransactionConsumerInterceptor implements ConsumerInterceptor<Object, Object> {
    //...

    /**
     * 레코드가 처리되기 전에 컨슈머 레코드와 메타데이터를 로깅
     */
    @Override
    public ConsumerRecords<Object, Object> onConsume(ConsumerRecords<Object, Object> consumerRecords) {
        LOG.info("Intercepted ConsumerRecords {}", buildMessage(consumerRecords.iterator()));
        return consumerRecords;
    }

    /**
     * 카프카 스트림즈 컨슈머가 브로커에 오프셋을 커밋할 때 커밋 정보를 로깅
     */
    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> map) {
        LOG.info("Commit information {}", map);
    }
    //...
}

프로듀서 인터셉터

  • ProducerInterceptor.onSend() 및 ProducerInterceptor.onAcknowledgement() 두 가지 접근 지점이 존재

  • 체인상에 있는 각 프로듀서 인터셉터는 이전 인터셉터에서 반환된 객체를 받는다.

간단히 로깅하는 ProducerInterceptor 예제

// ZMartProducerInterceptor.java

public class ZMartProducerInterceptor implements ProducerInterceptor<Object, Object> {
    /**
     * 메시지를 브로커에 전송하기 바로 전에 로깅
     */
    @Override
    public ProducerRecord<Object, Object> onSend(ProducerRecord<Object, Object> record) {
        LOG.info("ProducerRecord being sent out {} ", record);
        return record;
    }

    /**
     * 브로커 수신 확인 또는 생산 단계 동안 브로커 측에서 오류가 발생했는지 로깅
     */
    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        if (exception != null) {
            LOG.warn("Exception encountered producing record {}", exception);
        } else {
            LOG.info("record has been acknowledged {} ", metadata);
        }
    }
}
  • ProducerInterceptor는 ProducerConfig.INTERCEPTOR_CLASSES_CONFIG에 지정하고, 하나 이상의 ProducerInterceptor 클래스의 컬렉션으로 설정

✅ 참고

카프카 스트림즈 애플리케이션에서 인터셉터를 구성할 때 컨슈머와 프로듀서 인터셉터의 속성 이름의 프리픽스를

StreamsConfig.consumerPrefix(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG) 와

StreamsConfig.producerPrefix(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG) 로 지정해야 한다.

부수적으로 인터셉터는 카프카 스트림즈 애플리케이션의 모든 레코드에서 작동하므로 로깅 인터셉터의 출력이 중요하다.

  • 인터셉터 결과는 소스 코드를 설치한 로그 디렉토리에 있는 consumer_interceptor.log 및 producer_interceptor.log 로 출력

애플리케이션 메트릭

메트릭 카테고리 살펴보기

  • 스레드 메트릭

    • 평균 커밋, 폴링, 처리 작업 시간

    • 초당 생성한 태스크 수, 초당 종료된 태스크 수

  • 태스크 메트릭

    • 초당 평균 커밋 횟수

    • 평균 커밋 시간

  • 프로세서 노드 메트릭

    • 평균 및 최대 처리 시간

    • 초당 평균 처리 작업 수

    • 포워드 레이트

  • 상태 저장소 메트릭

    • put, get, flush 작업의 평균 실행 시간

    • put, get, flush 작업의 초당 평균 실행 횟수

.

👉🏻 메트릭 구성

  • 카프카 스트림즈는 이미 성능 메트릭을 수집하는 메커니즘을 제공

설정한 레벨에 따른 간으한 메트릭

  • 메트릭 수집의 기본 레벨은 INFO

매트릭 카테고리
DEBUG
INFO

스레드

O

O

태스크

O

프로세서 노드

O

상태 저장소

O

레코드 캐시

O

메트릭을 위한 구성 변경

private static Properties getProperties() {
    Properties props = new Properties();
    props.put(StreamsConfig.CLIENT_ID_CONFIG, "zmart-metrics-client-id"); // 클라이언트 아이디
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "zmart-metrics-group-id"); // 그룹 아이디
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "zmart-metrics-application-id"); // 애플리케이션 아이디
    props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG"); // 메트릭 로그 레벨
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // 브로커 접속 설정
    props.put(StreamsConfig.producerPrefix(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG), Collections.singletonList(ZMartProducerInterceptor.class));
    return props;
}
  • 카프카 스트림즈 애플리케이션의 전체 범위를 측정하는 기본 메트릭이 있으며, DEBUG 레벨에서 메트릭 수집을 설정하려면 그 전에 성능 영향을 신중하게 고려해야 함

.

👉🏻 수집한 메트릭 확인 방법

  • 카프카 스트림즈 애플리케이션의 메트릭을 수집하면 메트릭 리포터에게 배포

  • 기본 메트릭 리포터를 JMX(Java Management Extensions)를 통해 제공

  • 메트릭 접근 예제

.

👉🏻 JMX(Java Management Extensions) 사용

  • 자바 VM에서 실행되는 프로그램의 동작을 보는 표준 방법

  • JMX를 사용해 자바 VM 성능도 확인 가능

  • 즉, JMX는 실행 중인 프로그램의 일부를 노출하는 인프라를 제공

추가적인 카프카 스트림즈 디버깅 기술

👉🏻 애플리케이션 구조 조회

  • Topology.describe() 메소드는 애플리케이션 구조에 관한 일반적인 정보를 제공

  • 애플리케이션에서 실행 시간 정보를 보여주는 StreamThread 객체에 관한 정보를 얻는 것도 유용

    • KafkaStreams.localThreadsMetadata() 메소드 사용

.

👉🏻 다양한 애플리케이션 상태 알림 받기

StateListener 사용

  • 카프카 스트림즈 애플리케이션에서 가능한 여섯 가지 유효한 상태를 보여준다.

카프카 스트림즈 애플리케이션의 상태

// ZMartKafkaStreamsAdvancedReqsMetricsApp.java

KafkaStreams.StateListener stateListener = (newState, oldState) -> {
    // REBALANCING 에서 RUNNING 으로 상태 전환
    if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
        LOG.info("Application has gone from REBALANCING to RUNNING ");
        LOG.info("Topology Layout {}", streamsBuilder.build().describe());
    }

    // REBALANCING 단계 진입 시 액션
    if (newState == KafkaStreams.State.REBALANCING) {
        LOG.info("Application is entering REBALANCING phase");
    }
};

상태 리스토어 리스너

  • 카프카 스트림즈에서는 상태 저장소의 백업으로 변경로그 토픽을 사용

  • 변경로그는 변경이 발생한 상태 저장소의 업데이트를 기록

  • 카프카 스트림즈 애플리케이션이 실패하거나 재시작할 때, 상태 저장소는 로컬 상태 파일에서 복구 가능

  • StateListener와 흡사한 StateRestoreListener 인터페이스는 애플리케이션 내부에서 일어나는 일들에 대한 알림을 허용

    • onRestoreStart, onBatchRestored, onRestoreEnd 세 가지 메소드 존재

// LoggingStateRestoreListener.java

public class LoggingStateRestoreListener implements StateRestoreListener {
    private static final Logger LOG = LoggerFactory.getLogger(LoggingStateRestoreListener.class);
    private final Map<TopicPartition, Long> totalToRestore = new ConcurrentHashMap<>();
    private final Map<TopicPartition, Long> restoredSoFar = new ConcurrentHashMap<>();

    /**
     * 리스토어 리스너 로깅
     */
    @Override
    public void onRestoreStart(TopicPartition topicPartition, String store, long start, long end) {
        long toRestore = end - start;
        totalToRestore.put(topicPartition, toRestore); // 복원할 주어진 topicPartition 총량 저장
        LOG.info("Starting restoration for {} on topic-partition {} total to restore {}", store, topicPartition, toRestore);

    }
    /**
     * 복원된 각 배치를 처리
     */
    @Override
    public void onBatchRestored(TopicPartition topicPartition, String store, long start, long batchCompleted) {
        NumberFormat formatter = new DecimalFormat("#.##");

        long currentProgress = batchCompleted + restoredSoFar.getOrDefault(topicPartition, 0L); // 복원된 전체 레코드 개수 계산
        double percentComplete = (double) currentProgress / totalToRestore.get(topicPartition); // 복원이 완료된 백분율을 결정

        LOG.info("Completed {} for {}% of total restoration for {} on {}",
                batchCompleted, formatter.format(percentComplete * 100.00), store, topicPartition); // 복원된 백분율을 출력
        restoredSoFar.put(topicPartition, currentProgress); // 지금까지 복원된 레코드 개수 저장
    }
    /**
     * 복원 프로세스가 완료될 때
     */
    @Override
    public void onRestoreEnd(TopicPartition topicPartition, String store, long totalRestored) {
        LOG.info("Restoration completed for {} on topic-partition {}", store, topicPartition);
        restoredSoFar.put(topicPartition, 0L);
    }
}

// CoGroupingListeningExampleApplication.java
KafkaStreams kafkaStreams = new KafkaStreams(topology, streamsConfig);
kafkaStreams.setGlobalStateRestoreListener(new LoggingStateRestoreListener()); // 글로벌 리스토어 리스너 정의
  • 내부 컨슈머를 사용해 번경로그 토픽을 읽으므로 애플리케이션이 각 consumer.poll() 메소드 호출에서 레코드를 일괄적으로 복원

  • 복원 프로세스가 최근 배치를 상태 저장소에 로드한 후 onBatchRestored 메소드가 호출

  • 애플리케이션이 복구 프로세스를 완료하면 복원된 마지막 리스너를 총 레코드 수와 함께 호출

uncaught 예외 핸들러

  • 예기치 않은 오류 처리를 위해 KafkaStreams.setUncaughtExceptionHandler 제공

// CoGroupingListeningExampleApplication.java

kafkaStreams.setUncaughtExceptionHandler((thread, exception) ->
    LOG.error("Thread [{}] encountered [{}]", thread.getName(), exception.getMessage())
);

📖 요약

카프카 스트림즈를 모니터링하려면 카프카 브로커도 살펴봐야 한다.

애플리케이션의 성능이 어떻게 되는지 보고 싶다면 메트릭 리포팅을 수시로 활성화해야 한다.

내부를 살펴볼 필요가 있으며 가끔 jstack(스레드 덤프)과 jmap/jhat(힙 덤프) 같은 자바에 포함된 명령줄 도구를 사용해 좀 더 저 수준에서 애플리케이션 동작을 이해해야 한다.

8장. 카프카 스트림즈 애플리케이션 테스트

토폴로지 테스트

ProcessorTopologyTestDriver를 사용하면 테스트 실행을 위해 카프카 없이도 테스트 작성이 가능

testImplementation("org.apache.kafka:kafka-streams:1.0.0")
testImplementation("org.apache.kafka:kafka-clients:1.0.0")
// ZMartTopology.java

public class ZMartTopology {
    public static Topology build() {
        Serde<Purchase> purchaseSerde = StreamsSerdes.PurchaseSerde();
        Serde<PurchasePattern> purchasePatternSerde = StreamsSerdes.PurchasePatternSerde();
        Serde<RewardAccumulator> rewardAccumulatorSerde = StreamsSerdes.RewardAccumulatorSerde();
        Serde<String> stringSerde = Serdes.String();

        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream<String, Purchase> purchaseKStream = streamsBuilder.stream("transactions", Consumed.with(stringSerde, purchaseSerde))
                .mapValues(p -> Purchase.builder(p).maskCreditCard().build());
        KStream<String, PurchasePattern> patternKStream = purchaseKStream.mapValues(purchase -> PurchasePattern.builder(purchase).build());
        patternKStream.to("patterns", Produced.with(stringSerde, purchasePatternSerde));
        KStream<String, RewardAccumulator> rewardsKStream = purchaseKStream.mapValues(purchase -> RewardAccumulator.builder(purchase).build());

        rewardsKStream.to("rewards", Produced.with(stringSerde, rewardAccumulatorSerde));
        purchaseKStream.to("purchases", Produced.with(Serdes.String(), purchaseSerde));

        return streamsBuilder.build();
    }
}

.

👉🏻 테스트 만들기

// ZMartTopologyTest.java

public class ZMartTopologyTest {
    private ProcessorTopologyTestDriver topologyTestDriver;

    @BeforeEach
    public void setUp() {
        Properties props = new Properties();
        props.put(StreamsConfig.CLIENT_ID_CONFIG, "FirstZmart-Kafka-Streams-Client");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "zmart-purchases");
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "FirstZmart-Kafka-Streams-App");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
        props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);

        StreamsConfig streamsConfig = new StreamsConfig(props);
        Topology topology = ZMartTopology.build(); // 토폴로지 획득

        // ProcessorTopologyTestDriver 생성
        topologyTestDriver = new ProcessorTopologyTestDriver(streamsConfig, topology); 
    }


    @Test
    @DisplayName("Testing the ZMart Topology Flow")
    public void testZMartTopology() {
        Serde<Purchase> purchaseSerde = StreamsSerdes.PurchaseSerde();
        Serde<PurchasePattern> purchasePatternSerde = StreamsSerdes.PurchasePatternSerde();
        Serde<RewardAccumulator> rewardAccumulatorSerde = StreamsSerdes.RewardAccumulatorSerde();
        Serde<String> stringSerde = Serdes.String();

        // 테스트 객체 생성
        Purchase purchase = DataGenerator.generatePurchase();
        // 토폴로지에 초기 레코드 전송
        topologyTestDriver.process("transactions",
                null,
                purchase,
                stringSerde.serializer(),
                purchaseSerde.serializer());
        // 레코드 읽기
        ProducerRecord<String, Purchase> record = topologyTestDriver.readOutput("purchases",
                stringSerde.deserializer(),
                purchaseSerde.deserializer());
        // 테스트 객체를 기대하는 형식으로 변환
        Purchase expectedPurchase = Purchase.builder(purchase).maskCreditCard().build();
        // 토폴로지로부터 레코드가 기대하는 레코드와 일치하는지 검사
        assertThat(record.value(), equalTo(expectedPurchase));

        RewardAccumulator expectedRewardAccumulator = RewardAccumulator.builder(expectedPurchase).build();

        ProducerRecord<String, RewardAccumulator> accumulatorProducerRecord = 
            topologyTestDriver.readOutput("rewards", // rewards 토픽에서 레코드 읽기
                stringSerde.deserializer(),
                rewardAccumulatorSerde.deserializer());

        assertThat(accumulatorProducerRecord.value(), equalTo(expectedRewardAccumulator)); // rewards 토픽 출력과 기댓값을 비교

        PurchasePattern expectedPurchasePattern = PurchasePattern.builder(expectedPurchase).build();

        ProducerRecord<String, PurchasePattern> purchasePatternProducerRecord = 
              topologyTestDriver.readOutput("patterns", // patterns 토픽에서 레코드 읽기
                stringSerde.deserializer(),
                purchasePatternSerde.deserializer());

        assertThat(purchasePatternProducerRecord.value(), equalTo(expectedPurchasePattern)); // patterns 토픽 출력과 기댓값 비교
    }
}

.

👉🏻 토폴로지에서 상태 저장소 테스트

// StockPerformanceStreamsProcessorTopologyTest.java

StockTransaction stockTransaction = DataGenerator.generateStockTransaction(); // 테스트 레코드 생성

topologyTestDriver.process("stock-transactions", // 테스트 드라이버로 레코드 처리
        stockTransaction.getSymbol(),
        stockTransaction,
        stringSerde.serializer(),
        stockTransactionSerde.serializer());

KeyValueStore<String, StockPerformance> store = topologyTestDriver.getKeyValueStore("stock-performance-store"); // 테스트 토폴로지로부터 상태 저장소 조회

assertThat(store.get(stockTransaction.getSymbol()), notNullValue());

.

👉🏻 프로세서와 트랜스포머 테스트

  • Processor나 Transformer에 대한 단위 테스트 작성은 어렵지 않지만, 두 클래스 모두 상태 저장소를 얻고 펑추에이션 액션을 스케줄링하기 위해 ProcessorContext에 의존해야 한다.

  • Mockito 같은 모의 객체 프레임워크를 사용해 테스트에서 모의 객체를 생성하는 방법이 있다.

  • 또 다른 옵션은 ProcessorTopologyTestDriver와 동일한 테스트 라이브러리에 있는 MockProcessorContext 객체를 사용하는 것

// CogroupingMethodHandleProcessorTest.java

private ProcessorContext processorContext = mock(ProcessorContext.class);
private MockKeyValueStore<String, Tuple<List<ClickEvent>, List<StockTransaction>>> keyValueStore = new MockKeyValueStore<>();

private CogroupingMethodHandleProcessor processor = new CogroupingMethodHandleProcessor(); // 테스트할 클래스

@Test
@DisplayName("Processor should initialize correctly")
public void testInitializeCorrectly() {
    processor.init(processorContext); // ProcessorContext 에서 메소드 호출을 트리거하는 프로세서의 init 메서드 호출
    verify(processorContext).schedule(eq(15000L), eq(STREAM_TIME), isA(Punctuator.class)); // 매개변수 검증
    verify(processorContext).getStateStore(TUPLE_STORE_NAME); // 상태 저장소에서 받은 값 검증
}

@Test
@DisplayName("Punctuate should forward records")
public void testPunctuateProcess() {
    when(processorContext.getStateStore(TUPLE_STORE_NAME)).thenReturn(keyValueStore);

    processor.init(processorContext); // 프로세서의 init 메서드 호출
    processor.process("ABC", Tuple.of(clickEvent, null));
    processor.process("ABC", Tuple.of(null, transaction));

    Tuple<List<ClickEvent>, List<StockTransaction>> tuple = keyValueStore.innerStore().get("ABC"); // process 메서드에서 상태 저장소에 있던 항목 꺼내기
    List<ClickEvent> clickEvents = new ArrayList<>(tuple._1);
    List<StockTransaction> stockTransactions = new ArrayList<>(tuple._2);

    processor.cogroup(124722348947L); // punctuate 스케줄에 사용된 코그룹 메서드 호출

    verify(processorContext).forward("ABC", Tuple.of(clickEvents, stockTransactions));

    assertThat(tuple._1.size(), equalTo(0));
    assertThat(tuple._2.size(), equalTo(0));
}

통합 테스트

카프카 테스트 라이브러리와 함께 사용 가능한 내장 카프카 클러스터를 사용할 수 있다.

  • 내장 카프카 클러스터를 사용하면 언제든 개별 테스트 또는 전체 테스트의 일부분이 되었든 사용자 머신에서 카프카 클러스터가 필요한 통합 테스트 실행이 가능하다.

👉🏻 통합 테스트 구축

  • 내장 카프카 서버를 사용하는 첫 단계는 3개의 추가적인 테스트 의존성을 추가하는 것

    • scala-library-2.12.4.jar

    • kafka_2.12-1.0.0-test.jar

    • kafka_2.12-1.0.0.jar

.

내장 카프카 클러스터 추가

// KafkaStreamsYellingIntegrationTest.java

/**
 * 테스트를 위한 내장 카프카 브로커 추가
 */
private static final int NUM_BROKERS = 1;

@ClassRule
public static final EmbeddedKafkaCluster EMBEDDED_KAFKA = new EmbeddedKafkaCluster(NUM_BROKERS);

/**
 * 토픽 만들기
 */
@BeforeClass
public static void setUpAll() throws Exception {
    EMBEDDED_KAFKA.createTopic(YELL_A_TOPIC);
    EMBEDDED_KAFKA.createTopic(OUT_TOPIC);
}

내장 카프카 클러스터 추가

  • 다음 단계에 따라 통합 테스트 실행

    • (1) 카프카 스트림즈 애플리케이션 시작

    • (2) 소스 토픽에 레코드를 쓰고 정확한 결과인지 검증

    • (3) 패턴과 일치하는 새 토픽 생성

    • (4) 추가적인 레코드를 새로 생성된 토픽에 쓰고 정확한 결과인지 검증

// KafkaStreamsYellingIntegrationTest.java

@Test
public void shouldYellFromMultipleTopics() throws Exception {

    StreamsBuilder streamsBuilder = new StreamsBuilder();

    streamsBuilder.<String, String>stream(Pattern.compile("yell.*"))
            .mapValues(String::toUpperCase)
            .to(OUT_TOPIC);

    kafkaStreams = new KafkaStreams(streamsBuilder.build(), streamsConfig);
    kafkaStreams.start(); // 카프카 스트림즈 애플리케이션 시작

    List<String> valuesToSendList = Arrays.asList("this", "should", "yell", "at", "you"); // 전송할 값 목록
    List<String> expectedValuesList = valuesToSendList.stream()
            .map(String::toUpperCase)
            .collect(Collectors.toList()); // 기댓값 목록

    IntegrationTestUtils.produceValuesSynchronously(YELL_A_TOPIC, // 내장 카프카로 값 생산
            valuesToSendList,
            producerConfig,
            mockTime);

    int expectedNumberOfRecords = 5;
    List<String> actualValues = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(consumerConfig, // 카프카에서 레코드를 소비
            OUT_TOPIC,
            expectedNumberOfRecords);
    assertThat(actualValues, equalTo(expectedValuesList)); // 읽은 값과 기댓값이 같은지 검증

    EMBEDDED_KAFKA.createTopic(YELL_B_TOPIC);

    valuesToSendList = Arrays.asList("yell", "at", "you", "too");
    IntegrationTestUtils.produceValuesSynchronously(YELL_B_TOPIC,
            valuesToSendList,
            producerConfig,
            mockTime);

    expectedValuesList = valuesToSendList.stream().map(String::toUpperCase).collect(Collectors.toList());

    expectedNumberOfRecords = 4;
    actualValues = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(consumerConfig,
            OUT_TOPIC,
            expectedNumberOfRecords);

    assertThat(actualValues, equalTo(expectedValuesList));

}

모니터링 수행을 위해 , , 사용

Result
Result
Monitoring
Monitor Kafka Streams Applications in Confluent
StockPerformanceStreamsAndProcessorMetricsApplication.java
VisualVM
JConsole
JMC