카프카 찍먹하기 3부 (feat. 컨슈머)

컨슈머의 역할

  • 컨슈머는 파티션 리더에서 메시지를 가져와 소비한다

  • 컨슈머에서 메시지를 가져오는 기준은 다양하다

  • 특정 파티션을 지정할수도 있고 오프셋을 지정해서 가져올 수도 있다

토픽과 파티션

  • 토픽의 파티션은 컨슈머 그룹 단위로 할당된다

  • 컨슈머 그룹이 다를 경우 동일한 토픽의 파티션의 메시지를 소비할 수 있다

파티션 vs 컨슈머

컨슈머가 파티션보다 많을 경우

  • 하나의 파티션은 각기 다른 하나의 컨슈머에 연결된다

  • 그러므로 4개의 파티션과 5개의 컨슈머가 있을 경우, 1개의 컨슈머는 사용하지 않는다

컨슈머가 파티션보다 적을 경우

  • 파티션에 할당된 메시지를 소비해야 하기 때문에 하나의 컨슈머가 2개 이상의 파티션을 소비하게 된다

  • 생상되는 속도가 소비되는 속도에 비해 빠를경우에는 랙(Lag)이 커지게 되며, 이는 성능저하로 이어지게 된다

컨슈머가 메시지를 가져오는 기준은?

  • 만약 해당 토픽에 컨슈머가 소비한 오프셋이 존재하지 않을 경우에는 메시지를 읽어들일 기준을 정해야 한다

  • 이는 auto.offset.reset 옵션으로 설정할 수 있다

    • earliest : 가장 처음 오프셋 부터 읽는다

    • latest : 커밋되지 않은 가장 마지막 오프셋 이후 부터 읽음 (기본값)

컨슈머 커밋 기준은 어떻게 될까?

  • 커밋 유형은 크게 두 가지로 분류할 수 있다

  • enable.auto.commit

    • 커밋을 어떻게 할지 설정한다(기본값은 true)

    • true일 경우 일정 주기로 컨슈머가 읽은 오프셋을 커밋한다

자동 커밋

  • 만약 enable.auto.commit이 true라고 한다면 자동 커밋 주기를 설정해야 한다

  • auto.commit.interval.ms : 자동 커밋 주기 (기본값 : 5초)

수동 커밋

  • 컨슈머의 ackMode에 따라서 수동 커밋하는 방법은 다르다 (기본값 : BATCH)

  • ackMode가 MANUAL이거나 MANUAL_IMMEDIATE일 경우엔, AcknowledgingMessageListener 또는 BatchAcknowledgingMessageListener 인터페이스를 구현해서 수동으로 커밋을 해주어야 한다

  • 그외 모드에서는 메시지를 소비하고 특정 기준에 따라 커밋이 이루어지게 된다

AckMode 종류

  • RECORD : 레코드 단위로 프로세싱 이후 커밋

  • BATCH : poll() 메서드로 호출된 레코드가 모두 처리된 이후 커밋

  • TIME : 설정된 AckTime 이후에 커밋

  • COUNT : 특정 개수만큼 레코드가 처리된 이후 커밋

  • COUNT_TIME : TIME, COUNT 옵션 중 맞는 조건이 하나라도 나올 경우 커밋

  • MANUAL : AcknowledgingMessageListener 또는 BatchAcknowledgingMessageListener를 리스너를 사용해서 커밋 메서드를 수행할 경우, 다음 메시지를 poll할 경우에 이전에 poll 한 메시지의 오프셋까지 커밋된다

  • MANUAL_IMMEDIATE : AcknowledgingMessageListener 또는 BatchAcknowledgingMessageListener를 리스너를 사용해서 커밋 메서드를 수행할 경우, 즉시 읽은 오프셋까지 커밋된다

MANUAL_IMMEDIATE, MANUAL 차이는 무엇인가??

  • 우선 둘다 수동으로 커밋하는 방식이다

  • 차이는 MANUAL의 경우에는 우선 메시지를 폴링하고 커밋을 한 이후에 다음 폴링을 하게되면 이전에 폴링한 오프셋까지 커밋이 된다. 그러므로 BATCH와 로직은 동일하다

  • MANUAL_IMMEDIATE 은 다음 폴링까지 확인하지 않고 현재까지 폴링한 오프셋까지 바로 커밋된다

컨슈머 조회에 영향을 주는 주요 설정

  • fetch.min.bytes : 조회시 브로커가 전송할 최소 데이터 크기를 지정한다 (기본값 : 1)

    • 최소 데이터 크기가 클수록 대기 시간은 늘지만 처리량은 증가한다

  • fetch.max.wait.ms : 데이터가 최소 크기를 충족할 때까지 대기하는 시간 (기본값 : 500)

  • max.partition.fetch.bytes : 파티션 당 서버가 리턴할 수 있는 최대 크기 (기본값 : 1MB)

중복 메시지를 소비하는 경우

