5 분 소요


이 글에서는 Kind 클러스터에서 Fluent Bit 을 구성하고 테스트하는 방법을 소개합니다.

다음은 우리가 구성할 Workload 구성도입니다:

그림. kafka, aggregator Workload 구성

쿠버네티스에서 Pod 는 리소스의 기본 단위이고,
Workload 는 실행되는 애플리케이션(이하 앱)입니다.


목표

  • Helm Chart 를 사용하여 Kafka Statefulset 배포하기
  • Helm Chart 를 사용하여 Fluentd aggregator Daemonset 배포하기
  • 테스트하기




준비하기

  • WSL(윈도우즈일 경우)
  • Docker
  • Helm
  • Kind

위 구성이 준비되지 않았다면,
Containerization Part.0 - Containerization 준비 과정을 따라하세요. 😉




시작하기

Kind 클러스터 구성하기

다음 명령을 실행하여 3 개의 노드로 구성된 Kind 클러스터를 생성합니다:

  cat <<EOF | kind create cluster --config=-
  kind: Cluster
  apiVersion: kind.x-k8s.io/v1alpha4
  nodes:
  - role: control-plane
  - role: worker
  - role: worker
  EOF

role 을 보면 하나의 control-plane 과 두 worker 들로 구성한다는 것을 알 수 있습니다.

kind get nodes 명령으로 3 개의 Node 가 생성된 것을 확인할 수 있습니다:

  $ kind get nodes
  kind-worker
  kind-worker2
  kind-control-plane




kafka 배포하기

Apache Kafka는 실시간 파이프라인을 구축하도록 설계된 분산 스트리밍 플랫폼으로, 메시지 브로커나 빅 데이터 앱을 위한 로그 집계 솔루션으로 사용할 수 있습니다.

그림. kafka Workload 구성


TL;DR 🤖

helm repo add bitnami https://charts.bitnami.com/bitnami
helm install kafka bitnami/kafka --set replicaCount=2


kafka 소개

bitnami/kafka Helm Chart를 사용하여 kafka 를 실행합니다.
먼저, bitnami Helm chart 저장소를 추가하고, kafka 를 배포합니다.
실행할 때 파라미터로 전달하는 replicaCount 는 kafka 의 노드 수입니다.


bitnami/fluentd 의 Helm Chart 템플릿, 기본 값들은 ArtifactHub bitnami/kafka 를 참고하세요.




kafka 테스트하기

kafka Helm Chart를 실행하면, 다음 결과가 출력됩니다:

helm install kafka bitnami/kafka --set replicaCount=2
"bitnami" already exists with the same configuration, skipping
NAME: kafka
...
CHART NAME: kafka
CHART VERSION: 21.0.1
APP VERSION: 3.4.0

** Please be patient while the chart is being deployed **

Kafka can be accessed by consumers via port 9092 on the following DNS name from within your cluster:

    kafka.default.svc.cluster.local

Each Kafka broker can be accessed by producers via port 9092 on the following DNS name(s) from within your cluster:

    kafka-0.kafka-headless.default.svc.cluster.local:9092
    kafka-1.kafka-headless.default.svc.cluster.local:9092

To create a pod that you can use as a Kafka client run the following commands:

    kubectl run kafka-client --restart='Never' --image docker.io/bitnami/kafka:3.4.0-debian-11-r2 --namespace default --command -- sleep infinity
    kubectl exec --tty -i kafka-client --namespace default -- bash

    PRODUCER:
        kafka-console-producer.sh \
          --broker-list \
          kafka-0.kafka-headless.default.svc.cluster.local:9092,\
          kafka-1.kafka-headless.default.svc.cluster.local:9092 \
          --topic test

    CONSUMER:
        kafka-console-consumer.sh \
          --bootstrap-server kafka.default.svc.cluster.local:9092 \
          --topic test \
          --from-beginning


다음은 출력된 내용에 대한 설명입니다:

  • DNS 이름: kafka.default.svc.cluster.local
  • 각 노드의 DNS 이름:
    • kafka-0: kafka-0.kafka-headless.default.svc.cluster.local:9092
    • kafka-1: kafka-1.kafka-headless.default.svc.cluster.local:9092

headless 서비스를 사용하면 Statefulset Workload 리소스에서 특정 Node 에 있는 Pod 로 접근할 수 있습니다.


테스트 하기 전에 다음 명령으로 kafka Statefulset 의 상태를 확인합니다:

kubectl get statefulset

Statefulset 이 준비되었다면, 결과 화면은 다음과 같습니다:

NAME                               READY
statefulset.apps/kafka             2/2
statefulset.apps/kafka-zookeeper   1/1

