스프링은 예외에 따라 HTTP 응답코드를 반환하는 @RestControllerAdvice 어노테이션을 제공
전역 예외처리를 위해 @RestControllerAdvice를 이용한 ExceptionAdvisor 구현
@RestControllerAdviceclassExceptionAdvisor : ResponseEntityExceptionHandler() {@ExceptionHandler(NoSuchElementException::class)funhandleNoSuchElementException(exception: NoSuchElementException, webRequest: WebRequest): ResponseEntity<Any> {val body =mapOf("timestamp" to LocalDateTime.now(),"status" to HttpStatus.NOT_FOUND.value(),"error" to exception::class.simpleName,"message" to exception.message,"path" to webRequest.getDescription(false) )returnResponseEntity(body, HttpStatus.NOT_FOUND) }@ExceptionHandler(IllegalArgumentException::class)funhandleIllegalArgumentException(exception: IllegalArgumentException, webRequest: WebRequest): ResponseEntity<Any> {val body =mapOf("timestamp" to LocalDateTime.now(),"status" to HttpStatus.BAD_REQUEST.value(),"error" to exception::class.simpleName,"message" to exception.message,"path" to webRequest.getDescription(false) )returnResponseEntity(body, HttpStatus.BAD_REQUEST) }@ExceptionHandler(RuntimeException::class)funhandleRuntimeException(exception: RuntimeException, webRequest: WebRequest): ResponseEntity<Any> {val body =mapOf("timestamp" to LocalDateTime.now(),"status" to HttpStatus.INTERNAL_SERVER_ERROR.value(),"error" to exception::class.simpleName,"message" to exception.message,"path" to webRequest.getDescription(false) )returnResponseEntity(body, HttpStatus.INTERNAL_SERVER_ERROR) }}
마이크로서비스 모듈
RESTful API는 핵사고날 아키텍처에서 인바운드 어댑터
어댑터는 비즈니스 로직을 포함하면 안되고
외부 요청을 받아 애플리케이션 서비스에 요청을 위임
비즈니스와 관련있는 모듈인 service 패키지와 분리
RESTful API를 적용한 마이크로서비스의 모듈은 endpoint
핵사고날 아키텍처에서 외부 요청을 받는 인바운드 어댑터인 RESTful API를 endpoint 패키지에 두면서 애그리게이트 단위로 분리
아웃바운드 어댑터와 RESTful API
아웃바운드 어댑터는 아파치 HttpClient나 스프링 WebClient와 같이 HTTP를 지원하는 다양한 라이브러리를 사용할 수 있지만 중복 코드를 작성해야 하는 단점이 존재
대안으로 스프링이 제공하는 FeignClient 사용
FeignClient는 인터페이스 선언만으로 다른 마이크로서비스를 사용할 수 있는 개발 편의성을 제공
spring-cloud-starter-openfeign 의존성
@FeignClient는 오퍼레이션에 선언한 @xMapping에 따라 RESTful API를 호출하고 결과를 반환
@FeignClient(value = "cart")
interface CartClient {
@GetMapping("/cart/{itemId}")
fun queryItem(@PathVariable itemId: String): Item
}
data class Item(
var cartId: String? = null,
var productNo: String? = null,
var productName: String? = null,
var price: Int = 0,
var quantity: Int = 0
)
사용하지 않는 속성을 알고 있는 것보다는 코드 중복이 있더라도 개발자가 유지하는 소스 코드 단위로 필요한 속성만 선언하면 독립성을 높일 수 있다.
이벤트 브로커
브로커는 생산자와 소비자간 메시지를 주고 받는 가교 역학을 하는데, 일반적으로 서로 다른 시스템 간 데이터를 교환하면서 비동기 방식으로 처리하기 위한 목적으로 사용
메시지와 이벤트는 서로 다른 생명주기를 갖음
메시지는 등록되어 있는 소비자가 읽어가면서 삭제
이벤트는 소비자가 다시 읽을 수 있도록 저장소에 보관
이벤트 브로커는 생산자와 소비자간 메시지를 주고 받는 세 가지 패턴 존재
단일 생산자 : 단일 소비자
단일 생산자 : 다중 소비자
다중 생산자 : 다중 소비자
도커와 카프카
카프카는 주키퍼 기반으로 동작하므로 두 개의 컨테이너를 한 번에 선언해 실행하는 docker compose를 사용
version:'2'services:zookeeper:# (1) 컨테이너 정의container_name:zookeeperimage:wurstmeister/zookeeper:3.4.6expose: - "2181"ports: - "2181:2181"kafka:# (2) 컨테이너 정의container_name:kafkaimage:wurstmeister/kafka:2.12-2.4.1depends_on:# (3) zookeeper 컨테이너를 먼저 실행 후 kafka 컨테이너를 실행 - zookeeperexpose: - "9092"ports: - "9092:9092"environment:KAFKA_ADVERTISED_HOST_NAME:172.30.1.90# (4) docker를 실행하고 있는 서버 또는 개발자 PCKAFKA_ADVERTISED_PORT:9092KAFKA_ZOOKEEPER_CONNECT:zookeeper:2181volumes: - /var/run/docker.sock:/var/run/docker.sock
docker-compose 명령어로 주키퍼와 카프카를 실행
$docker-composeup-d
웹 UI를 제공하는 kafdrop, Offset Explorer, kafka-ui 등 다양한 도구 사용
MessageMapper는 1차로 발행한 이벤트가 변환 대상이면 2차 메시지를 생성해 반환
MessageMapper
/* MessageMapper.kt */interfaceMessageMapper {funmap(event: Event): Message?}.../* NotificationMessageMapper.kt */@ComponentclassNotificationMessageMapper(privateval kafkaTemplate: KafkaTemplate<String, Message<*>>) : MessageMapper(kafkaTemplate) {companionobject {privateconstval defaultOutput ="notification" }funmap(event: OrderCompleted) {val id = UUID.randomUUID().toString().split("-")[0]val transformedEvent =PutAlert( id,"A new order has been placed.","/order/${event.orderNo}" )val kafkaMessage =KafkaMessage( eventId = transformedEvent.identifier, type = transformedEvent::class.java.typeName, payload = JsonUtil.toJson(transformedEvent), time = transformedEvent.time() )val message: Message<String> = MessageBuilder .withPayload(JsonUtil.toJson(kafkaMessage)) .setHeader(KafkaHeaders.TOPIC, defaultOutput) .build() kafkaTemplate.send(message) }}.../* KafkaMessageRelay.kt */@ComponentclassKafkaMessageRelay(privateval context: ApplicationContext,privateval eventStore: EventStore,privateval kafkaTemplate: KafkaTemplate<String, Message<*>>) {privateval messageMappers: List<MessageMapper>init { messageMappers = context.getBeanNamesForType(MessageMapper::class.java) .map { context.getBean(it, MessageMapper::class.java) } }@Scheduled(fixedDelay =500)funpublish() {val events = eventStore.retrieve() events.forEach { event ->// Kafka 기본 메시지 전송 kafkaTemplate.send(domainMessage)// MessageMapper를 사용해 변환된 메시지 처리 messageMappers.forEach { mapper ->val transformedEvent = mapper.map(event) transformedEvent?.let {val message =KafkaMessage( eventId = event.eventId(), type = it::class.java.typeName, payload = JsonUtil.toJson(it) )val transformedMessage: Message<String> = MessageBuilder .withPayload(message.toJson()) .setHeader(KafkaHeader.TOPIC, message.topicName) .build() kafkaTemplate.send(transformedMessage) } } event.relayed =true eventStore.update(event) } }}
서비스 내부 이벤트와 외부 이벤트
Event 클래스에 outbox 속성을 추가해 내부 이벤트와 외부 발행 이벤트를 구분
메시지 릴레이는 outbox=true 인 경우에만 카프카와 같은 이벤트 브로커로 이벤트를 발행하도록 변경
@Scheduled(fixedDelay =500)funpublish() {val events = eventStore.retrieve() events.forEach { event ->// 스프링 컨텍스트에 이벤트 발행if (event.isOutbox()) {// 브로커로 이벤트 발행 } }}
인바운드 어댑터와 이벤트 소비
카프카에 발행한 메시지를 수신해 이벤트에 반응하기 위해 스프링 카프카가 제공하는 MessageListener를 사용
카프카도 메시지 전달을 위해 최소 한 번 이상 전달됨(At-Least-Once)을 보장
최소 한 번 이상이므로 동일한 이벤트를 여러 번 수신하더라도 서비스는 동일 이벤트에 한 번만 반응해야 함
이벤트 핸들러가 이벤트를 정확하게 한 번 처리하기 위해 메시지 릴레이와 같은 방식으로 이벤트를 수신해 저장소에 저장하는 책임과
이벤트를 스프링 컨텍스트에 발행하는 책임을 가진 **리버스 릴레이(ReverseRelay)**를 사용 가능
리버스 릴레이(ReverseRelay)는 주기적으로 수신한 이벤트의 저장소에서 메시지를 조회해 처리한 후 플레그 값을 변경하거나 이벤트를 삭제
이를 트랜잭셔널 아웃박스 패턴의 반대의 의미로 트랜잭셔널 인박스 패턴(Transactional Inbox)이라고 한다.
수신한 이벤트를 테이블에 저장
@ComponentclassCartStreamListener(privateval messageStore: MessageStore) {privateval logger = LoggerFactory.getLogger(CartStreamListener::class.java)@KafkaListener( topics = ["${broker.topic}"], groupId = ["${spring.application.name}"] )funon(message: String) {// JSON으로 직렬화한 KakfaMessage를 수신val kafkaMessage = JsonUtil.fromJson(message, KafkaMessage::class.java)try {val clazz = Class.forName(kafkaMessage.typeName) as Class<Message>// 기술에 중립적인 Message 객체로 역지렬화val msg = JsonUtil.fromJson(kafkaMessage.payload, clazz)// MessageStore를 이용해 수신 메시지를 테이블에 저장 messageStore.save(msg) } catch (e: ClassNotFoundException) { logger.warn("Could not find class ${kafkaMessage.typeName}") } }}
수신한 이벤트를 스프링 컨텍스트에 발행
ComponentclassReverseRelay(privateval messageStore: MessageStore,privateval eventPublisher: ApplicationEventPublisher) {@Scheduled(fixedDelay =100)funpublish() {// 메시지가 도착한 순서로 조회val messages = messageStore.retrieveUnexecutedMessages() messages.forEach { message -> message.ifPresent { msg ->// 스프링 컨텍스트에 메시지를 발행 eventPublisher.publishEvent(msg)// 메시지를 처리한 결과가 정상이면 발행한 메시지는 처리된 것으로 플래그를 변경 messageStore.update(msg) } } }}
이벤트 어댑터와 마이크로서비스 모듈
이벤트 어댑터와 추상화된 핵심
EventStore는 메시지 릴레이에서 이벤트 브로커로 도메인 이벤트를 발행하기 위해 사용하는 전용 인터페이스