[Kafka] 아파치 카프카 CLI 명령어
이번장에서는 아파치 카프카 CLI 명령어를 알아보도록 하겠습니다.
직접 아파치 카프카를 도커 컨테이너로 실행하여 CLI 명령어를 실행해볼 것이며, 아파치 카프카를 실행시키기 위해
도커와 아파치 카프카 오픈소스 깃 clone 과정이 필요합니다.
도커 설치
로컬 컴퓨터에 도커를 설치해 주시면됩니다..! 이 과정은 인터넷에 너무 많으니 패스하도록 하겠습니다.
저는 윈도우용의 도커를 설치했습니다.
아파치 카프카 오픈소스 설치
이제 도커를 설치했다면, 도커 컨테이너에 올릴 아파치 카프카 오픈소스가 필요합니다.
저는 conductor라는 오픈소스를 깃으로 clone을 해와서 가져올 것이며, 링크는 아래에 있습니다.
GitHub - conduktor/kafka-stack-docker-compose: docker compose files to create a fully working kafka stack
docker compose files to create a fully working kafka stack - conduktor/kafka-stack-docker-compose
github.com
이 오픈소스를 통해 저희는 따로 클라우드 사용없이, 쉽게 로컬에서 아파치 카프카를 도커로 실행시킬 수 있습니다.
우선 git clone을 해오겠습니다.
git clone https://github.com/conduktor/kafka-stack-docker-compose.git
이렇게 git clone을 해오면 다음과 같이 도커 컴포즈 파일들이 생성이 됩니다. (visual studio code 기준)
여기서 각 파일들은 도커 컴포즈 파일로, yml로 구성되어진 파일들입니다.
각 파일들을 간략하게나마 설명을 드리면 다음과 같습니다.
- conduktor.yml
- Conduktor 플랫폼을 실행하기 위한 Docker Compose 구성 파일입니다.
여기서 conduktor는 kafka 클러스터를 쉽게 관리하고 모니터링 하기위한 gui 도구입니다.
따라서 이 파일을 통해 Conduktor를 실행하여 쉽게 Kafka 클러스터를 관리하고 모니터링할 수 있습니다. - full-stack.yml
- Kafka와 관련된 모든 서비스를 실행하는 데 사용되는 Docker Compose 구성 파일입니다.
이 파일에선 Kafka, Zookeeper, Kafka RestProxy등의 서비스가 포함되어 있습니다. - zk-multiple-kafka-multiple-schema-registry.yml
- 여러 개의 Zookeeper 노드와 Kafka 브로커 및 Schema Registry를 실행하기 위한 Docker Compose 구성 파일입니다.
이 파일을 통해 실배포 환경을 시뮬레이션 및 테스트 할때 활용할 수 있습니다. - zk-multiple-kafka-multiple.yml
- 여러 개의 Zookeeper 노드와 Kafka 브로커를 실행하기 위한 Docker Compose 구성 파일입니다.
이 파일을 통해 Kafka 클러스터의 고가용성 및 장애 허용성을 실험할 수 있습니다.
저희는 여기서 full-stack.yml 파일을 기반으로 Docker Compose up을 하여 도커 컨테이너에 올리도록 하겠습니다.
이제 docker compose에 올리는 명령어는 다음과 같습니다.
docker compose -f full-stack.yml up
이 명령어로 실행을 하고 나면 다음과 같은 log가 뜰겁니다.
아래처럼 started ~~~ 하면서 log가 뜨면 성공입니다.
이제 이렇게 컨테이너를 실행 했으니 docker ps 명령어를 통해 docker container로 정보를 확인한 후, 해당 컨테이너 내부로 들어가 줍니다.
docker compose를 정상적으로 up 하셨고, 컨테이너들이 제대로 실행이 완료가 됐다면, 위 사진처럼 컨테이너가 8개가 나오는게 정상입니다.(만약 제대로 안나오신다면 좀 더 기다리시거나, Docker Desktop으로 상태를 보시는 걸 추천드립니다.)
이렇게 8개의 컨테이너중 kafka1에 들어가야하기 때문에 아래 명령어를 통해 직접 shell로 들어갑니다.
docker exec-it [컨테이너 이름] bash
이렇게 들어온 후, kafka와 관련된 데이터는 bin/파일에 kafka라는 이름의 파일들로 존재하므로, bin파일로 갑니다.
이제 여기서 카프카 CLI 명령어를 활용해서 카프카에서 토픽 생성 및 데이터 삽입, 추출을 할 수 있습니다.
kafka CLI 명령어
CLI를 활용해서 Kafka를 제어할 수 있는데, 명령어의 종류는 크게 다음과 같습니다.
- kafka topic 관리
- kafka producer 메세지 삽입
- kafka consumer 메세지 추출
위의 3가지의 명령으로 카프카를 직접 관리를 할 수 있습니다. 이제 명령어들을 하나하나 알아보도록 하겠습니다.
Kafka Topic 관리 명령어
우선 카프카 topic을 관리할 수 있는 명령어는 kafka-topics 명령어입니다. 이 명렁어를 기반으로 현재 토픽 목록을 조회할 수 있고, 토픽 생성 및 삭제, 정보 수정까지 할 수있습니다.
각각의 명령어는 다음과 같습니다.
우선 먼저 카프카 브로커의 주소를 입력해야하는데, 카프카 브로커 주소는 다음과 같습니다.
[카프카 컨테이너 이름]:[카프카 port 번호]
저 같은 경우는 kafka1:19092였습니다.
- 카프카 토픽 목록 조회
kafka-topics --bootstrap-server [카프카 브로커 주소] --list
해당 명령어를 통해 현재 생성되어있는 카프카 토픽들의 정보 리스트를 볼 수 있습니다.
- 카프카 토픽 상세 조회
kafka-topics --bootstrap-server [카프카 브로커 주소] --describe --topic [토픽 이름]
해당 명령어를 통해 해당 카프카 토픽의 상세 정보를 볼 수 있습니다.
해당 토픽의 partition 개수, Leader 개수, Replicas 등의 정보를 얻을 수 있습니다.
- 카프카 토픽 생성
kafka-topics --bootstrap-server [카프카 브로커 주소] --create --topic [토픽 이름] --partitions [파티션 개수] --replication-factor [replication-factor 개수] --config retention.ms=[토픽 메세지 보관 시간]
해당 명령어를 통해 카프카에 토픽을 새로 생성할 수 있습니다.
생성시 옵션에는 파티션 개수, 복제 개수, 토픽 메세지 보관시간을 옵션으로 줄 수 있습니다.
- 카프카 토픽 정보 수정
kafka-configs --bootstrap-server [카프카 브로커 주소] --entity-type topics --entity-name [토픽 이름] --alter --add-config retention.ms=[토픽 메세지 보관 시간]
해당 명령어를 통해 이미 존재하는 카프카 토픽정보를 수정할 수 있습니다.
수정할 수 있는 정보는 메세지 보관기간, 메세지 보관용량, 메세지 삭제정책 등이 있으나, 파티션 개수 및 복제 개수등은 수정할 수 없습니다.
해당 정보를 수정하려면, 토픽 재생성을 해야합니다.
- 카프카 토픽 삭제
kafka-topics --bootstrap-server [카프카 브로커 주소] --delete --topic [토픽 이름]
해당 명령어를 통해 카프카 토픽을 삭제할 수 있습니다.
이미 존재하는 카프카 토픽만을 삭제할 수 있습니다.
- 카프카 Producer 메세지 삽입
kafka-console-producer --bootstrap-server [카프카 브로커 주소] --topic [토픽 이름] --property "parse.key=true" --property "key.separator=:"
해당 명령어를 통해 존재하는 producer를 활용하여 해당 토픽에 메세지를 넣을 수 있습니다.
'property' 옵션을 통해 추가 옵션을 지정할 수 있으며, 주로 쓰이는 옵션은 parse.key와 key.separator 입니다.
parse.key는 메세지 파싱 유무이며, key.separator는 구분자 설정을 할 수 있습니다.
명령어를 반복적으로 입력하여 메세지를 넣을 수 있습니다.
만약 해당 topic에 존재하는 파티션의 개수가 1개가 아닌경우, 라운드 로빈으로 각 파티션에 나뉘어서 들어갑니다.
마지막으론, 빈 문자열을 입력함으로, 메세지 삽입을 중단할 수 있습니다.
- 카프카 Consumer 메세지 추출
kafka-console-consumer --bootstrap-server [카프카 브로커 주소] --topic [토픽 이름] --property print.key=true --property key.separator="-" --group [컨슈머 그룹이름] --from-beginning
해당 명령어를 통해 존재하는 consumer를 활용하여 해당 토픽에 존재하는 메세지를 추출하여 받아올 수 있습니다.
'property' 옵션을 통해 추가 옵션을 지정할 수 있으며, 주로 쓰이는 옵션은 print.key와 key.separator입니다.
추가적으로, --group 옵션을 활용하여 특정 컨슈머 그룹에서 메세지를 가져올 수 있으며
--from-beginning을 활용하여 이전에 이미 읽었던 메세지들도 다 읽어올 수 있습니다.
- 카프카 consumer 그룹 보기
kafka-consumer-groups --bootstrap-server [카프카 브로커 주소] --list
해당 명령어를 통해 현재 존재하는 모든 consumer 그룹 리스트를 확인 할 수 있습니다.
- 카프카 consumer 그룹 상세 보기
kafka-consumer-groups --bootstrap-server [카프카 브로커 주소] --group [consumer 그룹이름] --describe
해당 명령어를 통해 해당 consumer 그룹의 상세정보를 확인할 수 있습니다.
- 카프카 verifiable-producer 데이터 삽입
kafka-verifiable-producer --bootstrap-server [카프카 브로커 주소] --max-messages [메세지 개수] --topic verify-test
해당 명령어를 통해 verifiable-producer로 테스트 메세지를 삽입할 수 있습니다.
이는 보통 카프카가 제대로 실행되고 있는지 테스트 용도로 활용이 되며, 임의로 메세지 개수를 정해 메세지가 성적으로 전송되는지 확인할 수 있습니다.
- 카프카 verifiable-consumer 데이터 추출
kafka-verifiable-consumer --bootstrap-server [카프카 브로커 주소] --topic verify-test --group-id [그룹 이름]
해당 명령어를 통해 verifiable-consumer로 테스트 메세지들을 추출할 수 있습니다.
이는 producer와 마찬가지로 카프카가 제대로 실행되고 있는지 테스트 용도로 활용이 됩니다.
group-id 옵션이 필수적이므로, consumer 그룹 id를 필수적으로 입력하셔야 합니다.