쵼쥬
쵼쥬의 개발공부 TIL
쵼쥬
전체 방문자
오늘
어제
  • 분류 전체보기 (276)
    • 코딩테스트 (192)
      • [알고리즘] 알고리즘 정리 (7)
      • [백준] 코딩테스트 연습 (126)
      • [프로그래머스] 코딩테스트 연습 (59)
    • Spring (71)
      • [인프런] 스프링 핵심 원리- 기본편 (9)
      • [인프런] 스프링 MVC 1 (6)
      • [인프런] 스프링 MVC 2 (4)
      • [인프런] 실전! 스프링 부트와 JPA 활용1 (7)
      • [인프런] 실전! 스프링 부트와 JPA 활용2 (5)
      • [인프런] 실전! 스프링 데이터 JPA (7)
      • [인프런] 실전! Querydsl (7)
      • JWT (5)
      • [인프런] Spring Cloud (17)
      • [인프런] Spring Batch (4)
    • Java (6)
      • [Java8] 모던인자바액션 (4)
      • [부스트코스] 웹 백엔드 (2)
      • [패스트캠퍼스] JAVA STREAM (0)
    • CS (6)
      • 디자인 패턴과 프로그래밍 패터다임 (2)
      • 네트워크 (4)

블로그 메뉴

  • 홈

공지사항

인기 글

태그

  • 구현
  • Spring Data JPA
  • spring
  • 자바
  • 백준
  • 알고리즘
  • 누적합
  • 타임리프
  • 프로그래머스
  • 스프링
  • 코딩테스트
  • BFS
  • jpa
  • 부스트코스
  • MVC
  • querydsl
  • 비트마스킹
  • 인프런
  • 위클리 챌린지
  • 백분

최근 댓글

최근 글

티스토리

hELLO · Designed By 정상우.
쵼쥬

쵼쥬의 개발공부 TIL

데이터 동기화를 위한 Kafka 활용 1
Spring/[인프런] Spring Cloud

데이터 동기화를 위한 Kafka 활용 1

2022. 6. 30. 14:30

Apache Kafka 란?

https://engineering.linecorp.com/ko/blog/how-to-use-kafka-in-line-1/

Apache Software Foundation의 Scalar 언어로 된 오픈 소스 메시지 브로커 프로젝트

(메시지 브로커 : 특정 시스템에서 다른 시스템으로 메시지를 전달할 때 사용하는 서버)

 

  • 모든 시스템으로 데이터를 실시간으로 전송하여 처리할 수 있는 시스템
  • 데이터가 많아지더라도 확장이 용이한 시스템

 

중간에 Kafka를 도입해서 데이터베이스(스토리지서비스)에서 전송하는 데이터가 어떤 시스템에 저장되는지 관계하지 않고 오직 Kafka 하나만 상대하게 된다. 

누가 보내고 받는지 신경쓰지 않고 메시지를 보내게 된다.

  • Producer / Consumer 를 분리해서 작업
  • 메시지를 여러 Consumer에게 허용
  • 높은 처리량을 위한 메시지 최적화
  • Scale-out 가능 
  • Eco-system

Kafka Broker

  • 실행된 Kafka 어플리케이션 서버
  • 3대 이상의 Broker Cluster 구성(권장)
  • Zookeeper 연동 (장애 체크, 복구하는 코디네이터)
  • n 개 Broker 중 1대는 controller 기능 수행
    • Controller 역할 - 각 Broker 에게 담당 파티션 할당 수행, Broker 정상 동작 모니터리 관리

 

Zookeeper

  • 브로커들을 컨트롤해주는 역할. 즉, 클라이언트가 서로 공유하는 데이터를 관리해주는 역할
  • 분산되어 있는 각 애플리케이션의 정보를 중앙에 집중하고 구성관리, 그룹관리 네이밍, 동기화 등을 제공

 

 

Kafka 설치

 - https://kafka.apache.org/downloads

 

압축 해제

 

Kafka Client 

- Kafka와 데이터를 주고받기 위해 사용하는 Java Library

- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients

 -Producer, Consumer, Admin, Stream 등 Kafka 관련 API 제공

  • 다양한 3rd party library 존재 : C / C++, Node.js Python, .NET 등

 

 

Kafka Producer / Consumer

Zookeeper 및 Kafka 서버 기동(Zookeeper 먼저 실행하고 Kafka 서버 기동)

# Zookeeper 실행
$KAFKA_HOME/bin/zookeeper-server-start.sh  $KAFKA_HOME/config/zookeeper.properties

# Kafka 서버 기동
$KAFKA_HOME/bin/kafka-server-start.sh  $KAFKA_HOME/config/server.properties

 

 

