yudao-cloud 开发指南 yudao-cloud 开发指南
  • 萌新必读
  • 后端手册
  • 微服务手册
  • 工作流手册
  • 大屏手册
  • 支付手册
  • 商城手册
  • ERP 手册
  • CRM 手册
  • AI 大模型手册
  • IoT 物联网手册
  • 公众号手册
  • 前端手册 Vue 3.x
  • 前端手册 Vben 5.x
  • 前端手册 Vue 2.x
  • 前端手册 Admin Uniapp
  • 工作流手册
  • 大屏手册
  • 支付手册
  • 商城手册
  • ERP 手册
  • CRM 手册
  • AI 大模型手册
  • IoT 物联网手册
  • 公众号手册
  • 系统手册
视频教程
  • Vue3 + element-plus (opens new window)
  • Vue3 + vben5(ant-design-vue、element-plus) (opens new window)
  • Vue2 + element-ui (opens new window)
单体版 (opens new window)
作者博客 (opens new window)
GitHub (opens new window)
  • 萌新必读
  • 后端手册
  • 微服务手册
  • 工作流手册
  • 大屏手册
  • 支付手册
  • 商城手册
  • ERP 手册
  • CRM 手册
  • AI 大模型手册
  • IoT 物联网手册
  • 公众号手册
  • 前端手册 Vue 3.x
  • 前端手册 Vben 5.x
  • 前端手册 Vue 2.x
  • 前端手册 Admin Uniapp
  • 工作流手册
  • 大屏手册
  • 支付手册
  • 商城手册
  • ERP 手册
  • CRM 手册
  • AI 大模型手册
  • IoT 物联网手册
  • 公众号手册
  • 系统手册
视频教程
  • Vue3 + element-plus (opens new window)
  • Vue3 + vben5(ant-design-vue、element-plus) (opens new window)
  • Vue2 + element-ui (opens new window)
