Weekly Java: 간단한 재고 시스템으로 학습하는 동시성 이슈

Synchronized vs Pessimistic Lock vs Optimistic Lock vs Distributed Lock

요구 사항

각종 링크

  1. 인프런 강의
  2. GitHub 저장소

테스트 환경

  • Apple Macbook Pro M1
  • Docker Compose-based MySQL
  • Docker Compose-based Redis
  • JDK 11 & Spring Data JPA
  • IntelliJ & JUnit 5

테스트 1: 동시성을 고려하지 않은 기본적인 로직

 // StockService.java @Transactional
public void decreaseV1(final Long productId, final Long quantity) {
final Stock stock = stockRepository.getByProductId(productId);
stock.decrease(quantity);
}
 // StockServiceTest.java private final int threadCount = 300;
private final long productId = 10L;
private final long quantity = 1L;
private final long initQuantity = 300L;
private ExecutorService executorService;
private CountDownLatch countDownLatch;
@BeforeEach
public void beforeEach() {
stockRepository.save(new Stock(productId, initQuantity));
executorService = Executors.newFixedThreadPool(threadCount);
countDownLatch = new CountDownLatch(threadCount);
}
@AfterEach
public void afterEach() {
stockRepository.deleteAll();
}
@DisplayName("단일 쓰레드일 때를 테스트한다")
@Test
void 단일_쓰레드로_재고를_감소시킨다() {
// given
// when
stockService.decreaseV1(productId, quantity);
// then
final long afterQuantity = stockRepository.getByProductId(productId).getQuantity();
assertThat(afterQuantity).isEqualTo(initQuantity - 1);
}
// StockServiceTest.java
@Test
void 멀티_쓰레드를_사용한_재고_감소() throws InterruptedException {
// given
// when
IntStream.range(0, threadCount).forEach(e -> executorService.submit(() -> {
try {
stockService.decrease(productId, quantity);
} finally {
countDownLatch.countDown();
}
}
));
countDownLatch.await(); // then
final Long afterQuantity = stockRepository.getByProductId(productId).getQuantity();
System.out.println("### 동시성 처리 이후 수량 ###" + afterQuantity);
assertThat(afterQuantity).isZero();
}

테스트 2: Synchronized 키워드를 적용한 동기화 로직

public synchronized void decrease(final Long id, final Long quantity) {
// 1. get stock
// 2. decrease stock
// 3. save stock

final
Stock stock = stockRepository.getByProductId(id);
stock.decrease(quantity);
stockRepository.saveAndFlush(stock);
}
// StockServiceTest.java@DisplayName("SYNCHRONIZED를 사용한 재고 감소 - 동시 1000개 테스트 | 16.994s 소요")
@Test
void SYNCHRONIZED를_사용한_재고_감소() throws InterruptedException {
// given
// when
IntStream.range(0, threadCount).forEach(e -> executorService.submit(() -> {
try {
stockService.decrease(productId, quantity);
} finally {
countDownLatch.countDown();
}
}
));
countDownLatch.await();// then
final Long afterQuantity = stockRepository.getByProductId(productId).getQuantity();
System.out.println("### SYNCHRONIZED 동시성 처리 이후 수량 ###" + afterQuantity);
assertThat(afterQuantity).isZero();
}

테스트 3: Pessimistic Lock의 사용

// PessimisticStockRepository.javapublic interface PessimisticStockRepository extends JpaRepository<Stock, Long> {

