IEF+Kuiper实现边云协同流数据处理
引导式阅读
Go
IEF+Kuiper实现边云协同流数据处理
作者
C***
上架时间
2022-08-02 11:52:56

在IEF上使用Kuiper实现边云协同流数据处理

0. 版本说明

本示例配套的SDK版本为:不涉及

1. 示例简介

在现代化工业物联网的边缘使用场景中,许多工业车间已有现成系统进行简单的数据采集和初步分析,现希望将分布在各个地点的车间进行整体精细管理,并且在车间的数据上传到云端之前,在边缘端对这些数据进行预分析与处理。

智能边缘平台(Intelligent EdgeFabric,IEF)通过纳管边缘节点,提供将云上应用延伸到边缘的能力,联动边缘和云端的数据,满足客户对边缘计算资源的远程管控、数据处理、分析决策、智能化的诉求,同时,在云端提供统一的设备/应用监控、日志采集等运维能力,为企业提供完整的边缘和云协同的一体化服务的边缘计算解决方案。

EMQ X Kuiper 是由杭州映云科技有限公司开发的开源项目,实现的轻量级物联网边缘分析、流式处理开源软件,可以运行在各类资源受限的边缘设备上。开源项目链接:https://github.com/lf-edge/ekuiper

本文将介绍如何利用华为 IEF 和 EMQ 软件提供的能力,利用分布式云原生技术在边缘端实现对边缘端工业软件和应用的管理,以及工业数据的采集、转发、处理和存储;在云端部署的管理控制台实现对工业数据采集软件的配置,以及流式数据业务逻辑的部署和升级。该方案可以实现工业数据在边缘端的低时延、高吞吐和高安全地处理,是一套实现工业数字转型的理想方案。

系统共分为五个模块:华为IEF控制台、Kuiper管理控制台、用户云端应用edge-manager、边缘节点Kuiper实例、数据采集系统。Kuiper的管理控制台和Kuiper实例之间,是通过华为IEF SystemREST->ServiceBus的管理通道,实现规则的下发。数据的北向传输,则通过SystemEventBus->APIG->云端后台系统的通道,把处理后的数据传到用户的云端应用,再进行后续用户自定义的操作。

  • 云上组件包括IEF的控制台和Kuiper管理控制台。其中,IEF控制台提供边缘节点的管理、边缘应用和边云消息的管理能力;kuiper管理控制台提供了edge-manager用来实现工业数据的管理
  • 边缘组件包括IEF的Edgecore和kuiper的3种容器应用实例。其中,Edgecore作为IEF在边缘侧的控制组件,实现了容器应用的部署和管理能力。

img

2. 运行环境

针对上述使用场景,准备两台虚拟机,推荐使用华为云ECS云虚拟机,分别作为云端节点和边缘节点。

  • 云端部署用户云端应用与Kuiper管理控制台容器镜像,推荐使用华为云CCE云容器引擎。

  • 边缘节点部署Kuiper容器镜像。

使用到的容器镜像均可以从dockerhub镜像中心仓上下载,使用的镜像和版本如下:

  • lfedge/ekuiper:1.4.1-slim-python
  • emqx/emqx-edge:latest
  • emqx/edge-manager:1.2.1
  • emqx/neuron:1.2.1

img

3. 部署运行步骤

部署运行步骤主要包括以下5步:

  1. 通过IEF对边缘节点进行纳管
  2. 在边缘节点上部署Kuiper容器镜像
  3. 在云端节点上部署Kuiper管理控制台容器镜像与用户云端应用
  4. 创建边云消息通道
    • 定义从云端到边缘的消息路由:SystemREST -> ServiceBus
    • 定义从边缘到云端的消息路由:SystemEventBus -> APIGW -> 云端后台系统
  5. 通过Kuiper管理界面,部署以下业务逻辑规则
    • 将温度数据大于40度的数据上传到云端,不满足条件的数据直接丢弃

