K8s中间件上云部署 kafka

Kafka简介:

Kafka 是一个分布式的消息队列(MQ,Message Queue)系统,主要应用于大数据实时处理领域。

  1. 为什么需要消息队列(MQ):

(1)由于在高并发环境下,同步请求来不及处理,请求往往会发生阻塞。比如大量的请求并发访问数据库,导致行锁表锁,最后请求线程会堆积过多,从而触发 too many connection 错误,引发雪崩效应。

(2)我们使用消息队列,通过异步处理请求,从而缓解系统的压力。消息队列常应用于异步处理,流量削峰,应用解耦,消息通讯等场景。

  1. 当前比较常见的 MQ 中间件有 ActiveMQ、RabbitMQ、RocketMQ、Kafka 等。

  1. 数据处理过程:

Apache Kafka是一种流行的分布式流式消息平台。

Kafka生产者将数据写入分区主题,这些主题通过可配置的副本存储到群集broker(节点)上。

消费者来消费存储在broker分区上的数据。

Partion:分区,即分片

Topic消息主题--partion分成多个分片--replication每分片创建多个副本--分片副本存储到不同的broker节点

举例理解kafka:

场景:披萨店的点餐与配送系统

假设你开了一家火爆的披萨店,顾客下单量巨大,如何高效处理订单并确保每个订单都能准确送达?这时你可以用 Kafka 来优化流程。

  1. 核心角色
  1. 顾客(Producer生产者):下单的人,负责产生消息(比如“我要一份海鲜披萨”)。  
  2. 订单柜台(Topic消息主题):所有订单按类型分类存放。比如:  
  • - 海鲜披萨订单队列(Partition 1)  
  • - 榴莲披萨订单队列(Partition 2)  

           Partition分区是为了并行处理,提高效率

  1. 外卖小哥(Consumer消费者/处理订单):从订单柜台取订单,按顺序配送。  
  2. 记事本(Offset偏移量):记录每个外卖小哥当前配送到了哪个订单(防止重复或漏单)。  

  1. 工作流程

  1. 下单(Produce生产)  

   - 顾客A说:“海鲜披萨1份!” → 订单被放到 海鲜披萨队列(Partition 1) 的末尾。  

   - 顾客B说:“榴莲披萨2份!” → 订单进入 榴莲披萨队列(Partition 2)。  

  1. 处理订单(Consume消费)  

   - 外卖小哥1专门处理 海鲜披萨队列,从记事本看到上次送到第5单,现在取第6单。  

   - 外卖小哥2处理 榴莲披萨队列,独立工作,互不干扰。  

  1. 容灾备份(Replication)  

   - 订单柜台的每类订单都有 备份副本(比如副本放在后厨),即使柜台被砸了,数据也不丢。  

  1. 历史订单(Retention)  

   - 订单默认保存7天(可配置),超过时间自动清理,但重要订单可以永久保存(比如VIP客户的订单)。  

  1. 关键特性

- 高吞吐量:多个队列(分区)并行处理,就像多个外卖小哥同时送餐。  

- 持久化:订单即使被处理完也会保留一段时间(避免“我明明下单了,你说没记录?”)。  

- 消费者组(Consumer Group):  

  - 如果多个外卖小哥同属一个团队(同一个消费者组),他们会分摊订单(比如小哥1送1-5单,小哥2送6-10单)。  

  - 如果是不同团队(不同消费者组),每个团队都会收到全部订单(比如美团和饿了么各自独立配送同样的订单)。  

  1. 故障处理

- 外卖小哥崩溃了?  

  Kafka会检测到,并让同组的其他小哥接手他的任务(自动重新平衡)。  

- 订单太多了?  

  可以随时增加外卖小哥(扩容消费者),或者增加订单队列(分区)。  

总结:

Kafka 就像一套高效的订单管理系统:  

- 生产者丢消息 → Topic/分区分类存储 → 消费者按需处理,且支持多团队协作。  

- 核心优势:快、稳、可扩展,适合大数据场景(如日志处理、实时推荐等)。  

  • 环境说明
  • storageclass
  • Ingress(本节未配置)
  • kafka部署及部署验证

# vim kafka.yaml

apiVersion: v1

kind: Service

metadata:

  name: kafka-svc

  labels:

    app: kafka-app

spec:

  clusterIP: None

  ports:

    - name: '9092'

      port: 9092

      protocol: TCP

      targetPort: 9092

  selector:

    app: kafka-app

---

apiVersion: apps/v1

kind: StatefulSet

metadata:

  name: kafka

  labels:

    app: kafka-app

spec:

  serviceName: kafka-svc

  replicas: 3

  selector:

    matchLabels:

      app: kafka-app

  template:

    metadata:

      labels:

        app: kafka-app

    spec:

      containers:

        - name: kafka-container

          image: doughgle/kafka-kraft

          imagePullPolicy: IfNotPresent

          ports:

            - containerPort: 9092

            - containerPort: 9093

          env:

            - name: REPLICAS

              value: '3'

            - name: SERVICE

              value: kafka-svc

            - name: NAMESPACE

              value: default

            - name: SHARE_DIR

              value: /mnt/kafka

            - name: CLUSTER_ID

              value: oh-sxaDRTcyAr6pFRbXyzA

            - name: DEFAULT_REPLICATION_FACTOR

              value: '3'

            - name: DEFAULT_MIN_INSYNC_REPLICAS
              value: '2'
          volumeMounts:
            - name: data
              mountPath: /mnt/kafka
            - name: localtime
              mountPath: /etc/localtime
      volumes:
      - name: localtime
        hostPath:
          path: /etc/localtime
          type: ''

  volumeClaimTemplates:
    - metadata:
        name: data
      spec:
        accessModes:
          - "ReadWriteOnce"
        storageClassName: nfs-client
        resources:
          requests:
            storage: "1Gi"

