近年来,物联网技术发展迅速,广泛应用于智能旅游、智能产业、智能家居等场景。无论应用什么场景,都离不开数据这个过程。然而,不同的应用程序有不同的数据特性和业务需求,因此实现上述流程所需的产品组件也会有所不同。一般来说,我们可以根据应用程序和数据特性分为三种场景:。
以智能汽车场景为例,车辆定期更新当前最新状态信息,如发动机当前速度、当前速度等。这些描述车辆最新状态信息的数据称为元数据。在智能汽车的驾驶过程中,车辆的状态数据会随着时间的变化而变化,如车速、胎压等。描述车辆历史状态信息的数据称为时间数据。
另一个数据场景是控制车辆行为的指令信息,如调整车辆空调温度指令的发布和车辆执行命令后的结果反馈。这些控制指令的上下下降称为新闻数据。不同类型的数据应用场景不同,因此对存储系统的需求也不同。
本文主要分析了设备元数据场景的业务需求,以及如何选择合适的存储组件和实现方案。
首先以以分析具体业务需求为例。
在工业生产过程中,设备通常在固定周期(或事件触发)上报最新运行状态信息,即上述设备元数据,如设备 ID ,当前运行温度、湿度、压力值等。在业务中,根据设备元数据管理设备,如查询设备当前运行状态、多条件在线检索设备、循环选择合格设备集合。通过分析设备元数据,实时监控设备的运行状态,及时响应异常,避免故障。
以下是对业务需求的总结:
a. 设备状态数据定时上报,通过数据网关上云存储。
b. 存储侧需要实时更新大型设备元数据。
c. 存储侧需要支持根据任何设备指标查找设备。如果查询设备数量较少,我们称之为设备检索;如果一次搜索的设备数量非常大,我们称之为设备圈选择。
d. 设备状态更新后,存储侧需要支持实时监测异常状态。
根据上述场景要求,我们可以总结为存储侧的几个功能要求:
元数据存储场景对数据库的规模、性能、查询能力等方面的要求。一般来说,
到这里,大家可能会想到使用
最后,我们来看看
表格存储 Tablestore 是云上的结构化数据存储产品,产品功能和生态极其丰富,提供物联网存储 Iotstore、宽表引擎、多索引等能力满足时间数据、新闻数据、元数据场景的需求。本文不解释时间顺序和新闻数据场景的解决方案。让我们来看看 Tablestore 为满足元数据存储的场景需求提供了力。Tablestore 采用多引擎存储架构,实现不同场景需求的原理不同,先看图片:
宽表引擎是负责设备元数据存储和更新的分布式数据表。宽表引擎采用多分区(shard)每个分区对应一个 的分布式结构worker。在编写元数据的过程中,路由节点根据主键的范围将路由编写到不同的分区。当一个分区的负载达到瓶颈时,服务端将自动分为多个分区,从而提高宽表引擎的整体吞吐量。如下图所示:
与宽表引擎相比,索引擎底部采用倒排索引、空间索引等存储结构,可支持任何数据组合的检索和聚合。索引擎提供了两个查询接口:search 和 parallelScan。
search 接口: 支持多条件组合查询、模糊查询等能力,满足设备检索的场景需求。parallelScan 接口: 支持多并发数据检索,可大大提高数据返回速度,适用于设备元数据圈选择场景。
Tablestore 提供和 Flink 实时计算对接能力可用作 Flink 的数据源表将实时变化的设备元数据推送到 Flink 计算,实现元数据检测的业务场景。同时,支持将计算结果写入 Tablestore 在数据表中,记录异常数据结果。
分享完元数据存储场景和 的需求Tablestore 的技术原理之后,让我们来看看如何构建设备元数据场景的物联网架构。物联网数据上报网关后,根据不同的应用类型,一般有三种数据流,即应用服务器订阅消费、持久存储和转发到消息队列。设备元数据的主要需求是存储和计算,因此我们可以列出一个简单的数据流过程:
基于云上搭建的元数据管理平台架构如下图所示:
上述架构图包括物联网平台、表格存储 三个组成模块Tablestore、实时计算 Flink。架构中各模块的角色和功能如下:
物联网平台: 负责设备元数据访问、设备管理和数据转发。表格存储 Tablestore: 数据表负责设备元数据的存储、更新和查询。多元索引负责设备检索和循环选择。通道服务提供和实时计算 Flink 对接能力。实时计算 Flink: 负责流程计算设备元数据的变化,可以将计算结果写回 Tablestore 存储或对接 Kafka 等消息队列订阅消费。
假设一家工业制造商拥有100万台智能设备,每台设备定期更新其温度、湿度、压力数据等状态数据,并准备使用上述方案架构建立元数据管理平台。
CREATE TABLE device_meta_data ( device_name varchar(100), -- 设备名 humidity decimal(5,2), -- 设备当前湿度 pressure decimal(5,2), -- 设备当前压力值 temperature decimal(5,2), -- 设备当前温度 location varchar(20), -- 设备位置坐标 timestamp long -- 数据上报时间戳 );
1.创建表格存储实例
首先需要在 Tablestore 中创建实例 metadata-db(数据库),并创建存储元数据和异常结果数据的两张数据表,表名分别为 device_meta_data 和 device_errorResult_data,操作步骤如下图所示。
然后,创建元数据表和异常数据表。
2.创建物联网设备
在物联网平台控制台中创建产品(product_metadata),物模型(默认模块)、设备(test_deviceName)。一个产品相当于同类型设备的抽象,而物模型则是对设备上报数据结构的定义。
配置物模型
注册设备,获取设备证书。
在物联网平台中创建解析器,解析器负责将设备元数据更新的 topic 进行自定义脚本处理后,写入到 Tablestore 中持久化存储。如下图所示:
创建解析器需要分别配置数据源、解析脚本和数据目的。通过物模型上报的设备元数据会汇聚到名为
通常设备上报数据是基于设备端 SDK 开发程序来上报数据到物联网平台中,但为了快速实现方案,本文中使用物联网平台提供的设备模拟器来进行数据上报操作。如下图的一个模拟设备上报了一条数据:
3.设备数据实时采集分析
设备元数据经过解析器处理后存储到 Tablestore 中,到此我们已经完成了设备元数据采集与存储的所有操作。由于设备模拟器只能模拟单台设备元数据产生的过程,为了更贴近真实的业务场景,我们直接在 Tablestore 中生成了一亿条设备元数据以供后续步骤的实现。通过上文我们可以得知,设备检索与圈选需要依赖 Tablestore 的多元索引功能,所以首先我们需要创建一个多元索引 device_meta_data_index:
Tablestore 提供了 SQL、控制台或 SDK 等多种数据访问方式。以 SQL 为例,首先通过 SQL 确认设备元数据已正确导入。
select count(*) from `device_meta_data`;
查询 device_name = "test_deviceName" 的设备元数据。
select * from `device_meta_data` where device_name = 'test_deviceName';
4.设备检索和圈选
设备检索和圈选依赖 Tablestore 的多元索引能力,分别采用 search 和 parallelScan 接口来实现。
先来说设备检索,设备检索通常不会返回比较大的数据结果集,即使符合查询条件的数据量很大,也会采取分页的策略返回。针对于此场景,Tablestore 的多元索引功能采用了倒排索引、空间索引等数据结构来加速数据检索速度,例如下面一个例子:
检索设备温度低于 20 摄氏度,压力值在 0.5 kpa 至 1.0 kpa 之间,并且设备名以 “e3” 开头的设备。
select * from device_meta_data where temperature < 20 and pressure between 0.5 and 1.0 and device_name like 'e3%';
设备圈选通常返回的数据量比较大,如果使用 SQL 只能单并发查询和返回,这样性能显然是不高的。针对这个场景,Tablestore 推出了 parallelScan 接口来实现,parallelScan 接口的优化如下图所示:
在执行设备元数据圈选之前,首先会调用 ComputeSplit 接口根据多元索引中的数据量级来计算最大可执行的查询并发度。然后会按照多并发的方式分别查询多个数据分区,每一并发只负责一部分数据的查询和返回,通过这种方式可以极大地提高数据圈选的速度。如下面一个例子:
需要圈选设备名以“a”开头并且设备温度低于 10 摄氏度的所有设备。
//获取并发数int splitsSize = client.computeSplits(computeSplitsRequest).getSplitsSize();//多线程执行run(){ ParallelScanRequestparallelScanRequest = ParallelScanRequest.newBuilder().tableName(tableName) .indexName(indexName) .scanQuery(ScanQuery.newBuilder() .query(QueryBuilders.bool() .must(QueryBuilders.wildcard("device_name","a*"))//查询条件 .must(QueryBuilders.range("temperature").lessThan(10)) ) .maxParallel(splitsSize) //返回当前并发命中的结果集 RowIterator iterator =client.createParallelScanIterator(parallelScanRequest);}
从上面代码的运行结果可以看出,查询的数据结果将以多并发的方式返回,每个并发都包含了一部分查询命中的数据。parallelScan 接口在大规模数据圈选、数据导出场景下,平均单个并发可以达到
5.设备实时监控
设备实时监控依赖于 Tablestore 的通道服务能力,通道服务可以直接对接实时计算 Flink 来实现元数据流计算,如下图所示:
在实时计算中创建了源表、结果表和流计算作业。其中源表依赖 Tablestore 中设备元数据中创建增量通道服务,通道服务将设备元数据的实时变化数据推送到 Flink 计算引擎中进行流计算作业。而本案例中流计算作业将异常的设备状态数据保存到了 Tablestore 的异常结果数据表中,后续可通过异常数据结果表中查询某台设备的异常记录信息。针对于异常数据的处理还有一种业务场景是推送到 Kafka 等消息队列中进行处理,如告警机制接入、短信通知等。
为了更直观地看到流计算作业效果,我们可以在设备元数据表中更新一条设备元数据:
update `device_meta_data` set `pressure` = 2.0 where `device_name` = "test_deviceName"
经过流计算引擎处理后,会判定该条数据为异常数据,并自动写入到异常结果数据表中。
select * from `device_errorResult_data` where `device_name` = "test_deviceName"
本文分享了设备元数据的场景特点和架构实践。设备元数据场景中业务需求对存储组件的存储规模、查询方式以及计算性能有很高的要求,从
最后,
☞ 中国云计算第一股关停 IoT云服务
☞ 2022年IoT平台趋势:私有化部署
☞ 国内MCU行业发展研究报告
☞ 2021年4G通信模组企业排行
☞ 国内4大 IoT物联网平台选型对比
☞ 云厂商的[IoT物联网平台]不香了吗?