@Lock(LockModeType.PESSIMISTIC_WRITE)
Stock getByProductId(Long productId);
}
// PessimisticLockStockService.java@Service
public class PessimisticLockStockService implements StockBusinessInterface {
private PessimisticStockRepository stockRepository;public PessimisticLockStockService(final PessimisticStockRepository stockRepository) {
this.stockRepository = stockRepository;
}
@Transactional
public void decrease(final Long id, final Long quantity) {
Stock stock = stockRepository.getByProductId(id);
stock.decrease(quantity);
}
}
@DisplayName("pessimistic lock을 사용한 재고 감소 - 동시에 1000개 테스트 | 12.415s 소요")
@Test
void PESSIMISTIC_LOCK을_사용한_재고_감소() throws InterruptedException {
// given
// when
IntStream.range(0, threadCount).forEach(e -> executorService.submit(() -> {
try {
pessimisticLockStockService.decrease(productId, quantity);
} finally {
countDownLatch.countDown();
}
}
));
countDownLatch.await();// then
final Long afterQuantity = stockRepository.getByProductId(productId).getQuantity();
System.out.println("### PESSIMISTIC LOCK 동시성 처리 이후 수량 ###" + afterQuantity);
assertThat(afterQuantity).isZero();
}

테스트 4. Optimistic Lock의 사용

public interface OptimisticStockRepository extends JpaRepository<Stock, Long> {
@Lock(LockModeType.OPTIMISTIC)
Stock getByProductId(Long productId);
}
// Stock.javapackage com.example.stock.domain;import javax.persistence.Version;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
@Entity
public class Stock {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private Long productId;private Long quantity;public Stock() {}public Stock(Long productId, Long quantity) {
this.productId = productId;
this.quantity = quantity;
}
@Version
private Long version;
public Long getQuantity() {
return quantity;
}
public Long decrease(Long quantity) {
if (this.quantity < quantity) {
throw new IllegalArgumentException("Not enough stock");
}
this.quantity -= quantity;
return this.quantity;
}
}
package com.example.stock.service;import com.example.stock.domain.Stock;
import com.example.stock.repository.OptimisticStockRepository;
import com.example.stock.repository.StockRepository;
import org.springframework.stereotype.Service;
import javax.transaction.Transactional;@Service
public class OptimisticLockStockService implements StockBusinessInterface {
private final OptimisticStockRepository stockRepository;public OptimisticLockStockService(final OptimisticStockRepository stockRepository) {
this.stockRepository = stockRepository;
}
@Transactional
public void decrease(final Long id, final Long quantity) {
final Stock stock = stockRepository.getByProductId(id);
stock.decrease(quantity);
}
}
// OptimisticLockStockFacade.javapackage com.example.stock.facade;import com.example.stock.service.OptimisticLockStockService;
import org.springframework.stereotype.Service;
@Service
public class OptimisticLockStockFacade {
private final OptimisticLockStockService optimisticLockStockService;public OptimisticLockStockFacade(OptimisticLockStockService optimisticLockStockService) {
this.optimisticLockStockService = optimisticLockStockService;
}
public void decrease(final Long id, final Long quantity) throws InterruptedException {
while (true) {
try {
optimisticLockStockService.decrease(id, quantity);
break;
} catch (Exception e) {
// retry
System.out.println("OPTIMISTIC LOCK VERSION CONFLICT !!!");
System.out.println(e.getMessage());
Thread.sleep(1);
}
}
}
}
@DisplayName("optimistic lock을 사용한 재고 감소 - 동시에 1000개 테스트")
// 충돌이 빈번하게 일어나지 않을 것이라면 Optimistic Lock을 사용한다.
@Test
void OPTIMISTIC_LOCK을_사용한_재고_감소() throws InterruptedException {
// given
// when
IntStream.range(0, threadCount).forEach(e -> executorService.submit(() -> {
try {
stockOptimisticLockFacade.decrease(productId, quantity);
} catch (final InterruptedException ex) {
throw new RuntimeException(ex);
} finally {
countDownLatch.countDown();
}
}
));
countDownLatch.await();// then
final Long afterQuantity = stockRepository.getByProductId(productId).getQuantity();
System.out.println("### OPTIMISTIC LOCK 동시성 처리 이후 수량 ###" + afterQuantity);
assertThat(afterQuantity).isZero();
}

테스트 5. Named Lock의 사용

