yudao-cloud 开发指南 yudao-cloud 开发指南
  • 萌新必读
  • 后端手册
  • 微服务手册
  • 工作流手册
  • 大屏手册
  • 支付手册
  • 商城手册
  • ERP 手册
  • CRM 手册
  • AI 大模型手册
  • IoT 物联网手册
  • 公众号手册
  • 前端手册 Vue 2.x
  • 前端手册 Vue 3.x
  • 工作流手册
  • 大屏手册
  • 支付手册
  • 商城手册
  • ERP 手册
  • CRM 手册
  • AI 大模型手册
  • IoT 物联网手册
  • 公众号手册
  • 系统手册
视频教程
  • Vue3 + element-plus (opens new window)
  • Vue3 + vben(ant-design-vue) (opens new window)
  • Vue2 + element-ui (opens new window)
单体版 (opens new window)
作者博客 (opens new window)
GitHub (opens new window)
  • 萌新必读
  • 后端手册
  • 微服务手册
  • 工作流手册
  • 大屏手册
  • 支付手册
  • 商城手册
  • ERP 手册
  • CRM 手册
  • AI 大模型手册
  • IoT 物联网手册
  • 公众号手册
  • 前端手册 Vue 2.x
  • 前端手册 Vue 3.x
  • 工作流手册
  • 大屏手册
  • 支付手册
  • 商城手册
  • ERP 手册
  • CRM 手册
  • AI 大模型手册
  • IoT 物联网手册
  • 公众号手册
  • 系统手册
视频教程
  • Vue3 + element-plus (opens new window)
  • Vue3 + vben(ant-design-vue) (opens new window)
  • Vue2 + element-ui (opens new window)
