k8s部署kafka集群

k8s部署kafka集群

Scroll Down

之前文章有用过statefulset部署kafka集群,但是如果集群外部想消费的话,是无法消费的,因为不通无法解析kafka地址,想到通过nodeport来暴露每个pod,zookeeper也需要,做起来挺麻烦,本篇是在helm安装后来写的,仅个人学习测试记录,生产环境上kafka集群建议还是放在集群外部,通过endpoint到集群

概述

在k8s里面部署kafka、zookeeper这种有状态的服务,不能使用deployment和RC,k8s提供了一种专门用来部署这种有状态的服务的API--statefulset,有状态简单来说就是需要持久化数据,比如日志、数据库数据、服务状态等
statefulset应用场景:

  • 稳定的持久化存储,即Pod重新调度后还是能访问到相同的持久化数据,基于PVC来实现
  • 稳定的网络标志,即Pod重新调度后其PodName和HostName不变,基于Headless Service(即没有Cluster IP的Service)来实现
  • 有序部署,有序扩展,即Pod是有顺序的,在部署或者扩展的时候要依据定义的顺序依次依次进行(即从0到N-1,在下一个Pod运行之前所有之前的Pod必须都是Running和Ready状态),基于init containers来实现
  • 有序收缩,有序删除(即从N-1到0)
    statefulset组成:
  • 用于定义网络标志(DNS domain)的Headless Service
  • 用于创建PersistentVolumes的volumeClaimTemplates
  • 定义具体应用的StatefulSet
    StatefulSet中每个Pod的DNS格式为statefulSetName-{0..N-1}.serviceName.namespace.svc.cluster.local,其中
  • 0..N-1为Pod所在的序号,从0开始到N-1
  • serviceName为Headless Service的名字
  • statefulSetName为StatefulSet的名字
  • namespace为服务所在的namespace,Headless Servic和StatefulSet必须在相同的namespace
  • .cluster.local为Cluster Domain

部署zookeeper

zookeeper配置文件

