K8s中间件上云部署 kafka
如果多个外卖小哥同属一个团队(同一个消费者组),他们会分摊订单(比如小哥1送1-5单,小哥2送6-10单)。- 如果是不同团队(不同消费者组),每个团队都会收到全部订单(比如美团和饿了么各自独立配送同样的订单)。- 订单默认保存7天(可配置),超过时间自动清理,但重要订单可以永久保存(比如VIP客户的订单)。: 这是同步副本列表(In-Sync Replicas,简称 ISR),包含当前与领导者同
K8s中间件上云部署 kafka
Kafka简介:
Kafka 是一个分布式的消息队列(MQ,Message Queue)系统,主要应用于大数据实时处理领域。
- 为什么需要消息队列(MQ):
(1)由于在高并发环境下,同步请求来不及处理,请求往往会发生阻塞。比如大量的请求并发访问数据库,导致行锁表锁,最后请求线程会堆积过多,从而触发 too many connection 错误,引发雪崩效应。
(2)我们使用消息队列,通过异步处理请求,从而缓解系统的压力。消息队列常应用于异步处理,流量削峰,应用解耦,消息通讯等场景。
- 当前比较常见的 MQ 中间件有 ActiveMQ、RabbitMQ、RocketMQ、Kafka 等。
- 数据处理过程:
Apache Kafka是一种流行的分布式流式消息平台。
Kafka生产者将数据写入分区主题,这些主题通过可配置的副本存储到群集broker(节点)上。
消费者来消费存储在broker分区上的数据。
Partion:分区,即分片
Topic消息主题--partion分成多个分片--replication每分片创建多个副本--分片副本存储到不同的broker节点
举例理解kafka:
场景:披萨店的点餐与配送系统
假设你开了一家火爆的披萨店,顾客下单量巨大,如何高效处理订单并确保每个订单都能准确送达?这时你可以用 Kafka 来优化流程。
- 核心角色
- 顾客(Producer生产者):下单的人,负责产生消息(比如“我要一份海鲜披萨”)。
- 订单柜台(Topic消息主题):所有订单按类型分类存放。比如:
- - 海鲜披萨订单队列(Partition 1)
- - 榴莲披萨订单队列(Partition 2)
Partition分区是为了并行处理,提高效率
- 外卖小哥(Consumer消费者/处理订单):从订单柜台取订单,按顺序配送。
- 记事本(Offset偏移量):记录每个外卖小哥当前配送到了哪个订单(防止重复或漏单)。
- 工作流程
- 下单(Produce生产)
- 顾客A说:“海鲜披萨1份!” → 订单被放到 海鲜披萨队列(Partition 1) 的末尾。
- 顾客B说:“榴莲披萨2份!” → 订单进入 榴莲披萨队列(Partition 2)。
- 处理订单(Consume消费)
- 外卖小哥1专门处理 海鲜披萨队列,从记事本看到上次送到第5单,现在取第6单。
- 外卖小哥2处理 榴莲披萨队列,独立工作,互不干扰。
- 容灾备份(Replication)
- 订单柜台的每类订单都有 备份副本(比如副本放在后厨),即使柜台被砸了,数据也不丢。
- 历史订单(Retention)
- 订单默认保存7天(可配置),超过时间自动清理,但重要订单可以永久保存(比如VIP客户的订单)。
- 关键特性
- 高吞吐量:多个队列(分区)并行处理,就像多个外卖小哥同时送餐。
- 持久化:订单即使被处理完也会保留一段时间(避免“我明明下单了,你说没记录?”)。
- 消费者组(Consumer Group):
- 如果多个外卖小哥同属一个团队(同一个消费者组),他们会分摊订单(比如小哥1送1-5单,小哥2送6-10单)。
- 如果是不同团队(不同消费者组),每个团队都会收到全部订单(比如美团和饿了么各自独立配送同样的订单)。
- 故障处理
- 外卖小哥崩溃了?
Kafka会检测到,并让同组的其他小哥接手他的任务(自动重新平衡)。
- 订单太多了?
可以随时增加外卖小哥(扩容消费者),或者增加订单队列(分区)。
总结:
Kafka 就像一套高效的订单管理系统:
- 生产者丢消息 → Topic/分区分类存储 → 消费者按需处理,且支持多团队协作。
- 核心优势:快、稳、可扩展,适合大数据场景(如日志处理、实时推荐等)。
- 环境说明
- storageclass
- Ingress(本节未配置)
- kafka部署及部署验证
# vim kafka.yaml
apiVersion: v1
kind: Service
metadata:
name: kafka-svc
labels:
app: kafka-app
spec:
clusterIP: None
ports:
- name: '9092'
port: 9092
protocol: TCP
targetPort: 9092
selector:
app: kafka-app
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: kafka
labels:
app: kafka-app
spec:
serviceName: kafka-svc
replicas: 3
selector:
matchLabels:
app: kafka-app
template:
metadata:
labels:
app: kafka-app
spec:
containers:
- name: kafka-container
image: doughgle/kafka-kraft
imagePullPolicy: IfNotPresent
ports:
- containerPort: 9092
- containerPort: 9093
env:
- name: REPLICAS
value: '3'
- name: SERVICE
value: kafka-svc
- name: NAMESPACE
value: default
- name: SHARE_DIR
value: /mnt/kafka
- name: CLUSTER_ID
value: oh-sxaDRTcyAr6pFRbXyzA
- name: DEFAULT_REPLICATION_FACTOR
value: '3'
- name: DEFAULT_MIN_INSYNC_REPLICAS
value: '2'
volumeMounts:
- name: data
mountPath: /mnt/kafka
- name: localtime
mountPath: /etc/localtime
volumes:
- name: localtime
hostPath:
path: /etc/localtime
type: ''
volumeClaimTemplates:
- metadata:
name: data
spec:
accessModes:
- "ReadWriteOnce"
storageClassName: nfs-client
resources:
requests:
storage: "1Gi"
# kubectl apply -f kafka.yaml
# kubectl get pods
NAME READY STATUS RESTARTS AGE
kafka-0 1/1 Running 1 (2m4s ago) 4m22s
kafka-1 1/1 Running 0 3m22s
kafka-2 1/1 Running 0 2m9s
# kubectl get svc
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
kafka-svc ClusterIP None <none> 9092/TCP 4m42s
注:
出于安全考虑,默认配置下Kubernetes不会将Pod调度到Master节点。
如果希望将k8s-master也当作Node使用,可以执行如下命令:
查看master01调度控制(Taints污点设置,NoSchedule不被调度)
# kubectl describe node master01 | grep Taints
Taints: node-role.kubernetes.io/control-plane:NoSchedule
取消污点设置:
# kubectl taint node master01 node-role.kubernetes.io/control-plane-
# kubectl get pod -o wide
NAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATES
kafka-0 1/1 Running 0 74s 10.244.30.79 worker02 <none> <none>
kafka-1 1/1 Running 0 71s 10.244.5.11 worker01 <none> <none>
kafka-2 1/1 Running 0 69s 10.244.241.103 master01 <none> <none>
如果想取消被调度,执行如下命令即可,不会影响已经被调度的pod
# kubectl taint node master01 node-role.kubernetes.io/control-plane:NoSchedule
------------------------------------------------
- kafka应用测试
创建客户端pod
# kubectl run kafka-client --rm -it --image bitnami/kafka:3.1.0 -- bash
If you don't see a command prompt, try pressing enter.
I have no name!@kafka-client:/$
进入客户端pod
$ ls /opt/bitnami/kafka/bin/
$ cd /opt/bitnami/kafka/bin
查看默认存在的topic(Topic,即消息主题。)
$ kafka-topics.sh --list --bootstrap-server kafka-svc.default.svc.cluster.local:9092
空
创建topic
$ kafka-topics.sh --bootstrap-server kafka-svc.default.svc.cluster.local:9092 --topic test01 --create --partitions 3 --replication-factor 2
$ kafka-topics.sh --list --bootstrap-server kafka-svc.default.svc.cluster.local:9092
test01
创建数据生产者,添加数据
$ kafka-console-producer.sh --topic test01 --request-required-acks all --bootstrap-server kafka-svc.default.svc.cluster.local:9092
>hello world
Ctrl+c结束
在当前终端或另一个终端中创建数据消费者,消费数据
$ kafka-console-consumer.sh --topic test01 --from-beginning --bootstrap-server kafka-svc.default.svc.cluster.local:9092
hello world
查看默认test01 topic相关描述信息
$ kafka-topics.sh --describe --topic test01 --bootstrap-server kafka-svc.default.svc.cluster.local:9092
Topic: test01 Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: test01 Partition: 1 Leader: 2 Replicas: 2,0 Isr: 2,0
Topic: test01 Partition: 2 Leader: 0 Replicas: 0,1 Isr: 0,1
显示内容解释如下:
Topic: test01: 这是正在描述的主题名称。
Partition: X: 这里的 X 是分区编号,例如 0、1 或 2。每个分区都是主题的一个子集,可以独立地存储和处理消息。
Leader: 每个分区都有一个领导者(Leader),负责协调来自客户端的读写请求。其他副本(Replicas)会从领导者那里同步数据。
Replicas: 这是分区数据的副本列表,它们分布在不同的 Kafka 代理(Broker)上,以提供数据冗余和高可用性。
Isr: 这是同步副本列表(In-Sync Replicas,简称 ISR),包含当前与领导者同步的副本。
分区 0 的领导者是 Broker 1,副本在 Broker 1 和 2 上,且这两个副本都在 ISR 列表中,这意味着它们都是活跃的并且与领导者同步。
分区 1 的领导者是 Broker 2,副本在 Broker 2 和 0 上,同样这两个副本也都在 ISR 列表中。
分区 2 的领导者是 Broker 0,副本在 Broker 0 和 1 上,这两个副本也在 ISR 列表中。
这个输出显示了 Kafka 集群的健康状态,所有分区的副本都处于活跃状态,并且与领导者同步。
这表明 Kafka 集群正在正常运行,并且主题 test01 的数据具有高可用性。
更多推荐


所有评论(0)