watch "kubectl get statefulset" 명령을 사용하면 재실행하지 않고도 결과를 확인할 수 있습니다.


kubectl get svc 명령으로 kafka 서비스를 확인합니다:

$ kubectl get svc
NAME                     TYPE      CLUSTER-IP   EXTERNAL-IP PORT(S)
kafka                    ClusterIP 10.96.37.230 <none>      9092/TCP                   
kafka-headless           ClusterIP None         <none>      9092/TCP,9093/TCP         
kafka-zookeeper          ClusterIP 10.96.253.86 <none>      2181/TCP,2888/TCP,3888/TCP
kafka-zookeeper-headless ClusterIP None         <none>      2181/TCP,2888/TCP,3888/TCP

클러스터 내에서는 DNS 이름(kafka.default.svc.cluster.local)과 포트(9092)로 직접 접근할 수 있습니다.


kafka 테스트를 위해 producer, consumer Pod 를 배포하겠습니다:

그림. kafka producer, consumer Pod 배포하기

kafka producer 클라이언트로 사용할 Pod 를 배포하기 위해 다음 명령을 실행합니다:

kubectl run producer --restart='Never' --image docker.io/bitnami/kafka:3.4.0-debian-11-r2 --command -- sleep infinity
kubectl exec --tty -i producer -- bash
kafka-console-producer.sh \
  --broker-list \
  kafka-0.kafka-headless.default.svc.cluster.local:9092,\
  kafka-1.kafka-headless.default.svc.cluster.local:9092 \
  --topic test

kafka consumer 클라이언트로 사용할 Pod 를 배포하기 위해 다음 명령을 실행합니다:

kubectl run consumer --restart='Never' --image docker.io/bitnami/kafka:3.4.0-debian-11-r2 --command -- sleep infinity
kubectl exec --tty -i consumer -- bash
kafka-console-consumer.sh \
  --bootstrap-server kafka.default.svc.cluster.local:9092 \
  --topic test \
  --from-beginning


producer 에서 입력한 메시지를 consumer 에서 확인할 수 있다면 성공입니다.


테스트가 끝나면 exit 로 Shell 을 종료한 후, 다음 명령으로 테스트를 위해 만들었던 producer, consumer 두 Pod 를 삭제합니다:

kubectl delete pod producer
kubectl delete pod consumer




aggregator 배포하기

Fluent Bit의 에서 Kafka Output Plugin 을 레코드를 Kafka 서비스로 수집할 수 있습니다.
로그를 Kafka Output 하는 Fluent Bit 앱을 aggregator 라고 하겠습니다.

그림. kafka, aggregator Workload 구성


TL;DR 🤖

helm repo add fluent https://fluent.github.io/helm-charts
helm install aggregator fluent/fluent-bit -f https://raw.githubusercontent.com/cppis/tutorials-on-k8s/main/fluentbit-with-kafka-on-k8s/values.yaml


aggregator 소개

fluent/fluent-bit Helm Chart를 사용하여 aggregator 를 실행합니다.
먼저, fluent Helm chart 저장소를 추가하고, fluent-bit 을 배포합니다.
fluent-bit 의 구성은 https://raw.githubusercontent.com/cppis/tutorials-on-k8s/main/fluentbit-with-kafka-on-k8s/values.yaml 파일에 있고, 다음은 파일의 일부 내용입니다:

image:
  repository: cr.fluentbit.io/fluent/fluent-bit
  tag: "latest-debug"

service:
  type: NodePort
  port: 24224
  labels: {}
  annotations: {}