Topic 

 - Kafka는 메시지를 보내면 Topic에 저장된다.

 - Producer가 Topic에 메시지를 보낸다. -> Topic에 관심이 있다고 등록한 Consumer들은 메시지를 전달받게 된다.

# Topic 생성
$KAFKA_HOME/bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092 --partitions 1

# Topic 목록 확인
$KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list

# Topic 정보 확인
$KAFKA_HOME/bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092

 

 

메세지를 생산하는 producer, 소비하는 consumer 기동하기 (테스트)

 

 

 

Kafka Connect

  • Kafka Connect를 통해 Data를 import/export 가능하다
  • 코드 없이 Configuration으로 데이터를 이동

     - Restful api를 통해 지원

     - Stream or Batch 형태로 데이터 전송 가능

     - 커스텀 Connector를 통한 다양한 plugin 제공 (DB를 적용해 볼 예정) File, S3, Hive, Mysql, etc....

  • Kafka Connect Source : 특정 리소스에서 데이터를 가져와 카프카 클러스터에 가져오는걸 개입한다.(import)
  • Kafka Connect Sink : Cluster에 저장되어 있는 데이터를 다른 쪽으로 보내는데 개입한다.(export)

 

 

OrderService의 MariaDB 사용하도록 변경

<!-- Mariadb-->
<dependency>
    <groupId>org.mariadb.jdbc</groupId>
    <artifactId>mariadb-java-client</artifactId>
</dependency>

 

create database mydb;

create table users( id int auto_increment primary key, 

    -> user_id varchar(20),
    -> pwd varchar(20),
    -> name varchar(20),
    -> created_at datetime default NOW()
    -> );

 

Kafka Connect 설치

 

curl -O http://packages.confluent.io/archive/6.1/confluent-community-6.1.0.tar.gz

 

 

Kafka Connect 설정(기본으로 사용)

$KAFKA_HOME/config/connect-distributed.properties

 

Kafka Connect 실행

$KAFKA_CONNECT_HOME/bin/connect-distributed ./etc/kafka/connect-distributed.properties

 

Topic 목록 확인

Kafka Connect 으로 추가된 목록 확인

./bin/kafka-topic.sh --bootstrap-server localhost:9092 --list

Connect으로 생성된 Topic 확인

 

JDBC Connector 설치

DB에서 데이터를 읽어서 전달하기 위해서 설치한다.

- https://docs.confluent.io/5.5.1/connect/kafka-connect-jdbc/index.html

 

설정파일에 connector plugin 정보 추가

etc/kafka/connect-distributed.properties 파일 마지막에 아래 plugin 정보 추가

- plugin.path=[confluentinc-kafka-connect-jdbc-10.0.1 폴더]

 

plugin.path=/Users/jiseok/Documents/Kafka/confluentinc-kafka-connect-jdbc-10.5.0/lib

 

JdbcSourceConnector에서 MariaDB 사용하기 위해 mariadb 드라이버 복사

./share/java/kafka/ 폴더에 mariadb-java-client-3.0.4.jar  파일 복사

 - Maven에 추가한 mariadb에서 jar 파일 가져올 수 있다.

 

# mariaDB 드라이버 경로
/Users/jiseok/.m2/repository/org/mariadb/jdbc/mariadb-java-client/3.0.4

# 복사할 경로
/Users/jiseok/Documents/Kafka/confluent-6.1.0/share/java/kafka
# 복사
cp ./mariadb-java-client-3.0.4.jar /Users/jiseok/Documents/Kafka/confluent-6.1.0/share/java/kafka
No suitable driver found 가 발생해서 mysql-connector-java-8.0.29.jar 파일도 함께 복사해줘서 해결했다.

 

Kafka source Connect 테스트

Kafka Source 등록(PostMan으로 등록 가능)

echo '
{
	"name" : "my-source-connect", # 만드는 Connector 이름
	"config" : {
		"connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector", # Connector 종류
		"connection.url":"jdbc:mysql://localhost:3306/mydb", # DB 경로
		"connection.user":"root", 
		"connection.password":"test1357",
		"mode": "incrementing",	# 데이터가 등록되면서 자동으로 증가시키도록 설정
		"incrementing.column.name" : "id", # 자동으로 증가할 컬럼
		"table.whitelist":"users", # whiteList 에 등록된 테이블의 변경사항 감지
		"topic.prefix" : "my_topic_", # 감지된 데이터를 저장할 Topic (prefix 뒤에 테이블명 삽입 -> my_topic_users)
		"tasks.max" : "1" 
	}
}
' | curl -X POST -d @- http://localhost:8083/connectors --header "content-Type:application/json"