单体版 (opens new window)
作者博客 (opens new window)
GitHub (opens new window)
  • 萌新必读

    • 简介
    • 交流群
    • 视频教程
    • 功能列表
    • 快速启动(后端项目)
    • 快速启动(前端项目)
    • 接口文档
    • 技术选型
    • 项目结构
    • 代码热加载
    • 一键改包
    • 迁移模块(适合新项目)
    • 删除功能(以租户为例)
    • 表结构变更(版本升级)
    • 国产信创数据库(DM 达梦、大金、OpenGauss)
    • 内网穿透
    • 面试题、简历模版、简历优化
    • 项目外包
  • 后端手册

    • 新建服务
    • 代码生成【单表】(新增功能)
    • 代码生成【主子表】
    • 代码生成(树表)
    • 代码生成(移动端)
    • 功能权限
    • 数据权限
    • 用户体系
    • 三方登录
    • OAuth 2.0(SSO 单点登录)
    • SaaS 多租户【字段隔离】
    • SaaS 多租户【数据库隔离】
    • WebSocket 实时通信
    • 异常处理(错误码)
    • 参数校验、时间传参
    • 分页实现
    • VO 对象转换、数据翻译
    • 文件存储(上传下载)
    • Excel 导入导出
    • 操作日志、访问日志、异常日志
    • MyBatis 数据库
    • MyBatis 联表&分页查询
    • 多数据源(读写分离)、事务
    • Redis 缓存
    • 本地缓存
    • 异步任务
    • 分布式锁
    • 幂等性(防重复提交)
    • 请求限流(RateLimiter)
    • HTTP 接口签名(防篡改)
    • HTTP 接口加解密
    • 单元测试
    • 验证码
    • 工具类 Util
    • 数据库文档
  • 微服务手册

    • 微服务调试(必读)
    • 注册中心 Nacos
    • 配置中心 Nacos
    • 服务网关 Spring Cloud Gateway
    • 服务调用 Feign
    • 定时任务 XXL Job
    • 消息队列(内存)
    • 消息队列(Redis)
    • 消息队列(RocketMQ)
    • 消息队列(RabbitMQ)
    • 消息队列(Kafka)
    • 消息队列(Cloud)
    • 分布式事务 Seata
    • 服务保障 Sentinel
  • 工作流手册

    • 工作流演示
    • 功能开启
    • 工作流(达梦适配)
    • 审批接入(流程表单)
    • 审批接入(业务表单)
    • 流程设计器(BPMN)
    • 流程设计器(钉钉、飞书)
    • 选择审批人、发起人自选
    • 会签、或签、依次审批
    • 流程发起、取消、重新发起
    • 审批通过、不通过、驳回
    • 审批加签、减签
    • 审批转办、委派、抄送
    • 执行监听器、任务监听器
    • 流程表达式
    • 流程审批通知
  • 大屏手册

    • 报表设计器
    • 大屏设计器
  • 支付手册

    • 功能开启
    • 支付宝支付接入
    • 微信公众号支付接入
    • 微信小程序支付接入
    • 支付宝、微信退款接入
    • 支付宝转账接入
    • 微信转账接入
    • 钱包充值、支付、退款
    • 模拟支付、退款
  • 会员手册

    • 功能开启
    • 微信公众号登录
    • 微信小程序登录
    • 微信小程序订阅消息
    • 微信小程序码
    • 会员用户、标签、分组
    • 会员等级、积分、签到
  • 商城手册

    • 商城演示
    • 功能开启
    • 商城装修
    • 在线客服
    • 【商品】商品分类
    • 【商品】商品属性
    • 【商品】商品 SPU 与 SKU
    • 【商品】商品评价
    • 【交易】购物车
    • 【交易】交易订单
    • 【交易】售后退款
    • 【交易】快递发货
    • 【交易】门店自提
    • 【交易】分销返佣
    • 【营销】优惠劵
    • 【营销】积分商城
    • 【营销】拼团活动
    • 【营销】秒杀活动
    • 【营销】砍价活动
    • 【营销】满减送活动
    • 【营销】限时折扣
    • 【营销】内容管理
    • 【统计】会员、商品、交易统计
  • ERP手册

    • ERP 演示
    • 功能开启
    • 【产品】产品信息、分类、单位
    • 【库存】产品库存、库存明细
    • 【库存】其它入库、其它出库
    • 【库存】库存调拨、库存盘点
    • 【采购】采购订单、入库、退货
    • 【销售】销售订单、出库、退货
    • 【财务】采购付款、销售收款
  • CRM手册

    • CRM 演示
    • 功能开启
    • 【线索】线索管理
    • 【客户】客户管理、公海客户
    • 【商机】商机管理、商机状态
    • 【合同】合同管理、合同提醒
    • 【回款】回款管理、回款计划
    • 【产品】产品管理、产品分类
    • 【通用】数据权限
    • 【通用】跟进记录、待办事项
  • AI大模型手册

    • AI 大模型演示
    • 功能开启
    • AI 聊天对话
    • AI 绘画创作
    • AI 知识库(RAG)
    • AI 音乐创作
    • AI 写作助手
    • AI 思维导图
    • AI 工具(function calling)
    • AI 工作流
    • Dify 工作流
    • FastGPT 工作流
    • Coze 智能体
    • 推理模式(thinking)
    • 联网搜索
    • MCP Client 客户端
    • MCP Server 服务端
    • 【模型接入】Claude
    • 【模型接入】OpenAI
    • 【模型接入】通义千问
    • 【模型接入】DeepSeek
    • 【模型接入】字节豆包
    • 【模型接入】腾讯混元
    • 【模型接入】硅基流动
    • 【模型接入】MiniMax
    • 【模型接入】月之暗面
    • 【模型接入】百川智能
    • 【模型接入】文心一言
    • 【模型接入】LLAMA
    • 【模型接入】智谱 GLM
    • 【模型接入】讯飞星火
    • 【模型接入】微软 OpenAI
    • 【模型接入】谷歌 Gemini
    • 【模型接入】Stable Diffusion
    • 【模型接入】Midjourney
    • 【模型接入】Suno
  • IoT物联网手册

    • 功能开启
    • 产品管理
    • 设备管理
    • 物模型配置
    • 设备网关与子设备
    • 设备动态注册
    • 设备接入(概述)
    • 设备接入(HTTP 协议)
    • 设备接入(MQTT 协议)
    • 设备接入(EMQX 协议)
    • 设备接入(TCP 协议)
    • 设备接入(UDP 协议)
    • 设备接入(WebSocket 协议)
    • 设备接入(CoAP 协议)
    • 设备接入(Modbus Client 模式)
    • 设备接入(Modbus Server 模式)
    • 设备接入(自定义协议)
      • 1. 整体思路
      • 2. 实现步骤
        • 2.1 新增协议类型
        • 2.2 创建协议配置类
        • 2.3 实现 IotProtocol 接口
        • 2.4 实现上行 Handler
        • 2.5 实现下行 Subscriber
        • 2.6 注册到 IotProtocolManager
        • 2.7 添加 YAML 配置
      • 附录
        • A. 关键服务一览
        • B. 短连接 vs 长连接
        • C. 自定义序列化
    • 场景联动
    • 数据流转
    • 告警配置
    • OTA 固件升级
  • 公众号手册

    • 功能开启
    • 公众号接入
    • 公众号粉丝
    • 公众号标签
    • 公众号消息
    • 模版消息
    • 自动回复
    • 公众号菜单
    • 公众号素材
    • 公众号图文
    • 公众号统计
  • 系统手册

    • 短信配置
    • 邮件配置
    • 站内信配置
    • Webhook(钉钉、飞书、企微)
    • 数据脱敏
    • 敏感词
    • 地区 & IP 库
  • 运维手册

    • 开发环境
    • Linux 部署
    • Docker 部署
    • Jenkins 部署
    • 宝塔部署
    • 1Panel 部署
    • HTTPS 证书
    • 服务监控
  • 前端手册 Vue 3.x

    • 开发规范
    • 菜单路由
    • Icon 图标
    • 字典数据
    • 系统组件
    • 通用方法
    • 配置读取
    • CRUD 组件
    • 国际化
    • IDE 调试
    • 代码格式化
  • 前端手册 Vben 5.x

    • 开发规范
    • 菜单路由
    • 图标、主题、国际化
    • 字典数据
    • 系统组件
    • 通用方法
    • 配置读取
    • IDE 调试
    • 代码格式化
  • 前端手册 Vue 2.x

    • 开发规范
    • 菜单路由
    • Icon 图标
    • 字典数据
    • 系统组件
    • 通用方法
    • 配置读取
  • 前端手册 Admin Uniapp

    • 开发规范
    • 菜单路由
    • 图标、主题、国际化
    • 字典数据
    • 系统组件
    • 通用方法
    • IDE 调试
    • 代码格式化
    • 运行发布
  • 更新日志

    • 【v2026-01】
    • 【v2025-12】
    • 【v2025-11】
    • 【v2025-10】
    • 【v2025-09】
    • 【v2025-08】
    • 【v2-6-1】2025-07-19
    • 【v2-6-0】2025-06-07
    • 【v2.5.0】2025-05-13
    • 【v2.4.2】2025-04-12
    • 【v2.4.1】2025-02-09
    • 【v2.4.0】2024-12-31
    • 【v2.3.0】2024-10-07
    • 【v2.2.0】2024-08-02
    • 【v2.1.0】2024-05-05
    • 【v2.0.1】2024-03-01
    • 【v2.0.0】2024-01-26
    • 【v1.9.0】2023-12-01
  • 开发指南
  • IoT物联网手册
