First Come First Served

First Come First Served Event

μ‹€μŠ΅μœΌλ‘œ λ°°μš°λŠ” μ„ μ°©μˆœ 이벀트 μ‹œμŠ€ν…œ κ°•μ˜λ₯Ό λ“£κ³  μš”μ•½ν•œ λ‚΄μš©μž…λ‹ˆλ‹€.

Intro

μš”κ΅¬μ‚¬ν•­.

  • μ„ μ°©μˆœ 100λͺ…μ—κ²Œ 할인쿠폰을 μ œκ³΅ν•˜λŠ” 이벀트

  • μ„ μ°©μˆœ 100λͺ…μ—κ²Œλ§Œ μ§€κΈ‰λ˜μ–΄μ•Όν•œλ‹€.

  • 101개 이상이 μ§€κΈ‰λ˜λ©΄ μ•ˆλœλ‹€.

  • μˆœκ°„μ μœΌλ‘œ λͺ°λ¦¬λŠ” νŠΈλž˜ν”½μ„ 버틸 수 μžˆμ–΄μ•Όν•œλ‹€.

μ„ μ°©μˆœ 이벀트 진행 μ‹œ λ°œμƒν•  수 μžˆλŠ” 문제점.

  • 쿠폰이 κ°œμˆ˜λ³΄λ‹€ 더 많이 λ°œκΈ‰λ˜μ—ˆμ„ 경우

  • 이벀트 νŽ˜μ΄μ§€ 접속이 μ•ˆ 될 경우

  • μ΄λ²€νŠΈλž‘ μƒκ΄€μ—†λŠ” νŽ˜μ΄μ§€λ„ 느렀질 경우

문제 ν•΄κ²°.

  • νŠΈλž˜ν”½μ΄ λͺ°λ Έμ„ λ•Œ λŒ€μ²˜ν•  방법

  • Redis λ₯Ό ν™œμš©ν•˜μ—¬ 쿠폰 λ°œκΈ‰ 개수 보μž₯ν•˜λŠ” 방법

  • Kafka λ₯Ό ν™œμš©ν•˜μ—¬ λ‹€λ₯Έ νŽ˜μ΄μ§€λ“€μ— λŒ€ν•œ 영ν–₯도λ₯Ό μ€„μ΄λŠ” 방법

Race Condition

@Service
@RequiredArgsConstructor
public class ApplyService {

    private final CouponRepository couponRepository;

    public void apply(Long userId) {
        final long count = couponRepository.count();

        if (count > 100) {
            return;
        }

        couponRepository.save(new Coupon(userId));
    }
}

μ•„λž˜ ν…ŒμŠ€νŠΈμ—μ„œ 100의 κ²°κ³Όλ₯Ό μ˜ˆμƒν–ˆμ§€λ§Œ, λ™μ‹œμ— λ“€μ–΄μ˜€λŠ” μš”μ²­λ“€μ΄ κ°±μ‹  μ „ 값을 읽고 데이터λ₯Ό μΆ”κ°€ν•˜λ©΄μ„œ μ˜ˆμƒν–ˆλ˜ 개수λ₯Ό μ΄ˆκ³Όν•˜λŠ” ν˜„μƒμ΄ λ°œμƒν•˜κ²Œ λ©λ‹ˆλ‹€.

@Test
void apply_multiple() throws InterruptedException {
    int threadCount = 1000;
    ExecutorService executorService = Executors.newFixedThreadPool(32);
    CountDownLatch latch = new CountDownLatch(threadCount);

    for (int i = 0; i < threadCount; i++) {
        long userId = i;
        executorService.submit(() -> {
            try {
                applyService.apply(userId);
            } finally {
                latch.countDown();
            }
        });
    }

    latch.await();

    final long count = couponRepository.count();

    assertThat(count).isEqualTo(100);
}

Concurrency issues μ—μ„œ 배운 것과 같이 Java Synchronized λ₯Ό μ μš©ν•΄λ³Ό 수 μžˆμ§€λ§Œ,

μ„œλ²„κ°€ μ—¬λŸ¬ λŒ€κ°€ λœλ‹€λ©΄ Race Condition 이 λ‹€μ‹œ λ°œμƒν•˜κ²Œ λ˜λ―€λ‘œ μ μ ˆν•˜μ§€ μ•ŠμŠ΅λ‹ˆλ‹€.

.

또 λ‹€λ₯Έ λ°©λ²•μœΌλ‘œ MySQL, Redis λ₯Ό ν™œμš©ν•œ 락을 κ΅¬ν˜„ν•΄μ„œ ν•΄κ²°ν•  μˆ˜λ„ μžˆμ„ 것 κ°™μ§€λ§Œ,

