Apache Kafka 란?
https://engineering.linecorp.com/ko/blog/how-to-use-kafka-in-line-1/
Apache Software Foundation의 Scalar 언어로 된 오픈 소스 메시지 브로커 프로젝트
(메시지 브로커 : 특정 시스템에서 다른 시스템으로 메시지를 전달할 때 사용하는 서버)
- 모든 시스템으로 데이터를 실시간으로 전송하여 처리할 수 있는 시스템
- 데이터가 많아지더라도 확장이 용이한 시스템
중간에 Kafka를 도입해서 데이터베이스(스토리지서비스)에서 전송하는 데이터가 어떤 시스템에 저장되는지 관계하지 않고 오직 Kafka 하나만 상대하게 된다.
누가 보내고 받는지 신경쓰지 않고 메시지를 보내게 된다.
- Producer / Consumer 를 분리해서 작업
- 메시지를 여러 Consumer에게 허용
- 높은 처리량을 위한 메시지 최적화
- Scale-out 가능
- Eco-system

Kafka Broker
- 실행된 Kafka 어플리케이션 서버
- 3대 이상의 Broker Cluster 구성(권장)
- Zookeeper 연동 (장애 체크, 복구하는 코디네이터)
- n 개 Broker 중 1대는 controller 기능 수행
- Controller 역할 - 각 Broker 에게 담당 파티션 할당 수행, Broker 정상 동작 모니터리 관리
Zookeeper
- 브로커들을 컨트롤해주는 역할. 즉, 클라이언트가 서로 공유하는 데이터를 관리해주는 역할
- 분산되어 있는 각 애플리케이션의 정보를 중앙에 집중하고 구성관리, 그룹관리 네이밍, 동기화 등을 제공
Kafka 설치
- https://kafka.apache.org/downloads
압축 해제

Kafka Client
- Kafka와 데이터를 주고받기 위해 사용하는 Java Library
- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients
-Producer, Consumer, Admin, Stream 등 Kafka 관련 API 제공
- 다양한 3rd party library 존재 : C / C++, Node.js Python, .NET 등
Kafka Producer / Consumer
Zookeeper 및 Kafka 서버 기동(Zookeeper 먼저 실행하고 Kafka 서버 기동)
# Zookeeper 실행
$KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties
# Kafka 서버 기동
$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties


Topic
- Kafka는 메시지를 보내면 Topic에 저장된다.
- Producer가 Topic에 메시지를 보낸다. -> Topic에 관심이 있다고 등록한 Consumer들은 메시지를 전달받게 된다.
# Topic 생성
$KAFKA_HOME/bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092 --partitions 1
# Topic 목록 확인
$KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
# Topic 정보 확인
$KAFKA_HOME/bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092



메세지를 생산하는 producer, 소비하는 consumer 기동하기 (테스트)


Kafka Connect
- Kafka Connect를 통해 Data를 import/export 가능하다
- 코드 없이 Configuration으로 데이터를 이동
- Restful api를 통해 지원
- Stream or Batch 형태로 데이터 전송 가능
- 커스텀 Connector를 통한 다양한 plugin 제공 (DB를 적용해 볼 예정) File, S3, Hive, Mysql, etc....

- Kafka Connect Source : 특정 리소스에서 데이터를 가져와 카프카 클러스터에 가져오는걸 개입한다.(import)
- Kafka Connect Sink : Cluster에 저장되어 있는 데이터를 다른 쪽으로 보내는데 개입한다.(export)
OrderService의 MariaDB 사용하도록 변경
<!-- Mariadb-->
<dependency>
<groupId>org.mariadb.jdbc</groupId>
<artifactId>mariadb-java-client</artifactId>
</dependency>
create database mydb;
create table users( id int auto_increment primary key,
-> user_id varchar(20),
-> pwd varchar(20),
-> name varchar(20),
-> created_at datetime default NOW()
-> );
Kafka Connect 설치
curl -O http://packages.confluent.io/archive/6.1/confluent-community-6.1.0.tar.gz

Kafka Connect 설정(기본으로 사용)
$KAFKA_HOME/config/connect-distributed.properties
Kafka Connect 실행
$KAFKA_CONNECT_HOME/bin/connect-distributed ./etc/kafka/connect-distributed.properties
Topic 목록 확인
Kafka Connect 으로 추가된 목록 확인
./bin/kafka-topic.sh --bootstrap-server localhost:9092 --list

