쿠버네티스에서 Fluent Bit 과 Kafka 구성하기
이 글에서는 Kind 클러스터에서 Fluent Bit 을 구성하고 테스트하는 방법을 소개합니다.
다음은 우리가 구성할 Workload 구성도입니다:
쿠버네티스에서 Pod 는 리소스의 기본 단위이고,
Workload 는 실행되는 애플리케이션(이하 앱)입니다.
목표
- Helm Chart 를 사용하여 Kafka Statefulset 배포하기
- Helm Chart 를 사용하여 Fluentd aggregator Daemonset 배포하기
- 테스트하기
준비하기
- WSL(윈도우즈일 경우)
- Docker
- Helm
- Kind
위 구성이 준비되지 않았다면,
Containerization Part.0 - Containerization 준비 과정을 따라하세요. 😉
시작하기
Kind 클러스터 구성하기
다음 명령을 실행하여 3 개의 노드로 구성된 Kind 클러스터를 생성합니다:
  cat <<EOF | kind create cluster --config=-
  kind: Cluster
  apiVersion: kind.x-k8s.io/v1alpha4
  nodes:
  - role: control-plane
  - role: worker
  - role: worker
  EOF
role 을 보면 하나의 control-plane 과 두 worker 들로 구성한다는 것을 알 수 있습니다.
kind get nodes 명령으로 3 개의 Node 가 생성된 것을 확인할 수 있습니다:
  $ kind get nodes
  kind-worker
  kind-worker2
  kind-control-plane
kafka 배포하기
Apache Kafka는 실시간 파이프라인을 구축하도록 설계된 분산 스트리밍 플랫폼으로, 메시지 브로커나 빅 데이터 앱을 위한 로그 집계 솔루션으로 사용할 수 있습니다.
TL;DR 🤖
helm repo add bitnami https://charts.bitnami.com/bitnami
helm install kafka bitnami/kafka --set replicaCount=2
kafka 소개
bitnami/kafka Helm Chart를 사용하여 kafka 를 실행합니다.
먼저, bitnami Helm chart 저장소를 추가하고, kafka 를 배포합니다.
실행할 때 파라미터로 전달하는 replicaCount 는 kafka 의 노드 수입니다.
bitnami/fluentd 의 Helm Chart 템플릿, 기본 값들은 ArtifactHub bitnami/kafka 를 참고하세요.
kafka 테스트하기
kafka Helm Chart를 실행하면, 다음 결과가 출력됩니다:
helm install kafka bitnami/kafka --set replicaCount=2
"bitnami" already exists with the same configuration, skipping
NAME: kafka
...
CHART NAME: kafka
CHART VERSION: 21.0.1
APP VERSION: 3.4.0
** Please be patient while the chart is being deployed **
Kafka can be accessed by consumers via port 9092 on the following DNS name from within your cluster:
    kafka.default.svc.cluster.local
Each Kafka broker can be accessed by producers via port 9092 on the following DNS name(s) from within your cluster:
    kafka-0.kafka-headless.default.svc.cluster.local:9092
    kafka-1.kafka-headless.default.svc.cluster.local:9092
To create a pod that you can use as a Kafka client run the following commands:
    kubectl run kafka-client --restart='Never' --image docker.io/bitnami/kafka:3.4.0-debian-11-r2 --namespace default --command -- sleep infinity
    kubectl exec --tty -i kafka-client --namespace default -- bash
    PRODUCER:
        kafka-console-producer.sh \
          --broker-list \
          kafka-0.kafka-headless.default.svc.cluster.local:9092,\
          kafka-1.kafka-headless.default.svc.cluster.local:9092 \
          --topic test
    CONSUMER:
        kafka-console-consumer.sh \
          --bootstrap-server kafka.default.svc.cluster.local:9092 \
          --topic test \
          --from-beginning
다음은 출력된 내용에 대한 설명입니다:
- DNS 이름: kafka.default.svc.cluster.local
- 각 노드의 DNS 이름:
    - kafka-0: kafka-0.kafka-headless.default.svc.cluster.local:9092
- kafka-1: kafka-1.kafka-headless.default.svc.cluster.local:9092
 
