카프카 transaction 처리는 어떻게 해야할까?

카프카를 사용하면서 트랜잭션 처리가 가능할까?

  • 메시지큐를 사용하는 이유는 애플리케이션의 성능의 부하를 줄이기 위해서가 가장 큰 장점일 것이다

  • 그런데 비즈니스 로직 중간에 메시지를 프로듀서하고 그 이후에 exception이 발생하면 어떻게 될까?

  • 이미 전송한 메시지는 컨슈밍되어 무조건 수행되어야만 할까?

  • 코드로 살펴보자

테스트 코드

  • 테스트할 코드는 단순하다

  • 메시지를 전송하고 전달받은 메시지를 컨슈밍한다

프로듀서

  • 파라미터로 넘어온 메시지를 TestMessage라는 객체로 전달하였다

public void sendMessage(String message) {
    ProducerRecord<String, Object> record = new ProducerRecord<>("TEST", TestMessage.builder().message(message).build());
    multiTypeKafkaTemplate.send(record);
}

컨슈머

  • 전달받은 메시지를 로그로 남겨서 컨슈밍 되는지 확인한다

비즈니스 로직

  • 임시 메서드를 생성하여 프로듀서에게 메시지를 전달한다

잘 컨슈밍 되는지 로그를 확인해보자

그럼 exception이 발생하면 어떻게 될까?

서비스로직에 exception을 추가해보자

메시지 컨슈밍되는지 확인해보자

  • exception을 발생했지만 커슈밍된것을 확인할 수 있다

그럼 트랜잭션이 실패하면 메시지 컨슈밍도 롤백하려면 어떻게 해야 할까?

  • 해결방법은 심플하다!

  • 컨슈머를 설정하는 Bean의 옵션중에 “isolation.level”을 “read_committed”으로 설정하면 된다

오잉? 그럼 기본 설정은 어떻게 되어있지?

  • 기본값은 “read_uncommitted”로 커밋이 되지 않아도 컨슈밍할수 있도록 되어있다

그렇다면 트랜잭션 처리가 되어있지 않은 경우엔 어떻게 될까?

  • 트랜잭션을 주석처리하자

  • 그리고 컨슈밍되는지 확인해보자

  • 동일하게 동작하는것을 확인할수 있다

컨슈머는 어떻게 트랜잭션 처리가 되었는지 알수 있을까?

  • 실마리는 도먼큐트에 있다

  • 우선 트랜잭션 모드가 아닌 상황에서는 컨슈머는 unread_committed과 동일하게 간주한다고 한다

  • 그리고 트랜잭션 모드이고 read_committed 일 경우에는 트랜잭션이 시작하면서 보낸 메시지는 보류되고 트랜잭션이 완료되면 오프셋을 확인하여 트랜잭션이 처리되었는지 확인후 받은 메시지를 처리한다고 되어있다

Last updated

Was this helpful?