쿠폰 κ°œμˆ˜μ— λŒ€ν•œ 정합성을 μ›ν•˜λŠ”λ° 락을 ν™œμš©ν•˜μ—¬ κ΅¬ν˜„ν•˜λ©΄, λ°œκΈ‰λœ 쿠폰의 개수λ₯Ό κ°€μ Έμ˜€λŠ” 것뢀터 쿠폰을 생성할 λ•ŒκΉŒμ§€ 락을 κ±Έμ–΄μ•Ό ν•©λ‹ˆλ‹€.

μ΄λ ‡κ²Œ 되면 락을 κ±°λŠ” ꡬ간이 κΈΈμ–΄μ§€λ‹€λ³΄λ‹ˆ μ„±λŠ₯에 뢈이읡(락이 풀릴 λ•ŒκΉŒμ§€ 쿠폰 λ°œκΈ‰μ„ κΈ°λ‹€λ €μ•Ό ν•˜λŠ”)이 λ°œμƒν•  수 μžˆμŠ΅λ‹ˆλ‹€.

Redis

Redis incr λͺ…λ Ήμ–΄λŠ” 킀에 λŒ€ν•œ 값을 1μ”© μ¦κ°€μ‹œν‚€λŠ” λͺ…λ Ήμ–΄μž…λ‹ˆλ‹€.

Redis λŠ” μ‹±κΈ€μŠ€λ ˆλ“œ 기반으둜 λ™μž‘ν•˜μ—¬ 레이슀 μ»¨λ””μ…˜μ„ ν•΄κ²°ν•  수 μžˆμ„ 뿐 μ•„λ‹ˆλΌ incr λͺ…λ Ήμ–΄λŠ” μ„±λŠ₯도 ꡉμž₯히 λΉ λ₯Έ λͺ…λ Ήμ–΄μž…λ‹ˆλ‹€.

incr λͺ…λ Ήμ–΄λ₯Ό μ‚¬μš©ν•˜μ—¬ λ°œκΈ‰λœ 쿠폰 개수λ₯Ό μ œμ–΄ν•œλ‹€λ©΄ μ„±λŠ₯도 λΉ λ₯΄λ©° 데이터 정합성도 지킬 수 μžˆμŠ΅λ‹ˆλ‹€.

> incr coupon_count
(integer) 1

> incr coupon_count
(integer) 2

commit

Problems

1) 쿠폰의 개수

  • λ°œκΈ‰ν•˜λŠ” 쿠폰의 κ°œμˆ˜κ°€ λ§Žμ•„μ§ˆμˆ˜λ‘ λ°μ΄ν„°λ² μ΄μŠ€μ— λΆ€ν•˜λ₯Ό 주게 λ©λ‹ˆλ‹€.

  • ν•΄λ‹Ή λ°μ΄ν„°λ² μ΄μŠ€κ°€ λ‹€λ₯Έ κ³³μ—μ„œλ„ μ‚¬μš©λ˜κ³  μžˆλ‹€λ©΄ μ„œλΉ„μŠ€ μž₯μ• κΉŒμ§€ λ°œμƒν•  수 μžˆμŠ΅λ‹ˆλ‹€.

2) 짧은 μ‹œκ°„μ— λ§Žμ€ μš”μ²­

  • 짧은 μ‹œκ°„ 내에 λ§Žμ€ μš”μ²­μ΄ λ“€μ–΄μ˜€κ²Œ 될 경우 DB μ„œλ²„μ˜ λ¦¬μ†ŒμŠ€λ₯Ό 많이 μ‚¬μš©ν•˜κ²Œ λ˜λ―€λ‘œ λΆ€ν•˜κ°€ λ°œμƒν•˜κ²Œ λ©λ‹ˆλ‹€.

  • μ„œλΉ„μŠ€ 지연 ν˜Ήμ€ 였λ₯˜κ°€ λ°œμƒν•  수 μžˆμŠ΅λ‹ˆλ‹€.

Kafka

λΆ„μ‚° 이벀트 슀트리밍 ν”Œλž«νΌ

  • 이벀트 슀트리밍: μ†ŒμŠ€μ—μ„œ λͺ©μ μ§€κΉŒμ§€ 이벀트λ₯Ό μ‹€μ‹œκ°„μœΌλ‘œ 슀트리밍 ν•˜λŠ” 것

Producer ---> Topic <--- Consumer

Start Kafka

docker-compose.yml

