基于沃土数字平台分离IoT设备消息
引导式阅读
Others
基于沃土数字平台分离IoT设备消息
作者
HDC.Cloud
上架时间
2023-04-07 18:09:12

基于沃土数字平台分离IOT设备消息

您在体验过程中有任何疑问,都可以在此留言反馈>>

如需下载本指导文档,请点击此处下载。

1.介绍

什么是ROMA

ROMA是融合集成平台,提供轻量化应用、消息、数据等集成能力,简化企业上云,支持云上云下、跨区域集成,打通IT与OT,连接企业与生态伙伴,助力行业数字化转型。

数据集成:提供多种数据源的快速集成能力,可以在任意时间、任意地点、任意系统之间实现轻量级实时数据集成和定时增量数据迁移。支持文本、消息、API、结构和非结构化数据等多种数据源之间的灵活集成。

应用集成:提供API设计、开发、测试、管理、发布能力,具备API策略路由、统一接入、认证授权、请求验证、流量控制、API调用统计分析能力,以及协议转换、API编排、API调度等业务处理能力。

消息集成:针对企业级互联的专业消息组件,基于高可用分布式集群技术,包括发布订阅、消息轨迹、资源统计、监控报警等一套完整的消息云服务。

本次Codelabs主要体验ROMA的消息集成能力。

本次Codelabs的意义是什么?

实际IOT场景中,设备产生的消息种类比较复杂,消息结构和北向应用所需的结构存在差异,需要将消息分类为不同类型的消息,供北向系统订阅消费。

例如,本次实验中的IOT平台上报的设备消息既有告警消息又有设备消息,如果不进行分离分类,北向系统集成IOT平台消息时,就无法准确区分设备上报的是告警消息还是设备消息。

您将建立什么?

在本次体验中,您将使用ROMA系统建立一个应用,此应用能够将把上报的消息分类为设备消息和告警消息,并以消息队列形式提供给北向系统进行订阅。

您将学会到什么

  • 如何使用ROMA进行消息集成

  • 如何使用ROMA分离分类消息

  • 如何通过ROMA MQS向北向系统提供消息订阅能力

您需要什么?

开发环境及技能要求:

  • 熟悉JAVA Script。

  • 注册成为数字平台开发者。

运行终端要求:

  • 开发计算机(台式机或笔记本电脑)

  • 开发计算机需要安装Google Chrome浏览器。

2. 操作场景说明

在消防场景中,当IOT设备连接的消防探头探测到高温、烟雾,或有人按下消防紧急报警按钮时,消息通过ROMA上报到IOC平台,产生实时告警。

由于实际项目中,IOT产生的消息种类复杂,消息结构和实际应用所需的消息结构存在差异,这里将上报的消息进行了分类,具体实施流程如下图所示。

  1. IOT设备平台作为消息的生产者,调用ROMA平台的SDK将消息放入MQS队列中,SDK开发过程参见《ROMA 二次开发指南》。本次Codelabs中,将使用livedata创建模拟接口IOC_DATATEST,用于模拟IOT设备上报消息。
  2. ROMA LiveData中创建callback接口,订阅并消费MQS中IOT平台上报的消息,并通过CallBack消费的消息,转发到LiveData的分离API接口中。
  3. LiveData分离消息API将告警消息和其他消息进行分离并发送至MQS不同的Topic队列中。
  4. 北向IOC应用,通过订阅ROMA平台MQS消息,实时接收告警信息。

本示例中的信息规划如下表。

> 注意:同一套ROMA中应用、消息和API的ID、名称、路径都需要保持唯一,所以开发者可以根据下面的示例组合自己的用户名组成唯一的ID、名称和路径,否则在操作过程中,页面会提示此名称已使用。

