Table of Contents
Spring Boot Kafka 연동하기
의존성 추가
dependencies {
// ......
implementation 'org.springframework.kafka:spring-kafka'
// ......
}
application.yml 수정
spring:
kafka:
consumer:
bootstrap-servers: 192.168.98.51:9092
group-id: foo
# auto-offset-reset: earliest
# key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
bootstrap-servers: 192.168.98.51:9092
# key-serializer: org.apache.kafka.common.serialization.StringSerializer
# value-serializer: org.apache.kafka.common.serialization.StringSerializer
Producer 생성
@Service
public class KafkaProducer {
private static final String TOPIC = "exam";
private final KafkaTemplate<String, String> kafkaTemplate;
@Autowired
public KafkaProducer(KafkaTemplate kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(String message) {
System.out.printf("Produce message : %s%n", message);
this.kafkaTemplate.send(TOPIC, message);
}
}
Consumer 생성
@Service
public class KafkaConsumer {
@KafkaListener(topics = "exam", groupId = "foo")
public void consume(String message) throws IOException {
System.out.printf("Consumed message : %s%n", message);
}
}
Controller 생성
@Api(tags = { "00. Kafka" })
@RestController
@RequestMapping(value = "/v1/kafka")
public class KafkaController {
private final KafkaProducer producer;
@Autowired
KafkaController(KafkaProducer producer) {
this.producer = producer;
}
@ApiOperation(value = "메시지 전송", notes = "Kafka 메시지를 전송합니다.")
@PostMapping("/")
public String sendMessage(@RequestParam("message") String message) {
this.producer.sendMessage(message);
return "success";
}
}