Spring Boot Kafka Json 데이타 전송하기

By | 2021년 1월 2일
Table of Content

Spring Boot Kafka Json 데이타 전송하기

application.yml 설정

spring:
  kafka:
    bootstrap-servers: 192.168.0.9:9092

데이타 class 생성

@Getter
@Setter
@NoArgsConstructor
public class StockChange {

    String yyyymmdd;
    String skuCd;
    String fieldName;
    int diff;

    @Builder
    public StockChange(String yyyymmdd, String skuCd, String fieldName, int diff) {
        this.yyyymmdd = yyyymmdd;
        this.skuCd = skuCd;
        this.fieldName = fieldName;
        this.diff = diff;
    }
}

Configuration 생성

@Configuration
public class KafkaStockChangeProducerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public ProducerFactory<String, StockChange> producerFactory() {

        Map<String,Object> configs = new HashMap<>();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory(configs);
    }

    @Bean
    public KafkaTemplate<String, StockChange> kafkaTemplate() {

        return new KafkaTemplate<>(producerFactory());
    }
}
@Configuration
public class KafkaStockChangeConsumerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public ConsumerFactory<String, StockChange> stockChangeConsumer() {

        Map<String, Object> configs = new HashMap<>();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, "foo");

        return new DefaultKafkaConsumerFactory<>(
                configs,
                new StringDeserializer(),
                new JsonDeserializer<>(StockChange.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, StockChange> stockChangeListener() {
        ConcurrentKafkaListenerContainerFactory<String, StockChange> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(stockChangeConsumer());
        return factory;
    }
}

Producer, Consumer 생성

@Service
public class KafkaProducer {

    private static final String TOPIC = "exam";
    private final KafkaTemplate<String, StockChange> kafkaTemplate;

    @Autowired
    public KafkaProducer(KafkaTemplate<String, StockChange> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendMessage(String message) {

        StockChange stockChange = StockChange.builder()
                .yyyymmdd("2021-01-01")
                .skuCd("10300000033")
                .fieldName("ipgoNo")
                .diff(100)
                .build();

        // Send a message
        kafkaTemplate.send(TOPIC, stockChange);
    }
}
@Service
public class KafkaConsumer {

    @KafkaListener(topics = "exam", groupId = "foo", containerFactory = "stockChangeListener")
    public void consume(StockChange stockChange) {
        System.out.printf("Consumed message : %s%n", stockChange.getYyyymmdd());
    }
}

답글 남기기