Table of Contents
Spring Kafka 를 이용한 메시징(send/receive message)
목표
Kafka 를 이용해, 메시징 시스템을 구성합니다.
Kafka 설치
Install Kafka on Windows, Install Kafka with docker 를 참조해서 Kafka 를 설치합니다.
프로젝트 생성
신규 프로젝트를 생성합니다.
의존성은 DevTools, Lombok, Web, Spring for Apache Kafka 를 선택합니다.
application.yml 수정
spring:
kafka:
consumer:
bootstrap-servers: 127.0.0.1:9092
group-id: foo
# auto-offset-reset: earliest
# key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
bootstrap-servers: 127.0.0.1:9092
# key-serializer: org.apache.kafka.common.serialization.StringSerializer
# value-serializer: org.apache.kafka.common.serialization.StringSerializer
Producer 생성
@Service
public class KafkaProducer {
private static final String TOPIC = "exam";
private final KafkaTemplate<String, String> kafkaTemplate;
@Autowired
public KafkaProducer(KafkaTemplate kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(String message) {
System.out.printf("Produce message : %s%n", message);
this.kafkaTemplate.send(TOPIC, message);
}
}
Consumer 생성
@Service
public class KafkaConsumer {
@KafkaListener(topics = "exam", groupId = "foo")
public void consume(String message) throws IOException {
System.out.printf("Consumed message : %s%n", message);
}
}
Controller 생성
@RestController
@RequestMapping(value = "/kafka")
public class KafkaController {
private final KafkaProducer producer;
@Autowired
KafkaController(KafkaProducer producer) {
this.producer = producer;
}
@GetMapping("/")
public String sendMessage(@RequestParam("message") String message) {
this.producer.sendMessage(message);
return "success";
}
}
프로젝트 실행
프로젝트 실행 후 아래 링크에 접속합니다.
http://localhost:8080/kafka/?message=하이
로그에 메시지가 출력되는 것을 확인할 수 있습니다.
json 데이타 송수신하기
build.gradle
......
implementation 'org.json:json:20220320'
implementation 'com.googlecode.json-simple:json-simple:1.1.1'
......
public void sendMessage(String message) {
System.out.printf("Produce message : %s%n", message);
Date now = new Date();
DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); //yyyy-MM-dd HH:mm:ss
System.out.println(df.format(now));
JSONObject data = new JSONObject();
data.put("dt", df.format(now));
data.put("msg", message);
System.out.printf("Produce json message : %s%n", data.toJSONString());
this.kafkaTemplate.send(TOPIC, data.toJSONString());
}
@KafkaListener(topics = "json_topic", groupId = "foo")
public void consume(String message) {
System.out.printf("Consumed json message : %s%n", message);
try {
JSONParser parser = new JSONParser();
Object obj = parser.parse(message);
JSONObject jsonObj = (JSONObject) obj;
String msg = (String) jsonObj.get("msg");
System.out.printf("Consumed message : %s%n", msg);
} catch (ParseException ignored) {
//
}
}
Pingback: Spring Boot 시작하기 – 상구리의 기술 블로그