headless 서비스를 사용하면 Statefulset Workload 리소스에서 특정 Node 에 있는 Pod 로 접근할 수 있습니다.
테스트 하기 전에 다음 명령으로 kafka Statefulset 의 상태를 확인합니다:
kubectl get statefulset
Statefulset 이 준비되었다면, 결과 화면은 다음과 같습니다:
NAME                               READY
statefulset.apps/kafka             2/2
statefulset.apps/kafka-zookeeper   1/1
watch "kubectl get statefulset"명령을 사용하면 재실행하지 않고도 결과를 확인할 수 있습니다.
kubectl get svc 명령으로 kafka 서비스를 확인합니다:
$ kubectl get svc
NAME                     TYPE      CLUSTER-IP   EXTERNAL-IP PORT(S)
kafka                    ClusterIP 10.96.37.230 <none>      9092/TCP                   
kafka-headless           ClusterIP None         <none>      9092/TCP,9093/TCP         
kafka-zookeeper          ClusterIP 10.96.253.86 <none>      2181/TCP,2888/TCP,3888/TCP
kafka-zookeeper-headless ClusterIP None         <none>      2181/TCP,2888/TCP,3888/TCP
클러스터 내에서는 DNS 이름(kafka.default.svc.cluster.local)과 포트(9092)로 직접 접근할 수 있습니다.
kafka 테스트를 위해 producer, consumer Pod 를 배포하겠습니다:
kafka producer 클라이언트로 사용할 Pod 를 배포하기 위해 다음 명령을 실행합니다:
kubectl run producer --restart='Never' --image docker.io/bitnami/kafka:3.4.0-debian-11-r2 --command -- sleep infinity
kubectl exec --tty -i producer -- bash
kafka-console-producer.sh \
  --broker-list \
  kafka-0.kafka-headless.default.svc.cluster.local:9092,\
  kafka-1.kafka-headless.default.svc.cluster.local:9092 \
  --topic test
kafka consumer 클라이언트로 사용할 Pod 를 배포하기 위해 다음 명령을 실행합니다:
kubectl run consumer --restart='Never' --image docker.io/bitnami/kafka:3.4.0-debian-11-r2 --command -- sleep infinity
kubectl exec --tty -i consumer -- bash
kafka-console-consumer.sh \
  --bootstrap-server kafka.default.svc.cluster.local:9092 \
  --topic test \
  --from-beginning
producer 에서 입력한 메시지를 consumer 에서 확인할 수 있다면 성공입니다.
테스트가 끝나면 exit 로 Shell 을 종료한 후, 다음 명령으로 테스트를 위해 만들었던 producer, consumer 두 Pod 를 삭제합니다:
kubectl delete pod producer
kubectl delete pod consumer
aggregator 배포하기
Fluent Bit의 에서 Kafka Output Plugin 을 레코드를 Kafka 서비스로 수집할 수 있습니다.
로그를 Kafka Output 하는 Fluent Bit 앱을 aggregator 라고 하겠습니다.
TL;DR 🤖
helm repo add fluent https://fluent.github.io/helm-charts
helm install aggregator fluent/fluent-bit -f https://raw.githubusercontent.com/cppis/tutorials-on-k8s/main/fluentbit-with-kafka-on-k8s/values.yaml
aggregator 소개
fluent/fluent-bit Helm Chart를 사용하여 aggregator 를 실행합니다.
먼저, fluent Helm chart 저장소를 추가하고, fluent-bit 을 배포합니다.
fluent-bit 의 구성은 https://raw.githubusercontent.com/cppis/tutorials-on-k8s/main/fluentbit-with-kafka-on-k8s/values.yaml 파일에 있고, 다음은 파일의 일부 내용입니다:
image:
  repository: cr.fluentbit.io/fluent/fluent-bit
  tag: "latest-debug"
service:
  type: NodePort
  port: 24224
  labels: {}
  annotations: {}