# Endpoint의 connectors : 현재까지 등록된 Connector 조회, 등록, 삭제 가능

"name" :  만드는 Connector 이름
"connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector", # Connector 종류
"connection.url":"jdbc:mysql://localhost:3306/mydb", # DB 경로
"connection.user":"root", 
"connection.password":"test1357",
"mode": "incrementing", # 데이터가 등록되면서 자동으로 증가시키도록 설정
"incrementing.column.name" : "id", # 자동으로 증가할 컬럼
"table.whitelist":"users", # whiteList 에 등록된 테이블의 변경사항 감지
"topic.prefix" : "my_topic_", # 감지된 데이터를 저장할 Topic (prefix 뒤에 테이블명 삽입 -> my_topic_users)

 

  • GET /connectors – 모든 커넥터를 조회한다.
  • GET /connectors/{name} – {name}을 갖는 커넥터의 정보를 조회한다.
  • POST /connectors – 커넥터를 생성, Body쪽에는 JSON Object 타입의 커넥터 config정보가 있어야한다.
  • GET /connectors/{name}/status – 이 커넥터가 running인지, failed인지 paused 인지 현재 상태를 조회한다.
  • DELETE /connectors/{name} – {name}을 갖는 커넥터를 삭제시킨다.
  • GET /connector-plugins – 카프카 커넥터 클러스터 내부에 설치된 플러그인들을 조회한다.

 

1. MariaDB에서 데이터 추가

 

2. my_topic_users Topic 자동 생성 확인

 

3. consumer를 통해서 확인

데이터 추가로 추가하면 일정시간 후에 consumer에 데이터 추가 확인가능

{
"schema":{
    "type":"struct",
    "fields":[
        {"type":"int32","optional":false,"field":"id"},
        {"type":"string","optional":true,"field":"user_id"},
        {"type":"string","optional":true,"field":"pwd"},
        {"type":"string","optional":true,"field":"name"},
        {"type":"int64","optional":true,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"created_at"}
        ],
        "optional":false,
        "name":"users"
    },
    "payload":{
        "id":1,
        "user_id":"user1",
        "pwd":"test1111",
        "name":"Username",
        "created_at":1656565198000
    }
}

payload -> 실제 전달된 데이터

 

 

Kafka Sink Connect 

Source Connector에서 전달하게 되면 Topic에 데이터가 쌓이게 된다. Sink Connector는 Topic에 전달된 데이터를 사용하기 위한 사용처

 

{
	"name":"my-sink-connect",
	"config":{
		"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
		"connection.url":"jdbc:mysql://localhost:3306/mydb",
		"connection.user":"root",
		"connection.password":"test1357",
   	 	"auto.create":"true", # Topic과 같은 테이블을 생성하는 옵션, topic이 가진 데이터와 동일하게 됨
		"auto.evolve":"true",
		"delete.enabled":"false",
		"tasks.max":"1",
		"topics":"my_topic_users"
	}
}

 

users 테이블의 데이터를 변경하면 my_topic_users 테이블에 똑같이 반영된다.

데이터 추가

 

자동으로 생성된 my_topic_users 테이블 확인

 

데이터 동일하게 추가 (Topic을 지우고 다시 생성하면 이전 데이터는 생성되지 않는다.)

 

 

DB의 쿼리가 아닌 Kafka Producer를 이용해서 Kafka Topic에 데이터 직접 전송 

producer에서 데이터 전송 -> Topic에 추가 -> MariaDB에 추가

users테이블에는 데이터 반영되지 않지만 my_topic_users테이블에는 반영된다.

 

Source Connector와 Sink Connector는 데이터 동기화, 단일 데이터 베이스에 여러 인스턴로 넘겨받은 데이터들을 정리하는 용도로 사용할 수 있다.

 

 

'Spring > [인프런] Spring Cloud' 카테고리의 다른 글

장애 처리와 Microservice 분산 추적  (0) 2022.07.05
데이터 동기화를 위한 Kafka 활용 2  (0) 2022.07.04
Microservice간 통신  (0) 2022.06.28
설정 정보의 암호화 처리(Encryption, Decryption)  (0) 2022.06.27
Spring Cloud Bus  (0) 2022.06.24
    'Spring/[인프런] Spring Cloud' 카테고리의 다른 글
    • 장애 처리와 Microservice 분산 추적
    • 데이터 동기화를 위한 Kafka 활용 2
    • Microservice간 통신
    • 설정 정보의 암호화 처리(Encryption, Decryption)
    쵼쥬
    쵼쥬

    티스토리툴바