## https://docs.fluentbit.io/manual/administration/configuring-fluent-bit/configuration-file
config:
  service: |
    [SERVICE]
        Daemon Off
        Flush 
        Log_Level 
        Parsers_File parsers.conf
        Parsers_File custom_parsers.conf
        HTTP_Server On
        HTTP_Listen 0.0.0.0
        HTTP_Port 
        Health_Check On

  ## https://docs.fluentbit.io/manual/pipeline/inputs
  inputs: |
    [INPUT]
        Name tail
        Path /var/log/containers/*.log
        Exclude_Path /var/log/containers/aggregator-*.log,/var/log/containers/kindnet-*.log,/var/log/containers/kafka-*.log
        multiline.parser docker, cri
        Tag kube.*
        Mem_Buf_Limit 5MB
        Skip_Long_Lines On

    [INPUT]
        Name forward
        Listen 0.0.0.0
        Port 24224
        Buffer_Chunk_Size 1M
        Buffer_Max_Size 6M

  ...

  ## https://docs.fluentbit.io/manual/pipeline/outputs
  outputs: |
    [OUTPUT]
        Name stdout
        Match *

    [OUTPUT]
        Name        kafka
        Match       *
        Brokers     kafka:9092
        Topics      fluent-bit

  ...

  ## https://docs.fluentbit.io/manual/pipeline/parsers
  customParsers: |
    [PARSER]
        Name        docker
        Format      json
        Time_Key    time
        Time_Format %Y-%m-%dT%H:%M:%S.%L
        Time_Keep   Off
        # Command      |  Decoder     | Field | Optional Action
        # =============|======================|=================
        Decode_Field_As   json       log

    [PARSER]
        Name        json
        Format      json

# The config volume is mounted by default, either to the existingConfigMap value, or the default of "fluent-bit.fullname"
volumeMounts:
  - name: config
    mountPath: /fluent-bit/etc/fluent-bit.conf
    subPath: fluent-bit.conf
  - name: config
    mountPath: /fluent-bit/etc/custom_parsers.conf
    subPath: custom_parsers.conf

daemonSetVolumes:
  - name: varlog
    hostPath:
      path: /var/log
  - name: varlibdockercontainers
    hostPath:
      path: /var/lib/docker/containers
  - name: etcmachineid
    hostPath:
      path: /etc/machine-id
      type: File

daemonSetVolumeMounts:
  - name: varlog
    mountPath: /var/log
  - name: varlibdockercontainers
    mountPath: /var/lib/docker/containers
    readOnly: true
  - name: etcmachineid
    mountPath: /etc/machine-id
    readOnly: true

logLevel: info

다음은 fluentbit.yaml 설정 파일에 대한 설명입니다:

  • image
    cr.fluentbit.io/fluent/fluent-bit 이미지에서 디버깅을 위해 Shell 접근을 할 수 있는 latest-debug Tag 를 사용합니다.

  • service
    NodePort 타입으로 24224 를 사용합니다.

  • config.inputs
    Input 파이프라인으로 tail 과 forward 를 구성합니다.

    • tail
      • 노드의 컨테이너 로그 경로: /var/log/containers/
      • docker, cri multiline parser 설정
    • forward
      • 0.0.0.0:24224 로 forward 를 받습니다.
  • config.outputs
    Output 파이프라인으로 stdout 과 forward 를 구성합니다.

    • stdout
      • 컨테이너 stdout 으로 출력합니다.
    • kafka
      • kafka 서버의 Topic 으로 전송합니다.


fluent/helm-charts 의 Helm Chart 템플릿, 기본 값들은,
ArtifactHub fluent/fluent-bit 을 참고하세요.




테스트하기

Kafka 에 추가된 로그를 확인하기 위해 Consumer Pod 를 생성하여 확인하겠습니다.

그림. kafka consumer 배포하기


먼저 tail 을 위한 로깅을 만들어 kafka 로 produce 합니다.
다음 명령으로 /var/log/containers 볼륨이 마운트된 Pod 에 접속합니다:

kubectl exec -it $(kubectl get pod -l app.kubernetes.io/instance=aggregator -o jsonpath="{.items[0].metadata.name}") -- /bin/bash

접속한 Pod 에서 로깅하기 위해 다음 명령을 실행합니다:

echo '{"message":"hello"}' >> /var/log/containers/debug.log

exitPod Shell 에서 나온 후,
kafka consumer 로 fluent-bit 토픽 메시지를 확인하기 위해 다음 명령을 실행합니다:

kubectl run consumer --restart='Never' --image docker.io/bitnami/kafka:3.4.0-debian-11-r2 --command -- sleep infinity
kubectl exec --tty -i consumer -- bash
kafka-console-consumer.sh \
  --bootstrap-server kafka.default.svc.cluster.local:9092 \
  --topic fluent-bit \
  --from-beginning

로그가 잘 출력되는 것을 확인할 수 있습니다:

... kube.var.log.containers.debug.log: {"message":"{\"message\":\"hello\"}"}      

Flush 구성에 따라 로그가 수집되는 데 약간의 시간이 걸릴 수도 있습니다.




결론

Apache Kafka는 실시간 파이프라인을 구축하기 위해 설계된 분산 스트리밍 플랫폼으로, 메시지 브로커나 빅 데이터 앱을 위한 로그 집계 솔루션으로 사용할 수 있습니다.


이 글에서는 Fluent Bit, Kafka 를 쿠버네티스에 배포하고 구성을 테스트하는 방법을 소개했습니다. 😎




참고자료

쿠버네티스

Kafka

Fluent

Helm Chart

댓글남기기