涉及模块 名称 说明
应用 应用ID:com.fire.demo.ioc.test01 ;应用名称:com.fire.demo.ioc.test01 此处的应用不是IOC北向应用,而是ROMA上的应用,用于在ROMA:1) 接收IOT平台上报消息并进行消息处理的应用。2)上报告警信息到IOC北向应用。
MQS Topic名称:T_IOC_DEVICE_DATA_Test01 接收IOT平台消息的Topic。
MQS Topic名称:T_ALARM_DATA_Test01 接收通过API分离的告警消息的Topic。
MQS Topic名称:T_DEVICE_DATA_Test01 接收通过API分离的除告警消息以外其他消息的Topic。
APIC API名称:IOC_DATATEST 此接口用于模拟IOT平台设备发送消防安全消息。
APIC API名称:1_IOC_MQS_TEST01 此接口用于消费T_IOC_DEVICE_DATA_Test01队列中消息,并call back消息的body至接口2_IOC_MQS。访问路径规划为:/ioc/mqs1
APIC API名称:2_IOC_MQS 此接口用于将消息分离,将告警消息发送到T_ALARM_DATA_Test01的topic队列,将设备变化消息发送到T_DEVICE_DATA_Test01的topic队列。访问路径规划为:/ioc/mqs2

3. 创建ROMA应用

创建ROMA平台中用于接收消息、处理并上报消息的应用。

步骤1 使用chrome浏览器,登录数字平台开发平台,点击页面右上方“开发者中心>集成开发”,进入ROMA页面。

步骤2 如下图所示在页面上行选择“应用注册”,进入“应用注册”页面。

也可以选择管理控制台后应用名称右侧的小箭头,点击“应用注册”。

步骤3 根据信息规划中“应用”模块的规划,创建ID为com.fire.demo.ioc.test01的应用,认证类型选择“私钥认证”。记录此应用ID和名称,后面的步骤中需要用到这些信息。

> 注意: > > 1. 如果提示应用ID或应用名称已存在或者不可用,说明ROMA系统中已有此应用ID或应用名,请保持应用ID或应用名唯一。
> 2. 文中提示记录的信息,后面的步骤中会用到,为了加快操作,建议提前将这些信息记录到记事本中。
> 3. 私钥认证是采用应用ID和应用密钥进行API、MQS等访问认证的方式,jwt认证则要使用应用ID、时间戳和认证token来进行鉴权认证,才能访问应用的API、MQS等。本次操作为了简化用户操作,采用的是私钥认证方式。

步骤4 在应用管理页面单击新建应用前的“密钥管理”,点击“显示密钥”,查看并记录此应用的请求密钥,后面的步骤中需要用到此密钥。

4. 消息集成:创建并发布Topic

根据数据规划,在MQS上创建3个Topic,T_IOC_DEVICE_DATA_Test01用于接收IOT平台消息,T_DEVICE_DATA_Test01用于接收IOT设备消息,T_ALARM_DATA_Test01用于接收IOT告警消息。

以下示例为如何创建一个T_IOC_DEVICE_DATA_Test01,参考以下示例完成3个topic的创建。

步骤1 在ROMA左侧导航树中,选择“消息集成 > 消息队列服务 MQS”,进入MQS界面。

步骤2 在页面左侧二级导航树中,选择“Topic管理”。

步骤3 单击“创建Topic”按钮。

步骤4 参见信息规划中“MQS”模块的规划,创建名称为T_IOC_DEVICE_DATA_Test01的Topic。

其中“所属应用”选择刚创建的应用“com.fire.demo.ioc.test01”,表明此topic属于此应用。

> 注意:当提示保存失败,且名称已存在时,标明ROMA系统中存在重复的topic名称,请修改topic名称为唯一名称。

步骤5 在“T_IOC_DEVICE_DATA_Test01”前的操作列,单击“发布”。

在发布页面中“发布应用”选择新建的应用“com.fire.demo.ioc.test01”,表明此应用可向此topic写消息。点击“发布”。

步骤6 点击左侧二级导航栏中的“Topic管理”,在“T_IOC_DEVICE_DATA_Test01”前的操作列,单击“订阅”。

“订阅应用”选择新建应用“com.fire.demo.ioc.test01”,表明此应用可向此topic读消息。点击“查看连接地址”,记录内部访问地址,如“192.168.9.30:9776;192.168.9.96:9776”,后面的步骤中会用到。单击“保存”。

步骤7 参考步骤1~6重复创建、发布和订阅T_DEVICE_DATA_Test01和T_ALARM_DATA_Test01两个topic,并记录其中任意一个TOPIC发布信息中的内部访问地址。

5. 创建分离消息API

