쵼쥬
쵼쥬의 개발공부 TIL
쵼쥬
전체 방문자
오늘
어제
  • 분류 전체보기 (276)
    • 코딩테스트 (192)
      • [알고리즘] 알고리즘 정리 (7)
      • [백준] 코딩테스트 연습 (126)
      • [프로그래머스] 코딩테스트 연습 (59)
    • Spring (71)
      • [인프런] 스프링 핵심 원리- 기본편 (9)
      • [인프런] 스프링 MVC 1 (6)
      • [인프런] 스프링 MVC 2 (4)
      • [인프런] 실전! 스프링 부트와 JPA 활용1 (7)
      • [인프런] 실전! 스프링 부트와 JPA 활용2 (5)
      • [인프런] 실전! 스프링 데이터 JPA (7)
      • [인프런] 실전! Querydsl (7)
      • JWT (5)
      • [인프런] Spring Cloud (17)
      • [인프런] Spring Batch (4)
    • Java (6)
      • [Java8] 모던인자바액션 (4)
      • [부스트코스] 웹 백엔드 (2)
      • [패스트캠퍼스] JAVA STREAM (0)
    • CS (6)
      • 디자인 패턴과 프로그래밍 패터다임 (2)
      • 네트워크 (4)

블로그 메뉴

  • 홈

공지사항

인기 글

태그

  • 부스트코스
  • BFS
  • 비트마스킹
  • 백준
  • querydsl
  • 위클리 챌린지
  • 타임리프
  • jpa
  • 백분
  • 인프런
  • 알고리즘
  • 프로그래머스
  • 구현
  • 코딩테스트
  • spring
  • MVC
  • 스프링
  • 자바
  • 누적합
  • Spring Data JPA

최근 댓글

최근 글

티스토리

hELLO · Designed By 정상우.
쵼쥬

쵼쥬의 개발공부 TIL

데이터 동기화를 위한 Kafka 활용 2
Spring/[인프런] Spring Cloud

데이터 동기화를 위한 Kafka 활용 2

2022. 7. 4. 20:01

데이터 동기화

Orders -> Catalogs

Orders Service에 요청된 주문의 수량 정보를 Catalogs Service에 반영

Orders Service에서 Kafka Topic으로 메시지 전송 -> Producer 역할

Catalogs Service에서 Kafka Topic에 전송된 메시지 취득 -> Consumer 역할

 

코드 작업 순서 : Consumer -> Producer 로 작성하면 편함

 

Catalogs Service 수정

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

 

KafkaConsumerConfig.java

(https://kafka.apache.org/0102/javadoc/org/apache/kafka/clients/consumer/ConsumerConfig.html)

package com.example.catalogservice.messagequeue;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import java.util.HashMap;
import java.util.Map;

@EnableKafka
@Configuration
public class KafkaConsumerConfig {
    // 접속할 kafka 정보 등록할 빈
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> properties = new HashMap<>();

        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");  // 설정 파일에 등록하면 효율적으로 관리 가능
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupId"); // Consumer 들을 그룹핑할 수 있다.
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); // Serializer 된 Key 를 원래의 형태로 돌리는 방식
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);   // Serializer 된 Value 를 원래의 형태로 돌리는 방식

        return new DefaultKafkaConsumerFactory<>(properties);
    }

    // (Topic 의 변경사항 확인) 이벤트 리스너
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(){
        ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory
                = new ConcurrentKafkaListenerContainerFactory<>();

        kafkaListenerContainerFactory.setConsumerFactory(consumerFactory());     // 접속 정보 등록
        return kafkaListenerContainerFactory;
    }
}

ComsumerFactory : Topic에 접속하기 위한 정보

ConcurrentKafkaListenerContainerFactory : 이벤트 Listner

 

KafkaConsumer.java

package com.example.catalogservice.messagequeue;

import com.example.catalogservice.jpa.CatalogEntity;
import com.example.catalogservice.jpa.CatalogRepository;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

import java.util.HashMap;
import java.util.Map;

/**
 * 실제로 데이터를 가져와서 등록
 */
@Service
@Slf4j
public class KafkaConsumer {
    CatalogRepository catalogRepository;

    @Autowired
    public KafkaConsumer(CatalogRepository catalogRepository) {
        this.catalogRepository = catalogRepository;
    }

