ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Spring Kafka 적용하며 발생했었던 이슈 정리
    실무에서 알게된 내용 2022. 6. 2. 18:02

    1. Consumer exception

    • 문제 상황 : Consumer 동작을 테스트하기 위해 Kafka Cluster로 메시지를 수동으로 발행하고 있었다. 이때, 오타가 발생했고 이로 인해 사전에 약속된 형태(포맷)의 메시지가 아닌 다른 형태(포맷)의 메시지를 발행했다.
    • 현상 : 계속해서 아래 에러 메시지가 올라왔다. (무한 반복)
      • 에러 메시지 : This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer (아래 참고) 

    • 현상 이유

    KafkaMessageListenerContainer 클래스 내부에 있는 ListenerConsumer 클래스의 run( ) 메소드를 보면 아래와 같이 while 형태로 구현돼 있었고 pollAndInvoke( ) 메소드를 따라가다 보면 Fetcher#parseRecord(...) 메소드를 호출하게 되는데 여기서 value를 deserialize 할 때 실패가 나면서 계속해서 에러 메시지가 올라오고 있었다. 

    while (isRunning()) {
        try {
            pollAndInvoke();
        } catch (...) {
            ...
        }
        ...
        catch(Exception e) {
            handleConsumerException(e);
        }
    }
    • 해결 (spring-kafka 공식 문서 참고)

    When a deserializer fails to deserialize a message, Spring has no way to handle the problem, because it occurs before the poll() returns. To solve this problem, the ErrorHandlingDeserializer has been introduced. This deserializer delegates to a real deserializer (key or value). If the delegate fails to deserialize the record content, the ErrorHandlingDeserializer returns a null value and a DeserializationException in a header that contains the cause and the raw bytes. 

    spring:
      kafka:
        consumer:
          key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
          value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
          properties:
            spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
            spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.LongDeserializer
    • why deserializer로 ErrorHandlingDeserializer 클래스를 사용하면 해결이 되는 것일까? 

    ErrorHandlingDeserializer 클래스의 deserialize(...) 메소드를 보면 deserialize 하는 도중 Exception이 발생하면 이를 catch 해서 null을 리턴하는 로직이 있었기 때문이다.

    public class ErrorHandlingDeserializer<T> implements Deserializer<T> {    
    	...
        @Override
    	public T deserialize(String topic, Headers headers, byte[] data) {
    		try {
    			return this.delegate.deserialize(topic, headers, data);
    		}
    		catch (Exception e) {
                 // deserialize 도중 예외가 발생하면 recoverFromSupplier(...)에서 null을 리턴하고 있었음
    			deserializationException(headers, data, e);
    			return recoverFromSupplier(topic, headers, data, e);
    		}
    	}
        ...
    }

    댓글

Designed by Tistory.