芋道源码
2026-02-12
目录

设备接入(自定义协议)

推荐阅读:

  • 《设备接入(概述)》 — 建议先阅读,了解整体架构和消息格式

本文以 TCP 协议为蓝本,一步步讲解如何在 IoT 网关中扩展一种新协议。

# 1. 整体思路

新增一种协议,需要改动以下模块和类:

步骤 改动位置 说明
① IotProtocolTypeEnum 新增协议类型枚举
② IotXxxConfig + IotGatewayProperties.ProtocolProperties 协议配置类
③ IotXxxProtocol 实现 IotProtocol 接口 协议主类,管理生命周期
④ 上行 Handler + 下行 Subscriber 消息收发处理
⑤ IotProtocolManager 注册协议到管理器

在 yudao-module-iot-gateway 模块的 protocol/ 目录下新建协议包,最终的包结构如下:

protocol/
└── xxx/                                    // 你的协议包
    ├── IotXxxProtocol.java                 // 协议主类
    ├── IotXxxConfig.java                   // 专属配置类
    ├── handler/
    │   ├── upstream/                       // 上行(设备 → 网关)
    │   │   ├── IotXxxAuthHandler.java      // 认证
    │   │   └── IotXxxUpstreamHandler.java  // 上行消息
    │   └── downstream/                     // 下行(网关 → 设备)
    │       ├── IotXxxDownstreamHandler.java     // 下行消息处理逻辑
    │       └── IotXxxDownstreamSubscriber.java  // 消息总线订阅者
    └── manager/                            // 长连接协议需要
        └── IotXxxConnectionManager.java    // 设备连接管理