version: '3' # Docker Compose 파일 버전 지정
services: # μ—¬λŸ¬κ°œμ˜ Docker μ»¨ν…Œμ΄λ„ˆ μ„œλΉ„μŠ€ μ •μ˜
  zookeeper: # Zookeeper μ„œλΉ„μŠ€ μ •μ˜
    image: wurstmeister/zookeeper:3.4.6
    container_name: zookeeper
    ports:
      - "2181:2181" # 호슀트의 2181 포트λ₯Ό μ»¨ν…Œμ΄λ„ˆμ˜ 2181 ν¬νŠΈμ™€ 바인딩
  kafka: # kafka μ„œλΉ„μŠ€ μ •μ˜
    image: wurstmeister/kafka:2.12-2.5.0
    container_name: kafka
    ports:
      - "9092:9092" # 호슀트의 9092 포트λ₯Ό μ»¨ν…Œμ΄λ„ˆμ˜ 9092 ν¬νŠΈμ™€ 바인딩
    environment: # kafka μ»¨ν…Œμ΄λ„ˆμ˜ ν™˜κ²½ λ³€μˆ˜ μ„€μ •
      KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:29092,OUTSIDE://localhost:9092 # λ‚΄/μ™ΈλΆ€μ—μ„œ μ ‘κ·Όν•  수 μžˆλŠ” λ¦¬μŠ€λ„ˆ μ£Όμ†Œ μ„€μ •
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT # λ¦¬μŠ€λ„ˆμ˜ λ³΄μ•ˆ ν”„λ‘œν† μ½œ 맀핑
      KAFKA_LISTENERS: INSIDE://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092 # μ»¨ν…Œμ΄λ„ˆ λ‚΄λΆ€μ—μ„œ μ‚¬μš©ν•  λ¦¬μŠ€λ„ˆ μ£Όμ†Œ μ„€μ •
      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE # 브둜컀 κ°„ 톡신에 μ‚¬μš©ν•  λ¦¬μŠ€λ„ˆ 이름
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 # Kafkaκ°€ Zookeeper에 μ—°κ²°ν•˜κΈ° μœ„ν•œ μ£Όμ†Œ
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock # Docker μ†ŒμΌ“μ„ μ»¨ν…Œμ΄λ„ˆμ™€ κ³΅μœ ν•˜μ—¬ Docker 이벀트λ₯Ό 관리할 수 μžˆλ„λ‘ μ„€μ •

example

# μΉ΄ν”„μΉ΄ μ‹€ν–‰
$ docker-compose up -d

# 토픽생성
$ docker exec -it kafka kafka-topics.sh --bootstrap-server localhost:9092 --create --topic testTopic
Created topic testTopic.

# ν”„λ‘œλ“€μ„œ μ‹€ν–‰
$ docker exec -it kafka kafka-console-producer.sh --topic testTopic --broker-list 0.0.0.0:9092
>Hello

# 컨슈머 μ‹€ν–‰
$ docker exec -it kafka kafka-console-consumer.sh --topic testTopic --bootstrap-server localhost:9092
Hello

# μΉ΄ν”„μΉ΄ μ’…λ£Œ
$ docker-compose down

Producer

api/KafkaProducerConfig.java

@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, Long> producerFactory() {
        final Map<String, Object> config = new HashMap<>();

        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class);

        return new DefaultKafkaProducerFactory<>(config);
    }

    @Bean
    public KafkaTemplate<String, Long> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

api/CouponCreateProducer.java

@Component
@RequiredArgsConstructor
public class CouponCreateProducer {

    private final KafkaTemplate<String, Long> kafkaTemplate;

    public void create(Long userId) {
        kafkaTemplate.send("coupon_create", userId);
    }
}

KafkaProducerConfig CouponCreateProducer

Consumer

consumer/KafkaConsumerConfig.java

@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, Long> consumerFactory() {
        final Map<String, Object> config = new HashMap<>();

        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_1");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);

        return new DefaultKafkaConsumerFactory<>(config);
    }

    /**
     * ν† ν”½μœΌλ‘œλΆ€ν„° λ©”μ‹œμ§€λ₯Ό 전달받기 μœ„ν•œ kafka-listener λ₯Ό λ§Œλ“œλŠ” kafka-listener-container-factory 생성
      */
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Long> kafkaListenerContainerFactory() {
        final ConcurrentKafkaListenerContainerFactory<String, Long> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        return factory;
    }
}

consumer/CouponCreatedConsumer.java

@Component
@RequiredArgsConstructor
public class CouponCreatedConsumer {

    private final CouponRepository couponRepository;

    @KafkaListener(topics = "coupon_create", groupId = "group_1")
    public void listener(Long userId) {
        couponRepository.save(new Coupon(userId));
    }
}

using Consumer

Limit the number of coupons

λ°œκΈ‰ κ°€λŠ₯ν•œ 쿠폰 개수 1인당 1개둜 μ œν•œν•˜κΈ°.

