First Come First Served Event
์ค์ต์ผ๋ก ๋ฐฐ์ฐ๋ ์ ์ฐฉ์ ์ด๋ฒคํธ ์์คํ
๊ฐ์๋ฅผ ๋ฃ๊ณ ์์ฝํ ๋ด์ฉ์
๋๋ค.
Intro
์๊ตฌ์ฌํญ.
์ ์ฐฉ์ 100๋ช
์๊ฒ ํ ์ธ์ฟ ํฐ์ ์ ๊ณตํ๋ ์ด๋ฒคํธ
์ ์ฐฉ์ 100๋ช
์๊ฒ๋ง ์ง๊ธ๋์ด์ผํ๋ค.
101๊ฐ ์ด์์ด ์ง๊ธ๋๋ฉด ์๋๋ค.
์๊ฐ์ ์ผ๋ก ๋ชฐ๋ฆฌ๋ ํธ๋ํฝ์ ๋ฒํธ ์ ์์ด์ผํ๋ค.
์ ์ฐฉ์ ์ด๋ฒคํธ ์งํ ์ ๋ฐ์ํ ์ ์๋ ๋ฌธ์ ์ .
์ฟ ํฐ์ด ๊ฐ์๋ณด๋ค ๋ ๋ง์ด ๋ฐ๊ธ๋์์ ๊ฒฝ์ฐ
์ด๋ฒคํธ ํ์ด์ง ์ ์์ด ์ ๋ ๊ฒฝ์ฐ
์ด๋ฒคํธ๋ ์๊ด์๋ ํ์ด์ง๋ ๋๋ ค์ง ๊ฒฝ์ฐ
๋ฌธ์ ํด๊ฒฐ.
ํธ๋ํฝ์ด ๋ชฐ๋ ธ์ ๋ ๋์ฒํ ๋ฐฉ๋ฒ
Redis ๋ฅผ ํ์ฉํ์ฌ ์ฟ ํฐ ๋ฐ๊ธ ๊ฐ์ ๋ณด์ฅํ๋ ๋ฐฉ๋ฒ
Kafka ๋ฅผ ํ์ฉํ์ฌ ๋ค๋ฅธ ํ์ด์ง๋ค์ ๋ํ ์ํฅ๋๋ฅผ ์ค์ด๋ ๋ฐฉ๋ฒ
Race Condition
@Service
@RequiredArgsConstructor
public class ApplyService {
private final CouponRepository couponRepository;
public void apply(Long userId) {
final long count = couponRepository.count();
if (count > 100) {
return;
}
couponRepository.save(new Coupon(userId));
}
}
์๋ ํ
์คํธ์์ 100์ ๊ฒฐ๊ณผ๋ฅผ ์์ํ์ง๋ง, ๋์์ ๋ค์ด์ค๋ ์์ฒญ๋ค์ด ๊ฐฑ์ ์ ๊ฐ์ ์ฝ๊ณ ๋ฐ์ดํฐ๋ฅผ ์ถ๊ฐ
ํ๋ฉด์ ์์ํ๋ ๊ฐ์๋ฅผ ์ด๊ณผํ๋ ํ์์ด ๋ฐ์ํ๊ฒ ๋ฉ๋๋ค.
@Test
void apply_multiple() throws InterruptedException {
int threadCount = 1000;
ExecutorService executorService = Executors.newFixedThreadPool(32);
CountDownLatch latch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
long userId = i;
executorService.submit(() -> {
try {
applyService.apply(userId);
} finally {
latch.countDown();
}
});
}
latch.await();
final long count = couponRepository.count();
assertThat(count).isEqualTo(100);
}
Concurrency issues ์์ ๋ฐฐ์ด ๊ฒ๊ณผ ๊ฐ์ด Java Synchronized ๋ฅผ ์ ์ฉํด๋ณผ ์ ์์ง๋ง,
์๋ฒ๊ฐ ์ฌ๋ฌ ๋๊ฐ ๋๋ค๋ฉด Race Condition ์ด ๋ค์ ๋ฐ์ํ๊ฒ ๋๋ฏ๋ก ์ ์ ํ์ง ์์ต๋๋ค.
.
๋ ๋ค๋ฅธ ๋ฐฉ๋ฒ์ผ๋ก MySQL, Redis ๋ฅผ ํ์ฉํ ๋ฝ์ ๊ตฌํํด์ ํด๊ฒฐํ ์๋ ์์ ๊ฒ ๊ฐ์ง๋ง,
์ฟ ํฐ ๊ฐ์์ ๋ํ ์ ํฉ์ฑ์ ์ํ๋๋ฐ ๋ฝ์ ํ์ฉํ์ฌ ๊ตฌํํ๋ฉด, ๋ฐ๊ธ๋ ์ฟ ํฐ์ ๊ฐ์๋ฅผ ๊ฐ์ ธ์ค๋ ๊ฒ๋ถํฐ ์ฟ ํฐ์ ์์ฑํ ๋๊น์ง ๋ฝ์ ๊ฑธ์ด์ผ ํฉ๋๋ค.
์ด๋ ๊ฒ ๋๋ฉด ๋ฝ์ ๊ฑฐ๋ ๊ตฌ๊ฐ์ด ๊ธธ์ด์ง๋ค๋ณด๋ ์ฑ๋ฅ์ ๋ถ์ด์ต(๋ฝ์ด ํ๋ฆด ๋๊น์ง ์ฟ ํฐ ๋ฐ๊ธ์ ๊ธฐ๋ค๋ ค์ผ ํ๋)์ด ๋ฐ์ํ ์ ์์ต๋๋ค.
Redis
Redis incr
๋ช
๋ น์ด๋ ํค์ ๋ํ ๊ฐ์ 1์ฉ ์ฆ๊ฐ์ํค๋ ๋ช
๋ น์ด์
๋๋ค.
Redis ๋ ์ฑ๊ธ์ค๋ ๋ ๊ธฐ๋ฐ
์ผ๋ก ๋์ํ์ฌ ๋ ์ด์ค ์ปจ๋์
์ ํด๊ฒฐ
ํ ์ ์์ ๋ฟ ์๋๋ผ incr ๋ช
๋ น์ด๋ ์ฑ๋ฅ๋ ๊ต์ฅํ ๋น ๋ฅธ
๋ช
๋ น์ด์
๋๋ค.
incr
๋ช
๋ น์ด๋ฅผ ์ฌ์ฉํ์ฌ ๋ฐ๊ธ๋ ์ฟ ํฐ ๊ฐ์๋ฅผ ์ ์ดํ๋ค๋ฉด ์ฑ๋ฅ๋ ๋น ๋ฅด๋ฉฐ ๋ฐ์ดํฐ ์ ํฉ์ฑ๋ ์งํฌ ์ ์์ต๋๋ค.
> incr coupon_count
(integer) 1
> incr coupon_count
(integer) 2
commit
Problems
1) ์ฟ ํฐ์ ๊ฐ์
๋ฐ๊ธํ๋ ์ฟ ํฐ์ ๊ฐ์๊ฐ ๋ง์์ง์๋ก ๋ฐ์ดํฐ๋ฒ ์ด์ค์ ๋ถํ๋ฅผ ์ฃผ๊ฒ ๋ฉ๋๋ค.
ํด๋น ๋ฐ์ดํฐ๋ฒ ์ด์ค๊ฐ ๋ค๋ฅธ ๊ณณ์์๋ ์ฌ์ฉ๋๊ณ ์๋ค๋ฉด ์๋น์ค ์ฅ์ ๊น์ง ๋ฐ์ํ ์ ์์ต๋๋ค.
2) ์งง์ ์๊ฐ์ ๋ง์ ์์ฒญ
์งง์ ์๊ฐ ๋ด์ ๋ง์ ์์ฒญ์ด ๋ค์ด์ค๊ฒ ๋ ๊ฒฝ์ฐ DB ์๋ฒ์ ๋ฆฌ์์ค๋ฅผ ๋ง์ด ์ฌ์ฉํ๊ฒ ๋๋ฏ๋ก ๋ถํ๊ฐ ๋ฐ์ํ๊ฒ ๋ฉ๋๋ค.
์๋น์ค ์ง์ฐ ํน์ ์ค๋ฅ๊ฐ ๋ฐ์ํ ์ ์์ต๋๋ค.
Kafka
๋ถ์ฐ ์ด๋ฒคํธ ์คํธ๋ฆฌ๋ฐ ํ๋ซํผ
์ด๋ฒคํธ ์คํธ๋ฆฌ๋ฐ
: ์์ค์์ ๋ชฉ์ ์ง๊น์ง ์ด๋ฒคํธ๋ฅผ ์ค์๊ฐ์ผ๋ก ์คํธ๋ฆฌ๋ฐ ํ๋ ๊ฒ
Producer ---> Topic <--- Consumer
Start Kafka
docker-compose.yml
version: '3' # Docker Compose ํ์ผ ๋ฒ์ ์ง์
services: # ์ฌ๋ฌ๊ฐ์ Docker ์ปจํ
์ด๋ ์๋น์ค ์ ์
zookeeper: # Zookeeper ์๋น์ค ์ ์
image: wurstmeister/zookeeper:3.4.6
container_name: zookeeper
ports:
- "2181:2181" # ํธ์คํธ์ 2181 ํฌํธ๋ฅผ ์ปจํ
์ด๋์ 2181 ํฌํธ์ ๋ฐ์ธ๋ฉ
kafka: # kafka ์๋น์ค ์ ์
image: wurstmeister/kafka:2.12-2.5.0
container_name: kafka
ports:
- "9092:9092" # ํธ์คํธ์ 9092 ํฌํธ๋ฅผ ์ปจํ
์ด๋์ 9092 ํฌํธ์ ๋ฐ์ธ๋ฉ
environment: # kafka ์ปจํ
์ด๋์ ํ๊ฒฝ ๋ณ์ ์ค์
KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:29092,OUTSIDE://localhost:9092 # ๋ด/์ธ๋ถ์์ ์ ๊ทผํ ์ ์๋ ๋ฆฌ์ค๋ ์ฃผ์ ์ค์
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT # ๋ฆฌ์ค๋์ ๋ณด์ ํ๋กํ ์ฝ ๋งคํ
KAFKA_LISTENERS: INSIDE://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092 # ์ปจํ
์ด๋ ๋ด๋ถ์์ ์ฌ์ฉํ ๋ฆฌ์ค๋ ์ฃผ์ ์ค์
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE # ๋ธ๋ก์ปค ๊ฐ ํต์ ์ ์ฌ์ฉํ ๋ฆฌ์ค๋ ์ด๋ฆ
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 # Kafka๊ฐ Zookeeper์ ์ฐ๊ฒฐํ๊ธฐ ์ํ ์ฃผ์
volumes:
- /var/run/docker.sock:/var/run/docker.sock # Docker ์์ผ์ ์ปจํ
์ด๋์ ๊ณต์ ํ์ฌ Docker ์ด๋ฒคํธ๋ฅผ ๊ด๋ฆฌํ ์ ์๋๋ก ์ค์
example
# ์นดํ์นด ์คํ
$ docker-compose up -d
# ํ ํฝ์์ฑ
$ docker exec -it kafka kafka-topics.sh --bootstrap-server localhost:9092 --create --topic testTopic
Created topic testTopic.
# ํ๋ก๋์ ์คํ
$ docker exec -it kafka kafka-console-producer.sh --topic testTopic --broker-list 0.0.0.0:9092
>Hello
# ์ปจ์๋จธ ์คํ
$ docker exec -it kafka kafka-console-consumer.sh --topic testTopic --bootstrap-server localhost:9092
Hello
# ์นดํ์นด ์ข
๋ฃ
$ docker-compose down
Producer
api/KafkaProducerConfig.java
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, Long> producerFactory() {
final Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, Long> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
api/CouponCreateProducer.java
@Component
@RequiredArgsConstructor
public class CouponCreateProducer {
private final KafkaTemplate<String, Long> kafkaTemplate;
public void create(Long userId) {
kafkaTemplate.send("coupon_create", userId);
}
}
KafkaProducerConfig CouponCreateProducer
Consumer
consumer/KafkaConsumerConfig.java
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, Long> consumerFactory() {
final Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_1");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
return new DefaultKafkaConsumerFactory<>(config);
}
/**
* ํ ํฝ์ผ๋ก๋ถํฐ ๋ฉ์์ง๋ฅผ ์ ๋ฌ๋ฐ๊ธฐ ์ํ kafka-listener ๋ฅผ ๋ง๋๋ kafka-listener-container-factory ์์ฑ
*/
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Long> kafkaListenerContainerFactory() {
final ConcurrentKafkaListenerContainerFactory<String, Long> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
consumer/CouponCreatedConsumer.java
@Component
@RequiredArgsConstructor
public class CouponCreatedConsumer {
private final CouponRepository couponRepository;
@KafkaListener(topics = "coupon_create", groupId = "group_1")
public void listener(Long userId) {
couponRepository.save(new Coupon(userId));
}
}
using Consumer
Limit the number of coupons
๋ฐ๊ธ ๊ฐ๋ฅํ ์ฟ ํฐ ๊ฐ์ 1์ธ๋น 1๊ฐ๋ก ์ ํํ๊ธฐ.
(1) ๋ฐ์ดํฐ๋ฒ ์ด์ค ์ ๋ํฌํค ์ฌ์ฉํ๊ธฐ
userId, couponType ์ ์ ๋ํฌ ํค๋ฅผ ์ ์ฉํ๋ ๊ฐ์ฅ ๊ฐ๋จํ ๋ฐฉ๋ฒ.
๋ณดํต ์๋น์ค๋ ํ ์ ์ ๊ฐ ๊ฐ์ ํ์
์ ์ฟ ํฐ์ ์ฌ๋ฌ๊ฐ ๊ฐ์ง ์ ์์ผ๋ฏ๋ก ์ค์ฉ์ ์ธ ๋ฐฉ๋ฒ์ ์๋.
(2) ๋ฒ์๋ก ๋ฝ์ ์ก๊ณ ์ฒ์์ ์ฟ ํฐ ๋ฐ๊ธ ์ฌ๋ถ๋ฅผ ๊ฐ์ ธ์์ ํ๋จํ๋ ๋ฐฉ์
์ฟ ํฐ ๋ฐ๊ธ ๊ฐ๋ฅ ์ฌ๋ถ๋ง ํ๋จํ๊ณ ์ค์ ์ฟ ํฐ ์์ฑ์ ์ปจ์๋จธ์์ ์ํํ๊ณ ์์ผ๋ฏ๋ก ์๊ฐ์ฐจ๋ก 2๊ฐ ์ด์์ ์ฟ ํฐ์ด ๋ฐ๊ธ๋ ์ ์์.
์ง์ ์ฟ ํฐ์ ๋ฐ๊ธํ๋๋ผ๋ ๋ฝ์ ๋ฒ์๊ฐ ๋์ด์ ๋ค๋ฅธ ์์ฒญ๋ค์ ๋ฝ์ด ๋๋ ๋๊น์ง ๋๊ธฐ์ํ๊ฐ ๋๋ฏ๋ก ์ฑ๋ฅ์ด ์ ์ข์์ง ์ ์์.
(3) Rest ์์ Set ํ์ฉํ๊ธฐ
# add set in redis
> sadd test 1
(integer) 1 # ์ถ๊ฐ๋ value ๊ฐ์
> sadd test 1
(integer) 0 # ์ด๋ฏธ ํค(test)์ ๊ฐ(1)์ด ์กด์ฌํ๋ฏ๋ก 0์ ๋ฆฌํด
AppliedUserRepository.java
@Repository
@RequiredArgsConstructor
public class AppliedUserRepository {
private final RedisTemplate<String, String> redisTemplate;
public Long add(Long userId) {
return redisTemplate
.opsForSet()
.add("applied_user", userId.toString());
}
}
...
/* Service */
final Long apply = appliedUserRepository.add(userId);
if (apply != 1) {
return;
}
commit
Error issuing coupon
(1) ์คํจ ์ด๋ฒคํธ ๊ด๋ฆฌ ํ
์ด๋ธ ํ์ฉํ๊ธฐ
์ปจ์๋จธ์์ ์ฟ ํฐ ๋ฐ๊ธ ์ค ์๋ฌ๊ฐ ๋ฐ์ํ ๊ฒฝ์ฐ FailedEvent ํ
์ด๋ธ์ ์คํจํ ์ด๋ฒคํธ๋ฅผ ์ ์ฅํฉ๋๋ค.
์ดํ ๋ฐฐ์น๋ฅผ ํตํด FailedEvent ํ
์ด๋ธ์ ์์ธ ๋ฐ์ดํฐ๋ฅผ ์ฃผ๊ธฐ์ ์ผ๋ก ์ฝ์ด์ ์ฟ ํฐ์ ๋ฐ๊ธํด ์ค๋ค๋ฉด ๊ฒฐ๊ณผ์ ์ผ๋ก ํน์ ์๋์ ์ฟ ํฐ์ด ๋ชจ๋ ๋ฐ๊ธ๋ ์ ์์ต๋๋ค.
(3) Saga Pattern ์ ์ฉํ๊ธฐ
๋ง์ดํฌ๋ก์๋น์ค๋ค๋ผ๋ฆฌ ์ด๋ฒคํธ๋ฅผ ์ฃผ๊ณ ๋ฐ์ ํน์ ๋ง์ดํฌ๋ก์๋น์ค์์์ ์์
์ด ์คํจํ๋ฉด ์ด์ ๊น์ง์ ์์
์ด ์๋ฃ๋ ๋ง์ดํฌ์๋น์ค๋ค์๊ฒ ๋ณด์(complemetary) ์ด๋ฒคํธ๋ฅผ ์์ฑํจ์ผ๋ก์จ ๋ถ์ฐ ํ๊ฒฝ์์ ์์์ฑ(atomicity)์ ๋ณด์ฅํ๋ ํจํด
commit
Last updated