k8s部署 Kafka Zookeeper KafkaManager

k8s部署 Kafka Zookeeper KafkaManager

Scroll Down

系统环境

Kubernetes 版本:1.15.4
kafka 版本:2.4.0
zookeeper 版本:3.5.6
kafka manager 版本:2.0.0.2

组件简介

Kafka 简介

Kafka 是由 Apache 软件基金会开发的一个开源流处理平台,由 Scala 和 Java 编写。它是一个分布式、支持分区的的、多副本,基于 zookeeper 协调的分布式消息系统。它最大特性是可以实时的处理大量数据以满足各种需求场景,比如基于 Hadoop 的批处理系统、低延时的实时系统、storm/Spark 流式处理引擎,web/nginx 日志,访问日志,消息服务等等

Zookeeper 简介

ZooKeeper 是一种分布式协调服务,用于管理大型主机。在分布式环境中协调和管理服务是一个复杂的过程。ZooKeeper 通过其简单的架构和 API 解决了这个问题。 ZooKeeper 允许开发人员专注于核心应用程序逻辑,而不必担心应用程序的分布式特性
Kafka 中主要利用 zookeeper 解决分布式一致性问题。Kafka 使用 Zookeeper 的分布式协调服务将生产者,消费者,消息储存结合在一起。同时借助 Zookeeper,Kafka 能够将生产者、消费者和 Broker 在内的所有组件在无状态的条件下建立起生产者和消费者的订阅关系,实现生产者的负载均衡

Kafka Manager 简介

Kafka Manager 是目前最受欢迎的 Kafka 集群管理工具,最早由雅虎开源,用户可以在 Web 界面执行一些简单的集群管理操作
支持以下功能:

  • 管理 Kafka 集群
  • 方便集群状态监控 (包括topics, consumers, offsets, brokers, replica distribution, partition distribution)
  • 方便选择分区副本
  • 配置分区任务,包括选择使用哪些 Brokers
  • 可以对分区任务重分配
  • 提供不同的选项来创建及删除 Topic
  • Topic List 会指明哪些topic被删除
  • 批量产生分区任务并且和多个topic和brokers关联
  • 批量运行多个主题对应的多个分区
  • 向已经存在的主题中添加分区
  • 对已经存在的 Topic 修改配置
  • 可以在 Broker Level 和 Topic Level 的度量中启用 JMX Polling 功能
  • 可以过滤在 ZooKeeper 上没有 ids/owners/offsets/directories 的 consumer

部署过程

这个流程需要部署三个组件,分别为 Zookeeper、Kafka、Kafka Manager:

  • Zookeeper: 首先部署 Zookeeper,方便后续部署 Kafka 节点注册到 Zookeeper,用 StatefulSet 方式部署三个节点
  • Kafka: 第二个部署的是 Kafka,设置环境变量来指定 Zookeeper 地址,用 StatefulSet 方式部署
  • Kafka Manager: 最后部署的是 Kafka Manager,用 Deployment 方式部署,然后打开 Web UI 界面来管理、监控 Kafka
    kafka1.png

部署 Zookeeper & Kafka & Kafka Manager

由于都是使用 StatefulSet 方式部署的有状态服务,所以 Kubernetes 集群需要提前设置一个 StorageClass 方便后续部署时指定存储分配(如果想指定为已经存在的 StorageClass 创建 PV 则跳过此步骤)

Kubernetes 部署 Zookeeper

创建 Zookeeper 部署文件

[root@k8s01 kafka]# vim zookeeper.yaml
#部署 Service Headless,用于Zookeeper间相互通信
apiVersion: v1
kind: Service
metadata:
  name: zookeeper-headless
  namespace: test-env
  labels:
    app: zookeeper
spec:
  type: ClusterIP
  clusterIP: None
  publishNotReadyAddresses: true
  ports:
  - name: client
    port: 2181
    targetPort: client
  - name: follower
    port: 2888
    targetPort: follower
  - name: election
    port: 3888
    targetPort: election
  selector:
    app: zookeeper
---
#部署 Service,用于外部访问 Zookeeper
apiVersion: v1
kind: Service
metadata:
  name: zookeeper
  namespace: test-env
  labels:
    app: zookeeper
spec:
  type: ClusterIP
  ports:
  - name: client
    port: 2181
    targetPort: client
  - name: follower
    port: 2888
    targetPort: follower
  - name: election
    port: 3888
    targetPort: election
  selector:
    app: zookeeper
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: zookeeper
  namespace: test-env
  labels:
    app: zookeeper
