基于沃土数字平台分离IOT设备消息
您在体验过程中有任何疑问,都可以在此留言反馈>>
如需下载本指导文档,请点击此处下载。
1.介绍
什么是ROMA
ROMA是融合集成平台,提供轻量化应用、消息、数据等集成能力,简化企业上云,支持云上云下、跨区域集成,打通IT与OT,连接企业与生态伙伴,助力行业数字化转型。
数据集成:提供多种数据源的快速集成能力,可以在任意时间、任意地点、任意系统之间实现轻量级实时数据集成和定时增量数据迁移。支持文本、消息、API、结构和非结构化数据等多种数据源之间的灵活集成。
应用集成:提供API设计、开发、测试、管理、发布能力,具备API策略路由、统一接入、认证授权、请求验证、流量控制、API调用统计分析能力,以及协议转换、API编排、API调度等业务处理能力。
消息集成:针对企业级互联的专业消息组件,基于高可用分布式集群技术,包括发布订阅、消息轨迹、资源统计、监控报警等一套完整的消息云服务。
本次Codelabs主要体验ROMA的消息集成能力。
本次Codelabs的意义是什么?
实际IOT场景中,设备产生的消息种类比较复杂,消息结构和北向应用所需的结构存在差异,需要将消息分类为不同类型的消息,供北向系统订阅消费。
例如,本次实验中的IOT平台上报的设备消息既有告警消息又有设备消息,如果不进行分离分类,北向系统集成IOT平台消息时,就无法准确区分设备上报的是告警消息还是设备消息。
您将建立什么?
在本次体验中,您将使用ROMA系统建立一个应用,此应用能够将把上报的消息分类为设备消息和告警消息,并以消息队列形式提供给北向系统进行订阅。
您将学会到什么
您需要什么?
开发环境及技能要求:
-
熟悉JAVA Script。
-
注册成为数字平台开发者。
运行终端要求:
2. 操作场景说明
在消防场景中,当IOT设备连接的消防探头探测到高温、烟雾,或有人按下消防紧急报警按钮时,消息通过ROMA上报到IOC平台,产生实时告警。
由于实际项目中,IOT产生的消息种类复杂,消息结构和实际应用所需的消息结构存在差异,这里将上报的消息进行了分类,具体实施流程如下图所示。