# kubectl apply -f kafka.yaml

# kubectl get pods
NAME                                     READY   STATUS    RESTARTS       AGE
kafka-0                                  1/1     Running   1 (2m4s ago)   4m22s
kafka-1                                  1/1     Running   0              3m22s
kafka-2                                  1/1     Running   0              2m9s

# kubectl get svc
NAME         TYPE        CLUSTER-IP   EXTERNAL-IP   PORT(S)    AGE
kafka-svc    ClusterIP   None         <none>        9092/TCP   4m42s

出于安全考虑,默认配置下Kubernetes不会将Pod调度到Master节点。

如果希望将k8s-master也当作Node使用,可以执行如下命令:

查看master01调度控制(Taints污点设置,NoSchedule不被调度)

# kubectl describe node master01 | grep Taints

Taints:             node-role.kubernetes.io/control-plane:NoSchedule

取消污点设置:

# kubectl taint node master01 node-role.kubernetes.io/control-plane-

# kubectl get pod -o wide

NAME              READY   STATUS    RESTARTS      AGE   IP               NODE       NOMINATED NODE   READINESS GATES

kafka-0           1/1     Running   0             74s   10.244.30.79     worker02   <none>           <none>

kafka-1           1/1     Running   0             71s   10.244.5.11      worker01   <none>           <none>

kafka-2           1/1     Running   0             69s   10.244.241.103   master01   <none>           <none>

如果想取消被调度,执行如下命令即可,不会影响已经被调度的pod

# kubectl taint node master01 node-role.kubernetes.io/control-plane:NoSchedule

------------------------------------------------

  • kafka应用测试

创建客户端pod
# kubectl run kafka-client --rm -it --image bitnami/kafka:3.1.0 -- bash

If you don't see a command prompt, try pressing enter.

I have no name!@kafka-client:/$

进入客户端pod


$ ls /opt/bitnami/kafka/bin/

$ cd /opt/bitnami/kafka/bin

查看默认存在的topic(Topic,即消息主题。
$ kafka-topics.sh --list --bootstrap-server kafka-svc.default.svc.cluster.local:9092

创建topic
$ kafka-topics.sh --bootstrap-server kafka-svc.default.svc.cluster.local:9092 --topic test01 --create --partitions 3 --replication-factor 2

$ kafka-topics.sh --list --bootstrap-server kafka-svc.default.svc.cluster.local:9092
 test01

创建数据生产者,添加数据
$ kafka-console-producer.sh --topic test01 --request-required-acks all --bootstrap-server kafka-svc.default.svc.cluster.local:9092
>hello world

Ctrl+c结束

在当前终端或另一个终端中创建数据消费者,消费数据
$ kafka-console-consumer.sh --topic test01 --from-beginning --bootstrap-server kafka-svc.default.svc.cluster.local:9092

hello world

查看默认test01 topic相关描述信息
$ kafka-topics.sh --describe --topic test01 --bootstrap-server kafka-svc.default.svc.cluster.local:9092

 Topic: test01   Partition: 0    Leader: 1       Replicas: 1,2   Isr: 1,2

 Topic: test01   Partition: 1    Leader: 2       Replicas: 2,0   Isr: 2,0

 Topic: test01   Partition: 2    Leader: 0       Replicas: 0,1   Isr: 0,1

显示内容解释如下

Topic: test01: 这是正在描述的主题名称。

Partition: X: 这里的 X 是分区编号,例如 01 或 2。每个分区都是主题的一个子集,可以独立地存储和处理消息。

Leader: 每个分区都有一个领导者(Leader),负责协调来自客户端的读写请求。其他副本(Replicas)会从领导者那里同步数据。

Replicas: 这是分区数据的副本列表,它们分布在不同的 Kafka 代理(Broker)上,以提供数据冗余和高可用性。

Isr: 这是同步副本列表(In-Sync Replicas,简称 ISR),包含当前与领导者同步的副本。

分区 0 的领导者是 Broker 1,副本在 Broker 1 和 2 上,且这两个副本都在 ISR 列表中,这意味着它们都是活跃的并且与领导者同步。

分区 1 的领导者是 Broker 2,副本在 Broker 2 和 0 上,同样这两个副本也都在 ISR 列表中。

分区 2 的领导者是 Broker 0,副本在 Broker 0 和 1 上,这两个副本也在 ISR 列表中。

这个输出显示了 Kafka 集群的健康状态,所有分区的副本都处于活跃状态,并且与领导者同步。

这表明 Kafka 集群正在正常运行,并且主题 test01 的数据具有高可用性。

Logo

开源鸿蒙跨平台开发社区汇聚开发者与厂商,共建“一次开发,多端部署”的开源生态,致力于降低跨端开发门槛,推动万物智联创新。

更多推荐