创建分离消息API,用于分离callback API中回调的消息,将其中的告警消息发送到MQS消息队列T_ALARM_DATA_Test01,设备消息发送到MQS消息队列中T_DEVICE_DATA_Test01。

步骤1 在ROMA左侧导航树中,选择“服务集成 > Live Data”,进入LiveData管理控制台界面。

步骤2 设计API。

> 提示:设计API是初始化API,指定API的所属应用,请求路径,HTTP方法等。

在LiveData界面的左侧导航树中,选择“API设计”,单击“新建”。

配置API设计信息,如下表所示。单击“提交”,提示“创建成功”,完成API的设计。

参数名 如何配置
API基本信息
名称 2_IOC_MQS
申请人 使用默认值
所属应用 com.fire.demo.ioc.test01
API组 使用默认值。注意:若下拉菜单中无可选项,则首先在左侧二级导航栏中,资源管理>API组管理中创建一个API组。
版本号 使用默认值
描述 可不填
API调用信息
请求路径 /ioc/mqs2
HTTP方法 post
返回类型 JSON(application/json)
格式化返回 使用默认“是”
Header 可不填。
Parameter 可不填。
步骤3 开发API。

> 提示:开发API是定义API的具体功能。

在创建成功的页面上,单击“进入开发”。“类型”选择“函数API”,单击“确定”,进入API开发界面。配置“函数服务脚本”,返回对象填写“result”,编辑输入定义脚本,示例如下。点击“提交”。

> 提示:脚本中定义API根据消息的类型,将消息推送到不同的topic中。

