데이터 동기화
Orders -> Catalogs
Orders Service에 요청된 주문의 수량 정보를 Catalogs Service에 반영
Orders Service에서 Kafka Topic으로 메시지 전송 -> Producer 역할
Catalogs Service에서 Kafka Topic에 전송된 메시지 취득 -> Consumer 역할
코드 작업 순서 : Consumer -> Producer 로 작성하면 편함
Catalogs Service 수정
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
KafkaConsumerConfig.java
(https://kafka.apache.org/0102/javadoc/org/apache/kafka/clients/consumer/ConsumerConfig.html)
package com.example.catalogservice.messagequeue;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import java.util.HashMap;
import java.util.Map;
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
// 접속할 kafka 정보 등록할 빈
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); // 설정 파일에 등록하면 효율적으로 관리 가능
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupId"); // Consumer 들을 그룹핑할 수 있다.
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); // Serializer 된 Key 를 원래의 형태로 돌리는 방식
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); // Serializer 된 Value 를 원래의 형태로 돌리는 방식
return new DefaultKafkaConsumerFactory<>(properties);
}
// (Topic 의 변경사항 확인) 이벤트 리스너
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory
= new ConcurrentKafkaListenerContainerFactory<>();
kafkaListenerContainerFactory.setConsumerFactory(consumerFactory()); // 접속 정보 등록
return kafkaListenerContainerFactory;
}
}
ComsumerFactory : Topic에 접속하기 위한 정보
ConcurrentKafkaListenerContainerFactory : 이벤트 Listner
KafkaConsumer.java
package com.example.catalogservice.messagequeue;
import com.example.catalogservice.jpa.CatalogEntity;
import com.example.catalogservice.jpa.CatalogRepository;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.Map;
/**
* 실제로 데이터를 가져와서 등록
*/
@Service
@Slf4j
public class KafkaConsumer {
CatalogRepository catalogRepository;
@Autowired
public KafkaConsumer(CatalogRepository catalogRepository) {
this.catalogRepository = catalogRepository;
}
// example-catalog-topic 에 데이터가 전달되면 가져와서 실행
@KafkaListener(topics = "example-catalog-topic")
public void updateQty(String kafkaMessage) {
log.info("Kafka Message : -> " + kafkaMessage);
Map<Object, Object> map = new HashMap<>();
ObjectMapper mapper = new ObjectMapper(); // 데이터가 직렬화되어 있기 때문에 원래의 형태로 변경하기 위해 사용
try {
// 메시지에서 데이터 읽어옴
map = mapper.readValue(kafkaMessage, new TypeReference<Map<Object, Object>>() {
});
} catch (JsonProcessingException ex) {
ex.printStackTrace();
}
CatalogEntity entity = catalogRepository.findByProductId((String) map.get("productId")); // Object 형태로 가져오기 때문에 String 형태로 변경
// 조회 성공하면
if (entity != null) {
entity.setStock(entity.getStock() - (Integer) map.get("qty"));
catalogRepository.save(entity);
}
}
}
Orders Service 수정
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
KafkaProducerConfig.java
package com.example.orderservice.messagequeue;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.*;
import java.util.HashMap;
import java.util.Map;
@EnableKafka
@Configuration
public class KafkaProducerConfig {
// 접속할 kafka 정보 등록할 빈
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> properties = new HashMap<>();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); // 설정 파일에 등록하면 효율적으로 관리 가능
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // Key 를 Serializer 형태로 변환
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // Value 를 Serializer 형태로 변환
return new DefaultKafkaProducerFactory<>(properties);
}
// 데이터 전달하는 용도의 인스턴스
@Bean
public KafkaTemplate<String, String> kafkaTemplate(){
return new KafkaTemplate<>(producerFactory());
}
}
OrderController.java
@RestController
@RequestMapping("/order-service")
public class OrderController {
Environment env;
OrderService orderService;
KafkaProducer kafkaProducer; // 등록한 빈 사용
@Autowired
public OrderController(Environment env, OrderService orderService, KafkaProducer kafkaProducer) {
this.env = env;
this.orderService = orderService;
this.kafkaProducer = kafkaProducer;
}
@PostMapping("/{userId}/orders")
public ResponseEntity<ResponseOrder> createOrder(@PathVariable("userId") String userId, @RequestBody RequestOrder orderDetails) {
ModelMapper mapper = new ModelMapper();
mapper.getConfiguration().setMatchingStrategy(MatchingStrategies.STRICT);
OrderDto orderDto = mapper.map(orderDetails, OrderDto.class);
orderDto.setUserId(userId);
orderService.createOrder(orderDto);
// kafka 에 주문 전달
kafkaProducer.send("example-catalog-topic", orderDto);
ResponseOrder responseOrder = mapper.map(orderDto, ResponseOrder.class);
return ResponseEntity.status(HttpStatus.CREATED).body(responseOrder);
}
.....
}
주문 테스트

수량 동기화 확인

Multiple Order Service 에서의 데이터 동기화
- Users 의 요청 분산 처리
- Orders 데이터도 분산 저장 -> 동기화 문제
OrerService 가 여러개 실행되면 각각의 인스턴스에 독립적인 DB가 실행된다.(분산) 라운드 로빈 방식으로 서비스가 호출되서 주문을 여러번 하고 조회하면 결과가 다르게 나온다.

