在现代应用开发中,服务之间的通信越来越频繁。比如一个电商系统里,用户下单后要通知库存服务减库存、订单服务更新状态、物流服务准备发货,这些操作如果全都串在一起,一出问题整个流程就卡住。这时候用云原生消息队列就能解耦服务,让系统更稳定、更灵活。
什么是云原生消息队列
云原生消息队列是为容器化、微服务架构设计的消息中间件,运行在 Kubernetes 等平台上,具备自动扩缩容、高可用、动态配置等能力。常见的有 Apache Pulsar、NATS、RocketMQ on K8s、Kafka on K8s 等。它们不是简单地把老式 MQ 搬上云,而是从架构上就为云环境优化。
为什么选云原生而不是传统消息队列
传统消息队列比如 RabbitMQ 单机部署时,扩容麻烦,故障恢复慢。而云原生版本可以结合 Helm、Operator 快速部署,配合 Prometheus 做监控,出问题自动重启或切换节点。比如你在阿里云上用 RocketMQ 5.0,控制台一点就创建好集群,不需要自己搭主从复制。
典型使用场景:订单异步处理
假设你做一个秒杀系统,瞬间涌入大量请求。直接写数据库可能被打崩。可以在前端服务把订单消息发到 Kafka Topic,后端消费服务慢慢处理,削峰填谷。这样即使数据库暂时扛不住,消息也会堆积在队列里,不会丢。
from kafka import KafkaProducer
import json
producer = KafkaProducer(
bootstrap_servers="my-cluster-kafka-bootstrap:9092",
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
order_data = {
"order_id": "123456",
"user_id": "u789",
"item": "i001",
"count": 1
}
producer.send('order_events', value=order_data)
producer.flush()
如何在 K8s 中部署 Pulsar
用 Helm 部署 Pulsar 很方便。先添加官方仓库:
helm repo add apache-pulsar https://pulsar.apache.org/charts
然后安装:
helm install pulsar apache-pulsar/pulsar \n --set initialize=true \n --namespace pulsar \n --create-namespace
几分钟后,Pulsar 的 broker、bookkeeper、zookeeper 全部跑在 Pod 里,可以通过 pulsar-client 直接连接。
消费端怎么做幂等处理
消息可能重复投递,比如网络超时导致重试。这时候消费逻辑必须幂等。例如更新订单状态,不能因为收到两条“支付成功”消息就把库存扣两次。可以在数据库加个“已处理消息ID”表,或者用 Redis 记录已消费的 msgId。
if redis.get(f"msg:{msg_id}"):
return # 已处理,直接跳过
# 处理业务逻辑
update_order_status(order_id, "paid")
# 标记已处理
redis.setex(f"msg:{msg_id}", 3600, "1")
监控和告警别忽视
队列积压是常见问题。可以用 Prometheus 抓取 Kafka 或 Pulsar 的 metrics,设置告警规则:如果某个 consumer group 落后超过 1000 条就发钉钉通知。K8s 上还可以结合 HPA,根据消息堆积量自动扩容消费者 Pod 数量。
小贴士:本地调试怎么连云上队列
开发时不想搭整套环境,可以直接连测试环境的 Kafka 或 Pulsar。只要网络通,改下 bootstrap_servers 地址就行。但注意别误发生产数据,建议加个前缀隔离,比如本地发到 test.order_events.dev,正式走 order_events。