Table of Contents
Spring Boot Kafka Json 데이타 전송하기
application.yml 설정
spring:
kafka:
bootstrap-servers: 192.168.0.9:9092
데이타 class 생성
@Getter
@Setter
@NoArgsConstructor
public class StockChange {
String yyyymmdd;
String skuCd;
String fieldName;
int diff;
@Builder
public StockChange(String yyyymmdd, String skuCd, String fieldName, int diff) {
this.yyyymmdd = yyyymmdd;
this.skuCd = skuCd;
this.fieldName = fieldName;
this.diff = diff;
}
}
Configuration 생성
@Configuration
public class KafkaStockChangeProducerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public ProducerFactory<String, StockChange> producerFactory() {
Map<String,Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory(configs);
}
@Bean
public KafkaTemplate<String, StockChange> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
@Configuration
public class KafkaStockChangeConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public ConsumerFactory<String, StockChange> stockChangeConsumer() {
Map<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, "foo");
return new DefaultKafkaConsumerFactory<>(
configs,
new StringDeserializer(),
new JsonDeserializer<>(StockChange.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, StockChange> stockChangeListener() {
ConcurrentKafkaListenerContainerFactory<String, StockChange> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(stockChangeConsumer());
return factory;
}
}
Producer, Consumer 생성
@Service
public class KafkaProducer {
private static final String TOPIC = "exam";
private final KafkaTemplate<String, StockChange> kafkaTemplate;
@Autowired
public KafkaProducer(KafkaTemplate<String, StockChange> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(String message) {
StockChange stockChange = StockChange.builder()
.yyyymmdd("2021-01-01")
.skuCd("10300000033")
.fieldName("ipgoNo")
.diff(100)
.build();
// Send a message
kafkaTemplate.send(TOPIC, stockChange);
}
}
@Service
public class KafkaConsumer {
@KafkaListener(topics = "exam", groupId = "foo", containerFactory = "stockChangeListener")
public void consume(StockChange stockChange) {
System.out.printf("Consumed message : %s%n", stockChange.getYyyymmdd());
}
}