    // example-catalog-topic 에 데이터가 전달되면 가져와서 실행
    @KafkaListener(topics = "example-catalog-topic")
    public void updateQty(String kafkaMessage) {
        log.info("Kafka Message : -> " + kafkaMessage);

        Map<Object, Object> map = new HashMap<>();
        ObjectMapper mapper = new ObjectMapper();   // 데이터가 직렬화되어 있기 때문에 원래의 형태로 변경하기 위해 사용
        try {
            // 메시지에서 데이터 읽어옴
            map = mapper.readValue(kafkaMessage, new TypeReference<Map<Object, Object>>() {
            });
        } catch (JsonProcessingException ex) {
            ex.printStackTrace();
        }

        CatalogEntity entity = catalogRepository.findByProductId((String) map.get("productId"));  // Object 형태로 가져오기 때문에 String 형태로 변경
        // 조회 성공하면 
        if (entity != null) {
            entity.setStock(entity.getStock() - (Integer) map.get("qty"));
            catalogRepository.save(entity);
        }

    }
}

 

 

Orders Service 수정

 

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

 

KafkaProducerConfig.java

package com.example.orderservice.messagequeue;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.*;

import java.util.HashMap;
import java.util.Map;

@EnableKafka
@Configuration
public class KafkaProducerConfig {
    // 접속할 kafka 정보 등록할 빈
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> properties = new HashMap<>();

        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");  // 설정 파일에 등록하면 효율적으로 관리 가능
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // Key 를 Serializer 형태로 변환
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);   // Value 를 Serializer 형태로 변환

        return new DefaultKafkaProducerFactory<>(properties);
    }

    // 데이터 전달하는 용도의 인스턴스
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate(){
        return new KafkaTemplate<>(producerFactory());
    }
}

 

OrderController.java

@RestController
@RequestMapping("/order-service")
public class OrderController {

    Environment env;
    OrderService orderService;
    KafkaProducer kafkaProducer;   // 등록한 빈 사용

    @Autowired
    public OrderController(Environment env, OrderService orderService, KafkaProducer kafkaProducer) {
        this.env = env;
        this.orderService = orderService;
        this.kafkaProducer = kafkaProducer;
    }

    @PostMapping("/{userId}/orders")
    public ResponseEntity<ResponseOrder> createOrder(@PathVariable("userId") String userId, @RequestBody RequestOrder orderDetails) {
        ModelMapper mapper = new ModelMapper();
        mapper.getConfiguration().setMatchingStrategy(MatchingStrategies.STRICT);

        OrderDto orderDto = mapper.map(orderDetails, OrderDto.class);
        orderDto.setUserId(userId);
        orderService.createOrder(orderDto);

        // kafka 에 주문 전달
        kafkaProducer.send("example-catalog-topic", orderDto);


        ResponseOrder responseOrder = mapper.map(orderDto, ResponseOrder.class);

        return ResponseEntity.status(HttpStatus.CREATED).body(responseOrder);
    }

.....
}

 

 

주문 테스트

 

수량 동기화 확인

 

 

Multiple Order Service 에서의 데이터 동기화

  • Users 의 요청 분산 처리
  • Orders 데이터도 분산 저장 -> 동기화 문제

OrerService 가 여러개 실행되면 각각의 인스턴스에 독립적인 DB가 실행된다.(분산) 라운드 로빈 방식으로 서비스가 호출되서 주문을 여러번 하고 조회하면 결과가 다르게 나온다.

 

Kafka Connect를 활용한 단일 베이터베이스 사용해서 해결

  • Orders Service 에 요청된 주문 정보가 DB가 아니라 Kafka Topic 으로 전송
  • Kafka Topic 에 설정된 Kafka Sink Connect 를 사용해 단일 DB 에 저장 -> 데이터 동기화

 

MariaDB 로 변경 (Orders Service)

MariaDB 테이블 생성

create table orders (
  id int auto_increment primary key,
  user_id varchar(50) not null,
  product_id varchar(20) not null,
  order_id varchar(50) not null,
  qty int default 0,
  unit_price int default 0,
  total_price int default 0,
  created_at datetime default now()
)

 

order-service -> application.yml

spring:
  application:
    name: order-service
  datasource:
    url: jdbc:mariadb://localhost:3306/mydb
    driver-class-name: org.mariadb.jdbc.Driver
    username: root

 

주문 등록 확인

 

Orders Kafka Topic

 

토픽에는 이런 형태의 데이터가 저장되고 이러한 데이터는 생성한 Sinck Connector에 의해 불려지고 DB에 저장된다. 만약 이러한 포맷으로 데이터가 들어오지 않는다면 DB에 저장되지 않는다.

 

포맷에 맞는 DTO(KafkaOrderDto, Schema, Payload, Field) 생성

 

