First Come First Served Event
μ€μ΅μΌλ‘ λ°°μ°λ μ μ°©μ μ΄λ²€νΈ μμ€ν
κ°μλ₯Ό λ£κ³ μμ½ν λ΄μ©μ
λλ€.
Intro
μꡬμ¬ν.
μ μ°©μ 100λͺ
μκ² ν μΈμΏ ν°μ μ 곡νλ μ΄λ²€νΈ
μ μ°©μ 100λͺ
μκ²λ§ μ§κΈλμ΄μΌνλ€.
101κ° μ΄μμ΄ μ§κΈλλ©΄ μλλ€.
μκ°μ μΌλ‘ λͺ°λ¦¬λ νΈλν½μ λ²νΈ μ μμ΄μΌνλ€.
μ μ°©μ μ΄λ²€νΈ μ§ν μ λ°μν μ μλ λ¬Έμ μ .
μΏ ν°μ΄ κ°μλ³΄λ€ λ λ§μ΄ λ°κΈλμμ κ²½μ°
μ΄λ²€νΈ νμ΄μ§ μ μμ΄ μ λ κ²½μ°
μ΄λ²€νΈλ μκ΄μλ νμ΄μ§λ λλ €μ§ κ²½μ°
λ¬Έμ ν΄κ²°.
νΈλν½μ΄ λͺ°λ Έμ λ λμ²ν λ°©λ²
Redis λ₯Ό νμ©νμ¬ μΏ ν° λ°κΈ κ°μ 보μ₯νλ λ°©λ²
Kafka λ₯Ό νμ©νμ¬ λ€λ₯Έ νμ΄μ§λ€μ λν μν₯λλ₯Ό μ€μ΄λ λ°©λ²
Race Condition
Copy @Service
@RequiredArgsConstructor
public class ApplyService {
private final CouponRepository couponRepository;
public void apply(Long userId) {
final long count = couponRepository.count();
if (count > 100) {
return;
}
couponRepository.save(new Coupon(userId));
}
}
μλ ν
μ€νΈμμ 100μ κ²°κ³Όλ₯Ό μμνμ§λ§, λμμ λ€μ΄μ€λ μμ²λ€μ΄ κ°±μ μ κ°μ μ½κ³ λ°μ΄ν°λ₯Ό μΆκ°
νλ©΄μ μμνλ κ°μλ₯Ό μ΄κ³Όνλ νμμ΄ λ°μνκ² λ©λλ€.
Copy @Test
void apply_multiple() throws InterruptedException {
int threadCount = 1000;
ExecutorService executorService = Executors.newFixedThreadPool(32);
CountDownLatch latch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
long userId = i;
executorService.submit(() -> {
try {
applyService.apply(userId);
} finally {
latch.countDown();
}
});
}
latch.await();
final long count = couponRepository.count();
assertThat(count).isEqualTo(100);
}
Concurrency issues μμ λ°°μ΄ κ²κ³Ό κ°μ΄ Java Synchronized λ₯Ό μ μ©ν΄λ³Ό μ μμ§λ§,
μλ²κ° μ¬λ¬ λκ° λλ€λ©΄ Race Condition μ΄ λ€μ λ°μνκ² λλ―λ‘ μ μ νμ§ μμ΅λλ€.
.
λ λ€λ₯Έ λ°©λ²μΌλ‘ MySQL, Redis λ₯Ό νμ©ν λ½μ ꡬνν΄μ ν΄κ²°ν μλ μμ κ² κ°μ§λ§,
μΏ ν° κ°μμ λν μ ν©μ±μ μνλλ° λ½μ νμ©νμ¬ κ΅¬ννλ©΄, λ°κΈλ μΏ ν°μ κ°μλ₯Ό κ°μ Έμ€λ κ²λΆν° μΏ ν°μ μμ±ν λκΉμ§ λ½μ κ±Έμ΄μΌ ν©λλ€.
μ΄λ κ² λλ©΄ λ½μ κ±°λ ꡬκ°μ΄ κΈΈμ΄μ§λ€λ³΄λ μ±λ₯μ λΆμ΄μ΅(λ½μ΄ ν릴 λκΉμ§ μΏ ν° λ°κΈμ κΈ°λ€λ €μΌ νλ)μ΄ λ°μν μ μμ΅λλ€.
Redis
Redis incr
λͺ
λ Ήμ΄λ ν€μ λν κ°μ 1μ© μ¦κ°μν€λ λͺ
λ Ήμ΄μ
λλ€.
Redis λ μ±κΈμ€λ λ κΈ°λ°
μΌλ‘ λμνμ¬ λ μ΄μ€ 컨λμ
μ ν΄κ²°
ν μ μμ λΏ μλλΌ incr λͺ
λ Ήμ΄λ μ±λ₯λ κ΅μ₯ν λΉ λ₯Έ
λͺ
λ Ήμ΄μ
λλ€.
incr
λͺ
λ Ήμ΄λ₯Ό μ¬μ©νμ¬ λ°κΈλ μΏ ν° κ°μλ₯Ό μ μ΄νλ€λ©΄ μ±λ₯λ λΉ λ₯΄λ©° λ°μ΄ν° μ ν©μ±λ μ§ν¬ μ μμ΅λλ€.
Copy > incr coupon_count
(integer) 1
> incr coupon_count
(integer) 2
commit
Problems
1) μΏ ν°μ κ°μ
λ°κΈνλ μΏ ν°μ κ°μκ° λ§μμ§μλ‘ λ°μ΄ν°λ² μ΄μ€μ λΆν λ₯Ό μ£Όκ² λ©λλ€.
ν΄λΉ λ°μ΄ν°λ² μ΄μ€κ° λ€λ₯Έ κ³³μμλ μ¬μ©λκ³ μλ€λ©΄ μλΉμ€ μ₯μ κΉμ§ λ°μν μ μμ΅λλ€.
2) 짧μ μκ°μ λ§μ μμ²
짧μ μκ° λ΄μ λ§μ μμ²μ΄ λ€μ΄μ€κ² λ κ²½μ° DB μλ²μ 리μμ€ λ₯Ό λ§μ΄ μ¬μ©νκ² λλ―λ‘ λΆν κ° λ°μνκ² λ©λλ€.
μλΉμ€ μ§μ° νΉμ μ€λ₯κ° λ°μν μ μμ΅λλ€.
Kafka
λΆμ° μ΄λ²€νΈ μ€νΈλ¦¬λ° νλ«νΌ
μ΄λ²€νΈ μ€νΈλ¦¬λ°
: μμ€μμ λͺ©μ μ§κΉμ§ μ΄λ²€νΈλ₯Ό μ€μκ°μΌλ‘ μ€νΈλ¦¬λ° νλ κ²
Copy Producer ---> Topic <--- Consumer
Start Kafka
docker-compose.yml
Copy version: '3' # Docker Compose νμΌ λ²μ μ§μ
services: # μ¬λ¬κ°μ Docker 컨ν
μ΄λ μλΉμ€ μ μ
zookeeper: # Zookeeper μλΉμ€ μ μ
image: wurstmeister/zookeeper:3.4.6
container_name: zookeeper
ports:
- "2181:2181" # νΈμ€νΈμ 2181 ν¬νΈλ₯Ό 컨ν
μ΄λμ 2181 ν¬νΈμ λ°μΈλ©
kafka: # kafka μλΉμ€ μ μ
image: wurstmeister/kafka:2.12-2.5.0
container_name: kafka
ports:
- "9092:9092" # νΈμ€νΈμ 9092 ν¬νΈλ₯Ό 컨ν
μ΄λμ 9092 ν¬νΈμ λ°μΈλ©
environment: # kafka 컨ν
μ΄λμ νκ²½ λ³μ μ€μ
KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:29092,OUTSIDE://localhost:9092 # λ΄/μΈλΆμμ μ κ·Όν μ μλ 리μ€λ μ£Όμ μ€μ
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT # 리μ€λμ 보μ νλ‘ν μ½ λ§€ν
KAFKA_LISTENERS: INSIDE://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092 # 컨ν
μ΄λ λ΄λΆμμ μ¬μ©ν 리μ€λ μ£Όμ μ€μ
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE # λΈλ‘컀 κ° ν΅μ μ μ¬μ©ν 리μ€λ μ΄λ¦
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 # Kafkaκ° Zookeeperμ μ°κ²°νκΈ° μν μ£Όμ
volumes:
- /var/run/docker.sock:/var/run/docker.sock # Docker μμΌμ 컨ν
μ΄λμ 곡μ νμ¬ Docker μ΄λ²€νΈλ₯Ό κ΄λ¦¬ν μ μλλ‘ μ€μ
example
Copy # μΉ΄νμΉ΄ μ€ν
$ docker-compose up -d
# ν ν½μμ±
$ docker exec -it kafka kafka-topics.sh --bootstrap-server localhost:9092 --create --topic testTopic
Created topic testTopic.
# νλ‘λμ μ€ν
$ docker exec -it kafka kafka-console-producer.sh --topic testTopic --broker-list 0.0.0.0:9092
>Hello
# 컨μλ¨Έ μ€ν
$ docker exec -it kafka kafka-console-consumer.sh --topic testTopic --bootstrap-server localhost:9092
Hello
# μΉ΄νμΉ΄ μ’
λ£
$ docker-compose down
Producer
api/KafkaProducerConfig.java
Copy @Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, Long> producerFactory() {
final Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, Long> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
api/CouponCreateProducer.java
Copy @Component
@RequiredArgsConstructor
public class CouponCreateProducer {
private final KafkaTemplate<String, Long> kafkaTemplate;
public void create(Long userId) {
kafkaTemplate.send("coupon_create", userId);
}
}
KafkaProducerConfig CouponCreateProducer
Consumer
consumer/KafkaConsumerConfig.java
Copy @Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, Long> consumerFactory() {
final Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_1");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
return new DefaultKafkaConsumerFactory<>(config);
}
/**
* ν ν½μΌλ‘λΆν° λ©μμ§λ₯Ό μ λ¬λ°κΈ° μν kafka-listener λ₯Ό λ§λλ kafka-listener-container-factory μμ±
*/
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Long> kafkaListenerContainerFactory() {
final ConcurrentKafkaListenerContainerFactory<String, Long> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
consumer/CouponCreatedConsumer.java
Copy @Component
@RequiredArgsConstructor
public class CouponCreatedConsumer {
private final CouponRepository couponRepository;
@KafkaListener(topics = "coupon_create", groupId = "group_1")
public void listener(Long userId) {
couponRepository.save(new Coupon(userId));
}
}
using Consumer
Limit the number of coupons
λ°κΈ κ°λ₯ν μΏ ν° κ°μ 1μΈλΉ 1κ°λ‘ μ ννκΈ°.
(1) λ°μ΄ν°λ² μ΄μ€ μ λν¬ν€ μ¬μ©νκΈ°
userId, couponType μ μ λν¬ ν€λ₯Ό μ μ©νλ κ°μ₯ κ°λ¨ν λ°©λ².
λ³΄ν΅ μλΉμ€λ ν μ μ κ° κ°μ νμ
μ μΏ ν°μ μ¬λ¬κ° κ°μ§ μ μμΌλ―λ‘ μ€μ©μ μΈ λ°©λ²μ μλ.
(2) λ²μλ‘ λ½μ μ‘κ³ μ²μμ μΏ ν° λ°κΈ μ¬λΆλ₯Ό κ°μ Έμμ νλ¨νλ λ°©μ
μΏ ν° λ°κΈ κ°λ₯ μ¬λΆλ§ νλ¨νκ³ μ€μ μΏ ν° μμ±μ 컨μλ¨Έμμ μννκ³ μμΌλ―λ‘ μκ°μ°¨λ‘ 2κ° μ΄μμ μΏ ν°μ΄ λ°κΈλ μ μμ.
μ§μ μΏ ν°μ λ°κΈνλλΌλ λ½μ λ²μκ° λμ΄μ λ€λ₯Έ μμ²λ€μ λ½μ΄ λλ λκΉμ§ λκΈ°μνκ° λλ―λ‘ μ±λ₯μ΄ μ μ’μμ§ μ μμ.
(3) Rest μμ Set νμ©νκΈ°
Copy # add set in redis
> sadd test 1
(integer) 1 # μΆκ°λ value κ°μ
> sadd test 1
(integer) 0 # μ΄λ―Έ ν€(test)μ κ°(1)μ΄ μ‘΄μ¬νλ―λ‘ 0μ 리ν΄
AppliedUserRepository.java
Copy @Repository
@RequiredArgsConstructor
public class AppliedUserRepository {
private final RedisTemplate<String, String> redisTemplate;
public Long add(Long userId) {
return redisTemplate
.opsForSet()
.add("applied_user", userId.toString());
}
}
...
/* Service */
final Long apply = appliedUserRepository.add(userId);
if (apply != 1) {
return;
}
commit
Error issuing coupon
(1) μ€ν¨ μ΄λ²€νΈ κ΄λ¦¬ ν
μ΄λΈ νμ©νκΈ°
컨μλ¨Έμμ μΏ ν° λ°κΈ μ€ μλ¬κ° λ°μν κ²½μ° FailedEvent ν
μ΄λΈμ μ€ν¨ν μ΄λ²€νΈλ₯Ό μ μ₯ν©λλ€.
μ΄ν λ°°μΉλ₯Ό ν΅ν΄ FailedEvent ν
μ΄λΈμ μμΈ λ°μ΄ν°λ₯Ό μ£ΌκΈ°μ μΌλ‘ μ½μ΄μ μΏ ν°μ λ°κΈν΄ μ€λ€λ©΄ κ²°κ³Όμ μΌλ‘ νΉμ μλμ μΏ ν°μ΄ λͺ¨λ λ°κΈλ μ μμ΅λλ€.
(3) Saga Pattern μ μ©νκΈ°
λ§μ΄ν¬λ‘μλΉμ€λ€λΌλ¦¬ μ΄λ²€νΈλ₯Ό μ£Όκ³ λ°μ νΉμ λ§μ΄ν¬λ‘μλΉμ€μμμ μμ
μ΄ μ€ν¨νλ©΄ μ΄μ κΉμ§μ μμ
μ΄ μλ£λ λ§μ΄ν¬μλΉμ€λ€μκ² λ³΄μ(complemetary) μ΄λ²€νΈλ₯Ό μμ±ν¨μΌλ‘μ¨ λΆμ° νκ²½μμ μμμ±(atomicity)μ 보μ₯νλ ν¨ν΄
commit
Last updated 8 months ago