티스토리 뷰
Kafka SpringBoot Quick Start
- 지금까지 Kafka 클러스터 구성을 하였으니 이제는 Kafka 샘플을 작성하자.
- Kafka 샘플을 구성하고, 간단하게 메시지를 생산하고, 소비해볼 것이다.
작업순서
- Sample Code 템플릿 생성
- SpringBoot Kafka 환경 설정
- Kafka Topic 생성
- Kafka Producer 생성
- Kafka Consumer 생성
- Kafka 메시지 Key로 라우팅 결정하기.
1. Sample Code 템플릿 새성
- https://start.spring.io/ 에서 아래와 같이 프로젝트를 생성하자.
- Project: Maven
- Language: Java
- Spring Boot: 2.6.3
- Project Metadata: 프로젝트 메타데이터를 화면과 같이 지정
- Dependencies
- Spring Web: 스프링부트 웹 모듈 추가
- Lombok: 보일러 플레이트 코드를 제거해주는 유용한 도구 추가
- Spring for Apache Kafka: 카프카를 통해 메시징 처리를 위한 모듈
- 위와 같이 설정한 후 "GENERATE" 버튼을 클릭하고 템플릿 코들르 다운로드 받는다.
Kafka 환경설정
- 이제 우리가 생성한 Kafka Cluster에 접근하기 위해서 환경설정을 수행하자.
- application.properties 에 다음과 같이 kafka 접속을 위한 설정을 수해한다.
# Kafka endpoint for endpoint
kafka.bootstrap-servers=localhost:29092,localhost:39092,localhost:49092
- 위 내용은 kafkaAdmin, kafka producer, kafka consumer 엔드포인트를 위한 설정이다.
- bootstrap-servers 는 kafka 브로커를 위한 엔드포인트 경로이다.
- docker-compose를 통해 29092, 39093, 49094 포트를 노출하도록 설계 하였다.
Kafka 설정을 위한 Configs 생성하기.
- yaml 을 설정했다면 이제는 커뮤니테이션을 위한 Topic을 설정하자.
- 어플리케이션에서 토픽을 생성하기 위해서는 KafkaAdmin 을 이용한다.
- 토픽이 생성된 후 Producer를 위한 Config를 작성할 것이다.
KafkaAdmin 설정 작성하기.
- KafkaAdmin 은 기본적으로 localhost:9092 에 접속하도록 설정이 되어 있다.
- 그러므로 bootstrapServer로 접근할 수 있도록 다음과 같이 KafkaAdminConfig.java 파일을 만들자.
package com.schooldevops.kafkatutorials.configs;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaAdmin;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaAdminConfig {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServer;
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
return new KafkaAdmin(configs);
}
}
- 위 내용을 보면 우선 application.properties에 정의한 kafka.bootstrap-servers 의 값을 조회하고 있다.
- KafkaAdmin을 생성할때 config를 만들고, bootstrapServers 를 지정했다.
- AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG 설정값은 bootstrap_server_config 서버를 지정해 줄 수 있다.
kafkaTopic 생성하기.
- KafkaAdmin 빈을 등록하였다면 이제는 Topic을 생성할 차례이다.
- TOPIC의 경우 필요한경우 어플리케이션에서 생성하기 위해서는 KafkaAdmin을 이용하며, 고정된 TOPIC을 이용한다면 Kafka cli 툴을 이용하여 토픽을 생성할 수 있다.
- 우리는 여기서 어플리케이션에서 생상할 수 있도록 할 것이다.
package com.schooldevops.kafkatutorials.configs;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.config.TopicConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.KafkaAdmin;
import javax.annotation.PostConstruct;
@Configuration
public class KafkaTopicConfig {
public final static String DEFAULT_TOPIC = "DEF_TOPIC";
@Autowired
private KafkaAdmin kafkaAdmin;
private NewTopic defaultTopic() {
return TopicBuilder.name(DEFAULT_TOPIC)
.partitions(2)
.replicas(2)
.build();
}
@PostConstruct
public void init() {
kafkaAdmin.createOrModifyTopics(defaultTopic());
}
}
- 토픽 이름을 지정했다. 우리의 토픽은 DEF_TOPIC 으로 지정했음을 확인하자.
- KafkaAdmin 빈을 이용할 것이므로 Autowired 했다.
- NewTopic 객체를 생성하도록 함수를 하나 만들었다. 토픽 이름은 DEF_TOPIC 이며, 파티션은 2개, 복제 계수는 2개로 잡았다.
- 우리의 kafka cluster가 3개의 브로커로 구성하였으므로 위와 같이 파티션 2개, 복제계수 2개로 지정했다.
- @PostConstruct는 빈이 생성되고 나면 마지막으로 수행되는 어노테이션이다.
- 이 어노테이션에서 createOrModifyTopics 메소드를 이용하여 토픽을 생성하거나, 변경하도록 한다.
Topic 테스트하기.
- 이제 우리는 topic을 생성했다.
- 이를 확인하기 위해서 다음 커맨드로 확인을 하자. multi-broker docker-compose 을 이용했다고 가장하고 다음과 같이 docker-compose.yaml 파일이 존재하는 디렉토리에서 실행해야한다.
docker-compose exec kafka-1 kafka-topics --describe DEF_TOPIC --bootstrap-server kafka-1:9092
Topic: DEF_TOPIC TopicId: CXOleS0LScWBLSUUUODjAg PartitionCount: 2 ReplicationFactor: 2 Configs:
Topic: DEF_TOPIC Partition: 0 Leader: 2 Replicas: 2,3 Isr: 2,3
Topic: DEF_TOPIC Partition: 1 Leader: 3 Replicas: 3,1 Isr: 3,1
- 커맨드 실행결과 정상적으로 파티션 2개, 복제계수 2개를 수행하고 있음을 알 수 있다.
Kafka Producer 설정하기
- 토픽을 생성했다면 이제는 Producer를 작성해 보자.
- Producer는 ProducerFactory를 이용하며, 메시지를 전송하기 위해서는 KafkaProducerTemplate 를 이용한다.
- KafkaProducerConfig.java 파일을 생성하고 다음과 같이 추가하자.
package com.schooldevops.kafkatutorials.configs;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaProducerConfig {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServer;
private ProducerFactory<String, Object> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Object> kafkaProducerTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
- 위 내용은 bootstrapServer 를 application.properties에서 가져오도록 @Value 를 이용했다.
- 그리고 ProducerFactory를 지정하여, 키는 String, 값은 Object 를 갖도록 팩토리를 지정했다.
- 이후 ProducerFactory를 위한 설정값을 다음과 같이 지정한다.
- ProducerConfig.BOOTSTRAP_SERVERS_CONFIG: kafka broker 의 엔드포인트이다.
- ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG: kafka에 전송될 키는 스트링으로 지정했다.
- ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG: kafka에 전송될 값을 직렬화 하는 방법을 지정한다. 여기서는 JsonSerializer를 이용한다. 보통 JSON이 가장 많이 쓰인다고 볼 수 있다.
- KafkaTemplate은 Kafka와 오퍼레이션을 수행하기 위한 편의 기능을 가진 도구이다. 우리는 여기서 producerFactory를 추가하여 kafka와 오퍼레이션을 수행할 수 있는 설정값을 전달하고 있다.
메시지 객체 작성하기.
- 우리가 kafka에 produce하고 consume할 메시지 객체를 만들자.
- 파일이름은 TestEntity로 작성했다. TestEntity.java 를 다음과 같이 작성하자.
package com.schooldevops.kafkatutorials.entities;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.LocalDateTime;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class TestEntity {
private String title;
private String contents;
private LocalDateTime time;
}
- 단순하게 title, contents, time 을 갖는 메시지 객체를 만들었다.
ProducerController 작성하기.
- 이제는 메시지를 전송할 차례이다.
- 메시지를 전송하기 위해서 ProducerController를 작성하자.
package com.schooldevops.kafkatutorials.controllers;
import com.schooldevops.kafkatutorials.configs.KafkaTopicConfig;
import com.schooldevops.kafkatutorials.entities.TestEntity;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.ResponseEntity;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.time.LocalDateTime;
@Slf4j
@RestController
@RequestMapping("/api")
public class ProducerController {
// kafka producer를 위한 KafkaTemplate를 지정한다.
private final KafkaTemplate<String, Object> kafkaProducerTemplate;
public ProducerController(KafkaTemplate<String, Object> kafkaProducerTemplate) {
this.kafkaProducerTemplate = kafkaProducerTemplate;
}
@PostMapping("produce")
public ResponseEntity<?> produceMessage(@RequestBody TestEntity testEntity) {
testEntity.setTime(LocalDateTime.now());
// kafkaProducerTemplate.send를 이용하여 메시지를 전송한다.
// 이때 토픽을 지정하고, 메시지를 전송하면 된다.
// ListenableFuture 를 이용하여 전송 결과를 확인할 수 있다.
ListenableFuture<SendResult<String, Object>> future = kafkaProducerTemplate.send(KafkaTopicConfig.DEFAULT_TOPIC, testEntity);
// 메시지 처리는 비동기로 처리한다. 그러므로 callback을 지정했다.
future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onFailure(Throwable ex) {
log.error("Fail to send message to broker: {}", ex.getMessage());
}
@Override
public void onSuccess(SendResult<String, Object> result) {
log.info("Send message with offset: {}, partition: {}", result.getRecordMetadata().offset(), result.getRecordMetadata().partition());
}
});
return ResponseEntity.ok(testEntity);
}
}
- 메시지 요청을 POST로 받아서, 메시지를 kafka로 전달한다.
- ListenableFutureCallback을 이용하여 메시지 전송후 응답이 오면 처리를 한다.
- 정상적인경우 onSuccess 메소드가 처리하고, 비정상 케이스에서는 onFailure 메소드가 처리된다.
Consumer 설정하기.
- 이제 Consumer 를 설정할 차례이다.
- 전송된 메시지를 수신할 수 있도록 ConsumerConfig를 지정하면 된다.
- 설정을 위해서 KafkaConsumerConfig.java 파일을 다음과 같이 작성하자.
package com.schooldevops.kafkatutorials.configs;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import java.util.HashMap;
import java.util.Map;
@EnableKafka
@Configuration
public class KafakConsumerConfig {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServer;
private ConsumerFactory<String, Object> consumerFactory(String groupId) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
JsonDeserializer<Object> jsonDeserializer = new JsonDeserializer<>();
// Deserialize에 대해서 신뢰하는 패키지를 지정한다. "*"를 지정하면 모두 신뢰하게 된다.
jsonDeserializer.addTrustedPackages("*");
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), jsonDeserializer);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> defaultKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory("defaultGroup"));
factory.setConcurrency(1);
factory.setAutoStartup(true);
return factory;
}
}
- @EnableKafka: 는 KafkaListener를 검사하는 설정이다. 이 어노테이션을 지정해야 @KafkaListener 를 활성화 된다.
- kafka.bootstrap-servers 에 지정된 kafka cluster 엔드포인트를 조회한다.
- ConsumerFactory 는 kafka consumer를 생성하기 위한 설정을 가지며, consumer를 생성하는 역할을 한다.
- ConcurrentKafkaListenerContainerFactory 를 이용하여 동시에 kafka cluster로 부터 메시지를 읽을 수 있도록 지정한다.
- 여기서 consumerFactory를 파라미터로 전달하여, 컨슈머를 생성할 수 있도록 한다.
- setConcurrency 값은 동시에 읽을 컨슈머의 개수를 지정한다 여기서는 1개만 지정했다.
- setAutoStartup 값을 통해서 메시지 리스너가 자동으로 실행할지 지정한다. (true이므로 서버가 부트업 되면서 자동 실행된다. )
Kafka Listener 생성하기.
- ConsumerFactory를 작성했다면 이제는 Listener를 등록할 차례이다.
- Listener를 등록하면 kafka cluster로 부터 메시지를 리슨할 수 있게 된다.
- MessageListener.java 파일을 생성하여 다음과 같이 작성한다.
package com.schooldevops.kafkatutorials.consumers;
import com.schooldevops.kafkatutorials.configs.KafkaTopicConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class MessageListener {
@KafkaListener(topics = KafkaTopicConfig.DEFAULT_TOPIC, containerFactory = "defaultKafkaListenerContainerFactory")
public void listenDefaultTopic(Object record) {
log.info("Receive Message from {}, values {}", KafkaTopicConfig.DEFAULT_TOPIC, record);
}
}
- 메시지 리스너를 스프링에서 이용하기 위해서 @Component 를 지정한다.
- @KafkaListener 를 지정한다. 이때 전달되는 파라미터는 다음과 같다.
- topics: 메시지를 송신/수신하기 위한 토픽을 지정한다.
- groupId: 컨슈머 그룹 아이디이다. 그룹에 속한 컨슈머는 할당된 파티션만 조회한다.
- containerFactory: 이전에 지정한 컨테이너 팩토리 이름을 지정해 준다. 우리는 KafkaConsumerConfig에서 지정한 defaultKafkaListenerContainerFactory 을 지정 하였다.
- 메시지를 수신하면 토픽 이름과 레코드를 콘솔로 출력한다.
결과 테스트하기.
- 이제 결과를 테스트해보자.
- 서버를 다시 실행하고 메시지를 전송하면 다음과 같은 결과를 확인할 수 있다.
2022-02-21 17:57:01.796 INFO 34273 --- [ad | producer-1] c.s.k.controllers.ProducerController : Send message with offset: 1, partition: 1
- 위 내용은 메시지를 송신 결과를 나타낸다.
2022-02-21 17:57:01.818 INFO 34273 --- [ntainer#0-0-C-1] c.s.k.consumers.MessageListener : Receive Message from DEF_TOPIC, values ConsumerRecord(topic = DEF_TOPIC, partition = 1, leaderEpoch = 2, offset = 1, CreateTime = 1645433821733, serialized key size = -1, serialized value size = 80, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = TestEntity(title=TestSend2, contents=contents, time=2022-02-21T17:57:01.699981))
- 메시지 리스너가 조회한 메시지를 출력했다.
- 수신된 메시지는 DEF_TOPIC에서 조회했음을 알 수 있다.
- values는 수신한 레코드 내용과, 파티션, 오프셋등을 확인할 수 있다.
WrapUp
- 지금까지 우리는 멀티 kafka broker cluster를 구성했다.
- 그리고 java를 이용하여 메시지를 송신하고, 수신하는 예제를 작성했다.
- 위 예제는 가장 일반적으로 사용되는 단순 메시지 생산/소비를 구성한 예제이다.
- 이후 예제에서는 직접 메시지를 뽑아오는 방식, 그룹 아이디를 다르게 지정하여 메시지를 수신하는 방법, 메시지 우선순위 지정, 메시지 에러 핸들링, 메시지 송/수신 트랜잭션 처리하기 등에 대해서 알아 볼 것이다.
'Kafka' 카테고리의 다른 글
[Kafka] 스프링부트 Kafka 3 - Partition Key (0) | 2022.04.11 |
---|---|
[Kafka] 스프링부트 Kafka 2 - 수동 컨슈밍하기 (0) | 2022.04.11 |
[Kafka] Docker Compose로 멀티 클러스터 설치하기 (0) | 2022.04.11 |
[Kafka] Docker Compose로 설치하기 (0) | 2022.04.11 |
[Kafka] 그림으로 알아보는 Kafka 설치 (0) | 2022.04.11 |
댓글
공지사항
최근에 올라온 글
최근에 달린 댓글
- Total
- Today
- Yesterday
링크
TAG
- NodeSelector
- kubernetes
- springboot
- docker
- CD
- Database
- Terraform
- D3
- deploy
- go
- declative
- MySQL
- jpa
- docker-compose
- gitops
- MongoDB
- CI
- mapping
- Kafka
- argocd
- Golang
- Gorilla
- tfsec
- kubectl
- kafka-springboot
- jenkins
- java
- mongo
- AWS
- Spring
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | ||||
4 | 5 | 6 | 7 | 8 | 9 | 10 |
11 | 12 | 13 | 14 | 15 | 16 | 17 |
18 | 19 | 20 | 21 | 22 | 23 | 24 |
25 | 26 | 27 | 28 | 29 | 30 | 31 |
글 보관함