7장 스레드 한정, 액터 그리고 뮤텍스

코틀린 동시성 프로그래밍 7장을 요약한 내용입니다.

원자성 위반

원자성 위반은 동시성 오류 유형이다. 이 유형의 오류는 정확한 동기화 없이 객체의 상태를 동시에 수정할 때 발생한다. 원자성 위반은 코틀린에서도 발생할 수 있지만 오류를 피할 수 있도록 디자인하는 데 도움이 되는 기본형을 제공한다.

원자성의 의미

소프트웨어 실행의 관점에서, 연산(operation)이 단일하고 분할할 수 없을 때 이 연산을 원자적(atomic)이라 한다.

동시성 애플리케이션을 실행하게 되면 공유 상태를 수정하는 코드 블록이 다른 스레드의 변경 시도와 겹치면서 이런 문제가 발생한다.

private var counter = 0

fun increment() {
		counter ++
}

수차적으로 실행하면, counter 값에 대해 걱정할 필요 없이 incremental()을 원하는 만큼 호출할 수 있다. 그러나 여기에 동시성을 추가하면 내부의 많은 것들이 바뀐다.

var counter = 0

fun asyncIncrement(by: Int) = async(CommonPool[1]) {
		for (i in 0 until by) {
				counter++
		}
}

CommonPool을 CoroutineContext로 사용해 요청한 횟수만큼 counter를 늘리고 있다.

fun main(args: Array<String>) = runBlocking {
		val workerA = asyncIncrement(2000)
		val workerB = asyncIncrement(100)

		workerA.await()
		workerB.await()

		print("counter [$counter]")
}

실행 후 counter의 값이 가끔 2100보다 낮다는 것을 알 수 있다.

counter++를 수행하는 코드가 원자적이지 않아서 발생한다. 이것이 실제로 의미하는 바는 asyncIncrement 내부의 여러 for 루프 중 여러 사이클이 counter값을 오직 한 번만 바꿨다는 것이다.

코드 블록을 원자적으로 만들려면 블록 안에서 발생하는 어떤 메모리 엑세스도 동시에 실행되지 않도록 해야 한다.

스레드 한정

스레드 한정의 개요

스레드 한정은 이름에서 알 수 있듯이, 공유 상태에 접근하는 모든 코루틴을 단일 스레드에서 실행되도록 한정하는 것을 의미한다. 상태가 더 이상 스레드 간에 공유되지 않으며 하나의 스레드만 상태를 수정한다는 뜻이다. ( 이 방법은 애플리케이션의 성능에 부정적인 영향을 미치지 않는다는 점에 한해서만 유용하다.)

액터

스레드 한정은 앞에서 언급한 시나리오에서는 잘 작동하지만 앱의 여러 다른 부분에서 공유 상태를 수정해야 하거나, 원자 블록에 더 높은 유연성을 원하는 시나리오의 경우 이를 확장하는 방법이 필요하다.

액터의 역할

액터는 두 가지 강력한 도구의 조합이다. 상태 엑세스를 단일 스레드로 한정하고 다른 스레드가 채널로 상태 수정을 요청할 수 있다. 액터를 사용하면 값을 안전하게 업데이트할 수 있을 뿐만 아니라 이를 위한 강력한 커뮤니케이션 매커니즘도 갖추게 된다.

액터 생성

여러 다른 스레드에서 counter를 안전하게 수정해야 한다고 가정해보자.

private var counter = 0
private val context = newSingleThreadContext("counterActor")

fun getCounter() = counter

이제 카운터의 값을 캡슐화했으므로 수신된 각 메시지에 따라 값을 증가시키는 액터를 추가하기만 하면 된다.

val actorCounter = actor<Void?>(context) {
		for (msg in channel) {
				counter++
		}
}

전송된 메시지를 실제로는 사용하지 않기 때문에, 단순히 액터의 유형을 Void?로 설정하면 호출자가 null을 보낼 수 있다.

fun main(args: Array<String>) = runBlocking {
		val workerA = asyncIncrement(2000)
		val workerB = asyncIncrement(100)

		workerA.await()
		workerB.await()

		print("counter [$counter]")
}
fun asyncIncrement(by: int) = async(CommonPool) {
		for (i in 0 until by) {
				actorCounter.send(null)
		}
}

액터를 사용한 기능 확장

액터는 사용하면 채널이 전체 코루틴을 원자적으로 유지하면서 더 높은 유연성을 허용한다는 점이 가장 커다란 장점이다.

액터를 사용해 카운터를 늘리거나 줄일 수 있도록 만들어보자

enum class Action {
		INCREASE,
		DECREASE
}

그런 다음에 액터와 코루틴을 업데이트해 이런 액션을 연산에 매핑할 수 있다.

var actorCounter = actor<Action>(context) {
		for (msg in channel) {
				when(msg) {
						Action.INCREASE -> counter++
						Action.DECREASE -> counter--
				}
		}
}

fun asyncDecrement(by: Int) = async {
		for (i in 0 until by) {
				actorCounter.send(Action.DECREASE)
		}
}