单体版 (opens new window)
作者博客 (opens new window)
GitHub (opens new window)
  • 萌新必读

    • 简介
    • 交流群
    • 视频教程
    • 功能列表
    • 快速启动(后端项目)
    • 快速启动(前端项目)
    • 接口文档
    • 技术选型
    • 项目结构
    • 代码热加载
    • 一键改包
    • 迁移模块(适合新项目)
    • 删除功能(以租户为例)
    • 表结构变更(版本升级)
    • 国产信创数据库(DM 达梦、大金、OpenGauss)
    • 内网穿透
    • 面试题、简历模版、简历优化
    • 项目外包
  • 后端手册

    • 新建服务
    • 代码生成【单表】(新增功能)
    • 代码生成【主子表】
    • 代码生成(树表)
    • 功能权限
    • 数据权限
    • 用户体系
    • 三方登录
    • OAuth 2.0(SSO 单点登录)
    • SaaS 多租户【字段隔离】
    • SaaS 多租户【数据库隔离】
    • WebSocket 实时通信
    • 异常处理(错误码)
    • 参数校验、时间传参
    • 分页实现
    • VO 对象转换、数据翻译
    • 文件存储(上传下载)
    • Excel 导入导出
    • 操作日志、访问日志、异常日志
    • MyBatis 数据库
    • MyBatis 联表&分页查询
    • 多数据源(读写分离)
    • Redis 缓存
    • 本地缓存
    • 异步任务
    • 分布式锁
    • 幂等性(防重复提交)
    • 请求限流(RateLimiter)
    • HTTP 接口签名(防篡改)
    • 单元测试
    • 验证码
    • 工具类 Util
    • 数据库文档
  • 微服务手册

    • 微服务调试(必读)
    • 注册中心 Nacos
    • 配置中心 Nacos
    • 服务网关 Spring Cloud Gateway
    • 服务调用 Feign
    • 定时任务 XXL Job
    • 消息队列(内存)
    • 消息队列(Redis)
    • 消息队列(RocketMQ)
    • 消息队列(RabbitMQ)
    • 消息队列(Kafka)
    • 消息队列(Cloud)
      • 1. 集群消费
        • 1.1 使用场景
        • 1.2 实战案例
      • 2. 广播消费
        • 2.1 使用场景
        • 2.2 使用方式一:Bus
        • 2.2 使用方式二:Stream
    • 分布式事务 Seata
    • 服务保障 Sentinel
  • 工作流手册

    • 工作流演示
    • 功能开启
    • 工作流(达梦适配)
    • 审批接入(流程表单)
    • 审批接入(业务表单)
    • 流程设计器(BPMN)
    • 流程设计器(钉钉、飞书)
    • 选择审批人、发起人自选
    • 会签、或签、依次审批
    • 流程发起、取消、重新发起
    • 审批通过、不通过、驳回
    • 审批加签、减签
    • 审批转办、委派、抄送
    • 执行监听器、任务监听器
    • 流程表达式
    • 流程审批通知
  • 大屏手册

    • 报表设计器
    • 大屏设计器
  • 支付手册

    • 功能开启
    • 支付宝支付接入
    • 微信公众号支付接入
    • 微信小程序支付接入
    • 支付宝、微信退款接入
    • 钱包充值、支付、退款
    • 模拟支付、退款
  • 会员手册

    • 功能开启
    • 微信公众号登录
    • 微信小程序登录
    • 微信小程序订阅消息
    • 微信小程序码
    • 会员用户、标签、分组
    • 会员等级、积分、签到
  • 商城手册

    • 商城演示
    • 功能开启
    • 商城装修
    • 在线客服
    • 【商品】商品分类
    • 【商品】商品属性
    • 【商品】商品 SPU 与 SKU
    • 【商品】商品评价
    • 【交易】购物车
    • 【交易】交易订单
    • 【交易】售后退款
    • 【交易】快递发货
    • 【交易】门店自提
    • 【交易】分销返佣
    • 【营销】优惠劵
    • 【营销】积分商城
    • 【营销】拼团活动
    • 【营销】秒杀活动
    • 【营销】砍价活动
    • 【营销】满减送活动
    • 【营销】限时折扣
    • 【营销】内容管理
    • 【统计】会员、商品、交易统计
  • ERP手册

    • ERP 演示
    • 功能开启
    • 【产品】产品信息、分类、单位
    • 【库存】产品库存、库存明细
    • 【库存】其它入库、其它出库
    • 【库存】库存调拨、库存盘点
    • 【采购】采购订单、入库、退货
    • 【销售】销售订单、出库、退货
    • 【财务】采购付款、销售收款
  • CRM手册

    • CRM 演示
    • 功能开启
    • 【线索】线索管理
    • 【客户】客户管理、公海客户
    • 【商机】商机管理、商机状态
    • 【合同】合同管理、合同提醒
    • 【回款】回款管理、回款计划
    • 【产品】产品管理、产品分类
    • 【通用】数据权限
    • 【通用】跟进记录、待办事项
  • AI大模型手册

    • AI 大模型演示
    • 功能开启
    • AI 聊天对话
    • AI 绘画创作
    • AI 知识库
    • AI 音乐创作
    • AI 写作助手
    • AI 思维导图
    • AI 工具(function calling)
    • AI 工作流
    • Dify 工作流
    • FastGPT 工作流
    • Coze 智能体
    • 【模型接入】OpenAI
    • 【模型接入】通义千问
    • 【模型接入】DeepSeek
    • 【模型接入】字节豆包
    • 【模型接入】腾讯混元
    • 【模型接入】硅基流动
    • 【模型接入】MiniMax
    • 【模型接入】月之暗灭
    • 【模型接入】百川智能
    • 【模型接入】文心一言
    • 【模型接入】LLAMA
    • 【模型接入】智谱 GLM
    • 【模型接入】讯飞星火
    • 【模型接入】微软 OpenAI
    • 【模型接入】谷歌 Gemini
    • 【模型接入】Stable Diffusion
    • 【模型接入】Midjourney
    • 【模型接入】Suno
  • IoT物联网手册

    • 功能开启
  • 公众号手册

    • 功能开启
    • 公众号接入
    • 公众号粉丝
    • 公众号标签
    • 公众号消息
    • 自动回复
    • 公众号菜单
    • 公众号素材
    • 公众号图文
    • 公众号统计
  • 系统手册

    • 短信配置
    • 邮件配置
    • 站内信配置
    • 数据脱敏
    • 敏感词
    • 地区 & IP 库
  • 运维手册

    • 开发环境
    • Linux 部署
    • Docker 部署
    • Jenkins 部署
    • 宝塔部署
    • HTTPS 证书
    • 服务监控
  • 前端手册 Vue 3.x

    • 开发规范
    • 菜单路由
    • Icon 图标
    • 字典数据
    • 系统组件
    • 通用方法
    • 配置读取
    • CRUD 组件
    • 国际化
    • IDE 调试
    • 代码格式化
  • 前端手册 Vue 2.x

    • 开发规范
    • 菜单路由
    • Icon 图标
    • 字典数据
    • 系统组件
    • 通用方法
    • 配置读取
  • 更新日志

    • 【v2.5.0】开发中...
    • 【v2.4.2】2025-04-12
    • 【v2.4.1】2025-02-09
    • 【v2.4.0】2024-12-31
    • 【v2.3.0】2024-10-07
    • 【v2.2.0】2024-08-02
    • 【v2.1.0】2024-05-05
    • 【v2.0.1】2024-03-01
    • 【v2.0.0】2024-01-26
    • 【v1.9.0】2023-12-01
  • 开发指南
  • 微服务手册
芋道源码
2022-04-03
目录

消息队列(Cloud)

友情提示:该文档已过期!!!

考虑到 Spring Cloud Stream 和 Spring Cloud Bus 的学习成本较高,配置较为麻烦,且不够灵活,因此项目已经移除了相关的封装与使用。