提示

短连接协议(如 HTTP)通常不需要 manager/ 目录。

# 2. 实现步骤

# 2.1 新增协议类型

① 在 yudao-module-iot-core 模块的 IotProtocolTypeEnum 枚举中,新增一个枚举值:

public enum IotProtocolTypeEnum {

    // ... 已有协议 ...
    HTTP("http"),
    MQTT("mqtt"),
    TCP("tcp"),

    XXX("xxx"); // 新增:你的协议类型标识

}

注意

"xxx" 是协议的类型标识,必须与 application.yaml 配置中的 protocol 字段一致。

② 需要在管理后台的「系统管理 → 字典管理」中,找到字典类型 iot_product_protocol_type,新增一条字典数据(值与枚举标识一致,如 "xxx"),这样前端创建产品时才能选择该协议类型。

# 2.2 创建协议配置类

① 新建专属配置类 IotXxxConfig,存放该协议独有的配置参数:

@Data
public class IotXxxConfig {

    /**
     * 最大连接数
     */
    private Integer maxConnections = 1000;

    // ... 其他协议特有参数 ...
}

可参考 IotTcpConfig(含 codec、maxConnections 等字段)。

② 在 ProtocolProperties 中注册,编辑 IotGatewayProperties 的 ProtocolProperties 内部类:

@Data
public static class ProtocolProperties {

    // ... 已有字段 ...
    private IotHttpConfig http;
    private IotMqttConfig mqtt;

    private IotXxxConfig xxx; // 新增
}

# 2.3 实现 IotProtocol 接口

IotProtocol 是所有协议的核心接口,需要实现以下方法:

方法 说明
#getId() 协议实例 ID,对应配置中的 id 字段
#getServerId() 服务标识,用于下行消息路由
#getType() 协议类型枚举
#start() 启动协议服务
#stop() 停止协议服务
#isRunning() 是否正在运行

以 IotTcpProtocol 为参考,协议主类的实现结构如下:

@Slf4j
public class IotXxxProtocol implements IotProtocol {

    private final ProtocolProperties properties;
    @Getter
    private final String serverId;
    @Getter
    private volatile boolean running = false;

    // 协议资源(服务器、连接等)
    private YourServer server;
    private IotXxxDownstreamSubscriber downstreamSubscriber;

    public IotXxxProtocol(ProtocolProperties properties) {
        this.properties = properties;
        this.serverId = IotDeviceMessageUtils.generateServerId(properties.getPort());
    }

    @Override
    public String getId() {
        return properties.getId();
    }

    @Override
    public IotProtocolTypeEnum getType() {
        return IotProtocolTypeEnum.XXX;
    }

    @Override
    public void start() {
        if (running) {
            return;
        }
        try {
            // ① 创建并启动协议服务器
            this.server = createAndStartServer();

            running = true;
            log.info("[start][协议 {} 启动成功,端口:{}]", getId(), properties.getPort());

            // ② 启动下行消息订阅者
            IotMessageBus messageBus = SpringUtil.getBean(IotMessageBus.class);
            this.downstreamSubscriber = new IotXxxDownstreamSubscriber(this, messageBus);
            this.downstreamSubscriber.start();
        } catch (Exception e) {
            log.error("[start][协议 {} 启动失败]", getId(), e);
            stop0();
            throw e;
        }
    }

    @Override
    public void stop() {
        if (!running) {
            return;
        }
        stop0();
    }

