티스토리 뷰
Kafka SpringBoot Quick Start with Manual Consume
- 지금까지 메시지를 프로듀스 하면, 컨슈머 리스너가 브로커의 상태를 검사하고, 메시지가 들어오면 리스너가 메시지를 가져가는 방식을 알아 보았다.
- 일반적으로 Listener가 메시지를 가져가겠지만, 필요할때 직접 메시지를 가져가게 하고 싶다면 Manual로 메시지를 수신할 수 있다.
기본사항
- 기본적으로 Kafka 클러스터가 구축되어 있어야한다.
- Kafka SpringBoot Quick Start 를 참조하여 가장 기본적인 Kafka 어플리케이션을 구축하자.
Manual Consumer 작성하기.
- 기본적인 설정은 이미 작성한 Kafka SpringBoot Quick Start 의 내용에 다음과 같이 추가하자.
- KafkaConsumerConfig.java 파일에 다음과 같이 수동 컨슈머를 등록한다.
/**
* 수동 컨슈머를 작성한다.
* 기존과 다른것은 Consumer 객체를 반환하는 것이다.
* @return 컨슈머를 반환합니다.
*/
@Bean
public Consumer<String, Object> manualConsumer() {
return consumerFactory("manualConsumerGroup").createConsumer();
}
- 수동 컨슈머는 consumerFactory 에서 createConsumer() 로 생성할 수 있다.
- 참고로 아래 consumerFactory는 다음과 같이 이전 예제로 작성되었음을 확인하자.
private ConsumerFactory<String, Object> consumerFactory(String groupId) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
JsonDeserializer<Object> jsonDeserializer = new JsonDeserializer<>();
// Deserialize에 대해서 신뢰하는 패키지를 지정한다. "*"를 지정하면 모두 신뢰하게 된다.
jsonDeserializer.addTrustedPackages("*");
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), jsonDeserializer);
}
ManualConsumerService 작성하기.
- 메뉴얼 컨슈머를 사용하기 위해서는 ManualConsumerService.java 파일을 아래와 같이 작성하자.
package com.schooldevops.kafkatutorials.consumers;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.springframework.stereotype.Service;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
@Slf4j
@Service
public class ManualConsumerService {
private final Consumer<String, Object> manualConsumer;
public ManualConsumerService(Consumer<String, Object> manualConsumer) {
this.manualConsumer = manualConsumer;
}
public List<Object> receiveMessages(String topicName, int partition, int offset) {
TopicPartition topicPartition = new TopicPartition(topicName, partition);
manualConsumer.assign(Arrays.asList(topicPartition));
manualConsumer.seek(topicPartition, offset);
ConsumerRecords<String, Object> records = manualConsumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, Object> record: records) {
log.info("Receive Manually: {}", record);
}
manualConsumer.unsubscribe();
return StreamSupport.stream(records.spliterator(), false)
.map(r -> r.value())
.collect(Collectors.toList());
}
}
- 서비스에서는 receiveMessages 메소드를 작성한다.
- 토픽 이름과, 파티션을 지정, 그리고 오프셋을 파라미터로 전달하면, 해당 토픽의 특정 파티션, 오프셋에 해당하는 레코드를 가져올 수 있다.
- TopicPartition 을 이용하면 토픽 이름과 파티션 번호를 통해서 특정 파티션을 가져올 수 있다.
- seek 메소드의 경우 특정 파티션의 특정 오프셋에 해당하는 값을 혹득할 수 있다.
- poll 메소드는 실제 파티션에서 레코드를 획득한다.
컨트롤러 작성하기.
- 이제 테스트를 위해서 컨트롤러를 생성하고, 메시지 수신을 위한 요청을 수행하자.
- 메시지를 수신받기 위해서 파티션 번호, 오프셋 번소를 지정해서 레코드를 가져올 수 있다.
package com.schooldevops.kafkatutorials.controllers;
import com.schooldevops.kafkatutorials.configs.KafkaTopicConfig;
import com.schooldevops.kafkatutorials.consumers.ManualConsumerService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
@Slf4j
@RequestMapping("api/consumer")
public class ConsumerController {
private final ManualConsumerService manualConsumerService;
public ConsumerController(ManualConsumerService manualConsumerService) {
this.manualConsumerService = manualConsumerService;
}
@GetMapping("consume")
public ResponseEntity<?> getMessage(
@RequestParam(value = "partition", required = false, defaultValue = "0") Integer partition,
@RequestParam(value = "offset", required = false, defaultValue = "0") Integer offset
) {
return ResponseEntity.ok(manualConsumerService.receiveMessages(KafkaTopicConfig.DEFAULT_TOPIC, partition, offset));
}
}
- 컨슈머 컨틀롤러를 지정할때, 토픽이름, 파티션번호, 오프셋 번호를 전달하는 것을 확인하루 수 있다.
테스트해보기.
- 이제 테스틀 수행해 보자.
브로커를 실행한다.
docker-compose -f docker-compose.yaml up -d
[+] Running 7/7
⠿ Network kafka-handson_default Created 0.0s
⠿ Container kafka-handson-zookeeper-2-1 Started 0.8s
⠿ Container kafka-handson-zookeeper-3-1 Started 0.8s
⠿ Container kafka-handson-zookeeper-1-1 Started 0.8s
⠿ Container kafka-handson-kafka-1-1 Started 2.1s
⠿ Container kafka-handson-kafka-2-1 Started 2.0s
⠿ Container kafka-handson-kafka-3-1 Started
- dicker-compose는 로컬에 각각 3개이 zookeeper와 kafka 를 각각 실행된다.
메시지 요청하기.
curl -X POST localhost:8080/api/produce -H "Content-Type: application/json" -d '{"title": "TestSend2", "contents": "contents"}'
{"title":"TestSend2","contents":"contents","time":"2022-02-22T17:01:15.569961"}
- 메시지를 POST로 전달하였다. 메시지 전송 결과를 확인할 수 있다.
메뉴얼 메시지 수신하기.
Receive Message from DEF_TOPIC, values ConsumerRecord(topic = DEF_TOPIC, partition = 1, leaderEpoch = 3, offset = 0, CreateTime = 1645516875578, serialized key size = -1, serialized value size = 80, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = TestEntity(title=TestSend2, contents=contents, time=2022-02-22T17:01:15.569961))
- 이는 이전에 만든 consumer listener 가 동작한 것이다.
- 우리는 수동으로 메시지를 요청하기 위해서 터미널에 다음과 같이 내용을 입력하자.
curl -X GET 'localhost:8080/api/consumer/consume?partition=0&offset=0'
[{"title":"TestSend2","contents":"contents","time":"2022-02-22T17:01:15.569961"}]
- GET으로 정상적으로 메시지를 컨슘했음을 확인할 수 있다.
- 보낸 시간과 받은 시간이 동일함을 확인하자.
수신 결과 다시보기.
- 우리는 이전에 생성한 컨슈머 Listener를 통해서 메시지를 받았고, 또한 curl을 이용하여 REST API 를 통해서 동일한 메시지를 받았다.
- 이 이유는 ConsumerListener의 경우 그룹 아이디가 'defaultGroup' 이었고, ManualConsumer의 경우 그룹 아이디를 'manualConsumerGroup' 으로 작업했음을 확인하자.
- 그룹 아이디가 다르다면, 토픽내 동일 메시지를 서로다른 그룹아이디 내부 컨슈머는 각각 메시지를 수신하게 된다.
- 우리의 예제는 파티션을 2개 생성했다.
- 예제에서 동일하게 수동으로 메시지를 수신하기 위해서는 파티션 번호를 1로 지정해서 메시지를 수신하면 메시지를 확인할 수 있을 것이다.
- 또한 curl 내용에서 offset값을 0으로 매번 호출하면 파티션의 데이터중 첫번째 메시지부터 전달된 메시지 모두를 읽게 된다.
WrapUp
- 지금까지 수동으로 메시지를 컨슘하는 방법을 알아 보았다.
- 수동 컨슘을 위해서 Consumer 객체를 이용하며, 이를 이용하여 수신하기 위해서는 Topic, Partition, offset을 지정하여 해당 메시지를 수신할 수 있다.
- 컨슈머 그룹 아이디가 다르게 지정됨으로 해서 동일한 데이터를 컨슈머 그룹마다 각각 메시지를 읽을 수 있게 된다.
- 파티션을 2개 이상 기동하였다면, 메시지는 라운드 로빈으로 분할되어서 파티션에 들어가게 된다. 그러므로 수동으로 메시지를 수신할때 파티션을 지정해야한다.
- 오프셋은 메시지를 수신받는 위치를 나타낸다. 오프셋 0으로 지정하면 메시지는 항상 처음부터 읽게 된다.
'Kafka' 카테고리의 다른 글
[Kafka] 스프링부트 Kafka 4 - 우선순위 Queue (0) | 2022.04.11 |
---|---|
[Kafka] 스프링부트 Kafka 3 - Partition Key (0) | 2022.04.11 |
[Kafka] 스프링부트 Kafka 1 - 기본편 (0) | 2022.04.11 |
[Kafka] Docker Compose로 멀티 클러스터 설치하기 (0) | 2022.04.11 |
[Kafka] Docker Compose로 설치하기 (0) | 2022.04.11 |
공지사항
최근에 올라온 글
최근에 달린 댓글
- Total
- Today
- Yesterday
링크
TAG
- kubectl
- java
- Database
- mongo
- Spring
- go
- argocd
- Golang
- kubernetes
- NodeSelector
- jenkins
- CD
- tfsec
- deploy
- jpa
- docker-compose
- kafka-springboot
- MySQL
- docker
- D3
- declative
- Kafka
- CI
- mapping
- Gorilla
- AWS
- Terraform
- gitops
- MongoDB
- springboot
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | ||
6 | 7 | 8 | 9 | 10 | 11 | 12 |
13 | 14 | 15 | 16 | 17 | 18 | 19 |
20 | 21 | 22 | 23 | 24 | 25 | 26 |
27 | 28 | 29 | 30 |
글 보관함