OrderController.java

    Environment env;
    OrderService orderService;
    KafkaProducer kafkaProducer;   // 등록한 빈 사용
    OrderProducer orderProducer;

    @Autowired
    public OrderController(Environment env, OrderService orderService, KafkaProducer kafkaProducer, OrderProducer orderProducer) {
        this.env = env;
        this.orderService = orderService;
        this.kafkaProducer = kafkaProducer;
        this.orderProducer = orderProducer;
    }


	@PostMapping("/{userId}/orders")
    public ResponseEntity<ResponseOrder> createOrder(@PathVariable("userId") String userId, @RequestBody RequestOrder orderDetails) {
        ModelMapper mapper = new ModelMapper();
        mapper.getConfiguration().setMatchingStrategy(MatchingStrategies.STRICT);

        OrderDto orderDto = mapper.map(orderDetails, OrderDto.class);


        orderDto.setUserId(userId);

        // JPA
//        orderService.createOrder(orderDto);
//        ResponseOrder responseOrder = mapper.map(orderDto, ResponseOrder.class);

        // Kafka
        orderDto.setOrderId(UUID.randomUUID().toString());
        orderDto.setTotalPrice(orderDetails.getQty() * orderDetails.getUnitPrice());

        // kafka 에 주문 전달
        kafkaProducer.send("example-catalog-topic", orderDto);
        orderProducer.send("orders", orderDto);


        ResponseOrder responseOrder = mapper.map(orderDto, ResponseOrder.class);
        return ResponseEntity.status(HttpStatus.CREATED).body(responseOrder);
    }

 

OrderProducer.java

package com.example.orderservice.messagequeue;

import com.example.orderservice.dto.*;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

import java.util.List;

@Service
@Slf4j
public class OrderProducer {

    private KafkaTemplate<String, String> kafkaTemplate;

    List<Field> fields = List.of(new Field("String", true, "order_id"),
            new Field("String", true, "user_id"),
            new Field("String", true, "product_id"),
            new Field("int32", true, "qty_id"),
            new Field("int32", true, "unit_price"),
            new Field("int32", true, "total_price")
    );

    Schema schema = Schema.builder()
            .type("struct")
            .fields(fields)
            .optional(false)
            .name("orders")
            .build();


    @Autowired
    public OrderProducer(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public OrderDto send(String topic, OrderDto orderDto) {

        Payload payload = Payload.builder()
                .order_id(orderDto.getOrderId())
                .user_id(orderDto.getUserId())
                .product_id(orderDto.getProductId())
                .qty(orderDto.getQty())
                .unit_price(orderDto.getUnitPrice())
                .total_price(orderDto.getTotalPrice())
                .build();

        KafkaOrderDto kafkaOrderDto = new KafkaOrderDto(schema, payload);

        ObjectMapper mapper = new ObjectMapper();
        String jsonInString = "";
        try {
            jsonInString = mapper.writeValueAsString(kafkaOrderDto);
        } catch (JsonProcessingException ex) {
            ex.printStackTrace();
        }

        kafkaTemplate.send(topic, jsonInString);
        log.info("Order Producer sent data from the Order microservice : " + kafkaOrderDto);

        return orderDto;
    }

}

 

KafkaConnect 기동 후 SinkConnector 등록

 

SinkConnector 목록 확인

 

 

주문 테스트

2개의 Order Service 기동 후 주문을 등록해서 어떤 인스턴스가 동작하는지 확인

 

제대로 DB에 등록되지 않는 오류 발생

http://localhost:8083/connectors/my-order-sink-connect/status 확인

topic 이 제대로 생성 할때 오류가 발생했다. -> topic을 삭제 후 다시 등록해서 오류 해결

 

https://jusunglee.tistory.com/entry/토픽-삭제-Topic-delete

 

 

 

주문 등록 후 올바르게 삽입된 데이터 확인

 

'Spring > [인프런] Spring Cloud' 카테고리의 다른 글

Mircroservice 모니터링  (0) 2022.07.08
장애 처리와 Microservice 분산 추적  (0) 2022.07.05
데이터 동기화를 위한 Kafka 활용 1  (0) 2022.06.30
Microservice간 통신  (0) 2022.06.28
설정 정보의 암호화 처리(Encryption, Decryption)  (0) 2022.06.27
    'Spring/[인프런] Spring Cloud' 카테고리의 다른 글
    • Mircroservice 모니터링
    • 장애 처리와 Microservice 분산 추적
    • 데이터 동기화를 위한 Kafka 활용 1
    • Microservice간 통신
    쵼쥬
    쵼쥬

    티스토리툴바