    private void stop0() {
        // ① 停止下行订阅者
        if (downstreamSubscriber != null) {
            downstreamSubscriber.stop();
            downstreamSubscriber = null;
        }
        // ② 关闭服务器
        if (server != null) {
            server.close();
            server = null;
        }
        running = false;
    }
}

注意

stop0() 方法在 start() 异常时也会被调用(用于清理已初始化的资源),因此每个资源释放都应做 null 判断,避免 NPE。

# 2.4 实现上行 Handler

上行 Handler 负责处理设备发送到网关的请求,通常包括 认证 和 消息上报 两部分。

# 2.4.1 认证

认证方式取决于协议的连接特性:

连接类型 认证方式 参考实现 说明
短连接 JWT Token IotHttpAuthHandler 认证成功返回 Token,后续请求携带 Token
长连接 连接时认证 IotTcpUpstreamHandler(handleAuth() 方法) 认证成功后在 ConnectionManager 中记录设备信息,连接期间无需重复认证

两种方式都通过 IotDeviceCommonApi 的 #authDevice(...) 方法完成认证校验。认证成功后,需调用 IotDeviceMessageService 的 #sendDeviceMessage(...) 发送上线消息:

IotDeviceMessage message = IotDeviceMessage.buildStateUpdateOnline();
deviceMessageService.sendDeviceMessage(message, productKey, deviceName, serverId);

# 2.4.2 消息上报

处理设备上报的属性、事件等业务消息。核心流程:

  1. 从请求中解析出 productKey、deviceName
  2. 反序列化消息体为 IotDeviceMessage
  3. 调用 IotDeviceMessageService 的 #sendDeviceMessage(message, productKey, deviceName, serverId) 发布到消息总线
// 以 TCP 为例(IotTcpUpstreamHandler)
String productKey = connectionInfo.getProductKey();
String deviceName = connectionInfo.getDeviceName();
IotDeviceMessage message = deserializeMessage(data);
deviceMessageService.sendDeviceMessage(message, productKey, deviceName, serverId);

提示

如果需要自定义消息序列化格式,可参考附录「C. 自定义序列化」。

# 2.4.3 动态注册(可选)

如果协议需要支持设备动态注册(一型一密),参考 IotHttpRegisterHandler 和 IotHttpRegisterSubHandler,调用 IotDeviceCommonApi 的 #registerDevice(...) 和 #registerSubDevices(...) 方法。

# 2.5 实现下行 Subscriber

① 情况一:下行 Subscriber 负责接收平台发送给设备的消息(如属性设置、服务调用),继承 AbstractIotProtocolDownstreamSubscriber 即可:

@Slf4j
public class IotXxxDownstreamSubscriber extends AbstractIotProtocolDownstreamSubscriber {

    public IotXxxDownstreamSubscriber(IotProtocol protocol, IotMessageBus messageBus) {
        super(protocol, messageBus);
    }

    @Override
    protected void handleMessage(IotDeviceMessage message) {
        // 短连接协议(如 HTTP)不支持下行推送,直接忽略
        log.info("[handleMessage][协议不支持下行推送,忽略消息:{}]", message.getId());
    }
}

② 情况二:如果是长连接协议,需要通过 ConnectionManager 查找设备连接并推送消息。可参考 IotTcpDownstreamSubscriber + IotTcpDownstreamHandler,完整示例如下:

IotXxxDownstreamSubscriber 将消息委托给 Handler 处理:

@Slf4j
public class IotXxxDownstreamSubscriber extends AbstractIotProtocolDownstreamSubscriber {

    private final IotXxxDownstreamHandler downstreamHandler;

    public IotXxxDownstreamSubscriber(IotProtocol protocol,
                                      IotXxxDownstreamHandler downstreamHandler,
                                      IotMessageBus messageBus) {
        super(protocol, messageBus);
        this.downstreamHandler = downstreamHandler;
    }

    @Override
    protected void handleMessage(IotDeviceMessage message) {
        downstreamHandler.handle(message);
    }
}

IotXxxDownstreamHandler 查找连接、序列化、发送:

@Slf4j
@RequiredArgsConstructor
public class IotXxxDownstreamHandler {