- IOT设备平台作为消息的生产者,调用ROMA平台的SDK将消息放入MQS队列中,SDK开发过程参见《ROMA 二次开发指南》。本次Codelabs中,将使用livedata创建模拟接口IOC_DATATEST,用于模拟IOT设备上报消息。
- ROMA LiveData中创建callback接口,订阅并消费MQS中IOT平台上报的消息,并通过CallBack消费的消息,转发到LiveData的分离API接口中。
- LiveData分离消息API将告警消息和其他消息进行分离并发送至MQS不同的Topic队列中。
- 北向IOC应用,通过订阅ROMA平台MQS消息,实时接收告警信息。
本示例中的信息规划如下表。
> 注意:同一套ROMA中应用、消息和API的ID、名称、路径都需要保持唯一,所以开发者可以根据下面的示例组合自己的用户名组成唯一的ID、名称和路径,否则在操作过程中,页面会提示此名称已使用。
涉及模块 |
名称 |
说明 |
应用 |
应用ID:com.fire.demo.ioc.test01 ;应用名称:com.fire.demo.ioc.test01 |
此处的应用不是IOC北向应用,而是ROMA上的应用,用于在ROMA:1) 接收IOT平台上报消息并进行消息处理的应用。2)上报告警信息到IOC北向应用。 |
MQS |
Topic名称:T_IOC_DEVICE_DATA_Test01 |
接收IOT平台消息的Topic。 |
MQS |
Topic名称:T_ALARM_DATA_Test01 |
接收通过API分离的告警消息的Topic。 |
MQS |
Topic名称:T_DEVICE_DATA_Test01 |
接收通过API分离的除告警消息以外其他消息的Topic。 |
APIC |
API名称:IOC_DATATEST |
此接口用于模拟IOT平台设备发送消防安全消息。 |
APIC |
API名称:1_IOC_MQS_TEST01 |
此接口用于消费T_IOC_DEVICE_DATA_Test01队列中消息,并call back消息的body至接口2_IOC_MQS。访问路径规划为:/ioc/mqs1 |
APIC |
API名称:2_IOC_MQS |
此接口用于将消息分离,将告警消息发送到T_ALARM_DATA_Test01的topic队列,将设备变化消息发送到T_DEVICE_DATA_Test01的topic队列。访问路径规划为:/ioc/mqs2 |
3. 创建ROMA应用
创建ROMA平台中用于接收消息、处理并上报消息的应用。
步骤1 使用chrome浏览器,登录数字平台开发平台,点击页面右上方“开发者中心>集成开发”,进入ROMA页面。
步骤2 如下图所示在页面上行选择“应用注册”,进入“应用注册”页面。
![]()
也可以选择管理控制台后应用名称右侧的小箭头,点击“应用注册”。
步骤3 根据信息规划中“应用”模块的规划,创建ID为com.fire.demo.ioc.test01的应用,认证类型选择“私钥认证”。记录此应用ID和名称,后面的步骤中需要用到这些信息。
> 注意:
>
> 1. 如果提示应用ID或应用名称已存在或者不可用,说明ROMA系统中已有此应用ID或应用名,请保持应用ID或应用名唯一。
> 2. 文中提示记录的信息,后面的步骤中会用到,为了加快操作,建议提前将这些信息记录到记事本中。
> 3. 私钥认证是采用应用ID和应用密钥进行API、MQS等访问认证的方式,jwt认证则要使用应用ID、时间戳和认证token来进行鉴权认证,才能访问应用的API、MQS等。本次操作为了简化用户操作,采用的是私钥认证方式。
![]()
步骤4 在应用管理页面单击新建应用前的“密钥管理”,点击“显示密钥”,查看并记录此应用的请求密钥,后面的步骤中需要用到此密钥。
![]()
4. 消息集成:创建并发布Topic
根据数据规划,在MQS上创建3个Topic,T_IOC_DEVICE_DATA_Test01用于接收IOT平台消息,T_DEVICE_DATA_Test01用于接收IOT设备消息,T_ALARM_DATA_Test01用于接收IOT告警消息。
![]()
以下示例为如何创建一个T_IOC_DEVICE_DATA_Test01,参考以下示例完成3个topic的创建。
步骤1 在ROMA左侧导航树中,选择“消息集成 > 消息队列服务 MQS”,进入MQS界面。
步骤2 在页面左侧二级导航树中,选择“Topic管理”。
步骤3 单击“创建Topic”按钮。
步骤4 参见信息规划中“MQS”模块的规划,创建名称为T_IOC_DEVICE_DATA_Test01的Topic。
其中“所属应用”选择刚创建的应用“com.fire.demo.ioc.test01”,表明此topic属于此应用。
![]()
> 注意:当提示保存失败,且名称已存在时,标明ROMA系统中存在重复的topic名称,请修改topic名称为唯一名称。
步骤5 在“T_IOC_DEVICE_DATA_Test01”前的操作列,单击“发布”。
在发布页面中“发布应用”选择新建的应用“com.fire.demo.ioc.test01”,表明此应用可向此topic写消息。点击“发布”。
![]()
步骤6 点击左侧二级导航栏中的“Topic管理”,在“T_IOC_DEVICE_DATA_Test01”前的操作列,单击“订阅”。
“订阅应用”选择新建应用“com.fire.demo.ioc.test01”,表明此应用可向此topic读消息。点击“查看连接地址”,记录内部访问地址,如“192.168.9.30:9776;192.168.9.96:9776”,后面的步骤中会用到。单击“保存”。
步骤7 参考步骤1~6重复创建、发布和订阅T_DEVICE_DATA_Test01和T_ALARM_DATA_Test01两个topic,并记录其中任意一个TOPIC发布信息中的内部访问地址。
![]()
5. 创建分离消息API
创建分离消息API,用于分离callback API中回调的消息,将其中的告警消息发送到MQS消息队列T_ALARM_DATA_Test01,设备消息发送到MQS消息队列中T_DEVICE_DATA_Test01。
![]()
步骤1 在ROMA左侧导航树中,选择“服务集成 > Live Data”,进入LiveData管理控制台界面。
步骤2 设计API。
> 提示:设计API是初始化API,指定API的所属应用,请求路径,HTTP方法等。
在LiveData界面的左侧导航树中,选择“API设计”,单击“新建”。
配置API设计信息,如下表所示。单击“提交”,提示“创建成功”,完成API的设计。
参数名 |
如何配置 |
API基本信息 |
|
名称 |
2_IOC_MQS |
申请人 |
使用默认值 |
所属应用 |
com.fire.demo.ioc.test01 |
API组 |
使用默认值。注意:若下拉菜单中无可选项,则首先在左侧二级导航栏中,资源管理>API组管理中创建一个API组。 |
版本号 |
使用默认值 |
描述 |
可不填 |
API调用信息 |
|
请求路径 |
/ioc/mqs2 |
HTTP方法 |
post |
返回类型 |
JSON(application/json) |
格式化返回 |
使用默认“是” |
Header |
可不填。 |
Parameter |
可不填。 |
步骤3 开发API。 |
|
> 提示:开发API是定义API的具体功能。
在创建成功的页面上,单击“进入开发”。“类型”选择“函数API”,单击“确定”,进入API开发界面。配置“函数服务脚本”,返回对象填写“result”,编辑输入定义脚本,示例如下。点击“提交”。
> 提示:脚本中定义API根据消息的类型,将消息推送到不同的topic中。
Copied!
importClass(com.huawei.livedata.util.MQSUtils);
function excute(data) {
var topic_name1 = "T_DEVICE_DATA_Test01";
var topic_name2 = "T_ALARM_DATA_Test01";
var topic_url = "192.168.9.30:9776;192.168.9.96:9776";
var topic_appId = "com.fire.demo.ioc.test01";
var topic_passWord = "o1mDAgrsmWZiwMMAq57U2g==";
var utils = new MQSUtils();
var obj = JSON.parse(data);
var msg = obj.body.body;
msg = JSON.parse(msg);
try {
var resBody = {};
if (msg.hasOwnProperty("service")) {
if (typeof(msg.service) == "object"
&& Object.prototype.toString.call(msg.service).toLowerCase() == "[object object]"
&& !(msg.service).length) {
resBody = getMsgBodyByObj(msg);
var msgBody = {};
msgBody.body = resBody.msgBody;
if (resBody.type === "alarm") {
utils.sendMsg(topic_url, topic_appId, topic_passWord, topic_name2,
JSON.stringify(msgBody));
} else {
utils.sendMsg(topic_url, topic_appId, topic_passWord, topic_name1,
JSON.stringify(msgBody));
}
}
return "success+object";
}
} catch (exception) {
return exception + "";
}
}
function getMsgBodyByObj(paramBody) {
var resJson = {};
var msgBody = {};
var serviceId = "";
if (paramBody.service.hasOwnProperty("serviceId")) {
serviceId = paramBody.service.serviceId;
}
var serviceType = "";
if (paramBody.service.hasOwnProperty("serviceType")) {
serviceType = paramBody.service.serviceType;
}
if (paramBody.hasOwnProperty("requestId")) {
msgBody.requestId = paramBody.requestId;
} else {
msgBody.requestId = "";
}
if (paramBody.service.hasOwnProperty("eventTime")) {
msgBody.eventTime = paramBody.service.eventTime;
} else {
msgBody.eventTime = "";
}
msgBody.data = {};
var msgCode = "";
if (paramBody.service.hasOwnProperty("data")) {
var i = 0;
for (var key in (paramBody.service.data)) {
if (i === 0) {
msgCode = key;
}
msgBody.data[serviceId + "_" + key] = paramBody.service.data[key];
i++;
}
}
if (paramBody.hasOwnProperty("notifyType")) {
msgBody.notifyType = paramBody.notifyType;
} else {
msgBody.notifyType = "";
}
if (paramBody.hasOwnProperty("deviceId")) {
msgBody.deviceId = paramBody.deviceId;
} else {
msgBody.deviceId = "";
}
if (paramBody.hasOwnProperty("gatewayId")) {
msgBody.gatewayId = paramBody.gatewayId;
} else {
msgBody.gatewayId = "";
}
if (serviceType === "ServiceAlarm" || serviceType === "DeviceAlarm") {
msgBody.messageCode = msgCode;
}
resJson.msgBody = msgBody;
if (serviceType === "ServiceAlarm" || serviceType === "DeviceAlarm") {
resJson.type = "alarm";
} else {
resJson.type = "";
}
return resJson;
}
参数名 |
如何配置 |
topic_name1 |
MQS模块中接收除告警消息以外其他消息的Topic。本文示例为T_DEVICE_DATA_Test01。 |
topic_name2 |
MQS模块中接收告警消息的Topic。本文示例为T_ALARM_DATA_Test01。 |
topic_url |
Topic的内部访问地址。因为本次规划的两个topic同属于一个应用,所以访问地址相同。如果没有记录,可以点击左侧导航栏中“消息集成>消息队列服务MQS”,点击“发布订阅管理>发布管理”,单击topic前的编辑,点击“查看连接地址”中查看topic的内部访问地址。 |
topic_appId |
Topic的所属应用ID。本文示例为com.fire.demo.ioc.test01。 |
topic_passWord |
Topic所属应用的明文请求密钥。在页面左上方“管理控制台”旁的下拉箭头中,选择“应用中心”,点击“密钥管理”查看明文的应用请求密钥。 |
步骤4 在Live Data左侧导航树中,选择“API测试”。在“2_IOC_MQS”的操作列单击测试按钮,进入API测试页面。
-
接入集群:选择任一集群
-
接入节点:选择任一节点
-
Headers:无需填写
-
Parameters:无需填写
-
Body
-
内容类型:application/json
-
请求内容:
Copied!
{ "body":"{ \"requestId\":\"184dae0aa44648679b7e32635e8d2bbe\", \"service\":
{ \"serviceId\":\"ServiceAlarm\", \"serviceType\":\"ServiceAlarm\", \"eventTime\":
\"20190226030832\", \"data\":{ \"FireAlarmType\":\"10\" } }, \"notifyType\":
\"ServiceAlarm \", \"deviceId\":\"6f0bbb6d-1586-4638-83a0-8eaf7bb7f6bb\",
\"gatewayId\":\"06c9d00d-f965-4430-86ea-e0021d3aecf8\" }" }
> 注意:如果此处Body无法编辑,说明在设计API时所选的HTTP方法不是post,请返回查看并修改。
单击“测试API”,测试结果如下表示测试成功。可以单击“保存用例”,以便后续需要重复测试。
Copied!
{
"retCode": "0",
"retJSON": {
"result": "success+object"
}
}
步骤5 部署API。
> 提示:部署API是将API部署在API网关上。
在Live Data左侧导航树中,选择“API部署”,在“2_IOC_MQS”API的“操作”列,单击部署按钮,选择“部署到”下拉表中默认的集群。单击“部署”,提示部署成功。
步骤6 授权API。
> 提示:授权API是定义什么ROMA应用可以访问此API,因为本次操作应用的callback API会访问消息分离API,所以消息分离API需要在API网关上授权给应用访问。
在ROMA左侧导航树中,选择“服务集成 > API 网关”,在左侧二级导航栏中,选择“API授权”,在“2_IOC_MQS”API对应的操作列,单击授权按钮。在授权应用区域中,选择应用ID “com.fire.demo.ioc.test01”,单击“授权”。
6. 创建callback API
创建callback API,用于消费IOT设备上报的消息,并将消息Callback至分离告警消息的API中。
步骤1 选择“服务集成 > Live Data”,进入LiveData管理控制台界面。
步骤1 在ROMA左侧导航树中,选择“服务集成 > Live Data”,进入LiveData管理控制台界面。
步骤2 设计API.
在LiveData界面的左侧导航树中,选择“API设计”。单击“新建”。配置API设计信息,如下表所示。
参数名 |
如何配置 |
API基本信息 |
|
名称 |
1_IOC_MQS_TEST01 参见操作场景说明中规划的名称,若提示名称已存在,请修改名称为全局唯一。 |
申请人 |
使用默认值 |
所属应用 |
com.fire.demo.ioc.test01 |
API组 |
在下拉菜单中选择默认值 |
版本号 |
使用默认值 |
描述 |
可不填 |
API调用信息 |
|
请求路径 |
/ioc/mqs1 参见操作场景说明中规划的路径,此路径为接口的访问路径,可自定义。 |
HTTP方法 |
Post |
返回类型 |
JSON(application/json) |
格式化返回 |
使用默认“是” |
Header |
可不填。 |
Parameter |
可不填。 |
步骤3 开发API。 |
|
单击“提交”后,选择“进入开发”页面, “类型”选择“函数API”,单击“确定”,进入API开发界面。
配置“函数服务脚本”, 返回对象选择“result”,修改定义脚本后点击提交。脚本示例如下:
> 提示:脚本中的部分信息需要修改为实际配置。
>
> 脚本定义API订阅MQS中IOT设备上报Topic,并callback消息到分离消息API。
Copied!
importClass(com.huawei.livedata.util.MQSUtils);
importClass(com.huawei.livedata.util.MQSCallback);
importClass(com.huawei.livedata.lambdaservice.util.CacheUtils);
function excute(data) {
var topic_name = "T_IOC_DEVICE_DATA_Test01";
var topic_url = "192.168.9.30:9776;192.168.9.96:9776";
var topic_appId = "com.fire.demo.ioc.test01";
var topic_passWord = "o1mDAgrsmWZiwMMAq57U2g==";
var callback = new MQSCallback();
callback.setHost("192.168.9.116:1080");
callback.setService("/ioc/mqs2");
var headers = {
"X-HW-ID":"com.fire.demo.ioc.test01",
"X-HW-APPKEY":"o1mDAgrsmWZiwMMAq57U2g=="
};
callback.setHeaders(JSON.stringify(headers));
callback.setMethod("POST");
var utils = new MQSUtils();
try {
var result = utils.consumerMsg(topic_url, topic_appId, topic_passWord, topic_name, callback);
return result;
} catch(exception) {
return exception + "";
}
}
参数名 |
如何配置 |
topic_name |
接收IOT平台消息的Topic。此处为T_IOC_DEVICE_DATA_Test01。 |
topic_url |
接收IOT平台消息的TOPIC所在的内部访问地址。可以点击左侧导航栏中“消息集成>消息队列服务MQS”,点击“发布订阅管理>订阅管理”,单击topic前的编辑,点击“查看连接地址”中查看topic的内部访问地址。 |
topic_appId |
接收IOT平台消息的Topic所属的应用的应用ID。 |
topic_passWord |
接收IOT平台消息Topic所属应用的请求密钥。在页面左上方“管理控制台”旁的下拉箭头中,选择“应用中心”,点击“密钥管理”查看明文的应用请求密钥。 |
callback.setService |
分离消息API的请求路径,需要和“创建分离消息API”配置的请求路径保持一致。 |
X-HW-ID |
分离消息API所属的应用ID。 |
X-HW-APPKEY |
分离消息API所属的应用请求密钥。 |
7. 模拟测试
创建模拟API,模拟IOT设备发送消防安全消息至ROMA平台,测试最终在告警TOPIC中的MQS消息队列中是否有分离的告警消息。
步骤1 在ROMA左侧导航树中,选择“服务集成 > Live Data”,进入LiveData管理控制台界面。
步骤2 设计API
在LiveData界面的左侧导航树中,选择“API设计”,进入“API设计”页面。单击“新建”,进入“API设计”页面。配置API设计信息,如下表所示。
参数名 |
如何配置 |
API基本信息 |
|
名称 |
IOC_DATATEST,参见操作场景说明中信息规划部分。 |
申请人 |
使用默认值 |
所属者 |
com.fire.demo.ioc.test01 |
群组 |
使用默认值 |
版本号 |
使用默认值 |
所属应用 |
com.fire.demo.ioc.test01 |
描述 |
可不填 |
格式化返回 |
使用默认“是” |
API调用信息 |
|
请求路径 |
/data/test |
HTTP方法 |
Post |
返回类型 |
JSON(application/json) |
Header |
可不填。 |
Parameter |
可不填。 |
步骤3 开发API。
单击“提交”,提示“创建成功”,点击“进入开发”,API“类型”选择“函数API”。单击“确定”,进入API开发界面。
配置“函数服务脚本”。返回对象填写“result”,编辑定义脚本,脚本示例如下。完成后单击“提交”。
> 提示:脚本定义API模拟IOT设备发送一个预置的告警消息到消息上报TOPIC。
Copied!
importClass(com.huawei.livedata.util.MQSUtils);
function excute(data) {
var obj = JSON.parse(data);
var topic_name = "T_IOC_DEVICE_DATA_Test01";
var topic_url = "192.168.9.30:9776;192.168.9.96:9776";
var topic_appId = "com.fire.demo.ioc.test01";
var topic_passWord = "o1mDAgrsmWZiwMMAq57U2g==";
var msg = {};
var body = {};
var utils = new MQSUtils();
var result = "";
var rtnMsg = {};
try {
body = {
"requestId": "184dae0aa44648679b7e32635e8d2bbe",
"service": {
"serviceId": "ServiceAlarm",
"serviceType": "ServiceAlarm",
"eventTime": "20190226030832",
"data": {
"FireAlarmType": "10"
}
},
"notifyType": "ServiceAlarm",
"deviceId": "6f0bbb6d-1586-4638-83a0-8eaf7bb7f6bb",
"gatewayId": "06c9d00d-f965-4430-86ea-e0021d3aecf8"
};
msg.body = body;
result = utils.sendMsg(topic_url,topic_appId,topic_passWord,topic_name,JSON.stringify(msg));
rtnMsg = {
"resCode":0,
"resMsg":result
};
} catch(exception) {
rtnMsg = {
"resCode":1,
"resMsg":exception +""
};
}
return JSON.stringify(rtnMsg);
}
参数名 |
如何配置 |
topic_name |
接收IOT平台消息的Topic。此处为T_IOC_DEVICE_DATA_Test01。 |
topic_url |
接收IOT平台消息的Topic的内部访问地址。可以点击左侧导航栏中“消息集成>消息队列服务MQS”,点击“发布订阅管理>发布管理”,单击topic前的编辑,点击“查看连接地址”中查看topic的内部访问地址。 |
topic_appId |
接收IOT平台上报消息并进行消息处理的应用。此处为com.fire.demo.ioc.test01。 |
topic_passWord |
应用的请求密钥。在页面左上方“管理控制台”旁的下拉箭头中,选择“应用中心”,点击“密钥管理”查看明文的应用请求密钥。 |
步骤4 使用测试API模拟IOT设备上报消息。
在Live Data左侧导航树中,选择“API测试”,在“IOC_DATATEST”的操作列,单击测试按钮,进入API测试页面。
选择“接入集群”。单击“测试API”,测试结果如下表示API测试成功,表明测试API模拟IOT设备上报一条消息到T_IOC_DEVICE_DATA_Test01。
Copied!
{
"retCode": "0",
"retJSON": {
"result": {
"resCode": 0,
"resMsg": "success status: true"
}
}
}
步骤5 使用callback API消费IOT设备上报的消息,并callback给分离消息API。
> 提示:Callback API消费MQS消息时,五分钟内无二次消费,则会断开连接,实际场景中需要配置定时任务保持接口能不断消费MQS消息。
在Live Data左侧导航树中,选择“API测试”,进入API测试页面。在callback API“1_IOC_MQS_TEST01”的操作列,单击测试按钮,进入API测试页面。
在“API测试参数”页面,选择“接入集群”和“接入节点”,单击“测试API”,测试结果如下表示测试成功,表明callback API已消费IOT设备上报的消息。
Copied!
{
"retCode": "0",
"retJSON": {
"result": "Success to start consumer"
}
}
步骤6 在MQS Portal左侧导航树中,选择“消息查询”,查看消息历史中是否包含“T_ALARM_DATA_Test01”消息。
如果已包含此消息,则说明callback API已从T_IOC_DEVICE_DATA_Test01获取到模拟IOT设备上报的消息,并通过分离消息API将告警消息转发到了T_ALARM_DATA_Test01的topic。
![]()
----结束
8. 恭喜你
祝贺您,您已经成功完成本次Codelabs的操作,有机会赢取精美礼品,点击"闯关有礼"查看活动详情。
![]()
基于沃土数字平台分离IOT设备消息
您在体验过程中有任何疑问,都可以在此留言反馈>>
如需下载本指导文档,请点击此处下载。
1.介绍
什么是ROMA
ROMA是融合集成平台,提供轻量化应用、消息、数据等集成能力,简化企业上云,支持云上云下、跨区域集成,打通IT与OT,连接企业与生态伙伴,助力行业数字化转型。
数据集成:提供多种数据源的快速集成能力,可以在任意时间、任意地点、任意系统之间实现轻量级实时数据集成和定时增量数据迁移。支持文本、消息、API、结构和非结构化数据等多种数据源之间的灵活集成。
应用集成:提供API设计、开发、测试、管理、发布能力,具备API策略路由、统一接入、认证授权、请求验证、流量控制、API调用统计分析能力,以及协议转换、API编排、API调度等业务处理能力。
消息集成:针对企业级互联的专业消息组件,基于高可用分布式集群技术,包括发布订阅、消息轨迹、资源统计、监控报警等一套完整的消息云服务。
本次Codelabs主要体验ROMA的消息集成能力。
本次Codelabs的意义是什么?
实际IOT场景中,设备产生的消息种类比较复杂,消息结构和北向应用所需的结构存在差异,需要将消息分类为不同类型的消息,供北向系统订阅消费。
例如,本次实验中的IOT平台上报的设备消息既有告警消息又有设备消息,如果不进行分离分类,北向系统集成IOT平台消息时,就无法准确区分设备上报的是告警消息还是设备消息。
您将建立什么?
在本次体验中,您将使用ROMA系统建立一个应用,此应用能够将把上报的消息分类为设备消息和告警消息,并以消息队列形式提供给北向系统进行订阅。
您将学会到什么
如何使用ROMA进行消息集成
如何使用ROMA分离分类消息
如何通过ROMA MQS向北向系统提供消息订阅能力
您需要什么?
开发环境及技能要求:
熟悉JAVA Script。
注册成为数字平台开发者。
运行终端要求:
开发计算机(台式机或笔记本电脑)
开发计算机需要安装Google Chrome浏览器。
2. 操作场景说明
在消防场景中,当IOT设备连接的消防探头探测到高温、烟雾,或有人按下消防紧急报警按钮时,消息通过ROMA上报到IOC平台,产生实时告警。
由于实际项目中,IOT产生的消息种类复杂,消息结构和实际应用所需的消息结构存在差异,这里将上报的消息进行了分类,具体实施流程如下图所示。
本示例中的信息规划如下表。
> 注意:同一套ROMA中应用、消息和API的ID、名称、路径都需要保持唯一,所以开发者可以根据下面的示例组合自己的用户名组成唯一的ID、名称和路径,否则在操作过程中,页面会提示此名称已使用。
3. 创建ROMA应用
创建ROMA平台中用于接收消息、处理并上报消息的应用。
步骤1 使用chrome浏览器,登录数字平台开发平台,点击页面右上方“开发者中心>集成开发”,进入ROMA页面。
步骤2 如下图所示在页面上行选择“应用注册”,进入“应用注册”页面。
也可以选择管理控制台后应用名称右侧的小箭头,点击“应用注册”。
步骤3 根据信息规划中“应用”模块的规划,创建ID为com.fire.demo.ioc.test01的应用,认证类型选择“私钥认证”。记录此应用ID和名称,后面的步骤中需要用到这些信息。
> 注意: > > 1. 如果提示应用ID或应用名称已存在或者不可用,说明ROMA系统中已有此应用ID或应用名,请保持应用ID或应用名唯一。
> 2. 文中提示记录的信息,后面的步骤中会用到,为了加快操作,建议提前将这些信息记录到记事本中。
> 3. 私钥认证是采用应用ID和应用密钥进行API、MQS等访问认证的方式,jwt认证则要使用应用ID、时间戳和认证token来进行鉴权认证,才能访问应用的API、MQS等。本次操作为了简化用户操作,采用的是私钥认证方式。
步骤4 在应用管理页面单击新建应用前的“密钥管理”,点击“显示密钥”,查看并记录此应用的请求密钥,后面的步骤中需要用到此密钥。
4. 消息集成:创建并发布Topic
根据数据规划,在MQS上创建3个Topic,T_IOC_DEVICE_DATA_Test01用于接收IOT平台消息,T_DEVICE_DATA_Test01用于接收IOT设备消息,T_ALARM_DATA_Test01用于接收IOT告警消息。
以下示例为如何创建一个T_IOC_DEVICE_DATA_Test01,参考以下示例完成3个topic的创建。
步骤1 在ROMA左侧导航树中,选择“消息集成 > 消息队列服务 MQS”,进入MQS界面。
步骤2 在页面左侧二级导航树中,选择“Topic管理”。
步骤3 单击“创建Topic”按钮。
步骤4 参见信息规划中“MQS”模块的规划,创建名称为T_IOC_DEVICE_DATA_Test01的Topic。
其中“所属应用”选择刚创建的应用“com.fire.demo.ioc.test01”,表明此topic属于此应用。
> 注意:当提示保存失败,且名称已存在时,标明ROMA系统中存在重复的topic名称,请修改topic名称为唯一名称。
步骤5 在“T_IOC_DEVICE_DATA_Test01”前的操作列,单击“发布”。
在发布页面中“发布应用”选择新建的应用“com.fire.demo.ioc.test01”,表明此应用可向此topic写消息。点击“发布”。
步骤6 点击左侧二级导航栏中的“Topic管理”,在“T_IOC_DEVICE_DATA_Test01”前的操作列,单击“订阅”。
“订阅应用”选择新建应用“com.fire.demo.ioc.test01”,表明此应用可向此topic读消息。点击“查看连接地址”,记录内部访问地址,如“192.168.9.30:9776;192.168.9.96:9776”,后面的步骤中会用到。单击“保存”。
步骤7 参考步骤1~6重复创建、发布和订阅T_DEVICE_DATA_Test01和T_ALARM_DATA_Test01两个topic,并记录其中任意一个TOPIC发布信息中的内部访问地址。
5. 创建分离消息API
创建分离消息API,用于分离callback API中回调的消息,将其中的告警消息发送到MQS消息队列T_ALARM_DATA_Test01,设备消息发送到MQS消息队列中T_DEVICE_DATA_Test01。
步骤1 在ROMA左侧导航树中,选择“服务集成 > Live Data”,进入LiveData管理控制台界面。
步骤2 设计API。
> 提示:设计API是初始化API,指定API的所属应用,请求路径,HTTP方法等。
在LiveData界面的左侧导航树中,选择“API设计”,单击“新建”。
配置API设计信息,如下表所示。单击“提交”,提示“创建成功”,完成API的设计。
> 提示:开发API是定义API的具体功能。
在创建成功的页面上,单击“进入开发”。“类型”选择“函数API”,单击“确定”,进入API开发界面。配置“函数服务脚本”,返回对象填写“result”,编辑输入定义脚本,示例如下。点击“提交”。
> 提示:脚本中定义API根据消息的类型,将消息推送到不同的topic中。
importClass(com.huawei.livedata.util.MQSUtils); function excute(data) { //MQS属性 var topic_name1 = "T_DEVICE_DATA_Test01";//分离后非告警消息topic var topic_name2 = "T_ALARM_DATA_Test01";//分离后告警消息topic var topic_url = "192.168.9.30:9776;192.168.9.96:9776";//分离后topic内部访问地址 var topic_appId = "com.fire.demo.ioc.test01";//topic所属的ROMA应用ID var topic_passWord = "o1mDAgrsmWZiwMMAq57U2g=="; //topic所属ROMA应用密钥 var utils = new MQSUtils(); var obj = JSON.parse(data); var msg = obj.body.body; msg = JSON.parse(msg); try { var resBody = {}; //service是对象 if (msg.hasOwnProperty("service")) { if (typeof(msg.service) == "object" && Object.prototype.toString.call(msg.service).toLowerCase() == "[object object]" && !(msg.service).length) { resBody = getMsgBodyByObj(msg); var msgBody = {}; msgBody.body = resBody.msgBody; if (resBody.type === "alarm") { utils.sendMsg(topic_url, topic_appId, topic_passWord, topic_name2, JSON.stringify(msgBody)); } else { utils.sendMsg(topic_url, topic_appId, topic_passWord, topic_name1, JSON.stringify(msgBody)); } } return "success+object"; } } catch (exception) { return exception + ""; } } //对象情况下 function getMsgBodyByObj(paramBody) { var resJson = {}; var msgBody = {}; var serviceId = ""; if (paramBody.service.hasOwnProperty("serviceId")) { serviceId = paramBody.service.serviceId; } var serviceType = ""; if (paramBody.service.hasOwnProperty("serviceType")) { serviceType = paramBody.service.serviceType; } if (paramBody.hasOwnProperty("requestId")) { msgBody.requestId = paramBody.requestId; } else { msgBody.requestId = ""; } if (paramBody.service.hasOwnProperty("eventTime")) { msgBody.eventTime = paramBody.service.eventTime; } else { msgBody.eventTime = ""; } msgBody.data = {}; var msgCode = ""; if (paramBody.service.hasOwnProperty("data")) { var i = 0; for (var key in (paramBody.service.data)) { if (i === 0) { msgCode = key; } msgBody.data[serviceId + "_" + key] = paramBody.service.data[key]; i++; } } if (paramBody.hasOwnProperty("notifyType")) { msgBody.notifyType = paramBody.notifyType; } else { msgBody.notifyType = ""; } if (paramBody.hasOwnProperty("deviceId")) { msgBody.deviceId = paramBody.deviceId; } else { msgBody.deviceId = ""; } if (paramBody.hasOwnProperty("gatewayId")) { msgBody.gatewayId = paramBody.gatewayId; } else { msgBody.gatewayId = ""; } //判断serviceType如果不是ServiceAlarm或DeviceAlarm if (serviceType === "ServiceAlarm" || serviceType === "DeviceAlarm") { msgBody.messageCode = msgCode; } resJson.msgBody = msgBody; if (serviceType === "ServiceAlarm" || serviceType === "DeviceAlarm") { resJson.type = "alarm"; } else { resJson.type = ""; } return resJson; }
步骤4 在Live Data左侧导航树中,选择“API测试”。在“2_IOC_MQS”的操作列单击测试按钮,进入API测试页面。
接入集群:选择任一集群
接入节点:选择任一节点
Headers:无需填写
Parameters:无需填写
Body
内容类型:application/json
请求内容:
{ "body":"{ \"requestId\":\"184dae0aa44648679b7e32635e8d2bbe\", \"service\": { \"serviceId\":\"ServiceAlarm\", \"serviceType\":\"ServiceAlarm\", \"eventTime\": \"20190226030832\", \"data\":{ \"FireAlarmType\":\"10\" } }, \"notifyType\": \"ServiceAlarm \", \"deviceId\":\"6f0bbb6d-1586-4638-83a0-8eaf7bb7f6bb\", \"gatewayId\":\"06c9d00d-f965-4430-86ea-e0021d3aecf8\" }" }
> 注意:如果此处Body无法编辑,说明在设计API时所选的HTTP方法不是post,请返回查看并修改。
单击“测试API”,测试结果如下表示测试成功。可以单击“保存用例”,以便后续需要重复测试。
{ "retCode": "0", "retJSON": { "result": "success+object" } }
步骤5 部署API。
> 提示:部署API是将API部署在API网关上。
在Live Data左侧导航树中,选择“API部署”,在“2_IOC_MQS”API的“操作”列,单击部署按钮,选择“部署到”下拉表中默认的集群。单击“部署”,提示部署成功。
步骤6 授权API。
> 提示:授权API是定义什么ROMA应用可以访问此API,因为本次操作应用的callback API会访问消息分离API,所以消息分离API需要在API网关上授权给应用访问。
在ROMA左侧导航树中,选择“服务集成 > API 网关”,在左侧二级导航栏中,选择“API授权”,在“2_IOC_MQS”API对应的操作列,单击授权按钮。在授权应用区域中,选择应用ID “com.fire.demo.ioc.test01”,单击“授权”。
6. 创建callback API
创建callback API,用于消费IOT设备上报的消息,并将消息Callback至分离告警消息的API中。
步骤1 选择“服务集成 > Live Data”,进入LiveData管理控制台界面。
步骤1 在ROMA左侧导航树中,选择“服务集成 > Live Data”,进入LiveData管理控制台界面。
步骤2 设计API.
在LiveData界面的左侧导航树中,选择“API设计”。单击“新建”。配置API设计信息,如下表所示。
单击“提交”后,选择“进入开发”页面, “类型”选择“函数API”,单击“确定”,进入API开发界面。
配置“函数服务脚本”, 返回对象选择“result”,修改定义脚本后点击提交。脚本示例如下:
> 提示:脚本中的部分信息需要修改为实际配置。 > > 脚本定义API订阅MQS中IOT设备上报Topic,并callback消息到分离消息API。
importClass(com.huawei.livedata.util.MQSUtils); importClass(com.huawei.livedata.util.MQSCallback); importClass(com.huawei.livedata.lambdaservice.util.CacheUtils); function excute(data) { //MQS属性 var topic_name = "T_IOC_DEVICE_DATA_Test01"; //接收设备上报消息的TOPIC名称 var topic_url = "192.168.9.30:9776;192.168.9.96:9776";//接收设备消息的TOPIC的内部访问地址 var topic_appId = "com.fire.demo.ioc.test01"; //Topic所属ROMA应用ID var topic_passWord = "o1mDAgrsmWZiwMMAq57U2g=="; //Topic所属ROMA应用的密钥 //消费成功后回调接口 var callback = new MQSCallback(); //回调接口host,无需更改 callback.setHost("192.168.9.116:1080"); //分离消息API的接口路径 callback.setService("/ioc/mqs2"); var headers = { "X-HW-ID":"com.fire.demo.ioc.test01", //分离消息API所属的ROMA应用ID "X-HW-APPKEY":"o1mDAgrsmWZiwMMAq57U2g==" //ROMA应用的密钥 }; //回调接口headers callback.setHeaders(JSON.stringify(headers)); //回调接口方法类型 callback.setMethod("POST"); var utils = new MQSUtils(); try { var result = utils.consumerMsg(topic_url, topic_appId, topic_passWord, topic_name, callback); return result; } catch(exception) { return exception + ""; } }
7. 模拟测试
创建模拟API,模拟IOT设备发送消防安全消息至ROMA平台,测试最终在告警TOPIC中的MQS消息队列中是否有分离的告警消息。
步骤1 在ROMA左侧导航树中,选择“服务集成 > Live Data”,进入LiveData管理控制台界面。
步骤2 设计API
在LiveData界面的左侧导航树中,选择“API设计”,进入“API设计”页面。单击“新建”,进入“API设计”页面。配置API设计信息,如下表所示。
步骤3 开发API。
单击“提交”,提示“创建成功”,点击“进入开发”,API“类型”选择“函数API”。单击“确定”,进入API开发界面。
配置“函数服务脚本”。返回对象填写“result”,编辑定义脚本,脚本示例如下。完成后单击“提交”。
> 提示:脚本定义API模拟IOT设备发送一个预置的告警消息到消息上报TOPIC。
importClass(com.huawei.livedata.util.MQSUtils); function excute(data) { var obj = JSON.parse(data); var topic_name = "T_IOC_DEVICE_DATA_Test01"; //IOT设备上报消息的TOPIC var topic_url = "192.168.9.30:9776;192.168.9.96:9776";//IOT设备上报消息的TOPIC的内部访问地址 var topic_appId = "com.fire.demo.ioc.test01"; //TOPIC所属的ROMA应用的APPID var topic_passWord = "o1mDAgrsmWZiwMMAq57U2g=="; //TOPIC所属的ROMA应用的密钥 var msg = {}; var body = {}; var utils = new MQSUtils(); var result = ""; var rtnMsg = {}; try { body = { "requestId": "184dae0aa44648679b7e32635e8d2bbe", "service": { "serviceId": "ServiceAlarm", "serviceType": "ServiceAlarm", "eventTime": "20190226030832", "data": { "FireAlarmType": "10" } }, "notifyType": "ServiceAlarm", "deviceId": "6f0bbb6d-1586-4638-83a0-8eaf7bb7f6bb", "gatewayId": "06c9d00d-f965-4430-86ea-e0021d3aecf8" }; msg.body = body; result = utils.sendMsg(topic_url,topic_appId,topic_passWord,topic_name,JSON.stringify(msg)); rtnMsg = { "resCode":0, "resMsg":result }; } catch(exception) { rtnMsg = { "resCode":1, "resMsg":exception +"" }; } return JSON.stringify(rtnMsg); }
步骤4 使用测试API模拟IOT设备上报消息。
在Live Data左侧导航树中,选择“API测试”,在“IOC_DATATEST”的操作列,单击测试按钮,进入API测试页面。
选择“接入集群”。单击“测试API”,测试结果如下表示API测试成功,表明测试API模拟IOT设备上报一条消息到T_IOC_DEVICE_DATA_Test01。
{ "retCode": "0", "retJSON": { "result": { "resCode": 0, "resMsg": "success status: true" } } }
步骤5 使用callback API消费IOT设备上报的消息,并callback给分离消息API。
> 提示:Callback API消费MQS消息时,五分钟内无二次消费,则会断开连接,实际场景中需要配置定时任务保持接口能不断消费MQS消息。
在Live Data左侧导航树中,选择“API测试”,进入API测试页面。在callback API“1_IOC_MQS_TEST01”的操作列,单击测试按钮,进入API测试页面。
在“API测试参数”页面,选择“接入集群”和“接入节点”,单击“测试API”,测试结果如下表示测试成功,表明callback API已消费IOT设备上报的消息。
{ "retCode": "0", "retJSON": { "result": "Success to start consumer" } }
步骤6 在MQS Portal左侧导航树中,选择“消息查询”,查看消息历史中是否包含“T_ALARM_DATA_Test01”消息。
如果已包含此消息,则说明callback API已从T_IOC_DEVICE_DATA_Test01获取到模拟IOT设备上报的消息,并通过分离消息API将告警消息转发到了T_ALARM_DATA_Test01的topic。
----结束
8. 恭喜你
祝贺您,您已经成功完成本次Codelabs的操作,有机会赢取精美礼品,点击"闯关有礼"查看活动详情。
9. 参考文件
端到端开发园区消防安全应用,请参考由浅入深企业应用开发云-ROMA篇。学习如何开发一个应用,一步一步实现消息接入、数据处理和指令下发。
脚本结构可参见《融合集成平台ROMA 开发指南》中“LiveData函数清单”章节。
更多关于数字平台及ROMA的资料和视频,请参考数字平台知识中心。