spec:
  serviceName: zookeeper-headless
  replicas: 3
  podManagementPolicy: Parallel
  updateStrategy:
    type: RollingUpdate
  selector:
    matchLabels:
      app: zookeeper
  template:
    metadata:
      name: zookeeper
      labels:
        app: zookeeper
    spec:      
      securityContext:
        fsGroup: 1001
      containers:
      - name: zookeeper
        image: docker.io/bitnami/zookeeper:3.5.6-debian-9-r25
        imagePullPolicy: IfNotPresent
        securityContext:
          runAsUser: 1001
        command:
         - bash
         - -ec
         - |
            # Execute entrypoint as usual after obtaining ZOO_SERVER_ID based on POD hostname
            HOSTNAME=`hostname -s`
            if [[ $HOSTNAME =~ (.*)-([0-9]+)$ ]]; then
              ORD=${BASH_REMATCH[2]}
              export ZOO_SERVER_ID=$((ORD+1))
            else
              echo "Failed to get index from hostname $HOST"
              exit 1
            fi
            . /opt/bitnami/base/functions
            . /opt/bitnami/base/helpers
            print_welcome_page
            . /init.sh
            nami_initialize zookeeper
            exec tini -- /run.sh
        resources: 
          limits:
            cpu: 500m
            memory: 512Mi
          requests:
            cpu: 250m
            memory: 256Mi
        env:
        - name: ZOO_PORT_NUMBER
          value: "2181"
        - name: ZOO_TICK_TIME
          value: "2000"
        - name: ZOO_INIT_LIMIT
          value: "10"
        - name: ZOO_SYNC_LIMIT
          value: "5"
        - name: ZOO_MAX_CLIENT_CNXNS
          value: "60"
        - name: ZOO_SERVERS
          value: "
                  zookeeper-0.zookeeper-headless:2888:3888,
                  zookeeper-1.zookeeper-headless:2888:3888,
                  zookeeper-2.zookeeper-headless:2888:3888
                 "
        - name: ZOO_ENABLE_AUTH
          value: "no"
        - name: ZOO_HEAP_SIZE
          value: "1024"
        - name: ZOO_LOG_LEVEL
          value: "ERROR"
        - name: ALLOW_ANONYMOUS_LOGIN
          value: "yes"
        ports:
        - name: client
          containerPort: 2181
        - name: follower
          containerPort: 2888
        - name: election
          containerPort: 3888
        livenessProbe:
          tcpSocket:
            port: client
          initialDelaySeconds: 30
          periodSeconds: 10
          timeoutSeconds: 5
          successThreshold: 1
          failureThreshold: 6
        readinessProbe:
          tcpSocket:
            port: client
          initialDelaySeconds: 5
          periodSeconds: 10
          timeoutSeconds: 5
          successThreshold: 1
          failureThreshold: 6
        volumeMounts:
        - name: data
          mountPath: /bitnami/zookeeper
  volumeClaimTemplates:
    - metadata:
        name: data
        annotations:
      spec:
        storageClassName: managed-nfs-storage    #指定为上面创建的 storageclass
        accessModes:
          - ReadWriteOnce
        resources:
          requests:
            storage: 5Gi
            
[root@k8s01 kafka]# kubectl apply -f zookeeper.yaml
service/zookeeper-headless created
service/zookeeper created
statefulset.apps/zookeeper created
[root@k8s01 kafka]# kubectl get -n test-env po
NAME                         READY   STATUS    RESTARTS   AGE
zookeeper-0                  1/1     Running   2          2m10s
zookeeper-1                  1/1     Running   0          2m10s
zookeeper-2                  1/1     Running   1          2m10s   

Kubernetes 部署 Kafka

创建 Kafka 部署文件

[root@k8s01 kafka]# vim kafka.yaml
#部署 Service Headless,用于Kafka间相互通信
apiVersion: v1
kind: Service
metadata:
  name: kafka-headless
  namespace: test-env
  labels:
    app: kafka
spec:
  type: ClusterIP
  clusterIP: None
  ports:
  - name: kafka
    port: 9092
    targetPort: kafka
  selector:
    app: kafka
---
#部署 Service,用于外部访问 Kafka
apiVersion: v1
kind: Service
metadata:
  name: kafka
  namespace: test-env
  labels:
    app: kafka
spec:
  type: ClusterIP
  ports:
  - name: kafka
    port: 9092
    targetPort: kafka
  selector:
    app: kafka
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: "kafka"
  namespace: test-env
  labels:
    app: kafka
