Spring Kafka 를 이용한 메시징(send/receive message)

By | 2021년 7월 17일
Table of Contents

Spring Kafka 를 이용한 메시징(send/receive message)

목표

Kafka 를 이용해, 메시징 시스템을 구성합니다.

Kafka 설치

Install Kafka on Windows, Install Kafka with docker 를 참조해서 Kafka 를 설치합니다.

프로젝트 생성

신규 프로젝트를 생성합니다.

의존성은 DevTools, Lombok, Web, Spring for Apache Kafka 를 선택합니다.

application.yml 수정

spring:
  kafka:
    consumer:
      bootstrap-servers: 127.0.0.1:9092
      group-id: foo
      # auto-offset-reset: earliest
      # key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      bootstrap-servers: 127.0.0.1:9092
      # key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # value-serializer: org.apache.kafka.common.serialization.StringSerializer

Producer 생성

@Service
public class KafkaProducer {

    private static final String TOPIC = "exam";
    private final KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    public KafkaProducer(KafkaTemplate kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendMessage(String message) {
        System.out.printf("Produce message : %s%n", message);
        this.kafkaTemplate.send(TOPIC, message);
    }
}

Consumer 생성

@Service
public class KafkaConsumer {

    @KafkaListener(topics = "exam", groupId = "foo")
    public void consume(String message) throws IOException {
        System.out.printf("Consumed message : %s%n", message);
    }
}

Controller 생성

@RestController
@RequestMapping(value = "/kafka")
public class KafkaController {

    private final KafkaProducer producer;

    @Autowired
    KafkaController(KafkaProducer producer) {
        this.producer = producer;
    }

    @GetMapping("/")
    public String sendMessage(@RequestParam("message") String message) {
        this.producer.sendMessage(message);

        return "success";
    }
}

프로젝트 실행

프로젝트 실행 후 아래 링크에 접속합니다.

http://localhost:8080/kafka/?message=하이

로그에 메시지가 출력되는 것을 확인할 수 있습니다.

json 데이타 송수신하기

build.gradle

......
    implementation 'org.json:json:20220320'
    implementation 'com.googlecode.json-simple:json-simple:1.1.1'
......
    public void sendMessage(String message) {
        System.out.printf("Produce message : %s%n", message);

        Date now = new Date();
        DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); //yyyy-MM-dd HH:mm:ss
        System.out.println(df.format(now));

        JSONObject data = new JSONObject();
        data.put("dt", df.format(now));
        data.put("msg", message);

        System.out.printf("Produce json message : %s%n", data.toJSONString());
        this.kafkaTemplate.send(TOPIC, data.toJSONString());
    }
    @KafkaListener(topics = "json_topic", groupId = "foo")
    public void consume(String message) {
        System.out.printf("Consumed json message : %s%n", message);

        try {
            JSONParser parser = new JSONParser();
            Object obj = parser.parse(message);
            JSONObject jsonObj = (JSONObject) obj;

            String msg = (String) jsonObj.get("msg");
            System.out.printf("Consumed message : %s%n", msg);
        } catch (ParseException ignored) {
            //
        }
    }

One thought on “Spring Kafka 를 이용한 메시징(send/receive message)

  1. Pingback: Spring Boot 시작하기 – 상구리의 기술 블로그

답글 남기기