03.카프카 스트림즈 관리
Part 3. 카프카 스트림즈 관리
Table of contents
기본적인 카프카 모니터링
인터셉터
메트릭
카프카 스트림즈 디버깅 기술
7장. 모니터링과 성능
기본적인 카프카 모니터링
카프카 스트림즈 API는 카프카의 일부이므로 애플리케이션을 모니터링할 때 카프카도 일부 모니터링해야 한다.
.
👉🏻 컨슈머와 프로듀서 성능 측정
프로듀서와 컨슈머의 성능은 처리량과 관련이 있다.
프로듀서
의 경우 프로듀서가 얼마나 빠르게 브로커로 메시지를 보내느냐가 큰 관심사분명 처리량이 높으면 높을수록 더 낮다
컨슈머
의 경우 브로커에서 얼마나 빠르게 메시지를 읽을 수 있느냐가 성능에 영향컨슈머 성능을 측정하는 또 다른 방법은 컨슈머 지연
프로듀서가 브로커에 기록하는 속도와 컨슈머가 메시지를 읽는 속도 차이를
컨슈머 지연
이라고 부른다
.
👉🏻 컨슈머 지연 확인하기
컨슈머 지연을 확인하기 위해 카프카는 편리한 명령줄 도구를 제공
kafka-consumer-groups.sh
활성화 상태의 모든 컨슈머 그룹을 찾기 위해
list
명령을 사용
kafka-installed-path/bin/kafka-consumer-groups.sh
--bootstrap-server localhost:9092
--list
조회할 컨슈머 그룹 이름을 선택하고 다음 명령을 실행
kafka-installed-path/bin/kafka-consumer-groups.sh
--bootstrap-server localhost:9092
--group GROUP-NAME
--describe
작은 지연이나 일정한 지연은 문제가 안 되지만, 시간이 지남에 따라 계속 증가하는 지연은 컨슈머에게 더 많은 리소스를 제공해야 한다
.
👉🏻 프로듀서와 컨슈머 가로채기
인터셉터
는 디버깅을 위한 일반적인 제일선 도구는 아니지만 카프카 스트리밍 애플리케이션의 동작을 관찰하는 데 유용할 수 있으며, 자신만의 도구 세트에 추가할 만한 유용한 도구이다.인터셉터
를 사용하는 좋은 예제는 카프카 스트림즈 애플리케이션이 카프카 토픽으로 다시 생산하는 메시지 오프셋을 추적하는 데 사용
컨슈머 인터셉터
컨슈머 인터셉터는 가로채기를 위해 두 가지 접근점을 제공
1️⃣ ConsumerInterceptor.onConsume()
브로커에서 조회한 시점과
Consumer.poll()
메소드가 메시지를 반환하기 전ConsumerRecords
에서 읽는다.ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG
를 통해 하나 이상의ConsumerInterceptor
구현자 클래스의 컬렉션으로 지정
ConsumerRecords<String, String> poll(long timeout) {
ConsumerRecords<String, String> consumerRecords =
...consuming records
// 인터셉터 체인을 통해 레코드를 실행하고 결과를 반헌
return interceptors.onConsume(consumerRecords);
}
2️⃣ ConsumerInterceptor.onCommit()
컨슈머가 브로커에게 오프셋을 커밋하면 브로커는 토픽, 파티션 및 커밋된 오프셋과 관련된 메타데이터와 함께 정보가 포함된
Map<TopicPartition, OffsetAndMetadata>
를 반환
로깅 목적으로 사용되는 간단한 ConsumerInterceptor 예시
// StockTransactionConsumerInterceptor.java
public class StockTransactionConsumerInterceptor implements ConsumerInterceptor<Object, Object> {
//...
/**
* 레코드가 처리되기 전에 컨슈머 레코드와 메타데이터를 로깅
*/
@Override
public ConsumerRecords<Object, Object> onConsume(ConsumerRecords<Object, Object> consumerRecords) {
LOG.info("Intercepted ConsumerRecords {}", buildMessage(consumerRecords.iterator()));
return consumerRecords;
}
/**
* 카프카 스트림즈 컨슈머가 브로커에 오프셋을 커밋할 때 커밋 정보를 로깅
*/
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> map) {
LOG.info("Commit information {}", map);
}
//...
}
프로듀서 인터셉터
ProducerInterceptor.onSend()
및ProducerInterceptor.onAcknowledgement()
두 가지 접근 지점이 존재체인상에 있는 각 프로듀서 인터셉터는 이전 인터셉터에서 반환된 객체를 받는다.
간단히 로깅하는 ProducerInterceptor 예제
// ZMartProducerInterceptor.java
public class ZMartProducerInterceptor implements ProducerInterceptor<Object, Object> {
/**
* 메시지를 브로커에 전송하기 바로 전에 로깅
*/
@Override
public ProducerRecord<Object, Object> onSend(ProducerRecord<Object, Object> record) {
LOG.info("ProducerRecord being sent out {} ", record);
return record;
}
/**
* 브로커 수신 확인 또는 생산 단계 동안 브로커 측에서 오류가 발생했는지 로깅
*/
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
if (exception != null) {
LOG.warn("Exception encountered producing record {}", exception);
} else {
LOG.info("record has been acknowledged {} ", metadata);
}
}
}
ProducerInterceptor는 ProducerConfig.INTERCEPTOR_CLASSES_CONFIG에 지정하고, 하나 이상의 ProducerInterceptor 클래스의 컬렉션으로 설정
✅ 참고
카프카 스트림즈 애플리케이션에서 인터셉터를 구성할 때 컨슈머와 프로듀서 인터셉터의 속성 이름의 프리픽스를
StreamsConfig.consumerPrefix(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG)
와
StreamsConfig.producerPrefix(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG)
로 지정해야 한다.
부수적으로 인터셉터는 카프카 스트림즈 애플리케이션의 모든 레코드에서 작동하므로 로깅 인터셉터의 출력이 중요하다.
인터셉터 결과는 소스 코드를 설치한 로그 디렉토리에 있는 consumer_interceptor.log 및 producer_interceptor.log 로 출력
애플리케이션 메트릭
메트릭 카테고리 살펴보기
스레드 메트릭
평균 커밋, 폴링, 처리 작업 시간
초당 생성한 태스크 수, 초당 종료된 태스크 수
태스크 메트릭
초당 평균 커밋 횟수
평균 커밋 시간
프로세서 노드 메트릭
평균 및 최대 처리 시간
초당 평균 처리 작업 수
포워드 레이트
상태 저장소 메트릭
put, get, flush 작업의 평균 실행 시간
put, get, flush 작업의 초당 평균 실행 횟수
Monitor Kafka Streams Applications in Confluent
.
👉🏻 메트릭 구성
카프카 스트림즈는 이미 성능 메트릭을 수집하는 메커니즘을 제공
설정한 레벨에 따른 간으한 메트릭
메트릭 수집의 기본 레벨은 INFO
스레드
O
O
태스크
O
프로세서 노드
O
상태 저장소
O
레코드 캐시
O
메트릭을 위한 구성 변경
private static Properties getProperties() {
Properties props = new Properties();
props.put(StreamsConfig.CLIENT_ID_CONFIG, "zmart-metrics-client-id"); // 클라이언트 아이디
props.put(ConsumerConfig.GROUP_ID_CONFIG, "zmart-metrics-group-id"); // 그룹 아이디
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "zmart-metrics-application-id"); // 애플리케이션 아이디
props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG"); // 메트릭 로그 레벨
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // 브로커 접속 설정
props.put(StreamsConfig.producerPrefix(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG), Collections.singletonList(ZMartProducerInterceptor.class));
return props;
}
카프카 스트림즈 애플리케이션의 전체 범위를 측정하는 기본 메트릭이 있으며, DEBUG 레벨에서 메트릭 수집을 설정하려면 그 전에 성능 영향을 신중하게 고려해야 함
.
👉🏻 수집한 메트릭 확인 방법
카프카 스트림즈 애플리케이션의 메트릭을 수집하면 메트릭 리포터에게 배포
기본 메트릭 리포터를 JMX(Java Management Extensions)를 통해 제공
.
👉🏻 JMX(Java Management Extensions) 사용
자바 VM에서 실행되는 프로그램의 동작을 보는 표준 방법
JMX를 사용해 자바 VM 성능도 확인 가능
즉, JMX는 실행 중인 프로그램의 일부를 노출하는 인프라를 제공
추가적인 카프카 스트림즈 디버깅 기술
👉🏻 애플리케이션 구조 조회
Topology.describe()
메소드는 애플리케이션 구조에 관한 일반적인 정보를 제공