(1) λ°μ΄ν„°λ² μ΄μŠ€ μœ λ‹ˆν¬ν‚€ μ‚¬μš©ν•˜κΈ°

  • userId, couponType 에 μœ λ‹ˆν¬ ν‚€λ₯Ό μ μš©ν•˜λŠ” κ°€μž₯ κ°„λ‹¨ν•œ 방법.

  • 보톡 μ„œλΉ„μŠ€λŠ” ν•œ μœ μ €κ°€ 같은 νƒ€μž…μ˜ 쿠폰을 μ—¬λŸ¬κ°œ κ°€μ§ˆ 수 μžˆμœΌλ―€λ‘œ μ‹€μš©μ μΈ 방법은 μ•„λ‹˜.

(2) λ²”μœ„λ‘œ 락을 작고 μ²˜μŒμ— 쿠폰 λ°œκΈ‰ μ—¬λΆ€λ₯Ό κ°€μ Έμ™€μ„œ νŒλ‹¨ν•˜λŠ” 방식

  • 쿠폰 λ°œκΈ‰ κ°€λŠ₯ μ—¬λΆ€λ§Œ νŒλ‹¨ν•˜κ³  μ‹€μ œ 쿠폰 생성은 μ»¨μŠˆλ¨Έμ—μ„œ μˆ˜ν–‰ν•˜κ³  μžˆμœΌλ―€λ‘œ μ‹œκ°„μ°¨λ‘œ 2개 μ΄μƒμ˜ 쿠폰이 λ°œκΈ‰λ  수 있음.

  • 직접 쿠폰을 λ°œκΈ‰ν•˜λ”λΌλ„ 락의 λ²”μœ„κ°€ λ„“μ–΄μ„œ λ‹€λ₯Έ μš”μ²­λ“€μ€ 락이 끝날 λ•ŒκΉŒμ§€ λŒ€κΈ°μƒνƒœκ°€ λ˜λ―€λ‘œ μ„±λŠ₯이 μ•ˆ μ’‹μ•„μ§ˆ 수 있음.

(3) Rest μ—μ„œ Set ν™œμš©ν•˜κΈ°

# add set in redis
> sadd test 1
(integer) 1 # μΆ”κ°€λœ value 개수

> sadd test 1
(integer) 0 # 이미 ν‚€(test)에 κ°’(1)이 μ‘΄μž¬ν•˜λ―€λ‘œ 0을 리턴

AppliedUserRepository.java

@Repository
@RequiredArgsConstructor
public class AppliedUserRepository {

    private final RedisTemplate<String, String> redisTemplate;

    public Long add(Long userId) {
        return redisTemplate
                .opsForSet()
                .add("applied_user", userId.toString());
    }
}

...

/* Service */
final Long apply = appliedUserRepository.add(userId);
if (apply != 1) {
    return;
}

commit

Error issuing coupon

(1) μ‹€νŒ¨ 이벀트 관리 ν…Œμ΄λΈ” ν™œμš©ν•˜κΈ°

  • μ»¨μŠˆλ¨Έμ—μ„œ 쿠폰 λ°œκΈ‰ 쀑 μ—λŸ¬κ°€ λ°œμƒν•  경우 FailedEvent ν…Œμ΄λΈ”μ— μ‹€νŒ¨ν•œ 이벀트λ₯Ό μ €μž₯ν•©λ‹ˆλ‹€.

  • 이후 배치λ₯Ό 톡해 FailedEvent ν…Œμ΄λΈ”μ— μŒ“μΈ 데이터λ₯Ό 주기적으둜 μ½μ–΄μ„œ 쿠폰을 λ°œκΈ‰ν•΄ μ€€λ‹€λ©΄ 결과적으둜 νŠΉμ • μˆ˜λŸ‰μ˜ 쿠폰이 λͺ¨λ‘ λ°œκΈ‰λ  수 μžˆμŠ΅λ‹ˆλ‹€.

(3) Saga Pattern μ μš©ν•˜κΈ°

  • λ§ˆμ΄ν¬λ‘œμ„œλΉ„μŠ€λ“€λΌλ¦¬ 이벀트λ₯Ό μ£Όκ³  λ°›μ•„ νŠΉμ • λ§ˆμ΄ν¬λ‘œμ„œλΉ„μŠ€μ—μ„œμ˜ μž‘μ—…μ΄ μ‹€νŒ¨ν•˜λ©΄ μ΄μ „κΉŒμ§€μ˜ μž‘μ—…μ΄ μ™„λ£Œλœ λ§ˆμ΄ν¬μ„œλΉ„μŠ€λ“€μ—κ²Œ 보상(complemetary) 이벀트λ₯Ό μ†Œμ‹±ν•¨μœΌλ‘œμ¨ λΆ„μ‚° ν™˜κ²½μ—μ„œ μ›μžμ„±(atomicity)을 보μž₯ν•˜λŠ” νŒ¨ν„΄

commit

Last updated