어떤 경우에 중복 메시지가 발생하게 될까?

  • 컨슈머가 메시지를 소비해야 하는 최대 시간(max.poll.interval.ms)을 초과할 경우에 발생할 수 있다 → 이는 여러가지 해결 방법이 있을 수 있다

    • 우선은 소비해야 하는 최대 시간을 증가시킬 수 있다

    • 소비해야하는 레코드 사이즈를 줄일수 있다 → max.poll.records

    • 파티션과 컨슈머를 추가하여 메시지 소비에 대한 가용성을 증가시킬 수 있다

  • 수동 커밋의 경우에 구현 로직에서 커밋을 누락될 경우에도 발생

    → 구현상 오류로 로직 수정이 필요하다

  • 자동 커밋의 경우에도 파티션이나 컨슈머가 증가하면서 리밸런싱되는 경우에도 발생

    → 이는 수동커밋으로 전환해야 하며, 운영 환경에서는 수동커밋을 권장한다

컨슈머의 상태는 어떻게 확인하지?

  • 컨슈머는 일정시간 동안 heartbeat를 전송해서 유지한다

  • 일정 시간 heartbeat가 없으면 컨슈머를 그룹에서 제외하고 리밸런스 진행하게 된다

그럼 heartbeat를 주기는 어떻게 정하는건가?

  • heartbeat 주기를 정할 수도 있고, 실제로 컨슈머가 다운되었다고 간주되는 기준도 정할 수 있다

  • session.timeout.ms : 세션 타임 아웃 시간으로 해당 시간까지 heartbeat가 없으면 컨슈머가 다운되었다고 간주한다 (기본값 10초)

  • heartbeat.interval.ms : heartbeat 전송 주기(기본값 3초)

    • session.timeout.ms의 1/3 이하를 추천한다

컨슈머의 메시지 소비 기준은 어떻게 되나?

  • 몇 가지 설정을 통해서 소비되는 성능을 설정할 수 있다

  • max.poll.interval.ms : poll() 메서드의 최대 호출 간격

    • 해당 시간이 지나도록 다음 poll()하지 않으면 컨슈머를 그룹에서 제외하고 리밸런스 진행

    • 실제로 컨슈머가 정상적인 상태가 아닐지도 모르니, HeartBeat 쓰레드가 현재 시간과 마지막으로 poll 메서드가 호출된 시간의 차이를 계산하여 해당 시간의 차가 max.poll.interval.ms 보다 큰 경우 컨슈머 그룹에서 탈퇴 시킨다

  • max.poll.records : poll() 메서드의 최대 사이즈

    • 만약 소비할 메시지 사이즈가 커져 max.poll.interval.ms를 초과할 경우, max.poll.records 사이즈를 축소하여 리밸런싱 되지 않도록 해야 한다

    • max.poll.records 사이즈를 줄일수록 처리할 수 있는 데이터는 적겠지만, 수행시간은 빠르다

그럼 정리를 해보자

메시지 랙(Lack)이 증가한다면?

  • 생성되는 비율보다 소비되는 비율이 낮기 때문에, 소비되는 가용성을 증가시켜 주어야 한다

  • 파티션을 증가시키고 컨슈머를 추가해주어야 한다

메시지 토픽에 새로운 기능을 추가하고 싶다면?

  • 컨슈머 그룹을 새로 추가하면 된다

  • 서로 다른 컨슈머는 같은 토픽의 메시지를 동시에 소비할 수 있다

커밋은 어떻게 하면 좋을까?

  • 운영환경에서는 수동 커밋을 권장한다

  • AckMode에 따라서 직접 커밋을 해주어야 할 경우도 있고, 메시지를 소비되면서 커밋이 될수 있다

  • 그래도 장애를 최대한 줄일수 있는 방법은 AckMode를 MANUAL러 설정하는 것을 권장한다

정상적으로 운영중에 컨슈머가 정상적으로 소비하지 못할 경우

  • 컨슈머가 소비할 수 있는 메시지량(max.poll.records)보다 소비할 수 있는 최대 시간(max.poll.interval.ms)이 적게 설정되어 있을 경우에 컨슈머가 장애가 있다고 판단하고 리밸런싱 되는 경우가 발생할 수 있다

  • 이때는 소비되는 최대 시간을 증가시켜 임시로 해소할 수 있고, 메시지량을 줄여서 소비량을 개선할 수 있다

  • 그러나 이 해결방법은 임시적인 해결방법이며, 파티션을 증가시키고 컨슈머를 추가하여 소비되는 가용성을 확장해볼 필요가 있다

카프카 찍먹을 3부까지 진행하면서 최범균님 영상을 참고를 많이 하였습니다

덕분에 기본적인 설정이나 고려사항 등을 인지할 수 있어서 감사하다는 내용을 글 말미에 작성하고 싶었습니다

실제로는 뵙지는 못했지만 올려주신 영상으로 도움이 많이 되고 있습니다

감사합니다!!!!

참고

Last updated