Connect으로 생성된 Topic 확인
JDBC Connector 설치
DB에서 데이터를 읽어서 전달하기 위해서 설치한다.
- https://docs.confluent.io/5.5.1/connect/kafka-connect-jdbc/index.html
설정파일에 connector plugin 정보 추가
etc/kafka/connect-distributed.properties 파일 마지막에 아래 plugin 정보 추가
- plugin.path=[confluentinc-kafka-connect-jdbc-10.0.1 폴더]
plugin.path=/Users/jiseok/Documents/Kafka/confluentinc-kafka-connect-jdbc-10.5.0/lib
JdbcSourceConnector에서 MariaDB 사용하기 위해 mariadb 드라이버 복사
./share/java/kafka/ 폴더에 mariadb-java-client-3.0.4.jar 파일 복사
- Maven에 추가한 mariadb에서 jar 파일 가져올 수 있다.
# mariaDB 드라이버 경로
/Users/jiseok/.m2/repository/org/mariadb/jdbc/mariadb-java-client/3.0.4
# 복사할 경로
/Users/jiseok/Documents/Kafka/confluent-6.1.0/share/java/kafka
# 복사
cp ./mariadb-java-client-3.0.4.jar /Users/jiseok/Documents/Kafka/confluent-6.1.0/share/java/kafka
No suitable driver found 가 발생해서 mysql-connector-java-8.0.29.jar 파일도 함께 복사해줘서 해결했다.
Kafka source Connect 테스트
Kafka Source 등록(PostMan으로 등록 가능)
echo '
{
"name" : "my-source-connect", # 만드는 Connector 이름
"config" : {
"connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector", # Connector 종류
"connection.url":"jdbc:mysql://localhost:3306/mydb", # DB 경로
"connection.user":"root",
"connection.password":"test1357",
"mode": "incrementing", # 데이터가 등록되면서 자동으로 증가시키도록 설정
"incrementing.column.name" : "id", # 자동으로 증가할 컬럼
"table.whitelist":"users", # whiteList 에 등록된 테이블의 변경사항 감지
"topic.prefix" : "my_topic_", # 감지된 데이터를 저장할 Topic (prefix 뒤에 테이블명 삽입 -> my_topic_users)
"tasks.max" : "1"
}
}
' | curl -X POST -d @- http://localhost:8083/connectors --header "content-Type:application/json"
# Endpoint의 connectors : 현재까지 등록된 Connector 조회, 등록, 삭제 가능
"name" : 만드는 Connector 이름
"connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector", # Connector 종류
"connection.url":"jdbc:mysql://localhost:3306/mydb", # DB 경로
"connection.user":"root",
"connection.password":"test1357",
"mode": "incrementing", # 데이터가 등록되면서 자동으로 증가시키도록 설정
"incrementing.column.name" : "id", # 자동으로 증가할 컬럼
"table.whitelist":"users", # whiteList 에 등록된 테이블의 변경사항 감지
"topic.prefix" : "my_topic_", # 감지된 데이터를 저장할 Topic (prefix 뒤에 테이블명 삽입 -> my_topic_users)


- GET /connectors – 모든 커넥터를 조회한다.
- GET /connectors/{name} – {name}을 갖는 커넥터의 정보를 조회한다.
- POST /connectors – 커넥터를 생성, Body쪽에는 JSON Object 타입의 커넥터 config정보가 있어야한다.
- GET /connectors/{name}/status – 이 커넥터가 running인지, failed인지 paused 인지 현재 상태를 조회한다.
- DELETE /connectors/{name} – {name}을 갖는 커넥터를 삭제시킨다.
- GET /connector-plugins – 카프카 커넥터 클러스터 내부에 설치된 플러그인들을 조회한다.

1. MariaDB에서 데이터 추가

2. my_topic_users Topic 자동 생성 확인

3. consumer를 통해서 확인

데이터 추가로 추가하면 일정시간 후에 consumer에 데이터 추가 확인가능

{
"schema":{
"type":"struct",
"fields":[
{"type":"int32","optional":false,"field":"id"},
{"type":"string","optional":true,"field":"user_id"},
{"type":"string","optional":true,"field":"pwd"},
{"type":"string","optional":true,"field":"name"},
{"type":"int64","optional":true,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"created_at"}
],
"optional":false,
"name":"users"
},
"payload":{
"id":1,
"user_id":"user1",
"pwd":"test1111",
"name":"Username",
"created_at":1656565198000
}
}
payload -> 실제 전달된 데이터
Kafka Sink Connect
Source Connector에서 전달하게 되면 Topic에 데이터가 쌓이게 된다. Sink Connector는 Topic에 전달된 데이터를 사용하기 위한 사용처
{
"name":"my-sink-connect",
"config":{
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url":"jdbc:mysql://localhost:3306/mydb",
"connection.user":"root",
"connection.password":"test1357",
"auto.create":"true", # Topic과 같은 테이블을 생성하는 옵션, topic이 가진 데이터와 동일하게 됨
"auto.evolve":"true",
"delete.enabled":"false",
"tasks.max":"1",
"topics":"my_topic_users"
}
}
users 테이블의 데이터를 변경하면 my_topic_users 테이블에 똑같이 반영된다.


데이터 추가

자동으로 생성된 my_topic_users 테이블 확인

데이터 동일하게 추가 (Topic을 지우고 다시 생성하면 이전 데이터는 생성되지 않는다.)

DB의 쿼리가 아닌 Kafka Producer를 이용해서 Kafka Topic에 데이터 직접 전송
producer에서 데이터 전송 -> Topic에 추가 -> MariaDB에 추가

users테이블에는 데이터 반영되지 않지만 my_topic_users테이블에는 반영된다.

Source Connector와 Sink Connector는 데이터 동기화, 단일 데이터 베이스에 여러 인스턴로 넘겨받은 데이터들을 정리하는 용도로 사용할 수 있다.
'Spring > [인프런] Spring Cloud' 카테고리의 다른 글
장애 처리와 Microservice 분산 추적 (0) | 2022.07.05 |
---|---|
데이터 동기화를 위한 Kafka 활용 2 (0) | 2022.07.04 |
Microservice간 통신 (0) | 2022.06.28 |
설정 정보의 암호화 처리(Encryption, Decryption) (0) | 2022.06.27 |
Spring Cloud Bus (0) | 2022.06.24 |