spec:
  selector:
    matchLabels:
      app: kafka
  serviceName: kafka-headless
  podManagementPolicy: "Parallel"
  replicas: 3
  updateStrategy:
    type: "RollingUpdate"
  template:
    metadata:
      name: "kafka"
      labels:
        app: kafka
    spec:      
      securityContext:
        fsGroup: 1001
        runAsUser: 1001
      containers:
      - name: kafka
        image: "docker.io/bitnami/kafka:2.4.0-debian-9-r4"
        imagePullPolicy: "IfNotPresent"
        resources:
          limits:
            cpu: 500m
            memory: 512Mi
          requests:
            cpu: 250m
            memory: 256Mi
        env:
        - name: MY_POD_IP
          valueFrom:
            fieldRef:
              fieldPath: status.podIP
        - name: MY_POD_NAME
          valueFrom:
            fieldRef:
              fieldPath: metadata.name
        - name: KAFKA_CFG_ZOOKEEPER_CONNECT
          value: "zookeeper"                 #Zookeeper Service 名称
        - name: KAFKA_PORT_NUMBER
          value: "9092"
        - name: KAFKA_CFG_LISTENERS
          value: "PLAINTEXT://:$(KAFKA_PORT_NUMBER)"
        - name: KAFKA_CFG_ADVERTISED_LISTENERS
          value: 'PLAINTEXT://$(MY_POD_NAME).kafka-headless:$(KAFKA_PORT_NUMBER)'
        - name: ALLOW_PLAINTEXT_LISTENER
          value: "yes"
        - name: KAFKA_HEAP_OPTS
          value: "-Xmx512m -Xms512m"
        - name: KAFKA_CFG_LOGS_DIRS
          value: /opt/bitnami/kafka/data
        - name: JMX_PORT
          value: "9988"
        ports:
        - name: kafka
          containerPort: 9092
        livenessProbe:
          tcpSocket:
            port: kafka
          initialDelaySeconds: 10
          periodSeconds: 10
          timeoutSeconds: 5
          successThreshold: 1
          failureThreshold: 2
        readinessProbe:
          tcpSocket:
            port: kafka
          initialDelaySeconds: 5
          periodSeconds: 10
          timeoutSeconds: 5
          successThreshold: 1
          failureThreshold: 6
        volumeMounts:
        - name: data
          mountPath: /bitnami/kafka
  volumeClaimTemplates:
    - metadata:
        name: data
      spec:
        storageClassName: managed-nfs-storage    #指定为上面创建的 storageclass
        accessModes:
          - "ReadWriteOnce"
        resources:
          requests:
            storage: 5Gi
            
[root@k8s01 kafka]# kubectl apply -f kafka.yaml
service/kafka-headless created
service/kafka created
statefulset.apps/kafka created
 [root@k8s01 kafka]# kubectl get -n test-env po
NAME                         READY   STATUS    RESTARTS   AGE
kafka-0                      1/1     Running   0          13m
kafka-1                      1/1     Running   0          13m
kafka-2                      1/1     Running   0          13m
zookeeper-0                  1/1     Running   2          27m
zookeeper-1                  1/1     Running   0          27m
zookeeper-2                  1/1     Running   1          27m   

Kubernetes 部署 Kafka Manager

创建 Kafka Manager 部署文件

[root@k8s01 kafka]# vim kafka-manager.yaml
apiVersion: v1
kind: Service
metadata:
  name: kafka-manager
  namespace: test-env
  labels:
    app: kafka-manager
spec:
  type: NodePort
  ports:
  - name: kafka
    port: 9000
    targetPort: 9000
    nodePort: 30900
  selector:
    app: kafka-manager
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: kafka-manager
  namespace: test-env
  labels:
    app: kafka-manager
spec:
  replicas: 1
  selector:
    matchLabels:
      app: kafka-manager
  template:
    metadata:
      labels:
        app: kafka-manager
    spec:
      containers:
      - name: kafka-manager
        image: zenko/kafka-manager:latest
        imagePullPolicy: IfNotPresent
        ports:
        - name: kafka-manager
          containerPort: 9000
          protocol: TCP
        env:
        - name: ZK_HOSTS
          value: "zookeeper:2181"
        livenessProbe:
          httpGet:
            path: /api/health
            port: kafka-manager
        readinessProbe:
          httpGet:
            path: /api/health
            port: kafka-manager
        resources:
          limits:
            cpu: 500m
            memory: 512Mi
          requests:
            cpu: 250m
            memory: 256Mi
---
#可选添加ingress,通过域名访问
apiVersion: traefik.containo.us/v1alpha1
kind: IngressRoute
metadata:
  name: test-ing
  namespace: test-env
spec:
  entryPoints:
  - web
  routes:
  - match: Host(`kafka.k8s.com`)
    kind: Rule
    services:
      - name: kafka-manager
        port: 9000  
        
[root@k8s01 kafka]# kubectl apply -f kafka-manager.yaml
service/kafka-manager created
deployment.apps/kafka-manager created
[root@k8s01 kafka]# kubectl get -n test-env po
NAME                             READY   STATUS    RESTARTS   AGE
kafka-0                          1/1     Running   0          54m
kafka-1                          1/1     Running   0          54m
kafka-2                          1/1     Running   0          54m
kafka-manager-688f8dd46c-6x56t   1/1     Running   0          36s
zookeeper-0                      1/1     Running   2          67m
zookeeper-1                      1/1     Running   0          67m
zookeeper-2                      1/1     Running   1          67m

进入KafkaManager管理Kafka集群

输入192.168.200.81:30090或者域名kafka.k8s.com访问 Kafka Manager
进入后先配置 Kafka Manager,增加一个Zookeeper地址
kafka2.png
配置三个必填参数:

  • Cluster Name:自定义一个名称,任意输入即可
  • Zookeeper Hosts:输入 Zookeeper 地址,这里设置为 Zookeeper 服务名+端口
  • Kafka Version:选择 kafka 版本
    kafka3.png

配置完成后就可以看到新增了一条记录,点进去就可以查看相关集群信息

kafka4.png

kafka5.png

kafka6.png

附录:镜像参数配置

Zookeeper 镜像可配置变量参数
镜像 github 地址:https://github.com/bitnami/bitnami-docker-zookeeper

Kafka 镜像可配置变量参数
镜像 github 地址:https://github.com/bitnami/bitnami-docker-kafka