😁 建议阅读如下文档,按需使用:

  • 《消息队列(内存)》
  • 《消息队列(Redis)》
  • 《消息队列(RocketMQ)》
  • 《消息队列(RabbitMQ)》
  • 《消息队列(Kafka)》

yudao-spring-boot-starter-mq (opens new window) 技术组件,基于 RocketMQ 实现分布式消息队列,支持集群消费、广播消费。

友情提示:我对消息队列不了解,怎么办?

① 项目主要使用 RocketMQ 作为消息队列,所以可以学习下文章:

  • 《芋道 Spring Cloud Alibaba 消息队列 RocketMQ 入门》 (opens new window)
  • 《芋道 Spring Cloud Alibaba 事件总线 Bus RocketMQ 入门》 (opens new window)

② 如果你想替换使用 Kafka 或者 RabbitMQ,可以参考下文章:

  • 《芋道 Spring Cloud 消息队列 Kafka 入门 》 (opens new window)
  • 《芋道 Spring Cloud 事件总线 Bus Kafka 入门》 (opens new window)
  • 《芋道 Spring Cloud 消息队列 RabbitMQ 入门 》 (opens new window)
  • 《芋道 Spring Cloud 事件总线 Bus RabbitMQ 入门》 (opens new window)

# 1. 集群消费

集群消费,是指消息发送到 RocketMQ 时,有且只会被一个消费者(应用 JVM 实例)收到,然后消费成功。如下图所示:

集群消费

# 1.1 使用场景

集群消费在项目中的使用场景,主要是提供可靠的、可堆积的异步任务的能力。例如说:

  • 短信模块,使用它异步 (opens new window)发送短信。
  • 邮件模块,使用它异步 (opens new window)发送邮件。

相比 《开发指南 —— 异步任务》 来说,Spring Async 在 JVM 实例重启时,会导致未执行完的任务丢失。而集群消费,因为消息是存储在 RocketMQ 中,所以不会存在该问题。

# 1.2 实战案例

以短信模块异步发送短息为例子,讲解集群消费的使用。

实战案例

# 1.3.1 引入依赖

在 yudao-module-system-biz 模块的 pom.xml (opens new window) 中,引入 yudao-spring-boot-starter-mq 技术组件。如下所示:

<!-- 消息队列相关 -->
<dependency>
    <groupId>cn.iocoder.cloud</groupId>
    <artifactId>yudao-spring-boot-starter-mq</artifactId>
</dependency>

# 1.3.2 添加配置

① 在 application.yaml (opens new window) 中,添加 spring.cloud.stream 配置。如下所示:

--- #################### MQ 消息队列相关配置 ####################

spring:
  cloud:
    # Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
    stream:
      function:
        definition: smsSendConsumer;
      # Binding 配置项,对应 BindingProperties Map
      bindings:
        smsSend-out-0:
          destination: system_sms_send
        smsSendConsumer-in-0:
          destination: system_sms_send
          group: system_sms_send_consumer_group
      # Spring Cloud Stream RocketMQ 配置项
      rocketmq:
        default: # 默认 bindings 全局配置
          producer: # RocketMQ Producer 配置项,对应 RocketMQProducerProperties 类
            group: system_producer_group # 生产者分组
            send-type: SYNC # 发送模式,SYNC 同步
  • 注意,带有 sms 关键字的,都是和短信发送相关的配置。

② 在 application-local.yaml (opens new window) 中,添加 spring.cloud.stream 配置。如下所示:

--- #################### MQ 消息队列相关配置 ####################
spring:
  cloud:
    stream:
      rocketmq:
        # RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类
        binder:
          name-server: 127.0.0.1:9876 # RocketMQ Namesrv 地址

# 1.3.3 SmsSendMessage

在 yudao-module-system-biz 的 mq/message/sms (opens new window) 包下,创建 SmsSendMessage (opens new window) 类,短信发送消息。代码如下:

@Data
public class SmsSendMessage {

    /**
     * 短信日志编号
     */
    @NotNull(message = "短信日志编号不能为空")
    private Long logId;
    /**
     * 手机号
     */
    @NotNull(message = "手机号不能为空")
    private String mobile;
    /**
     * 短信渠道编号
     */
    @NotNull(message = "短信渠道编号不能为空")
    private Long channelId;
    /**
     * 短信 API 的模板编号
     */
    @NotNull(message = "短信 API 的模板编号不能为空")
    private String apiTemplateId;
    /**
     * 短信模板参数
     */
    private List<KeyValue<String, Object>> templateParams;

}

# 1.3.4 SmsProducer

① 在 yudao-module-system-biz 的 mq/producer/sms (opens new window) 包下,创建 SmsProducer (opens new window) 类,SmsSendMessage 的 Producer 生产者,核心是使用 StreamBridge 发送 SmsSendMessage 消息。代码如下图:

@Component
public class SmsProducer {

    @Resource
    private StreamBridge streamBridge;

    /**
     * 发送 {@link SmsSendMessage} 消息
     *
     * @param logId 短信日志编号
     * @param mobile 手机号
     * @param channelId 渠道编号
     * @param apiTemplateId 短信模板编号
     * @param templateParams 短信模板参数
     */
    public void sendSmsSendMessage(Long logId, String mobile,
                                   Long channelId, String apiTemplateId, List<KeyValue<String, Object>> templateParams) {
        SmsSendMessage message = new SmsSendMessage().setLogId(logId).setMobile(mobile);
        message.setChannelId(channelId).setApiTemplateId(apiTemplateId).setTemplateParams(templateParams);
        streamBridge.send("smsSend-out-0", message);
    }

}
  • 注意,这里的 smsSend-out-0 和上述的配置文件是对应的噢。

② 发送短信时,需要使用 SmsProducer 发送消息。如下图所示:

调用 SmsProducer 示例

# 1.3.4 SmsSendConsumer

在 yudao-module-system-biz 的 mq/consumer/sms (opens new window) 包下,创建 SmsSendConsumer (opens new window) 类,SmsSendMessage 的 Consumer 消费者。代码如下图:

@Component
@Slf4j
public class SmsSendConsumer implements Consumer<SmsSendMessage> {

    @Resource
    private SmsSendService smsSendService;

    @Override
    public void accept(SmsSendMessage message) {
        log.info("[accept][消息内容({})]", message);
        smsSendService.doSendSms(message);
    }
}

# 2. 广播消费

广播消费,是指消息发送到 RocketMQ 时,所有消费者(应用 JVM 实例)收到,然后消费成功。如下图所示:

集群消费

# 2.1 使用场景

例如说,在应用中,缓存了数据字典等配置表在内存中,可以通过 RocketMQ 广播消费,实现每个应用节点都消费消息,刷新本地内存的缓存。

又例如说,我们基于 WebSocket 实现了 IM 聊天,在我们给用户主动发送消息时,因为我们不知道用户连接的是哪个提供 WebSocket 的应用,所以可以通过 RocketMQ 广播消费。每个应用判断当前用户是否是和自己提供的 WebSocket 服务连接,如果是,则推送消息给用户。

# 2.2 使用方式一:Bus

基于 RocketMQ 的广播消费,可以使用 Spring Cloud Bus 实现。

Spring Cloud Bus 是什么?

Spring Cloud Bus 是 Spring Cloud 的一个子项目,它的作用是将分布式系统的节点与轻量级消息系统链接起来,用于广播状态变化,事件推送等。

它的实现原理是,通过 Spring Cloud Stream 将消息发送到消息代理(如 RabbitMQ、Kafka、RocketMQ),然后通过 Spring Cloud Bus 的事件监听,监听到消息后,进行处理。

以角色的本地缓存刷新为例子,讲解下 Spring Cloud Bus 如何使用 RocketMQ 广播消费。

# 2.2.1 引入依赖

在 yudao-module-system-biz 模块的 pom.xml (opens new window) 中,引入 yudao-spring-boot-starter-mq 技术组件。如下所示:

<!-- 消息队列相关 -->
<dependency>
    <groupId>cn.iocoder.cloud</groupId>
    <artifactId>yudao-spring-boot-starter-mq</artifactId>
</dependency>

# 2.2.2 添加配置

在 application.yaml (opens new window) 中,添加 spring.cloud.bus 配置。如下所示:

spring:
  cloud:
    # Spring Cloud Bus 配置项,对应 BusProperties 类
    bus:
      enabled: true # 是否开启,默认为 true
      id: ${spring.application.name}:${server.port} # 编号,Spring Cloud Alibaba 建议使用“应用:端口”的格式
      destination: springCloudBus # 目标消息队列,默认为 springCloudBus

# 2.2.3 编写代码

参见 《开发指南 —— 本地缓存》 文章的「3. 实时刷新缓存」小节。

# 2.2 使用方式二:Stream

基于 RocketMQ 的广播消费,也可以使用 Spring Cloud Stream 实现。

Spring Cloud Stream 是什么?

Spring Cloud Stream 是 Spring Cloud 的一个子项目,它的作用是为微服务应用构建消息驱动能力。

使用方式,和「1.2 实战案例」小节是一样的,只是需要在 application.yaml 配置文件中,添加 spring.cloud.stream.rocketmq.bindings.<channelName>.consumer.broadcasting (opens new window) 配置项为 true。

由于项目中暂时使用该方式,文档后续补充。

消息队列(Kafka)
分布式事务 Seata

← 消息队列(Kafka) 分布式事务 Seata→

Theme by Vdoing | Copyright © 2019-2025 芋道源码 | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式
×