4. 通过IEF对边缘节点进行纳管

  1. 打开华为云IEF产品网站:智能边缘平台(https://www.huaweicloud.com/product/ief.html),使用自己的账号登陆,打开页面后,左侧菜单选择 “边缘资源” -> "边缘节点" ,右上角点击 “注册边缘节点” 。填写边缘节点名称,这次暂不使用AI加速卡。

img

  1. 接下来在 “是否启用docker” 选择 “是” 。在 “系统日志” 和 “应用日志” 中可以按照自己实际情况进行选择。

img

  1. 点击右下角 “注册”,下载配置文件与边缘节点安装程序。

img

下面在边缘节点上进行节点软件的安装。

  1. 根据边缘节点的IP地址、账号和密码,从电脑端使用ssh方式登入到边缘节点上。

  2. 在边缘节点上执行上述第3步骤中的安装命令,完成IEF对边缘节点的纳管。

$ sudo tar -zxvf edge-installer_1.0.9_x86_64.tar.gz -C /opt $ sudo mkdir -p /opt/IEF/Cert; sudo tar -zxvf kuiper_demo_test.tar.gz -C /opt/IEF/Cert $ cd /opt/edge-installer; sudo ./installer -op=install
  1. 在华为云IEF智能边缘平台上查看边缘节点的运行状态,状态从 “未纳管” 显示为 “运行中” 。至此,通过IEF对边缘节点的纳管已经完成。

img

5. 在边缘节点上部署容器

部署ekuiper容器

从docker中心仓下载ekuiper镜像,在IEF界面左侧菜单选择“边缘应用”->“容器应用”,点击右上角“创建容器应用”。在出现的界面左下角,点击“上传镜像”。

img

在出现的界面中,点击右上角“上传镜像”,上传已下载的Kuiper容器镜像。

img

回到IEF选择镜像界面,刷新镜像列表,选择刚刚上传成功的Kuiper镜像。

img

容器的规格根据需要设置,默认设置为0.25coreCPU和512MB内存,不使用ai加速卡。

在高级配置里添加三个环境变量配置:

MQTT_SOURCE__DEFAULT__SERVERS = [tcp://192.168.0.161:1883] KUIPER__BASIC__CONSOLELOG = true KUIPER__BASIC__IGNORECASE = false

其中,MQTT_SOURCE__DEFAULT__SERVERS 表示边缘节点上开放的MQTT broker的地址,格式为tcp://{MQTT Broker弹性公网IP}:{端口},如果不部署emqx-edge应用,则使用边缘节点公开的MQTT Broker地址

然后创建端口映射,从9081映射到9081:

登录边缘节点,查看容器状态:

$ docker ps

显示kuiper:1.4.1-slim-python容器正在运行中。至此,在边缘节点上部署Kuiper实例已完成。

部署ekuiper-emqx-edge容器镜像

emqx-edge提供了一个边缘侧的MQTT Broker,与部署ekuiper容器方法类似,从docker中心仓中下载emqx-edge容器镜像

$ docker pull emqx/emqx-edge:latest

在IEF前端“创建容器应用”并上传镜像,选择emqx-edge:1.2.1版本

容器的规格根据需要设置,然后创建三个端口映射

"8081:8081" "2883:1883" "28083:18083"

登录边缘节点,查看容器状态:

$ docker ps

显示emqx-edge:1.2.1容器正在运行中。至此,在边缘节点上部署emqx-edge实例已完成。

部署ekuiper-neuron容器镜像(可选)

Neuron将连接模拟的Modbus设备,读取模拟设备的值,并将其转换为MQTT协议数据发送到emqx/edge中与,部署ekuiper容器方法类似,从docker中心仓中下载neuron容器镜像后

$ docker pull emqx/neuron:1.2.1

在IEF前端“创建容器应用”并上传镜像,选择neuron:1.2.1版本

容器的规格根据需要设置,然后创建三个端口映射

"7681:7681" "1947:1947" "7000:7000"

登录边缘节点,查看容器状态:

$ docker ps

显示neuron:1.2.1容器正在运行中。至此,在边缘节点上部署neuron实例已完成。

6. 在云端节点上部署Kuiper管理控制台容器镜像与用户云端应用

部署Kuiper管理控制台

华为云容器引擎(Cloud Container Engine,CCE)提供高可靠高性能的企业级容器应用管理服务,支持Kubernetes社区原生应用和工具,简化云上自动化容器运行环境搭建。用户在使用时,如果云上节点部署在华为云容器引擎CCE服务内,可以使用CCE来将edge-manager的容器部署在云端节点上。

与在IEF界面操作类似,在云容器引擎CCE界面点击“工作负载” -> “创建工作负载”

然后创建工作负载的访问方式,访问类型选择nodeport,亲和性选择“集群访问”,端口映射选择9082:9082,节点端口自动生成,也可以选择指定为特定值,这里指定为31853

在CCE界面显示容器运行中,至此通过CCE部署容器已完成

除了使用CCE外,为了方便起见,我们也可以直接在云端节点上使用docker直接部署edge-manager的镜像,

$ docker run -d -p 9082:9082 emqx/edge-manager:1.2.1

登录云端节点,查看容器状态:

$ docker ps

显示edge-manager:1.2.1容器正在运行中。

至此,在云端节点上部署Kuiper管理控制台实例已完成。

注册云端应用的apig

编写一个http server服务器,监听9082端口,将收到的消息打印出来,模拟用户云端应用

package main import ( "fmt" "io" "io/ioutil" "net/http" ) func handler(w http.ResponseWriter, r *http.Request) { reqBody, _ := ioutil.ReadAll(r.Body) io.WriteString(w, string(reqBody)) fmt.Println(string(reqBody)) } func main() { http.HandleFunc("/emqx", handler) err := http.ListenAndServe(":9082", nil) if err != nil { fmt.Printf("ListenAndServe: %s\n", err.Error()) } }

将云上应用edge-manager的http服务器的API注册到华为云APIG服务。在服务列表中搜索APIG,点击 “API网关 APIG” ,左侧菜单点击 "开放API" -> “API管理”,右上角点击 “新建API”。填写API名称、所属分组,类型选择公开,安全认证选择华为IAM认证:

点击下一步。请求协议选择HTTP,填写请求Path,Method选择POST。

继续点击下一步,请求方式选择POST,后端请求Path与之前我们自己编写的http server匹配,这里是/emqx。

点击管理VPC通道,填入通道名称与端口,端口也是和之前编写的http server监听的端口匹配,这里是9082。

继续点击下一步,添加云服务器。选择部署了edge-manager的云端节点。添加后,右下角点击完成。

回到刚才APIG注册的最后一步,选择刚刚创建的VCP通道,右下角点击 “立即完成” 。发布刚刚注册的API:

至此,用户云端应用的API已经可以通过外网访问,部署完成。

7. 创建边云消息通道

为了实现流式数据规则的下发和流式处理结果的上传,我们需要创建两条数据通道,分别是:从云端->边缘、从边缘->云端。整体可以分为两步进行:创建消息端点与创建消息路由。

创建消息端点

  1. 创建云端消息端点。消息端点类型选择APIG云上端点,消息端点名称自定义。

img

  1. 创建ekuiper边缘消息端点。消息端点类型选择Service Bus,填写消息端点名称,服务端口填写边缘节点ekuiper服务对应的9081端口。

  1. 创建neuron边缘消息端点。消息端点类型选择Service Bus,填写消息端点名称,服务端口填写边缘节点neuron服务对应的7000端口。

创建消息路由

智能边缘平台IEF左侧菜单选择 “边云消息” -> "消息路由"。右上角 “创建消息路由”。SystemREST和SystemEventBus为系统默认端点,无需创建。其中SystemREST代表该region下IEF云端REST Gateway接口。SystemEventBus代表边缘节点的MQTT Broker。

  1. 云端下发消息至边缘节点。通过调用开放在公网的IEF云端的REST Gateway接口,结合节点ID和自定义topic,向边缘节点中的mqtt发送消息。设备可以通过订阅对应的自定义topic进行消息接收,实现自定义topic的从云到边的消息下发。源端点选择SystemREST。源端点资源填写符合URL路径的以 “/” 开头的字符串,这里使用“/kuiper”。整个源端点资源为调用REST接口时的匹配字段。目的端点选择边缘节点,目的端点资源为消息转发至MQTT时对应的Topic前缀,这里填入 “/” 。

  1. 边缘节点上报消息到云端。SystemEventBus到DIS/APIG服务:可以将设备数据发送到边缘节点SystemEventBus(MQTT Broker)的自定义Topic中,IEF会将这些数据转发到DIS通道或APIG后端地址。数据转发到DIS通道或者APIG后端地址后,可以提取这些数据,并对数据进行处理分析。这里,源端点选择SystemEventBus,源端点资源选择 “自定义topic” ,节点选择边缘节点kuiper_demo,topic自定义,这里填入alert。目的端点选择刚刚创建的云端端点demo_apig。目的端点资源为刚刚注册APIG的API URL,可以在APIG服务里查看。

创建好的消息路由如下:

8. 创建相关业务规则,下发到边缘节点

在浏览器中输入http://{云端节点弹性公网IP}:31853,打开Kuiper管理控制台,如果使用docker直接在云端节点部署edge-manager的方式,则需要使用9082端口。登入,初始用户名是admin、初始密码是public。

创建kuiper服务

点击右上角“添加服务”, 服务管理类型选择Kuiper,端点URL的组成为https://{endpoint}/{node_id}/{topic},这里的topic选择注册消息路由时填写的topic

其中,所有局点的endpoint可以在文档 “IEF用户指南->边云消息->云端下发消息到边缘节点” 中找到。AK/SK(Access Key ID/Secret Access Key)即访问密钥,包含访问密钥ID(AK)和秘密访问密钥(SK)两部分,华为云通过AK识别用户的身份,通过SK对请求数据进行签名验证,用于确保请求的机密性、完整性和请求者身份的正确性。可以在账号“我的凭证”中获取访问密钥(AK/SK)。

常见问题:提示“no auth for the url”,原因是kuiper容器重新部署了,导致用户id改变,旧的缓存id没有了权限;解决方法是退出平台重新用admin登陆。

创建文字流

添加服务成功之后,点击刚刚添加的kuiper服务,在“流管理”中创建流。

如下图,创建一个名为demo的流。

  • 用于订阅地址为 tcp://{边缘侧emqx-edge的MQTT Broker弹性公网IP}:1883 的MQTT服务器消息。
  • 数据源指的是Kuiper订阅的MQTT消息的topic。其中的“+”是通配符。
  • 流结构体定义包含了以下两个字段:
    • temperature : float
    • humidity : bigint
  • 流类型可以不选择,不选则默认为缺省的mqtt,或者直接选择mqtt。
  • 配置组,与流类型类似,不选的话,使用缺省的default。
  • 流格式,与流类型类似,不选的话,使用缺省的json。

创建文字规则

接下来新建规则。将温度数据大于40度的数据上传到云端,不满足条件的数据直接丢弃。

SELECT * FROM demo WHERE temperature > 40

动作(Actions)部分定义了处理后的数据的转发操作。支持mqtt、rest等多种方式。具体格式说明参见 https://github.com/emqx/kuiper/tree/master/docs/zh_CN/rules/sinks 。我们这里使用mqtt,将处理后的数据发布到边缘节点本地上的MQTT broker。IEF通过订阅相关topic,利用边云通道,将订阅的数据转发到云上APIG,由用户云端应用进行处理。

点击“添加”动作按钮,弹出对话框如下:

Actions内的MQTT服务器地址字段值为tcp://{MQTT broker的弹性公网IP}:1883。Topic和IEF消息路由界面的SystemEventBus保持一致。

在kuiper管理界面上启动该规则:

接下来测试一下,该规则是否对边缘节点的数据生效。

在边缘节点安装MQTT的客户端mosquitto,以EulerOS为例

yum -y install mosquitto

用mosquitto_pub发布6条数据:

在云端节点的用户应用http server,可以看到,接收到的是已经过规则过滤后的数据。Kuiper下发的规则生效,整个流程完成。

创建图片流

也可以使用kuiper新建图片流换和规则,根据图片流接入的二进制照片数据,利用一些人工智能模型进行推断

点击刚刚添加的kuiper服务,在“流管理”中创建流。 创建一个名为tfdemo的流,不使用带结构的流,流类型选择mqtt,流格式选择BINARY

在边缘节点用编写一个mosquitto_pub发布数据,ip填入边缘节点的ip地址,参考代码如下:

// pub.go // // Copyright (c) Huawei Technologies Co., Ltd. 2021-2022. All rights reserved. // package main import ( "fmt" "strings" "io/ioutil" "time" mqtt "github.com/eclipse/paho.mqtt.golang" ) const ( imageDir = "./imgs/" topic = "tfdemo" ) func getImages(dirPath string) ([]string, error) { dir, err := ioutil.ReadDir(dirPath) if err != nil { return nil, err } files := make([]string, 0) for _, f := range dir { if strings.HasPrefix(f.Name(), ".png") || strings.HasPrefix(f.Name(), ".jpg") { files = append(files, dirPath+f.Name()) } } return files, err } func main() { images, err:= getImages(imageDir) if err != nil { fmt.Println(err) return } opts := mqtt.NewClientOptions().AddBroker("tcp://<ip>:21883") client := mqtt.NewClient(opts) if token := client.Connect(); token.Wait() && token.Error() != nil { panic(token.Error()) } for _, image := range images { fmt.Println("Publishing " + image) payload, err := ioutil.ReadFile(image) if err != nil { fmt.Println(err) continue } if token := client.Publish(topic, 0, false, payload); token.Wait() && token.Error() != nil { fmt.Println(token.Error()) } else { fmt.Println("Published " + image) } time.Sleep(1 * time.Second) } client.Disconnect(0) }

总结

本文介绍了EMQ X Kuiper与华为云IEF的集成解决方案,实现边云协同流数据处理能力。主要有:

  • 如何利用 IEF SystemREST->ServiceBus 通道进行 Kuiper 容器的管理,包括流、规则的管理。
  • 如何利用IEF EventBus->APIG 数据通道,将 Kuiper 的流式处理结果发送至云端的数据服务,或者自有的应用。

IEF+Kuiper的解决方案把数据进行就近处理,避免不必要的时延、成本和安全问题,能够大幅提升在边缘端的实时流式消息处理效率。

修订记录

发布日期 文档版本 修订说明
2022-7-22 1.0 文档首次发布