    private final IotXxxConnectionManager connectionManager;
    private final IotMessageSerializer serializer;

    public void handle(IotDeviceMessage message) {
        // ① 检查设备连接
        IotXxxConnectionManager.ConnectionInfo connectionInfo =
                connectionManager.getConnectionInfoByDeviceId(message.getDeviceId());
        if (connectionInfo == null) {
            log.warn("[handle][设备 {} 不在线]", message.getDeviceId());
            return;
        }
        // ② 序列化消息
        byte[] payload = serializer.serialize(message);
        // ③ 发送给设备
        connectionManager.sendToDevice(message.getDeviceId(), payload);
        log.info("[handle][下行消息发送成功, 设备 ID: {}, 方法: {}]",
                message.getDeviceId(), message.getMethod());
    }
}

# 2.6 注册到 IotProtocolManager

在 IotProtocolManager 中完成两处改动:

① 在 #createProtocol(config) 的 switch 中增加分支:

switch (protocolType) {
    // ... 已有 case ...
    case XXX:
        return createXxxProtocol(config);
}

② 新增工厂方法:

private IotXxxProtocol createXxxProtocol(ProtocolProperties config) {
    return new IotXxxProtocol(config);
}

# 2.7 添加 YAML 配置

在网关的 application.yaml 中添加协议实例配置:

yudao:
  iot:
    gateway:
      protocols:
        - id: xxx-json
          enabled: true
          protocol: xxx        # 对应 IotProtocolTypeEnum 的标识
          port: 9000
          serialize: json      # 可选,短连接协议通常不需要
          xxx:                 # 对应 IotXxxConfig 的字段
            max-connections: 1000

注意

enabled 默认为 false,需要显式设置为 true 才会启动。

# 附录

# A. 关键服务一览

协议实现中常用的服务,均可通过 SpringUtil.getBean(...) 获取:

服务 说明
IotDeviceMessageService 消息发送、序列化/反序列化(按设备配置)
IotDeviceTokenService JWT Token 创建/校验(短连接认证)
IotDeviceCommonApi 设备认证 #authDevice(...)、动态注册 #registerDevice(...) 等 RPC 接口
IotDeviceService 设备缓存查询 #getDeviceFromCache(...)
IotMessageBus 消息总线,注册/取消订阅者
IotMessageSerializerManager 获取指定类型的消息序列化器(JSON / Binary)

# B. 短连接 vs 长连接

根据协议的连接特性,实现方式有所不同:

特性 短连接(HTTP、CoAP) 长连接(MQTT、TCP、UDP、WebSocket)
认证方式 Token(JWT 无状态) 连接时认证,ConnectionManager 记录
下行推送 不支持(DownstreamSubscriber 忽略) 支持(通过 ConnectionManager 查找连接并推送)
序列化 固定 JSON(请求体即 JSON) 通过 serialize 配置 或 设备 serializeType 字段
连接管理 不需要 需要 ConnectionManager 管理连接和设备映射
离线检测 不涉及(Token 过期即视为离线) 连接断开时发送离线消息
参考实现 IotHttpProtocol IotTcpProtocol、IotMqttProtocol

# C. 自定义序列化

系统内置 JSON 和 Binary 两种消息序列化方式(对应 IotSerializeTypeEnum 枚举)。如需自定义序列化格式,步骤如下:

  1. 在 IotSerializeTypeEnum 中新增枚举值(如 MY_FORMAT("my_format"))
  2. 实现 IotMessageSerializer 接口,包含三个方法:
    • #serialize(IotDeviceMessage) — 将消息编码为 byte[]
    • #deserialize(byte[]) — 将 byte[] 解码为 IotDeviceMessage
    • #getType() — 返回对应的 IotSerializeTypeEnum 枚举值
  3. IotMessageSerializerManager 会自动加载所有枚举对应的序列化器,无需手动注册

可参考内置实现:IotJsonSerializer(JSON 格式)、IotBinarySerializer(自定义二进制协议,含魔数、版本号、消息类型等帧头结构)。

设备接入(Modbus Server 模式)
场景联动

← 设备接入(Modbus Server 模式) 场景联动→

Theme by Vdoing | Copyright © 2019-2026 芋道源码 | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式
×