if (redis.call('exists', KEYS[1]) == 0) then
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
// LOCK KEY가 존재하는지 확인한다(없으면 0, 있으면 1)
// LOCK KEY가 존재하지 않으면 LOCK KEY와 현재 쓰레드 아이디를 기반으로 값을 1 증가시켜준다
// LOCK KEY에 유효시간을 설정한다
// null 값을 리턴한다
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
// 해시맵 기반으로 LOCK KEY와 쓰레드 아이디로 존재하면 0이고, 존재하지 않으면 저장하고 1을 리턴한다
// LOCK KEY가 존재하지 않으면 LOCK KEY와 현재 쓰레드 아이디를 기반으로 값을 1 증가시켜준다
// LOCK KEY에 유효시간을 설정한다
// null 값을 리턴한다
return redis.call('pttl', KEYS[1]);
// 위의 조건들이 모두 false 이면 현재 존재하는 LOCK KEY의 TTL 시간을 리턴한다
2. waitTime이 초과되었는지 확인한다
lock 에 대한 점유시간이 아직 남아있다면 다시 lock에 대한 획득을 시도하기 이전에 waitTime(lock 획득 시간)이 초과되지는 않았는지 확인한다
만약 이미 초과되었다면 lock 획득은 실패로 리턴한다
// 2. 락 획득 대기 시간보다 초과되었는지 확인하고 초과되었으면 false를 리턴한다
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
3. 고유 Thread Id를 채널로 구독하여 lock이 available할때까지 대기한다
만약에 사용자가 설정한 waitTime을 초과할 경우 TimeoutException이 발생하여 lock 획득에 실패한다
여기서 중요한 점은 subscribe 내부에는 세마포어를 사용해서 공유자원에 대한 점유를 수행한다는 것이다
세마포어를 사용하여 공유자원을 점유하기 때문에 스핀락 보다는 레디스 I/O에 대한 부하를 줄일 수 있다
// 3. threadId를 채널로 구독하여 waitTime까지 대기한다.(block 처리)
// (만약 설정한 waitTime 보다 구독에 대한 응답이 없을 경우엔 TimeoutException이 발생하여 락 획득 결과를 false로 리턴한다)
CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
try {
subscribeFuture.get(time, TimeUnit.MILLISECONDS);
} catch (ExecutionException | TimeoutException e) {
if (!subscribeFuture.cancel(false)) {
subscribeFuture.whenComplete((res, ex) -> {
if (ex == null) {
unsubscribe(res, threadId);
}
});
}
acquireFailed(waitTime, unit, threadId);
return false;
}
subscribe 내부 로직을 간단히 살펴보면 세마포어를 사용하는 것을 확인할 수 있다
public CompletableFuture<E> subscribe(String entryName, String channelName) {
AsyncSemaphore semaphore = service.getSemaphore(new ChannelName(channelName));
CompletableFuture<E> newPromise = new CompletableFuture<>();
semaphore.acquire(() -> {
if (newPromise.isDone()) {
semaphore.release();
return;
}
E entry = entries.get(entryName);
if (entry != null) {
entry.acquire();
semaphore.release();
entry.getPromise().whenComplete((r, e) -> {
if (e != null) {
newPromise.completeExceptionally(e);
return;
}
newPromise.complete(r);
});
return;
}
E value = createEntry(newPromise);
value.acquire();
...
}
아래는 CompletableFuture.get 메소드에서 exception이 발생하는 케이스를 확인한 내용이다
/**
* Waits if necessary for at most the given time for this future
* to complete, and then returns its result, if available.
*
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @return the result value
* @throws CancellationException if this future was cancelled
* @throws ExecutionException if this future completed exceptionally
* @throws InterruptedException if the current thread was interrupted
* while waiting
* @throws TimeoutException if the wait timed out
*/
@SuppressWarnings("unchecked")
public T get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
long nanos = unit.toNanos(timeout);
Object r;
if ((r = result) == null)
r = timedGet(nanos);
return (T) reportGet(r);
}
redisson은 왜 세마포어를 사용했는가?
공식문서를 살펴 보면 세마포어를 사용한 이유를 파악할 수 있다. 레디스는 싱글 쓰레드로 동작하기 때문에 공유 자원에 대해서 쓰레드 세이프하게 동작하기 위해 동기화 매커니즘을 수행하기 위한 용도로 사용되었다고 설명하고 있다