메인 스레드는 CommonPool을 사용해 카운터를 동시에 증가 및 감소시키는 코루틴을 생성하며, 코루틴은 액션과 함께 메시지를 액터에 보내서 이를 수행한다.

액터 상호 작용에 대한 추가 정보

클라이언트 관점에서 액터는 단순히 송신 채널이다. 그러나 구현 관점에서 액터의 수행방식을 고민해 볼 필요가 있다.

버퍼드 액터

액터는 다른 송신 채널과 동일하게 버퍼링될 수 있다. 기본적으로 모든 액터는 버퍼링되지 않는다. 메시지가 수신될 때까지 발신자를 send()에서 일시 중단한다. 버퍼링된 액터(Buffered actors)를 생성하려면 capacity 매개변수를 빌더에 전달해야 한다.

fun main(args: Array<String>) {
		val bufferedPrinter = actor<String>(capacity = 10) {
				for (msg in channel) {
						println(msg)
				}
		}

		bufferedPrinter.send("hello")
		bufferedPrinter.send("hello")

		bufferedPrinter.close()
}

CoroutineContext를 갖는 액터

counter 예제를 해결한 것처럼 액터를 생성할 때 CoroutineContext를 전달할 수 있다.

val dispatcher = newFixedThreadPoolContext(3, "pool")
val actor = actor<String>(dispatcher) {
		for (msg in channel) {
				println("Running in ${Thread.currentThread().name}")
		}
}

for (i in 1..10) {
		actor.send("a")
}

상호 배제

지금까지는 코드 블록의 모든 메모리 액세스가 단일 스레드에서 발생하도록 보장함으로써 원자성 위반을 회피했다. 두 개의 코드 블록이 동시에 실행되는 것을 피할 수 있는 또 다른 방법이 있다. 바로 상호 배제다.

상호 배제의 이해

상호 배제란 한 번에 하나의 코루틴만 코드 블록을 실행할 수 있도록 하는 동기화 매커니즘을 말한다.

코틀린 뮤텍스의 가장 중요한 특징은 블록되지 않는다는 점이다. 실행 대기 중인 코루틴은 잠금을 획득하고 코드 블록을 실행할 수 있을 때까지 일시 중단된다. 코루틴은 일시 중단되지만 일시 중단 함수를 사용하지 않고 뮤텍스를 잠글 수 있다.

뮤텍스 생성

뮤텍스를 만들려면 Mutex 클래스의 인스턴스만 생성하면 된다.

var mutext = Mutext()

fun asyncIncrement(by: Int) = async {
		for (i in 0 until by) {
				mutex.withLock {
						counter++
				}
		}
}

한 번에 하나의 코루틴만 잠금을 보유하고, 잠금을 시도하는 다른 코루틴을 일시 중단 함으로써 카운터에 대한 모든 증분이 동기화 되도록 한다. 따라서 다음과 같이 몇 번을 호출하더라도 counter의 증분 중 어느 것도 유실되지 않는다.

상호 배제와 상호 작용

대개 withLock()을 사용하는 것만으로도 충분하다. 잠금 및 잠금 해제에 대한 상세 제어가 필요하면 lock, unlock()을 사용할 수 있다.

val mutext = Mutext()

mutext.lock() // 잠금이 이미 설정된 경우 일시 중단된다. 
print("I am now an atomic block")
mutext.unlock() // 이것은 중단되지 않는다. 

isLocked 속성을 사용해 뮤텍스가 현재 잠겨 있는지 확인할 수 있다.

뮤텍스를 잠글 수 있는지 여부를 나타내는 불리언(Boolean)을 반환하는 tryLock()을 사용하기도 한다.

val mutext = Mutext()

mutext.lock() 
val lockedByMe = mutext.tryLock() // false
mutext.unlock() 

휘발성 변수

휘발성 변수는 구현하려는 스레드 안전(thread-safe) 카운터와 같은 문제를 해결하지 못한다. 그러나 휘발성 변수는 일부 시나리오에서 스레드 간에 정보를 공유해야 할 때 간단한 솔루션으로 사용될 수 있다.

스레드 캐시

JVM에서 각 스레드는 비휘발성 변수의 캐시된 사본을 가질 수 있다. 이 캐시는 항상 변수의 실제 값과 동기화되지는 않는다. 한 스레드에서 공유 상태를 변경하면 캐시가 업데이트될 때까지 다른 스레드에서는 볼 수 없다.

@Volatile

변수의 변경사항을 다른 스레드에 즉시 표시하기 위해 다음 예제에서 @Volatile 주석을 사용할 수 있다.

@Volatile
var shutdownRequested = false

@Volatile은 Kotlin/JVM에서만 사용할 수 있다. 휘발성(volatility)을 보장하는 기능을 JVM의 기능을 사용하기 때문에, 다른 플랫폼에서는 사용할 수 없다.

@Volatile이 스레드 안전 카운터 문제를 해결하지 못하는 이유