## https://docs.fluentbit.io/manual/administration/configuring-fluent-bit/configuration-file
config:
  service: |
    [SERVICE]
        Daemon Off
        Flush 
        Log_Level 
        Parsers_File parsers.conf
        Parsers_File custom_parsers.conf
        HTTP_Server On
        HTTP_Listen 0.0.0.0
        HTTP_Port 
        Health_Check On
  ## https://docs.fluentbit.io/manual/pipeline/inputs
  inputs: |
    [INPUT]
        Name tail
        Path /var/log/containers/*.log
        Exclude_Path /var/log/containers/aggregator-*.log,/var/log/containers/kindnet-*.log,/var/log/containers/kafka-*.log
        multiline.parser docker, cri
        Tag kube.*
        Mem_Buf_Limit 5MB
        Skip_Long_Lines On
    [INPUT]
        Name forward
        Listen 0.0.0.0
        Port 24224
        Buffer_Chunk_Size 1M
        Buffer_Max_Size 6M
  ...
  ## https://docs.fluentbit.io/manual/pipeline/outputs
  outputs: |
    [OUTPUT]
        Name stdout
        Match *
    [OUTPUT]
        Name        kafka
        Match       *
        Brokers     kafka:9092
        Topics      fluent-bit
  ...
  ## https://docs.fluentbit.io/manual/pipeline/parsers
  customParsers: |
    [PARSER]
        Name        docker
        Format      json
        Time_Key    time
        Time_Format %Y-%m-%dT%H:%M:%S.%L
        Time_Keep   Off
        # Command      |  Decoder     | Field | Optional Action
        # =============|======================|=================
        Decode_Field_As   json       log
    [PARSER]
        Name        json
        Format      json
# The config volume is mounted by default, either to the existingConfigMap value, or the default of "fluent-bit.fullname"
volumeMounts:
  - name: config
    mountPath: /fluent-bit/etc/fluent-bit.conf
    subPath: fluent-bit.conf
  - name: config
    mountPath: /fluent-bit/etc/custom_parsers.conf
    subPath: custom_parsers.conf
daemonSetVolumes:
  - name: varlog
    hostPath:
      path: /var/log
  - name: varlibdockercontainers
    hostPath:
      path: /var/lib/docker/containers
  - name: etcmachineid
    hostPath:
      path: /etc/machine-id
      type: File
daemonSetVolumeMounts:
  - name: varlog
    mountPath: /var/log
  - name: varlibdockercontainers
    mountPath: /var/lib/docker/containers
    readOnly: true
  - name: etcmachineid
    mountPath: /etc/machine-id
    readOnly: true
logLevel: info
다음은 fluentbit.yaml 설정 파일에 대한 설명입니다:
- 
    image
 cr.fluentbit.io/fluent/fluent-bit이미지에서 디버깅을 위해 Shell 접근을 할 수 있는latest-debugTag 를 사용합니다.
- 
    service
 NodePort 타입으로 24224 를 사용합니다.
- 
    config.inputs
 Input 파이프라인으로 tail 과 forward 를 구성합니다.- tail- 노드의 컨테이너 로그 경로: /var/log/containers/
- docker, cri multiline parser 설정
 
- forward- 0.0.0.0:24224로 forward 를 받습니다.
 
 
- 
    config.outputs
 Output 파이프라인으로 stdout 과 forward 를 구성합니다.- stdout- 컨테이너 stdout 으로 출력합니다.
 
- kafka- kafka서버의 Topic 으로 전송합니다.
 
 
fluent/helm-charts 의 Helm Chart 템플릿, 기본 값들은,
ArtifactHub fluent/fluent-bit 을 참고하세요.
테스트하기
Kafka 에 추가된 로그를 확인하기 위해 Consumer Pod 를 생성하여 확인하겠습니다.
먼저 tail 을 위한 로깅을 만들어 kafka 로 produce 합니다.
다음 명령으로 /var/log/containers 볼륨이 마운트된 Pod 에 접속합니다:
kubectl exec -it $(kubectl get pod -l app.kubernetes.io/instance=aggregator -o jsonpath="{.items[0].metadata.name}") -- /bin/bash
접속한 Pod 에서 로깅하기 위해 다음 명령을 실행합니다:
echo '{"message":"hello"}' >> /var/log/containers/debug.log
exit 로 Pod Shell 에서 나온 후,
kafka consumer 로 fluent-bit 토픽 메시지를 확인하기 위해 다음 명령을 실행합니다:
kubectl run consumer --restart='Never' --image docker.io/bitnami/kafka:3.4.0-debian-11-r2 --command -- sleep infinity
kubectl exec --tty -i consumer -- bash
kafka-console-consumer.sh \
  --bootstrap-server kafka.default.svc.cluster.local:9092 \
  --topic fluent-bit \
  --from-beginning
로그가 잘 출력되는 것을 확인할 수 있습니다:
... kube.var.log.containers.debug.log: {"message":"{\"message\":\"hello\"}"}      
Flush 구성에 따라 로그가 수집되는 데 약간의 시간이 걸릴 수도 있습니다.
결론
Apache Kafka는 실시간 파이프라인을 구축하기 위해 설계된 분산 스트리밍 플랫폼으로, 메시지 브로커나 빅 데이터 앱을 위한 로그 집계 솔루션으로 사용할 수 있습니다.
이 글에서는 Fluent Bit, Kafka 를 쿠버네티스에 배포하고 구성을 테스트하는 방법을 소개했습니다. 😎
참고자료
쿠버네티스
Kafka
Fluent
- Fluent Bit v2.0 Documentation
    - Configuration
- Output Plugins
 
 
       
   
   
   
  
댓글남기기