Kafka Connect를 활용한 단일 베이터베이스 사용해서 해결
- Orders Service 에 요청된 주문 정보가 DB가 아니라 Kafka Topic 으로 전송
- Kafka Topic 에 설정된 Kafka Sink Connect 를 사용해 단일 DB 에 저장 -> 데이터 동기화


MariaDB 로 변경 (Orders Service)
MariaDB 테이블 생성
create table orders (
id int auto_increment primary key,
user_id varchar(50) not null,
product_id varchar(20) not null,
order_id varchar(50) not null,
qty int default 0,
unit_price int default 0,
total_price int default 0,
created_at datetime default now()
)
order-service -> application.yml
spring:
application:
name: order-service
datasource:
url: jdbc:mariadb://localhost:3306/mydb
driver-class-name: org.mariadb.jdbc.Driver
username: root
주문 등록 확인

Orders Kafka Topic
토픽에는 이런 형태의 데이터가 저장되고 이러한 데이터는 생성한 Sinck Connector에 의해 불려지고 DB에 저장된다. 만약 이러한 포맷으로 데이터가 들어오지 않는다면 DB에 저장되지 않는다.

포맷에 맞는 DTO(KafkaOrderDto, Schema, Payload, Field) 생성

OrderController.java
Environment env;
OrderService orderService;
KafkaProducer kafkaProducer; // 등록한 빈 사용
OrderProducer orderProducer;
@Autowired
public OrderController(Environment env, OrderService orderService, KafkaProducer kafkaProducer, OrderProducer orderProducer) {
this.env = env;
this.orderService = orderService;
this.kafkaProducer = kafkaProducer;
this.orderProducer = orderProducer;
}
@PostMapping("/{userId}/orders")
public ResponseEntity<ResponseOrder> createOrder(@PathVariable("userId") String userId, @RequestBody RequestOrder orderDetails) {
ModelMapper mapper = new ModelMapper();
mapper.getConfiguration().setMatchingStrategy(MatchingStrategies.STRICT);
OrderDto orderDto = mapper.map(orderDetails, OrderDto.class);
orderDto.setUserId(userId);
// JPA
// orderService.createOrder(orderDto);
// ResponseOrder responseOrder = mapper.map(orderDto, ResponseOrder.class);
// Kafka
orderDto.setOrderId(UUID.randomUUID().toString());
orderDto.setTotalPrice(orderDetails.getQty() * orderDetails.getUnitPrice());
// kafka 에 주문 전달
kafkaProducer.send("example-catalog-topic", orderDto);
orderProducer.send("orders", orderDto);
ResponseOrder responseOrder = mapper.map(orderDto, ResponseOrder.class);
return ResponseEntity.status(HttpStatus.CREATED).body(responseOrder);
}
OrderProducer.java
package com.example.orderservice.messagequeue;
import com.example.orderservice.dto.*;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import java.util.List;
@Service
@Slf4j
public class OrderProducer {
private KafkaTemplate<String, String> kafkaTemplate;
List<Field> fields = List.of(new Field("String", true, "order_id"),
new Field("String", true, "user_id"),
new Field("String", true, "product_id"),
new Field("int32", true, "qty_id"),
new Field("int32", true, "unit_price"),
new Field("int32", true, "total_price")
);
Schema schema = Schema.builder()
.type("struct")
.fields(fields)
.optional(false)
.name("orders")
.build();
@Autowired
public OrderProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public OrderDto send(String topic, OrderDto orderDto) {
Payload payload = Payload.builder()
.order_id(orderDto.getOrderId())
.user_id(orderDto.getUserId())
.product_id(orderDto.getProductId())
.qty(orderDto.getQty())
.unit_price(orderDto.getUnitPrice())
.total_price(orderDto.getTotalPrice())
.build();
KafkaOrderDto kafkaOrderDto = new KafkaOrderDto(schema, payload);
ObjectMapper mapper = new ObjectMapper();
String jsonInString = "";
try {
jsonInString = mapper.writeValueAsString(kafkaOrderDto);
} catch (JsonProcessingException ex) {
ex.printStackTrace();
}
kafkaTemplate.send(topic, jsonInString);
log.info("Order Producer sent data from the Order microservice : " + kafkaOrderDto);
return orderDto;
}
}
KafkaConnect 기동 후 SinkConnector 등록

SinkConnector 목록 확인

주문 테스트
2개의 Order Service 기동 후 주문을 등록해서 어떤 인스턴스가 동작하는지 확인

제대로 DB에 등록되지 않는 오류 발생
http://localhost:8083/connectors/my-order-sink-connect/status 확인

topic 이 제대로 생성 할때 오류가 발생했다. -> topic을 삭제 후 다시 등록해서 오류 해결
https://jusunglee.tistory.com/entry/토픽-삭제-Topic-delete
주문 등록 후 올바르게 삽입된 데이터 확인

'Spring > [인프런] Spring Cloud' 카테고리의 다른 글
Mircroservice 모니터링 (0) | 2022.07.08 |
---|---|
장애 처리와 Microservice 분산 추적 (0) | 2022.07.05 |
데이터 동기화를 위한 Kafka 활용 1 (0) | 2022.06.30 |
Microservice간 통신 (0) | 2022.06.28 |
설정 정보의 암호화 처리(Encryption, Decryption) (0) | 2022.06.27 |