쿠버네티스에서 Fluent Bit 과 Kafka 구성하기: Consuming 확인하기
이 글에서는 Kind 클러스터에서 Fluent Bit 을 구성하고 테스트하는 방법을 소개합니다.
다음은 우리가 구성할 Workload 구성도입니다:
쿠버네티스에서 Pod 는 리소스의 기본 단위이고,
Workload 는 실행되는 애플리케이션(이하 앱)입니다.
목표
- Helm Chart 를 사용하여 Kafka Statefulset 배포하기
- Helm Chart 를 사용하여 Fluentd aggregator Daemonset 배포하기
- 테스트하기
준비하기
- WSL(윈도우즈일 경우)
- Docker
- Helm
- Kind
위 구성이 준비되지 않았다면,
Containerization Part.0 - Containerization 준비 과정을 따라하세요. 😉
시작하기
Kind 클러스터 구성하기
다음 명령을 실행하여 3 개의 노드로 구성된 Kind 클러스터를 생성합니다:
cat <<EOF | kind create cluster --config=-
kind: Cluster
apiVersion: kind.x-k8s.io/v1alpha4
nodes:
- role: control-plane
- role: worker
- role: worker
EOF
role
을 보면 하나의 control-plane 과 두 worker 들로 구성한다는 것을 알 수 있습니다.
kind get nodes
명령으로 3 개의 Node 가 생성된 것을 확인할 수 있습니다:
$ kind get nodes
kind-worker
kind-worker2
kind-control-plane
kafka 배포하기
Apache Kafka는 실시간 파이프라인을 구축하도록 설계된 분산 스트리밍 플랫폼으로, 메시지 브로커나 빅 데이터 앱을 위한 로그 집계 솔루션으로 사용할 수 있습니다.
TL;DR 🤖
helm repo add bitnami https://charts.bitnami.com/bitnami
helm install kafka bitnami/kafka --set replicaCount=2
kafka 소개
bitnami/kafka
Helm Chart를 사용하여 kafka 를 실행합니다.
먼저, bitnami Helm chart 저장소를 추가하고, kafka 를 배포합니다.
실행할 때 파라미터로 전달하는 replicaCount 는 kafka 의 노드 수입니다.
bitnami/fluentd 의 Helm Chart 템플릿, 기본 값들은 ArtifactHub bitnami/kafka 를 참고하세요.
kafka 테스트하기
kafka Helm Chart를 실행하면, 다음 결과가 출력됩니다:
helm install kafka bitnami/kafka --set replicaCount=2
"bitnami" already exists with the same configuration, skipping
NAME: kafka
...
CHART NAME: kafka
CHART VERSION: 21.0.1
APP VERSION: 3.4.0
** Please be patient while the chart is being deployed **
Kafka can be accessed by consumers via port 9092 on the following DNS name from within your cluster:
kafka.default.svc.cluster.local
Each Kafka broker can be accessed by producers via port 9092 on the following DNS name(s) from within your cluster:
kafka-0.kafka-headless.default.svc.cluster.local:9092
kafka-1.kafka-headless.default.svc.cluster.local:9092
To create a pod that you can use as a Kafka client run the following commands:
kubectl run kafka-client --restart='Never' --image docker.io/bitnami/kafka:3.4.0-debian-11-r2 --namespace default --command -- sleep infinity
kubectl exec --tty -i kafka-client --namespace default -- bash
PRODUCER:
kafka-console-producer.sh \
--broker-list \
kafka-0.kafka-headless.default.svc.cluster.local:9092,\
kafka-1.kafka-headless.default.svc.cluster.local:9092 \
--topic test
CONSUMER:
kafka-console-consumer.sh \
--bootstrap-server kafka.default.svc.cluster.local:9092 \
--topic test \
--from-beginning
다음은 출력된 내용에 대한 설명입니다:
- DNS 이름: kafka.default.svc.cluster.local
- 각 노드의 DNS 이름:
- kafka-0: kafka-0.kafka-headless.default.svc.cluster.local:9092
- kafka-1: kafka-1.kafka-headless.default.svc.cluster.local:9092
headless 서비스를 사용하면 Statefulset Workload 리소스에서 특정 Node 에 있는 Pod 로 접근할 수 있습니다.
테스트 하기 전에 다음 명령으로 kafka Statefulset 의 상태를 확인합니다:
kubectl get statefulset
Statefulset 이 준비되었다면, 결과 화면은 다음과 같습니다:
NAME READY
statefulset.apps/kafka 2/2
statefulset.apps/kafka-zookeeper 1/1
watch "kubectl get statefulset"
명령을 사용하면 재실행하지 않고도 결과를 확인할 수 있습니다.
kubectl get svc
명령으로 kafka 서비스를 확인합니다:
$ kubectl get svc
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S)
kafka ClusterIP 10.96.37.230 <none> 9092/TCP
kafka-headless ClusterIP None <none> 9092/TCP,9093/TCP
kafka-zookeeper ClusterIP 10.96.253.86 <none> 2181/TCP,2888/TCP,3888/TCP
kafka-zookeeper-headless ClusterIP None <none> 2181/TCP,2888/TCP,3888/TCP
클러스터 내에서는 DNS 이름(kafka.default.svc.cluster.local
)과 포트(9092
)로 직접 접근할 수 있습니다.
kafka 테스트를 위해 producer, consumer Pod 를 배포하겠습니다:
kafka producer 클라이언트로 사용할 Pod 를 배포하기 위해 다음 명령을 실행합니다:
kubectl run producer --restart='Never' --image docker.io/bitnami/kafka:3.4.0-debian-11-r2 --command -- sleep infinity
kubectl exec --tty -i producer -- bash
kafka-console-producer.sh \
--broker-list \
kafka-0.kafka-headless.default.svc.cluster.local:9092,\
kafka-1.kafka-headless.default.svc.cluster.local:9092 \
--topic test
kafka consumer 클라이언트로 사용할 Pod 를 배포하기 위해 다음 명령을 실행합니다:
kubectl run consumer --restart='Never' --image docker.io/bitnami/kafka:3.4.0-debian-11-r2 --command -- sleep infinity
kubectl exec --tty -i consumer -- bash
kafka-console-consumer.sh \
--bootstrap-server kafka.default.svc.cluster.local:9092 \
--topic test \
--from-beginning
producer 에서 입력한 메시지를 consumer 에서 확인할 수 있다면 성공입니다.
테스트가 끝나면 exit
로 Shell 을 종료한 후, 다음 명령으로 테스트를 위해 만들었던 producer, consumer 두 Pod 를 삭제합니다:
kubectl delete pod producer
kubectl delete pod consumer
aggregator 배포하기
Fluent Bit의 에서 Kafka Output Plugin 을 레코드를 Kafka 서비스로 수집할 수 있습니다.
로그를 Kafka Output 하는 Fluent Bit 앱을 aggregator 라고 하겠습니다.
TL;DR 🤖
helm repo add fluent https://fluent.github.io/helm-charts
helm install aggregator fluent/fluent-bit -f https://raw.githubusercontent.com/cppis/tutorials-on-k8s/main/fluentbit-with-kafka-on-k8s/values.yaml
aggregator 소개
fluent/fluent-bit
Helm Chart를 사용하여 aggregator 를 실행합니다.
먼저, fluent Helm chart 저장소를 추가하고, fluent-bit 을 배포합니다.
fluent-bit 의 구성은 https://raw.githubusercontent.com/cppis/tutorials-on-k8s/main/fluentbit-with-kafka-on-k8s/values.yaml
파일에 있고, 다음은 파일의 일부 내용입니다:
image:
repository: cr.fluentbit.io/fluent/fluent-bit
tag: "latest-debug"
service:
type: NodePort
port: 24224
labels: {}
annotations: {}
## https://docs.fluentbit.io/manual/administration/configuring-fluent-bit/configuration-file
config:
service: |
[SERVICE]
Daemon Off
Flush
Log_Level
Parsers_File parsers.conf
Parsers_File custom_parsers.conf
HTTP_Server On
HTTP_Listen 0.0.0.0
HTTP_Port
Health_Check On
## https://docs.fluentbit.io/manual/pipeline/inputs
inputs: |
[INPUT]
Name tail
Path /var/log/containers/*.log
Exclude_Path /var/log/containers/aggregator-*.log,/var/log/containers/kindnet-*.log,/var/log/containers/kafka-*.log
multiline.parser docker, cri
Tag kube.*
Mem_Buf_Limit 5MB
Skip_Long_Lines On
[INPUT]
Name forward
Listen 0.0.0.0
Port 24224
Buffer_Chunk_Size 1M
Buffer_Max_Size 6M
...
## https://docs.fluentbit.io/manual/pipeline/outputs
outputs: |
[OUTPUT]
Name stdout
Match *
[OUTPUT]
Name kafka
Match *
Brokers kafka:9092
Topics fluent-bit
...
## https://docs.fluentbit.io/manual/pipeline/parsers
customParsers: |
[PARSER]
Name docker
Format json
Time_Key time
Time_Format %Y-%m-%dT%H:%M:%S.%L
Time_Keep Off
# Command | Decoder | Field | Optional Action
# =============|======================|=================
Decode_Field_As json log
[PARSER]
Name json
Format json
# The config volume is mounted by default, either to the existingConfigMap value, or the default of "fluent-bit.fullname"
volumeMounts:
- name: config
mountPath: /fluent-bit/etc/fluent-bit.conf
subPath: fluent-bit.conf
- name: config
mountPath: /fluent-bit/etc/custom_parsers.conf
subPath: custom_parsers.conf
daemonSetVolumes:
- name: varlog
hostPath:
path: /var/log
- name: varlibdockercontainers
hostPath:
path: /var/lib/docker/containers
- name: etcmachineid
hostPath:
path: /etc/machine-id
type: File
daemonSetVolumeMounts:
- name: varlog
mountPath: /var/log
- name: varlibdockercontainers
mountPath: /var/lib/docker/containers
readOnly: true
- name: etcmachineid
mountPath: /etc/machine-id
readOnly: true
logLevel: info
다음은 fluentbit.yaml
설정 파일에 대한 설명입니다:
-
image
cr.fluentbit.io/fluent/fluent-bit
이미지에서 디버깅을 위해 Shell 접근을 할 수 있는latest-debug
Tag 를 사용합니다. -
service
NodePort 타입으로 24224 를 사용합니다. -
config.inputs
Input 파이프라인으로 tail 과 forward 를 구성합니다.tail
- 노드의 컨테이너 로그 경로: /var/log/containers/
- docker, cri multiline parser 설정
forward
0.0.0.0:24224
로 forward 를 받습니다.
-
config.outputs
Output 파이프라인으로 stdout 과 forward 를 구성합니다.stdout
- 컨테이너 stdout 으로 출력합니다.
kafka
kafka
서버의 Topic 으로 전송합니다.
fluent/helm-charts 의 Helm Chart 템플릿, 기본 값들은,
ArtifactHub fluent/fluent-bit 을 참고하세요.
테스트하기
Kafka 에 추가된 로그를 확인하기 위해 Consumer Pod 를 생성하여 확인하겠습니다.
먼저 tail 을 위한 로깅을 만들어 kafka 로 produce 합니다.
다음 명령으로 /var/log/containers
볼륨이 마운트된 Pod 에 접속합니다:
kubectl exec -it $(kubectl get pod -l app.kubernetes.io/instance=aggregator -o jsonpath="{.items[0].metadata.name}") -- /bin/bash
접속한 Pod 에서 로깅하기 위해 다음 명령을 실행합니다:
echo '{"message":"hello"}' >> /var/log/containers/debug.log
exit
로 Pod Shell 에서 나온 후,
kafka consumer 로 fluent-bit 토픽 메시지를 확인하기 위해 다음 명령을 실행합니다:
kubectl run consumer --restart='Never' --image docker.io/bitnami/kafka:3.4.0-debian-11-r2 --command -- sleep infinity
kubectl exec --tty -i consumer -- bash
kafka-console-consumer.sh \
--bootstrap-server kafka.default.svc.cluster.local:9092 \
--topic fluent-bit \
--from-beginning
로그가 잘 출력되는 것을 확인할 수 있습니다:
... kube.var.log.containers.debug.log: {"message":"{\"message\":\"hello\"}"}
Flush 구성에 따라 로그가 수집되는 데 약간의 시간이 걸릴 수도 있습니다.
결론
Apache Kafka는 실시간 파이프라인을 구축하기 위해 설계된 분산 스트리밍 플랫폼으로, 메시지 브로커나 빅 데이터 앱을 위한 로그 집계 솔루션으로 사용할 수 있습니다.
이 글에서는 Fluent Bit, Kafka 를 쿠버네티스에 배포하고 구성을 테스트하는 방법을 소개했습니다. 😎
참고자료
쿠버네티스
Kafka
Fluent
- Fluent Bit v2.0 Documentation
- Configuration
- Output Plugins
댓글남기기