레디스 분산락은 서로 다른 프로세스가 서로 베타적인 방식으로 공유 리소스와 함께 작동해야 하는 많은 환경에서 매우 유용한 기본 기능이다
자바 언어 이외에 ruby, python, php 등 다양한 클라이언트 라이브러리가 존재한다
그렇다면 분산락을 구현하는 로직은 어떻게 될까?
구현 자체는 간단하다
획득하고자 하는 이름의 락을 정의하고 유효 시간까지 락 획득을 시도하게 된다
만약 유효 시간이 지나면 락 획득은 실패하게 된다
단 여기서 주의할 점은 unlock할 경우에, 해당 세션에서 잠금을 생성한 락인지 확인해야 한다
그렇지 않으면 다른 세션에서 수행중인 잠금이 해제될수도 있다
isLocked() : 잠금이 되었는지 확인
isHeldByCurrentThread() : 해당 세션에서 잠금을 생성했는지 확인
// 특정 이름으로 락 정의 RLock lock =redissonClient.getLock(key.toString());try {// 락 획득을 시도한다(20초동안 시도를 할 예정이며 획득할 경우 1초안에 해제할 예정이다) boolean available =lock.tryLock(20,1,TimeUnit.SECONDS);if (!available) {System.out.println("lock 획득 실패");return; }// 트랜잭션 로직(ex. orderService.createOrder(), stockService.increase())} catch (InterruptedException e) {thrownewRuntimeException(e);} finally {if (lock.isLocked() &&lock.isHeldByCurrentThread()) {lock.unlock(); }}
락을 점유를 시도하는 tryRock은 어떻게 구현되어 있나?
전체 코드
전체 코드를 확인하고 아래는 각 단계별 수행하는 내용에 대해서 추가적으로 설명하려고 한다
redisson은 일반적으로 lettuce로 동시성을 해결하기 위한 차선책으로 많이 제시된다
그 이유는 재시도가 필요한 경우에 lettuce는 직접 스핀락으로 구현해서 락 점유를 시도하기 때문이다
redisson은 스핀락으로 락을 점유하지 않고 pub/sub 구조로 레디스에 부하를 줄인다고 이야기하지만 실제 내부 로직을 살펴보면 스핀락 개념이 아예 없지는 않다
redisson 구현의 특징은 lua script와 세마포어의 사용이라고 할 수 있다
@Overridepublic boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { long time =unit.toMillis(waitTime); long current =System.currentTimeMillis(); long threadId =Thread.currentThread().getId(); Long ttl =tryAcquire(waitTime, leaseTime, unit, threadId);// lock acquiredif (ttl ==null) {returntrue; } time -=System.currentTimeMillis() - current;if (time <=0) {acquireFailed(waitTime, unit, threadId);returnfalse; } current =System.currentTimeMillis(); CompletableFuture<RedissonLockEntry> subscribeFuture =subscribe(threadId);try {subscribeFuture.get(time,TimeUnit.MILLISECONDS); } catch (ExecutionException | TimeoutException e) {if (!subscribeFuture.cancel(false)) {subscribeFuture.whenComplete((res, ex) -> {if (ex == null) {unsubscribe(res, threadId); } }); }acquireFailed(waitTime, unit, threadId);returnfalse; }try { time -=System.currentTimeMillis() - current;if (time <=0) {acquireFailed(waitTime, unit, threadId);returnfalse; }while (true) { long currentTime =System.currentTimeMillis(); ttl =tryAcquire(waitTime, leaseTime, unit, threadId);// lock acquiredif (ttl ==null) {returntrue; } time -=System.currentTimeMillis() - currentTime;if (time <=0) {acquireFailed(waitTime, unit, threadId);returnfalse; }// waiting for message currentTime =System.currentTimeMillis();if (ttl >=0&& ttl < time) {commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl,TimeUnit.MILLISECONDS); } else {commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time,TimeUnit.MILLISECONDS); } time -=System.currentTimeMillis() - currentTime;if (time <=0) {acquireFailed(waitTime, unit, threadId);returnfalse; } } } finally {unsubscribe(commandExecutor.getNow(subscribeFuture), threadId); }// return get(tryLockAsync(waitTime, leaseTime, unit));}
1. lock 획득을 시도한다
획득을 시도하려는 lock의 유지시간을 확인한다
아래에서 tryAcuiqre 로직을 더 살펴보겠지만 우선 ttl값이 null이면 lock을 점유했다고 간주한다
// 1. 락 획득을 시도한다// (락 점유시간이 null이라면 획득이 가능하다고 판단하여 true를 리턴한다)Long ttl =tryAcquire(waitTime, leaseTime, unit, threadId);// lock acquiredif (ttl ==null) {returntrue;}
tryAcquire 내부 로직을 살펴보면 lua script를 사용해서 setnx를 실행하는 것을 확인할 수 있다
if (redis.call('exists', KEYS[1]) == 0) then
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
// LOCK KEY가 존재하는지 확인한다(없으면 0, 있으면 1)
// LOCK KEY가 존재하지 않으면 LOCK KEY와 현재 쓰레드 아이디를 기반으로 값을 1 증가시켜준다
// LOCK KEY에 유효시간을 설정한다
// null 값을 리턴한다
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
// 해시맵 기반으로 LOCK KEY와 쓰레드 아이디로 존재하면 0이고, 존재하지 않으면 저장하고 1을 리턴한다
// LOCK KEY가 존재하지 않으면 LOCK KEY와 현재 쓰레드 아이디를 기반으로 값을 1 증가시켜준다
// LOCK KEY에 유효시간을 설정한다
// null 값을 리턴한다
return redis.call('pttl', KEYS[1]);
// 위의 조건들이 모두 false 이면 현재 존재하는 LOCK KEY의 TTL 시간을 리턴한다
2. waitTime이 초과되었는지 확인한다
lock 에 대한 점유시간이 아직 남아있다면 다시 lock에 대한 획득을 시도하기 이전에 waitTime(lock 획득 시간)이 초과되지는 않았는지 확인한다
만약 이미 초과되었다면 lock 획득은 실패로 리턴한다
// 2. 락 획득 대기 시간보다 초과되었는지 확인하고 초과되었으면 false를 리턴한다time -=System.currentTimeMillis() - current;if (time <=0) {acquireFailed(waitTime, unit, threadId);returnfalse;}
3. 고유 Thread Id를 채널로 구독하여 lock이 available할때까지 대기한다
아래는 CompletableFuture.get 메소드에서 exception이 발생하는 케이스를 확인한 내용이다
/** * Waits if necessary for at most the given time for this future * to complete, and then returns its result, if available. * * @param timeout the maximum time to wait * @param unit the time unit of the timeout argument * @return the result value * @throws CancellationException if this future was cancelled * @throws ExecutionException if this future completed exceptionally * @throws InterruptedException if the current thread was interrupted * while waiting * @throws TimeoutException if the wait timed out */@SuppressWarnings("unchecked")public Tget(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { long nanos =unit.toNanos(timeout); Object r;if ((r = result) ==null) r =timedGet(nanos);return (T) reportGet(r);}
redisson은 왜 세마포어를 사용했는가?
공식문서를 살펴 보면 세마포어를 사용한 이유를 파악할 수 있다. 레디스는 싱글 쓰레드로 동작하기 때문에 공유 자원에 대해서 쓰레드 세이프하게 동작하기 위해 동기화 매커니즘을 수행하기 위한 용도로 사용되었다고 설명하고 있다