Spring Boot Kafka 연동하기

By | 2021년 1월 1일
Table of Content

Spring Boot Kafka 연동하기

의존성 추가

dependencies {
    // ......
    implementation 'org.springframework.kafka:spring-kafka'
    // ......
}

application.yml 수정

spring:
  kafka:
    consumer:
      bootstrap-servers: 192.168.98.51:9092
      group-id: foo
      # auto-offset-reset: earliest
      # key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      bootstrap-servers: 192.168.98.51:9092
      # key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # value-serializer: org.apache.kafka.common.serialization.StringSerializer

Producer 생성

@Service
public class KafkaProducer {

    private static final String TOPIC = "exam";
    private final KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    public KafkaProducer(KafkaTemplate kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendMessage(String message) {
        System.out.printf("Produce message : %s%n", message);
        this.kafkaTemplate.send(TOPIC, message);
    }
}

Consumer 생성

@Service
public class KafkaConsumer {

    @KafkaListener(topics = "exam", groupId = "foo")
    public void consume(String message) throws IOException {
        System.out.printf("Consumed message : %s%n", message);
    }
}

Controller 생성

@Api(tags = { "00. Kafka" })
@RestController
@RequestMapping(value = "/v1/kafka")
public class KafkaController {

    private final KafkaProducer producer;

    @Autowired
    KafkaController(KafkaProducer producer) {
        this.producer = producer;
    }

    @ApiOperation(value = "메시지 전송", notes = "Kafka 메시지를 전송합니다.")
    @PostMapping("/")
    public String sendMessage(@RequestParam("message") String message) {
        this.producer.sendMessage(message);

        return "success";
    }
}

답글 남기기