// NamedLockRepository.javapublic interface NamedLockRepository extends JpaRepository<Stock, Long> {@Query(value = "select get_lock(:key, 1000)", nativeQuery = true)
void getLock(String key);
@Query(value = "select release_lock(:key)", nativeQuery = true)
void releaseLock(String key);
}
------------------------------------------------------------------// NamedLockService.javapackage com.example.stock.service;import com.example.stock.domain.Stock;
import com.example.stock.repository.StockRepository;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
@Service
public class NamedLockStockService implements StockBusinessInterface{
private final StockRepository stockRepository;public NamedLockStockService(final StockRepository stockRepository) {
this.stockRepository = stockRepository;
}
@Transactional(propagation = Propagation.REQUIRES_NEW)
public synchronized void decrease(final Long id, final Long quantity) {
final Stock stock = stockRepository.getByProductId(id);
stock.decrease(quantity);
stockRepository.saveAndFlush(stock);
}
}
-------------------------------------------------------------------// NamedLockStockFacade.javapackage com.example.stock.facade;import com.example.stock.repository.NamedLockRepository;
import com.example.stock.service.NamedLockStockService;
import org.springframework.stereotype.Component;
@Component
public class NamedLockStockFacade {
private NamedLockRepository namedLockRepository;private NamedLockStockService namedLockStockService;public NamedLockStockFacade(final NamedLockRepository namedLockRepository, final NamedLockStockService namedLockStockService) {
this.namedLockRepository = namedLockRepository;
this.namedLockStockService = namedLockStockService;
}
public void decrease(Long id, Long quantity) {
try {
namedLockRepository.getLock(id.toString());
namedLockStockService.decrease(id, quantity);
} finally {
namedLockRepository.releaseLock(id.toString());
}
}
}
@DisplayName("named lock 을 사용한 재고 감소 - 동시에 1000개 테스트 | 21.857s 소요")
// 데이터 소스를 분리하지 않고 하나로 사용할 경우 커넥션 풀이 부족해질 수 있으므로 분리하는 것을 추천한다.
@Test
void NAMED_LOCK을_사용한_재고_감소() throws InterruptedException {
// given
// when
IntStream.range(0, threadCount).forEach(e -> executorService.submit(() -> {
try {
namedLockStockFacade.decrease(productId, quantity);
} finally {
countDownLatch.countDown();
}
}
));
countDownLatch.await(); // then
final Long afterQuantity = stockRepository.getByProductId(productId).getQuantity();
System.out.println("### NAMED LOCK 동시성 처리 이후 수량 ###" + afterQuantity);
assertThat(afterQuantity).isZero();
}

테스트 6. 분산 락 기반의 Lettuce 사용

// RedisRepository.javapackage com.example.stock.repository;import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.time.Duration;@Component
public class RedisRepository {
private final RedisTemplate<String, String> redisTemplate;public RedisRepository(final RedisTemplate<String, String> redisTemplate) {
this.redisTemplate = redisTemplate;
}
public Boolean lock(final Long key) {
String generatedKey = generateKey(key);
return redisTemplate
.opsForValue()
.setIfAbsent(generatedKey, "lock", Duration.ofMillis(3_000));
}
public Boolean unlock(final Long key) {
String generatedKey = generateKey(key);
return redisTemplate.delete(generatedKey);
}
public String generateKey(final Long key) {
return key.toString();
}
}
// LettuceLockStockFacade.javapackage com.example.stock.facade;import com.example.stock.repository.RedisRepository;
import com.example.stock.service.StockNonSynchronizedService;
import com.example.stock.service.StockService;
import org.springframework.stereotype.Component;
@Component
public class LettuceLockStockFacade {
private final RedisRepository redisRepository;private final StockNonSynchronizedService stockService;public LettuceLockStockFacade(final RedisRepository redisRepository, final StockNonSynchronizedService stockService) {
this.redisRepository = redisRepository;
this.stockService = stockService;
}
public void decrease(final Long productId, final Long quantity) throws InterruptedException {
while (!redisRepository.lock(productId)) {
Thread.sleep(100); // 부하를 줄여줘본다.
}
try {
stockService.decrease(productId, quantity);
} finally {
redisRepository.unlock(productId);
}
}
}
// StockServiceTest.java@DisplayName("redis lettuce lock 을 사용한 재고 감소")
// Redis를 사용하면 트랜잭션에 따라 대응되는 현재 트랜잭션 풀 세션 관리를 하지 않아도 되므로 구현이 편리하다.
// Spin Lock 방식이므로 부하를 줄 수 있어서 thread busy waiting을 통하여 요청 간의 시간을 주어야 한다.
@Test
void LETTUCE_LOCK을_사용한_재고_감소() throws InterruptedException {
// given
// when
IntStream.range(0, threadCount).forEach(e -> executorService.submit(() -> {
try {
lettuceLockStockFacade.decrease(productId, quantity);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
} finally {
countDownLatch.countDown();
}
}
));
countDownLatch.await(); // then
final Long afterQuantity = stockRepository.getByProductId(productId).getQuantity();
System.out.println("### LETTUCE LOCK 동시성 처리 이후 수량 ###" + afterQuantity);
assertThat(afterQuantity).isZero();
}