애플리케이션에서 실행 시간 정보를 보여주는
StreamThread
객체에 관한 정보를 얻는 것도 유용KafkaStreams.localThreadsMetadata() 메소드 사용
.
👉🏻 다양한 애플리케이션 상태 알림 받기
StateListener
사용
카프카 스트림즈 애플리케이션에서 가능한 여섯 가지 유효한 상태를 보여준다.
카프카 스트림즈 애플리케이션의 상태

// ZMartKafkaStreamsAdvancedReqsMetricsApp.java
KafkaStreams.StateListener stateListener = (newState, oldState) -> {
// REBALANCING 에서 RUNNING 으로 상태 전환
if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
LOG.info("Application has gone from REBALANCING to RUNNING ");
LOG.info("Topology Layout {}", streamsBuilder.build().describe());
}
// REBALANCING 단계 진입 시 액션
if (newState == KafkaStreams.State.REBALANCING) {
LOG.info("Application is entering REBALANCING phase");
}
};
상태 리스토어 리스너
카프카 스트림즈에서는 상태 저장소의 백업으로
변경로그
토픽을 사용변경로그는 변경이 발생한 상태 저장소의 업데이트를 기록
카프카 스트림즈 애플리케이션이 실패하거나 재시작할 때, 상태 저장소는 로컬 상태 파일에서 복구 가능
StateListener와 흡사한
StateRestoreListener
인터페이스는 애플리케이션 내부에서 일어나는 일들에 대한 알림을 허용onRestoreStart
,onBatchRestored
,onRestoreEnd
세 가지 메소드 존재
// LoggingStateRestoreListener.java
public class LoggingStateRestoreListener implements StateRestoreListener {
private static final Logger LOG = LoggerFactory.getLogger(LoggingStateRestoreListener.class);
private final Map<TopicPartition, Long> totalToRestore = new ConcurrentHashMap<>();
private final Map<TopicPartition, Long> restoredSoFar = new ConcurrentHashMap<>();
/**
* 리스토어 리스너 로깅
*/
@Override
public void onRestoreStart(TopicPartition topicPartition, String store, long start, long end) {
long toRestore = end - start;
totalToRestore.put(topicPartition, toRestore); // 복원할 주어진 topicPartition 총량 저장
LOG.info("Starting restoration for {} on topic-partition {} total to restore {}", store, topicPartition, toRestore);
}
/**
* 복원된 각 배치를 처리
*/
@Override
public void onBatchRestored(TopicPartition topicPartition, String store, long start, long batchCompleted) {
NumberFormat formatter = new DecimalFormat("#.##");
long currentProgress = batchCompleted + restoredSoFar.getOrDefault(topicPartition, 0L); // 복원된 전체 레코드 개수 계산
double percentComplete = (double) currentProgress / totalToRestore.get(topicPartition); // 복원이 완료된 백분율을 결정
LOG.info("Completed {} for {}% of total restoration for {} on {}",
batchCompleted, formatter.format(percentComplete * 100.00), store, topicPartition); // 복원된 백분율을 출력
restoredSoFar.put(topicPartition, currentProgress); // 지금까지 복원된 레코드 개수 저장
}
/**
* 복원 프로세스가 완료될 때
*/
@Override
public void onRestoreEnd(TopicPartition topicPartition, String store, long totalRestored) {
LOG.info("Restoration completed for {} on topic-partition {}", store, topicPartition);
restoredSoFar.put(topicPartition, 0L);
}
}
// CoGroupingListeningExampleApplication.java
KafkaStreams kafkaStreams = new KafkaStreams(topology, streamsConfig);
kafkaStreams.setGlobalStateRestoreListener(new LoggingStateRestoreListener()); // 글로벌 리스토어 리스너 정의
내부 컨슈머를 사용해 번경로그 토픽을 읽으므로 애플리케이션이 각 consumer.poll() 메소드 호출에서 레코드를 일괄적으로 복원
복원 프로세스가 최근 배치를 상태 저장소에 로드한 후
onBatchRestored
메소드가 호출애플리케이션이 복구 프로세스를 완료하면 복원된 마지막 리스너를 총 레코드 수와 함께 호출
uncaught 예외 핸들러
예기치 않은 오류 처리를 위해
KafkaStreams.setUncaughtExceptionHandler
제공
// CoGroupingListeningExampleApplication.java
kafkaStreams.setUncaughtExceptionHandler((thread, exception) ->
LOG.error("Thread [{}] encountered [{}]", thread.getName(), exception.getMessage())
);
📖 요약
카프카 스트림즈를 모니터링하려면 카프카 브로커도 살펴봐야 한다.
애플리케이션의 성능이 어떻게 되는지 보고 싶다면 메트릭 리포팅을 수시로 활성화해야 한다.
내부를 살펴볼 필요가 있으며 가끔
jstack
(스레드 덤프)과jmap/jhat
(힙 덤프) 같은 자바에 포함된 명령줄 도구를 사용해 좀 더 저 수준에서 애플리케이션 동작을 이해해야 한다.
8장. 카프카 스트림즈 애플리케이션 테스트
토폴로지 테스트
ProcessorTopologyTestDriver
를 사용하면 테스트 실행을 위해 카프카 없이도 테스트 작성이 가능
testImplementation("org.apache.kafka:kafka-streams:1.0.0")
testImplementation("org.apache.kafka:kafka-clients:1.0.0")
// ZMartTopology.java
public class ZMartTopology {
public static Topology build() {
Serde<Purchase> purchaseSerde = StreamsSerdes.PurchaseSerde();
Serde<PurchasePattern> purchasePatternSerde = StreamsSerdes.PurchasePatternSerde();
Serde<RewardAccumulator> rewardAccumulatorSerde = StreamsSerdes.RewardAccumulatorSerde();
Serde<String> stringSerde = Serdes.String();
StreamsBuilder streamsBuilder = new StreamsBuilder();
KStream<String, Purchase> purchaseKStream = streamsBuilder.stream("transactions", Consumed.with(stringSerde, purchaseSerde))
.mapValues(p -> Purchase.builder(p).maskCreditCard().build());
KStream<String, PurchasePattern> patternKStream = purchaseKStream.mapValues(purchase -> PurchasePattern.builder(purchase).build());
patternKStream.to("patterns", Produced.with(stringSerde, purchasePatternSerde));
KStream<String, RewardAccumulator> rewardsKStream = purchaseKStream.mapValues(purchase -> RewardAccumulator.builder(purchase).build());
rewardsKStream.to("rewards", Produced.with(stringSerde, rewardAccumulatorSerde));
purchaseKStream.to("purchases", Produced.with(Serdes.String(), purchaseSerde));
return streamsBuilder.build();
}
}
.
👉🏻 테스트 만들기
// ZMartTopologyTest.java
public class ZMartTopologyTest {
private ProcessorTopologyTestDriver topologyTestDriver;
@BeforeEach
public void setUp() {
Properties props = new Properties();
props.put(StreamsConfig.CLIENT_ID_CONFIG, "FirstZmart-Kafka-Streams-Client");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "zmart-purchases");
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "FirstZmart-Kafka-Streams-App");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
StreamsConfig streamsConfig = new StreamsConfig(props);
Topology topology = ZMartTopology.build(); // 토폴로지 획득
// ProcessorTopologyTestDriver 생성
topologyTestDriver = new ProcessorTopologyTestDriver(streamsConfig, topology);
}
@Test
@DisplayName("Testing the ZMart Topology Flow")
public void testZMartTopology() {
Serde<Purchase> purchaseSerde = StreamsSerdes.PurchaseSerde();
Serde<PurchasePattern> purchasePatternSerde = StreamsSerdes.PurchasePatternSerde();
Serde<RewardAccumulator> rewardAccumulatorSerde = StreamsSerdes.RewardAccumulatorSerde();
Serde<String> stringSerde = Serdes.String();
// 테스트 객체 생성
Purchase purchase = DataGenerator.generatePurchase();
// 토폴로지에 초기 레코드 전송
topologyTestDriver.process("transactions",
null,
purchase,
stringSerde.serializer(),
purchaseSerde.serializer());
// 레코드 읽기
ProducerRecord<String, Purchase> record = topologyTestDriver.readOutput("purchases",
stringSerde.deserializer(),
purchaseSerde.deserializer());
// 테스트 객체를 기대하는 형식으로 변환
Purchase expectedPurchase = Purchase.builder(purchase).maskCreditCard().build();
// 토폴로지로부터 레코드가 기대하는 레코드와 일치하는지 검사
assertThat(record.value(), equalTo(expectedPurchase));
RewardAccumulator expectedRewardAccumulator = RewardAccumulator.builder(expectedPurchase).build();
ProducerRecord<String, RewardAccumulator> accumulatorProducerRecord =
topologyTestDriver.readOutput("rewards", // rewards 토픽에서 레코드 읽기
stringSerde.deserializer(),
rewardAccumulatorSerde.deserializer());
assertThat(accumulatorProducerRecord.value(), equalTo(expectedRewardAccumulator)); // rewards 토픽 출력과 기댓값을 비교
PurchasePattern expectedPurchasePattern = PurchasePattern.builder(expectedPurchase).build();
ProducerRecord<String, PurchasePattern> purchasePatternProducerRecord =
topologyTestDriver.readOutput("patterns", // patterns 토픽에서 레코드 읽기
stringSerde.deserializer(),
purchasePatternSerde.deserializer());
assertThat(purchasePatternProducerRecord.value(), equalTo(expectedPurchasePattern)); // patterns 토픽 출력과 기댓값 비교
}
}
.
👉🏻 토폴로지에서 상태 저장소 테스트
// StockPerformanceStreamsProcessorTopologyTest.java
StockTransaction stockTransaction = DataGenerator.generateStockTransaction(); // 테스트 레코드 생성
topologyTestDriver.process("stock-transactions", // 테스트 드라이버로 레코드 처리
stockTransaction.getSymbol(),
stockTransaction,
stringSerde.serializer(),
stockTransactionSerde.serializer());
KeyValueStore<String, StockPerformance> store = topologyTestDriver.getKeyValueStore("stock-performance-store"); // 테스트 토폴로지로부터 상태 저장소 조회
assertThat(store.get(stockTransaction.getSymbol()), notNullValue());
.
👉🏻 프로세서와 트랜스포머 테스트
Processor
나Transformer
에 대한 단위 테스트 작성은 어렵지 않지만, 두 클래스 모두 상태 저장소를 얻고 펑추에이션 액션을 스케줄링하기 위해ProcessorContext
에 의존해야 한다.Mockito
같은 모의 객체 프레임워크를 사용해 테스트에서 모의 객체를 생성하는 방법이 있다.또 다른 옵션은
ProcessorTopologyTestDriver
와 동일한 테스트 라이브러리에 있는MockProcessorContext
객체를 사용하는 것
// CogroupingMethodHandleProcessorTest.java
private ProcessorContext processorContext = mock(ProcessorContext.class);
private MockKeyValueStore<String, Tuple<List<ClickEvent>, List<StockTransaction>>> keyValueStore = new MockKeyValueStore<>();
private CogroupingMethodHandleProcessor processor = new CogroupingMethodHandleProcessor(); // 테스트할 클래스
@Test
@DisplayName("Processor should initialize correctly")
public void testInitializeCorrectly() {
processor.init(processorContext); // ProcessorContext 에서 메소드 호출을 트리거하는 프로세서의 init 메서드 호출
verify(processorContext).schedule(eq(15000L), eq(STREAM_TIME), isA(Punctuator.class)); // 매개변수 검증
verify(processorContext).getStateStore(TUPLE_STORE_NAME); // 상태 저장소에서 받은 값 검증
}
@Test
@DisplayName("Punctuate should forward records")
public void testPunctuateProcess() {
when(processorContext.getStateStore(TUPLE_STORE_NAME)).thenReturn(keyValueStore);
processor.init(processorContext); // 프로세서의 init 메서드 호출
processor.process("ABC", Tuple.of(clickEvent, null));
processor.process("ABC", Tuple.of(null, transaction));
Tuple<List<ClickEvent>, List<StockTransaction>> tuple = keyValueStore.innerStore().get("ABC"); // process 메서드에서 상태 저장소에 있던 항목 꺼내기
List<ClickEvent> clickEvents = new ArrayList<>(tuple._1);
List<StockTransaction> stockTransactions = new ArrayList<>(tuple._2);
processor.cogroup(124722348947L); // punctuate 스케줄에 사용된 코그룹 메서드 호출
verify(processorContext).forward("ABC", Tuple.of(clickEvents, stockTransactions));
assertThat(tuple._1.size(), equalTo(0));
assertThat(tuple._2.size(), equalTo(0));
}
통합 테스트
카프카 테스트 라이브러리와 함께 사용 가능한 내장 카프카 클러스터를 사용할 수 있다.
내장 카프카 클러스터를 사용하면 언제든 개별 테스트 또는 전체 테스트의 일부분이 되었든 사용자 머신에서 카프카 클러스터가 필요한 통합 테스트 실행이 가능하다.
👉🏻 통합 테스트 구축
내장 카프카 서버를 사용하는 첫 단계는 3개의 추가적인 테스트 의존성을 추가하는 것
scala-library-2.12.4.jar
kafka_2.12-1.0.0-test.jar
kafka_2.12-1.0.0.jar
.
내장 카프카 클러스터 추가
// KafkaStreamsYellingIntegrationTest.java
/**
* 테스트를 위한 내장 카프카 브로커 추가
*/
private static final int NUM_BROKERS = 1;
@ClassRule
public static final EmbeddedKafkaCluster EMBEDDED_KAFKA = new EmbeddedKafkaCluster(NUM_BROKERS);
/**
* 토픽 만들기
*/
@BeforeClass
public static void setUpAll() throws Exception {
EMBEDDED_KAFKA.createTopic(YELL_A_TOPIC);
EMBEDDED_KAFKA.createTopic(OUT_TOPIC);
}
내장 카프카 클러스터 추가
다음 단계에 따라 통합 테스트 실행
(1) 카프카 스트림즈 애플리케이션 시작
(2) 소스 토픽에 레코드를 쓰고 정확한 결과인지 검증
(3) 패턴과 일치하는 새 토픽 생성
(4) 추가적인 레코드를 새로 생성된 토픽에 쓰고 정확한 결과인지 검증
// KafkaStreamsYellingIntegrationTest.java
@Test
public void shouldYellFromMultipleTopics() throws Exception {
StreamsBuilder streamsBuilder = new StreamsBuilder();
streamsBuilder.<String, String>stream(Pattern.compile("yell.*"))
.mapValues(String::toUpperCase)
.to(OUT_TOPIC);
kafkaStreams = new KafkaStreams(streamsBuilder.build(), streamsConfig);
kafkaStreams.start(); // 카프카 스트림즈 애플리케이션 시작
List<String> valuesToSendList = Arrays.asList("this", "should", "yell", "at", "you"); // 전송할 값 목록
List<String> expectedValuesList = valuesToSendList.stream()
.map(String::toUpperCase)
.collect(Collectors.toList()); // 기댓값 목록
IntegrationTestUtils.produceValuesSynchronously(YELL_A_TOPIC, // 내장 카프카로 값 생산
valuesToSendList,
producerConfig,
mockTime);
int expectedNumberOfRecords = 5;
List<String> actualValues = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(consumerConfig, // 카프카에서 레코드를 소비
OUT_TOPIC,
expectedNumberOfRecords);
assertThat(actualValues, equalTo(expectedValuesList)); // 읽은 값과 기댓값이 같은지 검증
EMBEDDED_KAFKA.createTopic(YELL_B_TOPIC);
valuesToSendList = Arrays.asList("yell", "at", "you", "too");
IntegrationTestUtils.produceValuesSynchronously(YELL_B_TOPIC,
valuesToSendList,
producerConfig,
mockTime);
expectedValuesList = valuesToSendList.stream().map(String::toUpperCase).collect(Collectors.toList());
expectedNumberOfRecords = 4;
actualValues = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(consumerConfig,
OUT_TOPIC,
expectedNumberOfRecords);
assertThat(actualValues, equalTo(expectedValuesList));
}
Last updated