티스토리 뷰
Kafka SpringBoot Quick Start with Error Handling
- kafka 는 기본적으로 메시지를 수신하고, 오류가 발생하면 메시지를 버린다.
- 이때 메시지를 재 처리하거나, 특정 작업을 수행하기 위해서 에러 핸들러를 두고 있다.
- kafka 에서는 recovery, error handling 등의 방법을 제공한다.
기본사항
- 기본적으로 Kafka 클러스터가 구축되어 있어야한다.
- Kafka SpringBoot Quick Start 를 참조하여 가장 기본적인 Kafka 어플리케이션을 구축하자.
- 작업 순서
-
- 기본 에러 핸들링에 대해서 간단하게 알아본다.
-
- 에러가 발생한경우 핸들링을 수행하는 예제를 살펴본다.
-
- 리커버리를 통해 메시지 정보를 수정하거나, 별도 처리하는 방법을 알아본다.
-
에 핸들링을 위해서 properties 생성하기.
# topic for error handle
kafka.topic-default-error=default-error-topic
kafka.topic-error-handle=error-handle-topic
kafka.topic-retry-handle=retry-handle-topic
- 위와 같이 프로퍼티를 설정했다.
- kafka.topic-default-error: 기본 에러 핸들링을 위한 토픽
- kafka.topic-error-handle: 에러 핸들링을 별도 구성하기 위한 토픽
- kafka.topic-retry-handle: retry/recovery 구성을 위한 토픽
토픽 등록하기.
- 토픽 설정을 정의했으므로 이제는 토픽을 자동생성할 수 있도록 등록하자.
- KafkaTopicConfig.java 파일을 열고 다음과 같이 코드를 추가한다.
... 생략
@Value("${kafka.topic-default-error}")
public String TOPIC_COMMON_ERROR;
@Value("${kafka.topic-error-handle}")
public String TOPIC_ERROR_HANDLER;
@Value("${kafka.topic-retry-handle}")
public String TOPIC_RETRY_HANDLER;
... 생략
private NewTopic topicCommonError() {
return TopicBuilder.name(TOPIC_COMMON_ERROR)
.partitions(3)
.replicas(2)
.build();
}
private NewTopic topicErrorHandle() {
return TopicBuilder.name(TOPIC_ERROR_HANDLER)
.partitions(3)
.replicas(2)
.build();
}
private NewTopic topicRetryHandle() {
return TopicBuilder.name(TOPIC_RETRY_HANDLER)
.partitions(3)
.replicas(2)
.build();
}
@PostConstruct
public void init() {
kafkaAdmin.createOrModifyTopics(defaultTopic());
kafkaAdmin.createOrModifyTopics(topicWithKey());
kafkaAdmin.createOrModifyTopics(topicWithPriority());
kafkaAdmin.createOrModifyTopics(topicCommonError());
kafkaAdmin.createOrModifyTopics(topicErrorHandle());
kafkaAdmin.createOrModifyTopics(topicRetryHandle());
}
- topicCommonError(): 기본 공통 에러를 위한 토픽을 설정한다.
- topicErrorHandle(): 에러 핸들링을 위한 토픽 설정
- topicRetryHandle(): Retry/Recovery 를 위한 토픽 설정
- init() 메소드에 위 생성한 3개의 토픽을 생성하도록 설정했다.
CommonError 테스트 수행하기.
KafkaConsumerConfig 수정하기
- 이제 CommonError를 등록하기 위해서 KafkaConsumerConfig.java 파일을 다음과 같이 수정한다.
... 생략
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> errorCommonHandlingKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory("errorHandlingConsumerGroup"));
// SeekToCurrentErrorHandler 의 경우, 현재 읽은 오프셋에서 에러가 발생하면 FixedBackOff 등으로 설정한 backoff 만큼 기다리다가 다시 메시지를 읽는다.
// FixedBackOff(주기(밀리세컨), 최대재시도횟수) 로 백오프를 지정했다.
factory.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(100, 2)));
factory.setConcurrency(1);
factory.setAutoStartup(true);
return factory;
}
... 생략
- setCommonErrorHandler를 이용하여 기본 에러핸들러를 등록하고 있다.
- DefaultErrorHandler 는 기본 에러 핸들러이며, 다음과 같은 기능을 제공할 수 있다.
- BackOff 제공: BackOff는 오류가 발생한경우 특정 시간동은 대기하다가, 특정 횟수만큼 재시도를 하게 된다.
- ConsumerRecordRecover: 컨슈머 레코드 리커버를 구현하여 오류 발생시 레코드를 수정 처리할 수 있다.
- FixedBackOff(대기시간, 재시도횟수): 100ms 대기했다가 재시도를 2번 수행한다. BackOff는 다양하게 지정할 수 있다.
- FixedBackOff: 고정시간동안 대기후 재시도
- ExponentialBackOff: 대기시간을 재시도 할때마다 승수로 대기하고, 재시도한다.
Listener 등록하기.
- 이제는 컨슈머가 동작하도록 리스너를 걸어주자.
- 생성된 토픽에 대해서, 컨슈머 컨테이너 팩토리를 걸어준다.
- MessageListener.java 파일에 아래와 같이 추가한다.
...생략
@KafkaListener(topics = "${kafka.topic-default-error}", containerFactory = "errorCommonHandlingKafkaListenerContainerFactory")
public void listenForDefaultErrorHandle(Object record) {
log.info("Receive Message for Default Error Handler, It will occur error: {}", record);
throw new RuntimeException("Consumer Error and Exception Occurs.");
}
...생략
- 토픽은 kafka.topic-default-error 으로 지정된 토픽에서 컨슘 한다.
- containerFactory는 방금 생성한 errorCommonHandlingKafkaListenerContainerFactory 를 이용한다.
- throw new RuntimeException("Consumer Error and Exception Occurs.") 을 이용하여 메시지가 수신하면 그대로 RuntimeException을 발생시켰다. 즉, 레코드를 수신받으면 바로 예외를 발생 시켰다.
메시지 프로듀싱 하기.
- 이제 메시지와 에러핸들링, 리커버리를 수행하기 위해서 다음과 같이 하나의 코드를 작성하고 재사용할 것이다.
- ProducerController.java 를 다음과 코드를 작성하자.
...생략
@PostMapping("produce_error/{category}")
public ResponseEntity<?> produceMessage(@PathVariable("category") String category, @RequestBody TestEntity testEntity) {
testEntity.setTime(LocalDateTime.now());
String topic = "";
if ("default-error".equals(category)) {
topic = TOPIC_COMMON_ERROR;
} else if ("error-handler".equals(category)) {
topic = TOPIC_ERROR_HANDLER;
} else if ("retry-handler".equals(category)) {
topic = TOPIC_RETRY_HANDLER;
} else {
topic = KafkaTopicConfig.DEFAULT_TOPIC;
}
ListenableFuture<SendResult<String, Object>> future = kafkaProducerTemplate.send(topic, testEntity);
future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onFailure(Throwable ex) {
log.error("Fail to send message to broker: {}", ex.getMessage());
}
@Override
public void onSuccess(SendResult<String, Object> result) {
log.info("Send message with offset: {}, partition: {}", result.getRecordMetadata().offset(), result.getRecordMetadata().partition());
}
});
return ResponseEntity.ok(testEntity);
}
- 위 코드는 /api/produce_error/{category}: URI로 요청을 한다.
- 카테고리는 [default-error|error-handler|retry-handler] 중에서 하나가 올 수 있다. 이에 따라 토픽을 결정하게 된다.
- 메시지는 이전 예지외 같이 title, contents 를 입력할 수 있다.
- 메시지 전송은 기존과 동일하게, 토픽을 지정하여 전달하고, 키가 없으므로 파티션은 RoundRobin 방식으로 결정된다.
테스트 수행하기.
- 이제 자바 어플리케이션을 실행하고 메시지를 전송해 보자.
- category 파라미터를 default-error 로 전송한다.
curl -X POST localhost:8080/api/produce_error/default-error -H "Content-Type: application/json" -d '{"title": "default-error", "contents": "contents default-error"}'
- 최초 시도
2022-03-02 13:20:56.174 INFO 3647 --- [ad | producer-1] c.s.k.controllers.ProducerController : Send message with offset: 0, partition: 1
2022-03-02 13:20:56.172 INFO 3647 --- [ntainer#0-0-C-1] c.s.k.consumers.MessageListener : Receive Message for Default Error Handler, It will occur error: ConsumerRecord(topic = default-error-topic, partition = 1, leaderEpoch = 0, offset = 0, CreateTime = 1646194856073, serialized key size = -1, serialized value size = 97, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = TestEntity(title=default-error, contents=contents default-error, time=2022-03-02T13:20:56.045359))
2022-03-02 13:20:56.283 INFO 3647 --- [ntainer#0-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-errorHandlingConsumerGroup-7, groupId=errorHandlingConsumerGroup] Seeking to offset 0 for partition default-error-topic-1
2022-03-02 13:20:56.287 ERROR 3647 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Error handler threw an exception
org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.schooldevops.kafkatutorials.consumers.MessageListener.listenForDefaultErrorHandle(java.lang.Object)' threw exception; nested exception is java.lang.RuntimeException: Consumer Error and Exception Occurs.; nested exception is java.lang.RuntimeException: Consumer Error and Exception Occurs.
- 첫번째 재시도
2022-03-02 13:20:56.675 INFO 3647 --- [ntainer#0-0-C-1] c.s.k.consumers.MessageListener : Receive Message for Default Error Handler, It will occur error: ConsumerRecord(topic = default-error-topic, partition = 1, leaderEpoch = 0, offset = 0, CreateTime = 1646194856073, serialized key size = -1, serialized value size = 97, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = TestEntity(title=default-error, contents=contents default-error, time=2022-03-02T13:20:56.045359))
2022-03-02 13:20:56.787 INFO 3647 --- [ntainer#0-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-errorHandlingConsumerGroup-7, groupId=errorHandlingConsumerGroup] Seeking to offset 0 for partition default-error-topic-1
2022-03-02 13:20:56.787 ERROR 3647 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Error handler threw an exception
org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.schooldevops.kafkatutorials.consumers.MessageListener.listenForDefaultErrorHandle(java.lang.Object)' threw exception; nested exception is java.lang.RuntimeException: Consumer Error and Exception Occurs.; nested exception is java.lang.RuntimeException: Consumer Error and Exception Occurs.
- 두번째 재시도
- 결과적으로 최초 시도 이후 우리가 지정한 2번의 재시도가 발생했다.
- 기본 에러 핸들러의 경우 BackOff 를 지정하면, 지정된 시간 이후 지정된 수 만큼 재시도를 수행한다.
- 보통 이런 경우는, 메시지 처리가 다른 시스템에 데이터를 전송하거나 할때 커넥션이 비정상 적이어서 컨슘 처리가 실패난 경우 유용하다.
ErrorHandler 처리 수행하기.
- 이번에는 ErrorHandler를 지정해보자.
- 이미 프로퍼티와 토픽을 등록해 두었으므로, 컨슈머 팩토리와, 리스너만 추가로 등록하면 된다.
KafkaConsumerConfig 수정하기
- 이제 ErrorHandler 등록하기 위해서 KafkaConsumerConfig.java 파일을 다음과 같이 수정한다.
... 생략
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> errorHandlingKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory("errorHandlingConsumerGroup"));
// SeekToCurrentErrorHandler 의 경우, 현재 읽은 오프셋에서 에러가 발생하면 FixedBackOff 등으로 설정한 backoff 만큼 기다리다가 다시 메시지를 읽는다.
// FixedBackOff(주기(밀리세컨), 최대재시도횟수) 로 백오프를 지정했다.
factory.setErrorHandler(new ErrorHandler() {
@Override
public void handle(Exception thrownException, ConsumerRecord<?, ?> data) {
log.error("Error is {} : data {}", thrownException.getMessage(), data);
}
});
factory.setConcurrency(1);
factory.setAutoStartup(true);
return factory;
}
... 생략
- 위와 같이 컨슈머 팩토리를 생성하고 에러 핸들러를 등록했다.
- ErrorHandler의 경우 인터페이스이며, 이 인터페이스를 우 코드와 같이 구현했다.
- 위 내용은 handle라는 메소드에서 처리를 수행하며, 단순하게 로그만 처리했다. 필요한경우 이 핸들러 메시지에서 다양한 작업을 할 수 있다.
- 오류를 확인하고, 메시지(레코드)를 수정하거나, 데드레터로 처리하고 재사용하거나, 오류 로그만 별도로 남겨 후처리를 수행할 수도 있다.
Listener 등록하기.
- 이전 작업과 동일하게 Listener를 등록해보자.
- MessageListener.java 코드에 다음 내용을 추가하자.
...생략
@KafkaListener(topics = "${kafka.topic-error-handle}", containerFactory = "errorHandlingKafkaListenerContainerFactory")
public void listenForErrorHandle(Object record) {
log.info("Receive Message for Error Handler, It will occur error: {}", record);
throw new RuntimeException("Consumer Error and Exception Occurs.");
}
...생략
- listenForErrorHandle메소드에서 들어온 메시지를 로깅하고 바로 런타임 오류를 발생 시켰다.
테스트 수행하기.
- 이제 에러 핸들러를 테스트해보자.
- category 파라미터를 error-handler 로 전송한다.
curl -X POST localhost:8080/api/produce_error/error-handler -H "Content-Type: application/json" -d '{"title": "error-handler", "contents": "contents error-handler"}'
2022-03-02 13:34:48.543 INFO 3647 --- [ad | producer-1] c.s.k.controllers.ProducerController : Send message with offset: 0, partition: 0
2022-03-02 13:34:48.542 INFO 3647 --- [ntainer#5-0-C-1] c.s.k.consumers.MessageListener : Receive Message for Error Handler, It will occur error: ConsumerRecord(topic = error-handle-topic, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1646195688485, serialized key size = -1, serialized value size = 98, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = TestEntity(title=error-handler, contents=contents error-handler, time=2022-03-02T13:34:48.480896))
2022-03-02 13:34:48.543 ERROR 3647 --- [ntainer#5-0-C-1] c.s.k.configs.KafakConsumerConfig : Error is Listener method 'public void com.schooldevops.kafkatutorials.consumers.MessageListener.listenForErrorHandle(java.lang.Object)' threw exception; nested exception is java.lang.RuntimeException: Consumer Error and Exception Occurs.; nested exception is java.lang.RuntimeException: Consumer Error and Exception Occurs. : data ConsumerRecord(topic = error-handle-topic, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1646195688485, serialized key size = -1, serialized value size = 98, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = TestEntity(title=error-handler, contents=contents error-handler, time=2022-03-02T13:34:48.480896))
- 메시지를 전송하였고, 이후에 Listener가 메시지를 수신하였다.
- Listener는 RuntimeException 을 생성했다.
- 이후 에러 핸들러가 에러를 캐치하고, 메시지를 출력하고 그대로 종료하고 있다.
- 우히는 이 부분에서 메시지를 다양한 방법으로 처리할 수 있을 것이다.
- 데드레터 처리하기
- 에러 로그 남기기
- 에러 내용과 에러 사항을 DB에 남기고 재처리하기
Retry/Recovery 처리하기.
- 이제는 리커버리를 수행해 보자.
- 리커버리는 예외를 확인하고, 레코드를 수정하는 작업이다.
- 사전에 프로퍼티와 토픽을 생성해 두었으므로 카프카 컨슈머 팩토리와 리스너만 등록하도록 하겠다.
Kafka Consumer Factory 작성하기.
- 카프카 컨슈머 팩토리를 등록하기 위해서 KafkaConsumerConfig.java 파일에 다음 내용에 추가하자.
...생략
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> recoveryHandlingKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory("retryHandlingConsumerGroup"));
factory.setRetryTemplate(retryTemplate());
factory.setRecoveryCallback(new RecoveryCallback<Object>() {
@Override
public Object recover(RetryContext retryContext) throws Exception {
ConsumerRecord consumerRecord = (ConsumerRecord) retryContext.getAttribute("record");
log.info("Recovery is called for message {} ", consumerRecord.value());
return null;
}
});
factory.setConcurrency(1);
factory.setAutoStartup(true);
return factory;
}
private RetryTemplate retryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryPolicy(getSimpleRetryPolicy());
return retryTemplate;
}
private SimpleRetryPolicy getSimpleRetryPolicy() {
Map<Class<? extends Throwable>, Boolean> exceptionMap = new HashMap<>();
exceptionMap.put(RetryTestException.class, true);
return new SimpleRetryPolicy(5,exceptionMap,true);
}
...생략
- recoveryHandlingKafkaListenerContainerFactory 는 커넥션 팩토리를 생성하는 메소드이다.
- setRetryTemplate를 보면 재시도에 대한 처리를 어떤 방식으로 수행할지 지정하는 것이다. retryTemplate() 의 결과인 RetryTemplate를 전달한다.
- setRecoveryCallback를 보면 리커버리를 어떻게 수행할지에 대한 콜백 처리를 수행하도록 한다.
- RecoveryCallback 인터페이스의 구현체가 setRecoveryCallback에 전달되며, 이 작업은 메시지를 읽고 단순하게 로깅하고 있다.
- retryTemplate() 메소드드를 구현한다.
- getSimpleRetryPolicy() 메소드의 경우 재시도를 어떤 방식으로 할지 재시도시 어떤 예외일때 재시도를 할지, 재시도 횟수는 얼마일지 지정한다.
- 여기서는 RetryTestException 클래스인경우 재시도를 수행한다.
- 재시도 횟수는 5번을 수행하게 된다.
- SimpleRetryPolicy의 마지막 파라미터는 오류 원인을 탐색할지 알려준다.
- 즉 위 내용은 RetryTestException.class 인경우 예외처리를 재시도 하며, 재시도 횟수는 5번 수행한다.
- 그리고 리커버의 경우 로깅을 출력하고 작업을 마치는 것이다.
Listener 등록하기.
- 이전 작업과 동일하게 Listener를 등록해보자.
- MessageListener.java 코드에 다음 내용을 추가하자.
@KafkaListener(topics = "${kafka.topic-retry-handle}", containerFactory = "recoveryHandlingKafkaListenerContainerFactory")
public void listenForRetryHandle(Object record) {
log.info("Receive Message for Retry Handler, It will occur error: {}", record);
throw new RetryTestException("Consumer Error and Exception Occurs.");
}
- 위와같이 listenForRetryHandle 를 확인하면 메시지를 받고나서 RetryTestException을 던진다.
- 이 경우에는 재시도 정책에 해당하는 예외임을 확인할 수 있다.
테스트 수행하기.
- 이제 에러 핸들러를 테스트해보자.
- category 파라미터를 retry-handler 로 전송한다.
curl -X POST localhost:8080/api/produce_error/retry-handler -H "Content-Type: application/json" -d '{"title": "retry-handler", "contents": "contents retry-handler"}'
2022-03-02 14:09:45.857 INFO 3647 --- [ad | producer-1] c.s.k.controllers.ProducerController : Send message with offset: 0, partition: 0
2022-03-02 14:09:45.892 INFO 3647 --- [ntainer#6-0-C-1] c.s.k.consumers.MessageListener : Receive Message for Retry Handler, It will occur error: ConsumerRecord(topic = retry-handle-topic, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1646197785564, serialized key size = -1, serialized value size = 97, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = TestEntity(title=retry-handler, contents=contents retry-handler, time=2022-03-02T14:09:45.542093))
2022-03-02 14:09:45.894 INFO 3647 --- [ntainer#6-0-C-1] c.s.k.consumers.MessageListener : Receive Message for Retry Handler, It will occur error: ConsumerRecord(topic = retry-handle-topic, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1646197785564, serialized key size = -1, serialized value size = 97, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = TestEntity(title=retry-handler, contents=contents retry-handler, time=2022-03-02T14:09:45.542093))
2022-03-02 14:09:45.895 INFO 3647 --- [ntainer#6-0-C-1] c.s.k.consumers.MessageListener : Receive Message for Retry Handler, It will occur error: ConsumerRecord(topic = retry-handle-topic, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1646197785564, serialized key size = -1, serialized value size = 97, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = TestEntity(title=retry-handler, contents=contents retry-handler, time=2022-03-02T14:09:45.542093))
2022-03-02 14:09:45.895 INFO 3647 --- [ntainer#6-0-C-1] c.s.k.consumers.MessageListener : Receive Message for Retry Handler, It will occur error: ConsumerRecord(topic = retry-handle-topic, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1646197785564, serialized key size = -1, serialized value size = 97, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = TestEntity(title=retry-handler, contents=contents retry-handler, time=2022-03-02T14:09:45.542093))
2022-03-02 14:09:45.895 INFO 3647 --- [ntainer#6-0-C-1] c.s.k.consumers.MessageListener : Receive Message for Retry Handler, It will occur error: ConsumerRecord(topic = retry-handle-topic, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1646197785564, serialized key size = -1, serialized value size = 97, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = TestEntity(title=retry-handler, contents=contents retry-handler, time=2022-03-02T14:09:45.542093))
2022-03-02 14:09:45.895 INFO 3647 --- [ntainer#6-0-C-1] c.s.k.configs.KafakConsumerConfig : Recovery is called for message TestEntity(title=retry-handler, contents=contents retry-handler, time=2022-03-02T14:09:45.542093)
- 위 내용은 한번의 메시지를 수신받고, 이후 5번의 재시도를 수행하는 것을 확인할 수 있다.
- 5번 이후 최종적으로 Recovery 작업이 수행되었다. 'Recovery is called for message' 으로 확인할 수 있다.
- 이 경우 외부 시스템과 통신할대 오류가 발생했다고 가정하면 이 문제가 해결되기 까지 5번 정도의 재시도를 수행하며, 최종적으로 동일한 오류가 발생하는경우 Recovery를 수행하게 된다.
WrapUP
- 지금까지 몇가지 예외 상황을 처리하는 방법을 알아 보았다.
- CommonErrorHandler를 수행하는 방법, 에러 핸들러로 필요한 작업을 수행하는 방법, 그리고 Retry/Recovery를 수행하는 방법또한 알아보았다.
- 시스템의 요구사항에 따라 선택을 할 수 있으며, 배치 에러 핸들링등 다양한 에러 핸들링 도구와 방법들이 있으므로 찾아보면서 구현하면 좋을듯 하다.
'Kafka' 카테고리의 다른 글
[Kafka] AWS MSK Install guide (0) | 2022.04.11 |
---|---|
[Kafka] 스프링부트 Kafka 6 - Transaction (0) | 2022.04.11 |
[Kafka] 스프링부트 Kafka 4 - 우선순위 Queue (0) | 2022.04.11 |
[Kafka] 스프링부트 Kafka 3 - Partition Key (0) | 2022.04.11 |
[Kafka] 스프링부트 Kafka 2 - 수동 컨슈밍하기 (0) | 2022.04.11 |
공지사항
최근에 올라온 글
최근에 달린 댓글
- Total
- Today
- Yesterday
링크
TAG
- docker
- MongoDB
- jenkins
- mongo
- Golang
- Gorilla
- NodeSelector
- jpa
- gitops
- CD
- docker-compose
- Terraform
- CI
- argocd
- deploy
- kafka-springboot
- AWS
- springboot
- mapping
- Database
- Kafka
- D3
- go
- kubectl
- java
- MySQL
- Spring
- kubernetes
- declative
- tfsec
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | ||
6 | 7 | 8 | 9 | 10 | 11 | 12 |
13 | 14 | 15 | 16 | 17 | 18 | 19 |
20 | 21 | 22 | 23 | 24 | 25 | 26 |
27 | 28 | 29 | 30 |
글 보관함