그러나 이전의 예제를 실행하면 실제 일부 변경사항이 유실되는 것을 확인할 수 있다. 이런 결과가 나온데에는 두 가지 이유가 있다.

  • 다른 스레드가 읽거나 수정하는 동안 스레드의 읽기가 발생할 때 : 두 스레드는 모두 같은 데이터로 시작해 동일한 증분을 만든다. 둘 다 카운터를 X에서 Y로 변경하므로 한 증분씩 유실된다.

  • 다른 스레드가 수정한 후 스레드의 읽기가 발생하지만, 스레드의 로컬 캐시가 업데이트되지 않았을 때: 스레드는 로컬 캐시가 제때 업데이트되지 않아서 다른 스레드가 Y로 설정한 후에도 카운터의 값을 X로 읽을 수 있다. 두 번째 스레드는 카운터의 값을 증가시키지만 오래된 값으로 시작했기 때문에 이미 변경한 내용을 덮어 쓴다.

@Volatile을 사용하는 경우

휘발성 변수를 사용해서 더 나은 코드를 작성하려면 두 가지 시나리오 전제가 참이어야 한다.

  • 변수 값의 변경은 현재 상태에 의존하지 않는다.

  • 휘발성 변수는 다른 변수에 의존하지 않으며, 다른 변수도 휘발성 변수에 의존하지 않는다.

첫 번째 전제는 스레드 안전 카운터와 같은 시나리오를 배제하는 데 도움이 된다. 반면에 두 번째 전제는 휘발성 변수의 의존성 때문에 일관성 없는 상태가 생성되는 것을 피할 수 있도록 도와준다. 이전 사례는 두 번째 전제를 따르지 않기 때문에 안전하지 않다.

class DataProcessor {
		@Volatile
		private var shutdownRequested = false

		fun shutdown() {
				shutdownRequested = true
		}

		fun process() {
				while (!shutdownRequested) {
						// process away
				}
		}
}

해당 예제에서는 두 전제 모두 유효하다.

  • shutdown()에서 작성된 shutdownRequested의 수정은 변수 자체의 현재 상태에 의존하지 않는다. 항상 true로 설정된다.

  • 다른 변수는 shutdownRequested에 의존하지 않으며, 다른 변수에도 의존하지 않는다.

원자적 데이터 구조

원자성은 원자적 데이터 구조로서 기본적으로 원자적 연산을 제공하는 데이터 구조다.

val counter = AtomicInteger()
counter.incrementAndGet() // 원자적이므로 스레드 안전 카운터를 쉽게 구현할 수 있다. 

요약

  • 공유 상태를 가지면 동시성 코드에서 문제가 될 수 있다. 스레드의 캐시와 메모리의 액세스의 원자성으로 인해 다른 스레드에서 수행한 수정 사항이 유실될 수 있다. 상태의 일관성을 해치는 원인이 된다.

  • 스레드 동기화를 할 수 있는 두 가지 방법이 있다. 하나의 스레드만 상태와 상호 작용하도록 보장해서 쓰기가 아닌 읽기 전용으로만 공유할 수 있게 하는것과, 코드 블록을 원자적으로 만들기 위해서 잠금을 사용해 코드 블록을 실행하려는 모든 스레드의 동기화를 강제하는 것이다.

  • CoroutineContext를 하나의 스레드로 된 디스패처와 함께 사용해 코루틴의 실행을 단일 스레드에서 실행되도록 강제한다.

  • 액터는 송신 채널과 코루틴의 쌍이다. 액터를 단일 스레드로 한정해 메시지를 기반으로 하기 보다는 강력한 동기화 매커니즘을 구축할 수 있다. 원하는 스레드에서 메시지를 보내 변경을 요청할 수 있지만, 변경은 특정 스레드에서 실행될 것이다.

  • 액터는 특히 코루틴의 스레드 제한과 쌍을 이뤄 이용하면 좋다. 예를 들어 액터가 스레드 풀에서 실행하도록 액터가 사용할 CoroutineContext를 지정할 수 있다.

  • 액터는 코루틴이기 때문에 여러 방식으로 시작할 수 있다.

  • 잠금을 사용해 코루틴을 동기화하기 위해 뮤텍스를 사용할 수 있다. 이렇게 하면 코루틴이 동기화된 작업을 수행할 수 있도록 기다리는 동안 코루틴을 일시중단할 수 있다.

  • JVM은 스레드의 캐시에 저장되지 않는 변수인 휘발성 변수를 제공한다. 휘발성 변수는 수정될 때 새 값은 이전 값에 의존하지 않으며 휘발성 변수의 상태는 다른 속성에 의존하지 않거나 영향을 미치지 않는 경우다.

  • 원자적 변수는 단순한 경우에 유용하지만 공유되는 상태가 하나 이상의 여러 변수인 경우 확장하기가 어려울 것이다.

  • 우리는 중요한 원칙인 정보 은닉을 실천에 옮겼다. 카운터의 구현을 숨겨서 향후에 뮤텍스, 원자적 변수, 또는 액터 없는 스레드 제한을 사용하도록 그것을 바꿀 수 있다.

Last updated