-
Spring Kafka 적용하며 발생했었던 이슈 정리실무에서 알게된 내용 2022. 6. 2. 18:02
1. Consumer exception
- 문제 상황 : Consumer 동작을 테스트하기 위해 Kafka Cluster로 메시지를 수동으로 발행하고 있었다. 이때, 오타가 발생했고 이로 인해 사전에 약속된 형태(포맷)의 메시지가 아닌 다른 형태(포맷)의 메시지를 발행했다.
- 현상 : 계속해서 아래 에러 메시지가 올라왔다. (무한 반복)
- 에러 메시지 : This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer (아래 참고)
- 에러 메시지 : This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer (아래 참고)
- 현상 이유
KafkaMessageListenerContainer 클래스 내부에 있는 ListenerConsumer 클래스의 run( ) 메소드를 보면 아래와 같이 while 형태로 구현돼 있었고 pollAndInvoke( ) 메소드를 따라가다 보면 Fetcher#parseRecord(...) 메소드를 호출하게 되는데 여기서 value를 deserialize 할 때 실패가 나면서 계속해서 에러 메시지가 올라오고 있었다.
while (isRunning()) { try { pollAndInvoke(); } catch (...) { ... } ... catch(Exception e) { handleConsumerException(e); } }
- 해결 (spring-kafka 공식 문서 참고)
When a deserializer fails to deserialize a message, Spring has no way to handle the problem, because it occurs before the poll() returns. To solve this problem, the ErrorHandlingDeserializer has been introduced. This deserializer delegates to a real deserializer (key or value). If the delegate fails to deserialize the record content, the ErrorHandlingDeserializer returns a null value and a DeserializationException in a header that contains the cause and the raw bytes.
spring: kafka: consumer: key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer properties: spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.LongDeserializer
- why deserializer로 ErrorHandlingDeserializer 클래스를 사용하면 해결이 되는 것일까?
ErrorHandlingDeserializer 클래스의 deserialize(...) 메소드를 보면 deserialize 하는 도중 Exception이 발생하면 이를 catch 해서 null을 리턴하는 로직이 있었기 때문이다.
public class ErrorHandlingDeserializer<T> implements Deserializer<T> { ... @Override public T deserialize(String topic, Headers headers, byte[] data) { try { return this.delegate.deserialize(topic, headers, data); } catch (Exception e) { // deserialize 도중 예외가 발생하면 recoverFromSupplier(...)에서 null을 리턴하고 있었음 deserializationException(headers, data, e); return recoverFromSupplier(topic, headers, data, e); } } ... }
'실무에서 알게된 내용' 카테고리의 다른 글
컬럼 값을 Collection으로 받을 때 Querydsl 이슈 (0) 2022.06.21 Grafana loki에서 에러 로그 하나가 검색이 안 됐던 이슈 (0) 2022.06.14 시스템 별로 Feign Client readTimeout 설정 때문에 내부 동작 코드 분석 (0) 2022.06.14 현재 운영하고 있는 DB(MySQL) 커넥션 수 확인하기 (0) 2022.06.03 Redis를 이용한 동시성 제어 (0) 2022.05.19