importClass(com.huawei.livedata.util.MQSUtils); function excute(data) { //MQS属性 var topic_name1 = "T_DEVICE_DATA_Test01";//分离后非告警消息topic var topic_name2 = "T_ALARM_DATA_Test01";//分离后告警消息topic var topic_url = "192.168.9.30:9776;192.168.9.96:9776";//分离后topic内部访问地址 var topic_appId = "com.fire.demo.ioc.test01";//topic所属的ROMA应用ID var topic_passWord = "o1mDAgrsmWZiwMMAq57U2g=="; //topic所属ROMA应用密钥 var utils = new MQSUtils(); var obj = JSON.parse(data); var msg = obj.body.body; msg = JSON.parse(msg); try { var resBody = {}; //service是对象 if (msg.hasOwnProperty("service")) { if (typeof(msg.service) == "object" && Object.prototype.toString.call(msg.service).toLowerCase() == "[object object]" && !(msg.service).length) { resBody = getMsgBodyByObj(msg); var msgBody = {}; msgBody.body = resBody.msgBody; if (resBody.type === "alarm") { utils.sendMsg(topic_url, topic_appId, topic_passWord, topic_name2, JSON.stringify(msgBody)); } else { utils.sendMsg(topic_url, topic_appId, topic_passWord, topic_name1, JSON.stringify(msgBody)); } } return "success+object"; } } catch (exception) { return exception + ""; } } //对象情况下 function getMsgBodyByObj(paramBody) { var resJson = {}; var msgBody = {}; var serviceId = ""; if (paramBody.service.hasOwnProperty("serviceId")) { serviceId = paramBody.service.serviceId; } var serviceType = ""; if (paramBody.service.hasOwnProperty("serviceType")) { serviceType = paramBody.service.serviceType; } if (paramBody.hasOwnProperty("requestId")) { msgBody.requestId = paramBody.requestId; } else { msgBody.requestId = ""; } if (paramBody.service.hasOwnProperty("eventTime")) { msgBody.eventTime = paramBody.service.eventTime; } else { msgBody.eventTime = ""; } msgBody.data = {}; var msgCode = ""; if (paramBody.service.hasOwnProperty("data")) { var i = 0; for (var key in (paramBody.service.data)) { if (i === 0) { msgCode = key; } msgBody.data[serviceId + "_" + key] = paramBody.service.data[key]; i++; } } if (paramBody.hasOwnProperty("notifyType")) { msgBody.notifyType = paramBody.notifyType; } else { msgBody.notifyType = ""; } if (paramBody.hasOwnProperty("deviceId")) { msgBody.deviceId = paramBody.deviceId; } else { msgBody.deviceId = ""; } if (paramBody.hasOwnProperty("gatewayId")) { msgBody.gatewayId = paramBody.gatewayId; } else { msgBody.gatewayId = ""; } //判断serviceType如果不是ServiceAlarm或DeviceAlarm if (serviceType === "ServiceAlarm" || serviceType === "DeviceAlarm") { msgBody.messageCode = msgCode; } resJson.msgBody = msgBody; if (serviceType === "ServiceAlarm" || serviceType === "DeviceAlarm") { resJson.type = "alarm"; } else { resJson.type = ""; } return resJson; }
参数名 如何配置
topic_name1 MQS模块中接收除告警消息以外其他消息的Topic。本文示例为T_DEVICE_DATA_Test01。
topic_name2 MQS模块中接收告警消息的Topic。本文示例为T_ALARM_DATA_Test01。
topic_url Topic的内部访问地址。因为本次规划的两个topic同属于一个应用,所以访问地址相同。如果没有记录,可以点击左侧导航栏中“消息集成>消息队列服务MQS”,点击“发布订阅管理>发布管理”,单击topic前的编辑,点击“查看连接地址”中查看topic的内部访问地址。
topic_appId Topic的所属应用ID。本文示例为com.fire.demo.ioc.test01。
topic_passWord Topic所属应用的明文请求密钥。在页面左上方“管理控制台”旁的下拉箭头中,选择“应用中心”,点击“密钥管理”查看明文的应用请求密钥。

步骤4 在Live Data左侧导航树中,选择“API测试”。在“2_IOC_MQS”的操作列单击测试按钮,进入API测试页面。

  • 接入集群:选择任一集群

  • 接入节点:选择任一节点

  • Headers:无需填写

  • Parameters:无需填写

  • Body

    • 内容类型:application/json

    • 请求内容:

{ "body":"{ \"requestId\":\"184dae0aa44648679b7e32635e8d2bbe\", \"service\": { \"serviceId\":\"ServiceAlarm\", \"serviceType\":\"ServiceAlarm\", \"eventTime\": \"20190226030832\", \"data\":{ \"FireAlarmType\":\"10\" } }, \"notifyType\": \"ServiceAlarm \", \"deviceId\":\"6f0bbb6d-1586-4638-83a0-8eaf7bb7f6bb\", \"gatewayId\":\"06c9d00d-f965-4430-86ea-e0021d3aecf8\" }" }

> 注意:如果此处Body无法编辑,说明在设计API时所选的HTTP方法不是post,请返回查看并修改。

单击“测试API”,测试结果如下表示测试成功。可以单击“保存用例”,以便后续需要重复测试。

{ "retCode": "0", "retJSON": { "result": "success+object" } }

步骤5 部署API。

> 提示:部署API是将API部署在API网关上。

在Live Data左侧导航树中,选择“API部署”,在“2_IOC_MQS”API的“操作”列,单击部署按钮,选择“部署到”下拉表中默认的集群。单击“部署”,提示部署成功。

步骤6 授权API。

> 提示:授权API是定义什么ROMA应用可以访问此API,因为本次操作应用的callback API会访问消息分离API,所以消息分离API需要在API网关上授权给应用访问。

在ROMA左侧导航树中,选择“服务集成 > API 网关”,在左侧二级导航栏中,选择“API授权”,在“2_IOC_MQS”API对应的操作列,单击授权按钮。在授权应用区域中,选择应用ID “com.fire.demo.ioc.test01”,单击“授权”。

6. 创建callback API

创建callback API,用于消费IOT设备上报的消息,并将消息Callback至分离告警消息的API中。

步骤1 选择“服务集成 > Live Data”,进入LiveData管理控制台界面。

步骤1 在ROMA左侧导航树中,选择“服务集成 > Live Data”,进入LiveData管理控制台界面。

步骤2 设计API.

在LiveData界面的左侧导航树中,选择“API设计”。单击“新建”。配置API设计信息,如下表所示。

参数名 如何配置
API基本信息
名称 1_IOC_MQS_TEST01 参见操作场景说明中规划的名称,若提示名称已存在,请修改名称为全局唯一。
申请人 使用默认值
所属应用 com.fire.demo.ioc.test01
API组 在下拉菜单中选择默认值
版本号 使用默认值
描述 可不填
API调用信息
请求路径 /ioc/mqs1 参见操作场景说明中规划的路径,此路径为接口的访问路径,可自定义。
HTTP方法 Post
返回类型 JSON(application/json)
格式化返回 使用默认“是”
Header 可不填。
Parameter 可不填。
步骤3 开发API。

单击“提交”后,选择“进入开发”页面, “类型”选择“函数API”,单击“确定”,进入API开发界面。

配置“函数服务脚本”, 返回对象选择“result”,修改定义脚本后点击提交。脚本示例如下:

> 提示:脚本中的部分信息需要修改为实际配置。 > > 脚本定义API订阅MQS中IOT设备上报Topic,并callback消息到分离消息API。

importClass(com.huawei.livedata.util.MQSUtils); importClass(com.huawei.livedata.util.MQSCallback); importClass(com.huawei.livedata.lambdaservice.util.CacheUtils); function excute(data) { //MQS属性 var topic_name = "T_IOC_DEVICE_DATA_Test01"; //接收设备上报消息的TOPIC名称 var topic_url = "192.168.9.30:9776;192.168.9.96:9776";//接收设备消息的TOPIC的内部访问地址 var topic_appId = "com.fire.demo.ioc.test01"; //Topic所属ROMA应用ID var topic_passWord = "o1mDAgrsmWZiwMMAq57U2g=="; //Topic所属ROMA应用的密钥 //消费成功后回调接口 var callback = new MQSCallback(); //回调接口host,无需更改 callback.setHost("192.168.9.116:1080"); //分离消息API的接口路径 callback.setService("/ioc/mqs2"); var headers = { "X-HW-ID":"com.fire.demo.ioc.test01", //分离消息API所属的ROMA应用ID "X-HW-APPKEY":"o1mDAgrsmWZiwMMAq57U2g==" //ROMA应用的密钥 }; //回调接口headers callback.setHeaders(JSON.stringify(headers)); //回调接口方法类型 callback.setMethod("POST"); var utils = new MQSUtils(); try { var result = utils.consumerMsg(topic_url, topic_appId, topic_passWord, topic_name, callback); return result; } catch(exception) { return exception + ""; } }
参数名 如何配置
topic_name 接收IOT平台消息的Topic。此处为T_IOC_DEVICE_DATA_Test01
topic_url 接收IOT平台消息的TOPIC所在的内部访问地址。可以点击左侧导航栏中“消息集成>消息队列服务MQS”,点击“发布订阅管理>订阅管理”,单击topic前的编辑,点击“查看连接地址”中查看topic的内部访问地址。
topic_appId 接收IOT平台消息的Topic所属的应用的应用ID。
topic_passWord 接收IOT平台消息Topic所属应用的请求密钥。在页面左上方“管理控制台”旁的下拉箭头中,选择“应用中心”,点击“密钥管理”查看明文的应用请求密钥。
callback.setService 分离消息API的请求路径,需要和“创建分离消息API”配置的请求路径保持一致。
X-HW-ID 分离消息API所属的应用ID。
X-HW-APPKEY 分离消息API所属的应用请求密钥。

7. 模拟测试

创建模拟API,模拟IOT设备发送消防安全消息至ROMA平台,测试最终在告警TOPIC中的MQS消息队列中是否有分离的告警消息。

步骤1 在ROMA左侧导航树中,选择“服务集成 > Live Data”,进入LiveData管理控制台界面。

步骤2 设计API

在LiveData界面的左侧导航树中,选择“API设计”,进入“API设计”页面。单击“新建”,进入“API设计”页面。配置API设计信息,如下表所示。

参数名 如何配置
API基本信息
名称 IOC_DATATEST,参见操作场景说明中信息规划部分。
申请人 使用默认值
所属者 com.fire.demo.ioc.test01
群组 使用默认值
版本号 使用默认值
所属应用 com.fire.demo.ioc.test01
描述 可不填
格式化返回 使用默认“是”
API调用信息
请求路径 /data/test
HTTP方法 Post
返回类型 JSON(application/json)
Header 可不填。
Parameter 可不填。

步骤3 开发API。

单击“提交”,提示“创建成功”,点击“进入开发”,API“类型”选择“函数API”。单击“确定”,进入API开发界面。

配置“函数服务脚本”。返回对象填写“result”,编辑定义脚本,脚本示例如下。完成后单击“提交”。

> 提示:脚本定义API模拟IOT设备发送一个预置的告警消息到消息上报TOPIC。

importClass(com.huawei.livedata.util.MQSUtils); function excute(data) { var obj = JSON.parse(data); var topic_name = "T_IOC_DEVICE_DATA_Test01"; //IOT设备上报消息的TOPIC var topic_url = "192.168.9.30:9776;192.168.9.96:9776";//IOT设备上报消息的TOPIC的内部访问地址 var topic_appId = "com.fire.demo.ioc.test01"; //TOPIC所属的ROMA应用的APPID var topic_passWord = "o1mDAgrsmWZiwMMAq57U2g=="; //TOPIC所属的ROMA应用的密钥 var msg = {}; var body = {}; var utils = new MQSUtils(); var result = ""; var rtnMsg = {}; try { body = { "requestId": "184dae0aa44648679b7e32635e8d2bbe", "service": { "serviceId": "ServiceAlarm", "serviceType": "ServiceAlarm", "eventTime": "20190226030832", "data": { "FireAlarmType": "10" } }, "notifyType": "ServiceAlarm", "deviceId": "6f0bbb6d-1586-4638-83a0-8eaf7bb7f6bb", "gatewayId": "06c9d00d-f965-4430-86ea-e0021d3aecf8" }; msg.body = body; result = utils.sendMsg(topic_url,topic_appId,topic_passWord,topic_name,JSON.stringify(msg)); rtnMsg = { "resCode":0, "resMsg":result }; } catch(exception) { rtnMsg = { "resCode":1, "resMsg":exception +"" }; } return JSON.stringify(rtnMsg); }
参数名 如何配置
topic_name 接收IOT平台消息的Topic。此处为T_IOC_DEVICE_DATA_Test01。
topic_url 接收IOT平台消息的Topic的内部访问地址。可以点击左侧导航栏中“消息集成>消息队列服务MQS”,点击“发布订阅管理>发布管理”,单击topic前的编辑,点击“查看连接地址”中查看topic的内部访问地址。
topic_appId 接收IOT平台上报消息并进行消息处理的应用。此处为com.fire.demo.ioc.test01。
topic_passWord 应用的请求密钥。在页面左上方“管理控制台”旁的下拉箭头中,选择“应用中心”,点击“密钥管理”查看明文的应用请求密钥。

步骤4 使用测试API模拟IOT设备上报消息。

在Live Data左侧导航树中,选择“API测试”,在“IOC_DATATEST”的操作列,单击测试按钮,进入API测试页面。

选择“接入集群”。单击“测试API”,测试结果如下表示API测试成功,表明测试API模拟IOT设备上报一条消息到T_IOC_DEVICE_DATA_Test01。

{ "retCode": "0", "retJSON": { "result": { "resCode": 0, "resMsg": "success status: true" } } }

步骤5 使用callback API消费IOT设备上报的消息,并callback给分离消息API。

> 提示:Callback API消费MQS消息时,五分钟内无二次消费,则会断开连接,实际场景中需要配置定时任务保持接口能不断消费MQS消息。

在Live Data左侧导航树中,选择“API测试”,进入API测试页面。在callback API“1_IOC_MQS_TEST01”的操作列,单击测试按钮,进入API测试页面。

在“API测试参数”页面,选择“接入集群”和“接入节点”,单击“测试API”,测试结果如下表示测试成功,表明callback API已消费IOT设备上报的消息。

{ "retCode": "0", "retJSON": { "result": "Success to start consumer" } }

步骤6 在MQS Portal左侧导航树中,选择“消息查询”,查看消息历史中是否包含“T_ALARM_DATA_Test01”消息。

如果已包含此消息,则说明callback API已从T_IOC_DEVICE_DATA_Test01获取到模拟IOT设备上报的消息,并通过分离消息API将告警消息转发到了T_ALARM_DATA_Test01的topic。

----结束

8. 恭喜你

祝贺您,您已经成功完成本次Codelabs的操作,有机会赢取精美礼品,点击"闯关有礼"查看活动详情。

9. 参考文件