ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Spring Kafka 프로젝트 <Producer>
    카테고리 없음 2022. 3. 13. 10:20

    아래 작성한 글은 Spring Kafka Reference 문서를 보며 스스로 학습한 내용을 토대로 작성한 글입니다. 따라서, 오류가 있을 수 있음을 먼저 말씀드립니다. 혹시, 오류를 발견하신다면 코멘트로 남겨주시면 수정하도록 하겠습니다. 

     

    • Kafka Client 2.6.0 버전 기준으로 작성되었습니다.

    1. Spring Kafka 프로젝트

    Spring Kafka프로젝트는 Spring에서 Apache Kafka를 쉽게 사용할 수 있도록 추상화하여 제공해주는 프로젝트다.

    2. Spring 프로젝트에서 Kafka를 사용하기 위해 필요한 대표적인 구성요소

    • ProducerFactory : Producer( Kafka Topic에 데이터를 전송하는 역할 )를 만드는 역할
    • ConsumerFactory : Consumer( Kafka Topic으로부터 데이터를 수신해오는 역할 )를 만드는 역할

    3. Producer : Kafka Topic에 데이터 전송

    Kafka Topic에 데이터를 전송하기 위해서는 Producer를 생성해주는 공장인 ProducerFactory가 필요하다. Spring Kafka에서는 이를 위해 ProducerFactory 인터페이스를 제공해주고 있고, default 구현체인 DefaultKafkaProducerFactory도 제공해주고 있다. 내가 하고 있는 프로젝트에서는 DefaultKafkaProducerFactory를 사용해도 무리가 없었기 때문에 DefaultKafkaProducerFactory를 사용했다.

    또한, Spring Kafka에서는 ProducerFactory로부터 생성된 Producer를 wrapping 한 KafkaTemplate을 제공해주고 있다. 따라서, 개발자는 KafkaTemplate를 사용하여 쉽게 Kafka Topic에 데이터를 전송할 수 있다. 

     

    • Spring 프로젝트에서 KafkaTemplate을 만드는 샘플 코드 (이를 기반으로 자신의 상황에 맞게 커스터마이징 하면 된다.)

     

    위 과정을 통해서 Spring 프로젝트에서 Kafka Topic으로 데이터 전송하는 것은 어렵지 않게 구현했다. 하지만, 도대체 어떤 메커니즘으로 데이터가 전송이 되는지 궁금했다. 아래는 이를 분석하고 학습한 내용이다.

    4. KafkaProducer Client 내부 구조

      • 크게 3가지 구성 요소
        • KafkaProducer
        • RecordAccumulator
        • Sender
      • 아래 이미지는 네이버 d2 블로그 KafkaProducer Client Internals 글에서 발췌했다.

    개인적으로 이 이미지가 말하고자 하는 핵심은 Kafka Topic으로 데이터를 전송하면, 내부적으로 바로 전송되는 것이 아니라 일정 시간 동안 배치(Buffer)에 담아뒀다가 Batch형태로 전송된다는 점이라고 생각한다.

     

    좀 더 자세한 flow를 알기 위해 코드를 분석해 봤다. 일단, 개발자가 Kafka Topic으로 데이터를 전송하기 위해 KafkaTemplate을 사용하여 send( ) 메소드를 호출하게 되면, 메소드 내부에서 KafkaProducer에게 send( ) 메소드 호출하여 작업을 위임하고 있었다.

     

    • KafkaTemplate# dosend( ) 메소드

    작업을 위임받은 KafkaProducer는 이를 또 한 번 RecordAccumulator에게 작업을 위임한다. 이렇게 하는 이유는 데이터를 Kafka로 바로 전송하는 것이 아니라 배치(Buffer)에 잠시 담아두기 위해서였다. 

     

    • KafkaProducer# dosend( ) 메소드

    RecordAccumulator로부터 결과를 받은 KafkaProducer는 RecordAccumulator가 반환해준 RecordAppendResult 객체를 통해 배치(Buffer)가 가득 찼는지 또는 기존 배치에 공간이 부족해서 새로운 배치(Buffer)가 만들어졌는지를 검사한다. 만약 두 가지 조건 중 하나라도 만족하면 Sender를 wakeup 한다.

     

    • KafkaProducer# dosend( ) 메소드

     

    Sender는 background 스레드 RecordAccumulator에 배치(Buffer)로 저장된 데이터를 drain( ) 메소드로 꺼내서 Kafka Cluser로 데이터를 전송하는 역할을 하고 있었다.

     

    • Sender# sendProducerData( ) 메소드

    • Sender# sendProduceRequest( ) 메소드

    5. 결론

    위 내용을 간단히 정리해 보면, 개발자가 KafkaTemplate의 send( ) 메소드를 호출했을 때 데이터가 바로 Kafka Cluster로 전송되는 것이 아니라 RecordAccumulator에 먼저 저장되고 저장된 결과가 특정 조건에 부합하면 그때, Sender Thread가 RecordAccumulator에서 데이터들을 읽어와서 전송을 하게 된다.

     

    Producer와 관련해서 새롭게 알게 되는 내용이 있다면, 계속해서 추가하겠습니다.

     

     

     

    댓글

Designed by Tistory.