[root@k8s01 k8s-kafka]# cat zk-kafka-cm.yaml
apiVersion: v1
data:
  ok: |
    #!/bin/sh
    echo ruok | nc 127.0.0.1 ${1:-2181}
  ready: |
    #!/bin/sh
    echo ruok | nc 127.0.0.1 ${1:-2181}
  run: |
    #!/bin/bash

    set -a
    ROOT=$(echo /apache-zookeeper-*)

    ZK_USER=${ZK_USER:-"zookeeper"}
    ZK_LOG_LEVEL=${ZK_LOG_LEVEL:-"INFO"}
    ZK_DATA_DIR=${ZK_DATA_DIR:-"/data"}
    ZK_DATA_LOG_DIR=${ZK_DATA_LOG_DIR:-"/data/log"}
    ZK_CONF_DIR=${ZK_CONF_DIR:-"/conf"}
    ZK_CLIENT_PORT=${ZK_CLIENT_PORT:-2181}
    ZK_SERVER_PORT=${ZK_SERVER_PORT:-2888}
    ZK_ELECTION_PORT=${ZK_ELECTION_PORT:-3888}
    ZK_TICK_TIME=${ZK_TICK_TIME:-2000}
    ZK_INIT_LIMIT=${ZK_INIT_LIMIT:-10}
    ZK_SYNC_LIMIT=${ZK_SYNC_LIMIT:-5}
    ZK_HEAP_SIZE=${ZK_HEAP_SIZE:-2G}
    ZK_MAX_CLIENT_CNXNS=${ZK_MAX_CLIENT_CNXNS:-60}
    ZK_MIN_SESSION_TIMEOUT=${ZK_MIN_SESSION_TIMEOUT:- $((ZK_TICK_TIME*2))}
    ZK_MAX_SESSION_TIMEOUT=${ZK_MAX_SESSION_TIMEOUT:- $((ZK_TICK_TIME*20))}
    ZK_SNAP_RETAIN_COUNT=${ZK_SNAP_RETAIN_COUNT:-3}
    ZK_PURGE_INTERVAL=${ZK_PURGE_INTERVAL:-0}
    ID_FILE="$ZK_DATA_DIR/myid"
    ZK_CONFIG_FILE="$ZK_CONF_DIR/zoo.cfg"
    LOG4J_PROPERTIES="$ZK_CONF_DIR/log4j.properties"
    HOST=$(hostname)
    DOMAIN=`hostname -d`
    ZOOCFG=zoo.cfg
    ZOOCFGDIR=$ZK_CONF_DIR
    JVMFLAGS="-Xmx$ZK_HEAP_SIZE -Xms$ZK_HEAP_SIZE"

    APPJAR=$(echo $ROOT/*jar)
    CLASSPATH="${ROOT}/lib/*:${APPJAR}:${ZK_CONF_DIR}:"

    if [[ $HOST =~ (.*)-([0-9]+)$ ]]; then
        NAME=${BASH_REMATCH[1]}
        ORD=${BASH_REMATCH[2]}
        MY_ID=$((ORD+1))
    else
        echo "Failed to extract ordinal from hostname $HOST"
        exit 1
    fi

    mkdir -p $ZK_DATA_DIR
    mkdir -p $ZK_DATA_LOG_DIR
    echo $MY_ID >> $ID_FILE

    echo "clientPort=$ZK_CLIENT_PORT" >> $ZK_CONFIG_FILE
    echo "dataDir=$ZK_DATA_DIR" >> $ZK_CONFIG_FILE
    echo "dataLogDir=$ZK_DATA_LOG_DIR" >> $ZK_CONFIG_FILE
    echo "tickTime=$ZK_TICK_TIME" >> $ZK_CONFIG_FILE
    echo "initLimit=$ZK_INIT_LIMIT" >> $ZK_CONFIG_FILE
    echo "syncLimit=$ZK_SYNC_LIMIT" >> $ZK_CONFIG_FILE
    echo "maxClientCnxns=$ZK_MAX_CLIENT_CNXNS" >> $ZK_CONFIG_FILE
    echo "minSessionTimeout=$ZK_MIN_SESSION_TIMEOUT" >> $ZK_CONFIG_FILE
    echo "maxSessionTimeout=$ZK_MAX_SESSION_TIMEOUT" >> $ZK_CONFIG_FILE
    echo "autopurge.snapRetainCount=$ZK_SNAP_RETAIN_COUNT" >> $ZK_CONFIG_FILE
    echo "autopurge.purgeInterval=$ZK_PURGE_INTERVAL" >> $ZK_CONFIG_FILE
    echo "4lw.commands.whitelist=*" >> $ZK_CONFIG_FILE

    for (( i=1; i<=$ZK_REPLICAS; i++ ))
    do
        echo "server.$i=$NAME-$((i-1)).$DOMAIN:$ZK_SERVER_PORT:$ZK_ELECTION_PORT" >> $ZK_CONFIG_FILE
    done

    rm -f $LOG4J_PROPERTIES

    echo "zookeeper.root.logger=$ZK_LOG_LEVEL, CONSOLE" >> $LOG4J_PROPERTIES
    echo "zookeeper.console.threshold=$ZK_LOG_LEVEL" >> $LOG4J_PROPERTIES
    echo "zookeeper.log.threshold=$ZK_LOG_LEVEL" >> $LOG4J_PROPERTIES
    echo "zookeeper.log.dir=$ZK_DATA_LOG_DIR" >> $LOG4J_PROPERTIES
    echo "zookeeper.log.file=zookeeper.log" >> $LOG4J_PROPERTIES
    echo "zookeeper.log.maxfilesize=256MB" >> $LOG4J_PROPERTIES
    echo "zookeeper.log.maxbackupindex=10" >> $LOG4J_PROPERTIES
    echo "zookeeper.tracelog.dir=$ZK_DATA_LOG_DIR" >> $LOG4J_PROPERTIES
    echo "zookeeper.tracelog.file=zookeeper_trace.log" >> $LOG4J_PROPERTIES
    echo "log4j.rootLogger=\${zookeeper.root.logger}" >> $LOG4J_PROPERTIES
    echo "log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender" >> $LOG4J_PROPERTIES
    echo "log4j.appender.CONSOLE.Threshold=\${zookeeper.console.threshold}" >> $LOG4J_PROPERTIES
    echo "log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout" >> $LOG4J_PROPERTIES
    echo "log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [myid:%X{myid}] - %-5p [%t:%C{1}@%L] - %m%n" >> $LOG4J_PROPERTIES

    if [ -n "$JMXDISABLE" ]
    then
        MAIN=org.apache.zookeeper.server.quorum.QuorumPeerMain
    else
        MAIN="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=$JMXPORT -Dcom.sun.management.jmxremote.authenticate=$JMXAUTH -Dcom.sun.management.jmxremote.ssl=$JMXSSL -Dzookeeper.jmx.log4j.disable=$JMXLOG4J org.apache.zookeeper.server.quorum.QuorumPeerMain"
    fi

    set -x
    exec java -cp "$CLASSPATH" $JVMFLAGS $MAIN $ZK_CONFIG_FILE
kind: ConfigMap
metadata:
  labels:
    app: zookeeper
  name: kafka-zookeeper
  namespace: test-env

部署zookeeper

[root@k8s01 k8s-kafka]# cat zookeeper.yaml
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  labels:
    app: zookeeper
  name: zookeeper
  namespace: test-env
spec:
  podManagementPolicy: OrderedReady
  replicas: 3
  revisionHistoryLimit: 10
  selector:
    matchLabels:
      app: zookeeper
      component: server
      release: kafka
  serviceName: zookeeper-headless
  template:
    metadata:
      creationTimestamp: null
      labels:
        app: zookeeper
        component: server
        release: kafka
    spec:
      containers:
      - command:
        - /bin/bash
        - -xec
        - /config-scripts/run
        env:
        - name: ZK_REPLICAS
          value: "3"
        - name: JMXAUTH
          value: "false"
        - name: JMXDISABLE
          value: "false"
        - name: JMXPORT
          value: "1099"
        - name: JMXSSL
          value: "false"
        - name: ZK_HEAP_SIZE
          value: 1G
        - name: ZK_SYNC_LIMIT
          value: "10"
        - name: ZK_TICK_TIME
          value: "2000"
        - name: ZOO_AUTOPURGE_PURGEINTERVAL
          value: "0"
        - name: ZOO_AUTOPURGE_SNAPRETAINCOUNT
          value: "3"
        - name: ZOO_INIT_LIMIT
          value: "5"
        - name: ZOO_MAX_CLIENT_CNXNS
          value: "60"
        - name: ZOO_PORT
          value: "2181"
        - name: ZOO_STANDALONE_ENABLED
          value: "false"
        - name: ZOO_TICK_TIME
          value: "2000"
        image: zookeeper:3.5.5
        imagePullPolicy: IfNotPresent
        livenessProbe:
          exec:
            command:
            - sh
            - /config-scripts/ok
          failureThreshold: 2
          initialDelaySeconds: 20
          periodSeconds: 30
          successThreshold: 1
          timeoutSeconds: 5
        name: zookeeper
        ports:
        - containerPort: 2181
          name: client
          protocol: TCP
        - containerPort: 3888
          name: election
          protocol: TCP
        - containerPort: 2888
          name: server
          protocol: TCP
        readinessProbe:
          exec:
            command:
            - sh
            - /config-scripts/ready
          failureThreshold: 2
          initialDelaySeconds: 20
          periodSeconds: 30
          successThreshold: 1
          timeoutSeconds: 5
        resources: {}
        terminationMessagePath: /dev/termination-log
        terminationMessagePolicy: File
        volumeMounts:
        - mountPath: /data
          name: data
        - mountPath: /config-scripts
          name: config
      dnsPolicy: ClusterFirst
      restartPolicy: Always
      schedulerName: default-scheduler
      securityContext:
        fsGroup: 1000
        runAsUser: 1000
      terminationGracePeriodSeconds: 1800
      volumes:
      - configMap:
          defaultMode: 365
          name: kafka-zookeeper
        name: config
  updateStrategy:
    type: RollingUpdate
  volumeClaimTemplates:
  - metadata:
      creationTimestamp: null
      name: data
    spec:
      accessModes:
      - ReadWriteOnce
      resources:
        requests:
          storage: 5Gi
      storageClassName: managed-nfs-storage
---
apiVersion: v1
kind: Service
metadata:
  labels:
    app: zookeeper
    release: kafka
  name: zookeeper
  namespace: test-env
spec:
  ports:
  - name: client
    port: 2181
    protocol: TCP
    targetPort: client
  selector:
    app: zookeeper
    release: kafka
  type: ClusterIP
---
apiVersion: v1
kind: Service
metadata:
  labels:
    app: zookeeper
    release: kafka
  name: zookeeper-headless
  namespace: test-env
spec:
  clusterIP: None
  ports:
  - name: client
    port: 2181
    protocol: TCP
    targetPort: client
  - name: election
    port: 3888
    protocol: TCP
    targetPort: election
  - name: server
    port: 2888
    protocol: TCP
    targetPort: server
  selector:
    app: zookeeper
    release: kafka 

部署kafka

部署kafka

[root@k8s01 k8s-kafka]# cat kafka.yaml
apiVersion: apps/v1
kind: StatefulSet
metadata:
  labels:
    app.kubernetes.io/component: kafka-broker
    app.kubernetes.io/instance: kafka
    app.kubernetes.io/name: kafka    
  name: kafka
  namespace: test-env
spec:
  podManagementPolicy: OrderedReady
  replicas: 3
  revisionHistoryLimit: 10
  selector:
    matchLabels:
      app.kubernetes.io/component: kafka-broker
      app.kubernetes.io/instance: kafka
      app.kubernetes.io/name: kafka
  serviceName: kafka-headless
  template:
    metadata:
      creationTimestamp: null
      labels:
        app.kubernetes.io/component: kafka-broker
        app.kubernetes.io/instance: kafka        
        app.kubernetes.io/name: kafka
    spec:
      containers:
      - command:
        - sh
        - -exc
        - |
          unset KAFKA_PORT && \
          export KAFKA_BROKER_ID=${POD_NAME##*-} && \
          export KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://${POD_IP}:9092,EXTERNAL://192.168.50.205:$((31090 + ${KAFKA_BROKER_ID})) && \
          exec /etc/confluent/docker/run
        env:
        - name: POD_IP
          valueFrom:
            fieldRef:
              apiVersion: v1
              fieldPath: status.podIP
        - name: POD_NAME
          valueFrom:
            fieldRef:
              apiVersion: v1
              fieldPath: metadata.name
        - name: POD_NAMESPACE
          valueFrom:
            fieldRef:
              apiVersion: v1
              fieldPath: metadata.namespace
        - name: KAFKA_HEAP_OPTS
          value: -Xmx1G -Xms1G
        - name: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
          value: "3"
        - name: KAFKA_ZOOKEEPER_CONNECT
          value: zookeeper:2181
        - name: KAFKA_LOG_DIRS
          value: /opt/kafka/data/logs
        - name: KAFKA_ADVERTISED_LISTENERS
          value: EXTERNAL://192.168.50.205:$((31090 + ${KAFKA_BROKER_ID}))
        - name: KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE
          value: "false"
        - name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
          value: PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
        - name: KAFKA_JMX_PORT
          value: "5555"
        image: confluentinc/cp-kafka:5.0.1
        imagePullPolicy: IfNotPresent
        livenessProbe:
          exec:
            command:
            - sh
            - -ec
            - /usr/bin/jps | /bin/grep -q SupportedKafka
          failureThreshold: 3
          initialDelaySeconds: 30
          periodSeconds: 10
          successThreshold: 1
          timeoutSeconds: 5
        name: kafka-broker
        ports:
        - containerPort: 9092
          name: kafka
          protocol: TCP
        - containerPort: 31090
          name: external-0
          protocol: TCP
        - containerPort: 31091
          name: external-1
          protocol: TCP
        - containerPort: 31092
          name: external-2
          protocol: TCP
        readinessProbe:
          failureThreshold: 3
          initialDelaySeconds: 30
          periodSeconds: 10
          successThreshold: 1
          tcpSocket:
            port: kafka
          timeoutSeconds: 5
        volumeMounts:
        - mountPath: /opt/kafka/data
          name: datadir
      dnsPolicy: ClusterFirst
      restartPolicy: Always
      schedulerName: default-scheduler
      securityContext: {}
      terminationGracePeriodSeconds: 60
  updateStrategy:
    type: OnDelete
  volumeClaimTemplates:
  - metadata:
      creationTimestamp: null
      name: datadir
    spec:
      accessModes:
      - ReadWriteOnce
      resources:
        requests:
          storage: 5Gi
      storageClassName: managed-nfs-storage
---
---
apiVersion: v1
kind: Service
metadata:
  labels:
    app.kubernetes.io/component: kafka-broker
    app.kubernetes.io/instance: kafka
    app.kubernetes.io/name: kafka
  name: kafka
  namespace: test-env
spec:
  ports:
  - name: broker
    port: 9092
    protocol: TCP
    targetPort: kafka
  selector:
    app.kubernetes.io/component: kafka-broker
    app.kubernetes.io/instance: kafka
    app.kubernetes.io/name: kafka
  type: ClusterIP
---
apiVersion: v1
kind: Service
metadata:
  labels:
    app.kubernetes.io/component: kafka-broker
    app.kubernetes.io/instance: kafka
    app.kubernetes.io/name: kafka
  name: kafka-headless
  namespace: test-env
spec:
  clusterIP: None
  ports:
  - name: broker
    port: 9092
    protocol: TCP
    targetPort: 9092
  selector:
    app.kubernetes.io/component: kafka-broker
    app.kubernetes.io/instance: kafka
    app.kubernetes.io/name: kafka
---
apiVersion: v1
kind: Service
metadata:
  labels:
    app.kubernetes.io/component: kafka-broker
    app.kubernetes.io/instance: kafka
    app.kubernetes.io/name: kafka
    pod: kafka-0
  name: kafka-0-external
  namespace: test-env
spec:
  externalTrafficPolicy: Cluster
  ports:
  - name: external-broker
    nodePort: 31090
    port: 19092
    protocol: TCP
    targetPort: 31090
  selector:
    app.kubernetes.io/component: kafka-broker
    app.kubernetes.io/instance: kafka
    app.kubernetes.io/name: kafka
    statefulset.kubernetes.io/pod-name: kafka-0
  type: NodePort
---
apiVersion: v1
kind: Service
metadata:
  labels:
    app.kubernetes.io/component: kafka-broker
    app.kubernetes.io/instance: kafka
    app.kubernetes.io/name: kafka
    pod: kafka-1
  name: kafka-1-external
  namespace: test-env
spec:
  externalTrafficPolicy: Cluster
  ports:
  - name: external-broker
    nodePort: 31091
    port: 19092
    protocol: TCP
    targetPort: 31091
  selector:
    app.kubernetes.io/component: kafka-broker
    app.kubernetes.io/instance: kafka
    app.kubernetes.io/name: kafka
    statefulset.kubernetes.io/pod-name: kafka-1
  type: NodePort
---
apiVersion: v1
kind: Service
metadata:
  labels:
    app.kubernetes.io/component: kafka-broker
    app.kubernetes.io/instance: kafka
    app.kubernetes.io/name: kafka
    pod: kafka-2
  name: kafka-2-external
  namespace: test-env
spec:
  externalTrafficPolicy: Cluster
  ports:
  - name: external-broker
    nodePort: 31092
    port: 19092
    protocol: TCP
    targetPort: 31092
  selector:
    app.kubernetes.io/component: kafka-broker
    app.kubernetes.io/instance: kafka
    app.kubernetes.io/name: kafka
    statefulset.kubernetes.io/pod-name: kafka-2
  type: NodePort

测试

[root@k8s01 k8s-kafka]# kubectl get -n test-env po
NAME                             READY   STATUS    RESTARTS   AGE
kafka-0                          1/1     Running   0          38h
kafka-1                          1/1     Running   0          38h
kafka-2                          1/1     Running   0          38h
zookeeper-0                      1/1     Running   0          38h
zookeeper-1                      1/1     Running   0          38h
zookeeper-2                      1/1     Running   0          38h
[root@nfs kafka_2.12-2.3.0]# bin/kafka-topics.sh --list --bootstrap-server 192.168.50.205:31092
__consumer_offsets
axhome