테스트 7. 분산 락 기반의 Redisson의 사용

(Session 1) $ docker exec -it 6c7c0a47dd34 redis-cli
(Session 2) $ docker exec -it 6c7c0a47dd34 redis-cli

(Session 1) $ subscribe ch1
// Reading messages... (press Ctrl-C to quit)
// 1) "subscribe"
// 2) "ch1"
// 3) (integer) 1

(Session 2) $ publish ch1 hello
// (integer) 1

(Session 1) $
// 1) "message"
// 2) "ch1"
// 3) "hello"
// RedissonLockStockFacade.javapackage com.example.stock.facade;import com.example.stock.service.StockNonSynchronizedService;
import com.example.stock.service.StockService;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;@Component
public class RedissonLockStockFacade {
private final RedissonClient redissonClient;private final StockNonSynchronizedService stockService;public RedissonLockStockFacade(final RedissonClient redissonClient, final StockNonSynchronizedService stockService) {
this.redissonClient = redissonClient;
this.stockService = stockService;
}
public void decrease(final Long productId, final Long quantity) throws InterruptedException {
final RLock lock = redissonClient.getLock(productId.toString());
try {
boolean isAvailable = lock.tryLock(10, 1, TimeUnit.SECONDS);
if (!isAvailable) {
System.out.println("redisson getLock timeout");
return;
}
stockService.decrease(productId, quantity);} finally {
// unlock the lock object
lock.unlock();
}
}
}
// StockServiceTest.java    @DisplayName("redis reddison lock 을 사용한 재고 감소 - 동시에 1000개 테스트 | 17.23s 소요")
@Test
void REDISSON_LOCK을_사용한_재고_감소() throws InterruptedException {
// given
// when
IntStream.range(0, threadCount).forEach(e -> executorService.submit(() -> {
try {
redissonLockStockFacade.decrease(productId, quantity);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
} finally {
countDownLatch.countDown();
}
}
));
countDownLatch.await();// then
final Long afterQuantity = stockRepository.getByProductId(productId).getQuantity();
System.out.println("### REDDISON LOCK 동시성 처리 이후 수량 ###" + afterQuantity);
assertThat(afterQuantity).isZero();
}

Q&A

Pessimistic Lock vs Optimistic Lock

Facade? Helper?

MySQL? Redis?

Version 주입할 시의 어노테이션

import javax.persistence.Version;

더 살펴보기

  1. What is the purpose of await() in CountDownLatch? https://stackoverflow.com/questions/41866691/what-is-the-purpose-of-await-in-countdownlatch
  2. MySQL에서 사용하는 Lock 이해http://web.bluecomtech.com/saltfactory/blog.saltfactory.net/database/introduce-mysql-lock.html

참고 문헌

--

--

Software Engineer at DSRV. ZK Evangelist at Boom Labs. twitter.com/@sigridjin_eth

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store