了解如何使用 Flux。
如果已部署 IoT 解决方案,则必须决定在何处以及如何存储所有数据。至少从我的角度来看,存储传感器数据的最佳和最简单的地方,当然是InfluxDB。
我说这不能让你感到惊讶。要存储的其他数据呢?传感器数据?如传感器制造商、投入使用日期、客户等 ID、操作平台类型等。您知道,所有传感器元数据。
也许你还喜欢:
旧通量,新通量
当然,解决方案是简单地添加所有内容作为标签 InfluxDB 在传感器数据中,然后继续你的一天。所有的传感器数据都应该与每个数据点一起存储吗?当时,很多事情似乎都是个好主意,但当现实来临时,事情很快就变成了一个可怕的想法。
因为大多数元数据不会经常改变,可能与客户信息相关,所以在传统中 RDBMS 它的最佳位置很有可能。很有可能你已经有了一个包含客户数据的 RDBMS,那为什么不继续利用投资呢?
正如我反复说过的,这不是传感器数据的最佳位置。现在,您可以在两个不同的数据库中获得 IoT 数据。如何访问并将其合并到一个位置,在那里你可以看到它的一切?
通量是答案
告诉我你必须看到这一点。嗯,公平地说,你可能有,因为,毕竟,你将如何获得你的基础SQL的数据通过Flux?这就是Flux美:可扩展!
我们现在有一个扩展,允许你通过 Flux 从 MySQL、MariaDB 或 Postgres 读取数据。当我听说这个的时候。SQL当连接器准备就绪时,我不得不尝试一下。我会告诉你我建了什么,怎么做。
建立客户数据库
首先要做的是建立一个MySQL数据库和一些客户信息。 IoTMeta 调用数据库,放置一个带有一些传感器元数据的表。我还添加了另一个表,包括这些传感器的客户信息。
非常基本的表。Sensor_ID我填的字段和 Sensor_id InfluxDB 例子中标记了相应的数据。我敢打赌,看看我该去哪里。我添加了很多数据。
com.cn/wp-content/uploads/2019/12/Screen-Shot-2019-11-14-at-11.48.38-AM.png”标题=”Customer_ID”/*
现在,我的传感器元数据数据库有一些关于我在这里运行的每个传感器的信息,以及一些关于谁拥有传感器的”客户数据”。现在是时候把这一切变成有用的东西了。
使用通量查询数据
首先,我在 Flux 我建立了一个查询来获取我的一些传感器数据,但我对传感器数据本身并不感兴趣。我正在寻找和识别标记值: Sensor_id 。这个查询看起来有点奇怪,但最终会有意义,我保证。
temperature = from(bucket: "telegraf") |> range(start: v.timeRangeStart,
stop: v.timeRangeStop) |> filter(fn: (r) => r._measurement == "temperature"
and (r._field == "temp_c")) |> last() |> map(fn: (r) => { return
{ query: r.Sensor_id } }) |> tableFind(fn: (key) => true) |> getRecord(idx: 0)
它回到一行表,然后拉出来 Sensor_id 在那里你可能会说哇?请记住:Flux 返回表中的所有内容。我基本上需要的是表中的标量值。在这种情况下,它是相关标记的字符串值。你是怎么做到的?
接下来,我会得到的 MySQL 数据库的用户名和密码可以很容易地存储在数据库中 InfluxDB 机密存储。
uname = secrets.get(key: "SQL_USER") pass = secrets.get(key: "SQL_PASSW")
等等,我是怎么把这些值存储在这个秘密里的?好吧,我们回来一分钟。
curl -XPATCH http://localhost:9999/api/v2/orgs//secrets -H
//'Authorization: Token ' -H 'Content-type: application/json'
//--data '{ "SQL_USER": "" }'
需要注意的是,你把它们从 URL 获取。它不是 InfluxDB 中组织的实际名称。 然后你为 SQL_PASSW 这个秘密做同样的事情。你可以告诉他们你想要什么。现在,您不必以纯文本的形式输入用户名/密码。
接下来,我将用所有这些来构建它们 SQL 查询:
sq = sql.from( driverName: "mysql", dataSourceName:
"${uname}:${pass}@tcp(localhost:3306)/IoTMeta",
query: "SELECT * FROM Sensor_data, Customer_Data WHERE
"Sensor_data.Sensor_ID = ${"\"" temperature.query "\"
"AND Sensor_data.measurement = \"temperature\"
"AND Sensor_data.CustomerID = Customer_Data.Customer_ID"}"
//"SELECT * FROM Sensor_data WHERE Sensor_ID = ${"\"" temperature.query "\"
//AND measurement = \"temperature\""}" //q // humidity.query //"SELECT *
//FROM Sensor_Data WHERE Sensor_ID = \"THPL001\""// humidity.query )
你会看见我在 SQL 第一个用于查询 Flux 查询值。酷!你可能还注意到我在执行 join 该 SQL 查询,以便从数据库中的两个表中获取数据。有多酷?接下来,我将格式化生成的表,以便只显示要显示的列:
fin = sq |> map(fn: (r) => ({Sensor_id: r.Sensor_ID,
Owner: r._Sensor_owner, Manufacturer: r.Sensor_mfg,
MCU_Class: r.MCU_class, MCU_Vendor: r.MCU_vendor,
我现在有一个表,包括关于我的传感器的所有元数据所有关于传感器的客户联系人数据influxdata.com/wp-content/uploads/Screen-Shot-2019-11-14-at-12.12.19-PM.png”rel=不跟随
这个巫术是什么?我有一个表,包括传感器的所有元数据、一些客户数据和传感器读数?是的。我愿意。这是真正的魔力:因为它可以从SQL数据库和 InfluxDB存储桶中获取数据,因此数据也可以连接到单个表中。
以下是我这样做的方式:
temp = from(bucket: "telegraf") |> range(start: v.timeRangeStart,
stop: v.timeRangeStop) |> filter(fn: (r) =>
r._measurement == "temperature" and (r._field == "temp_c"))
给我一个传感器数据表。我已经有一个来自 SQL 元数据表。
j1 = join(tables: {temp: temp, fin: fin}, on: ["Sensor_id"] )
|> map(fn: (r) => ({_value: r._value, _time: r._time,
Owner: r.Owner, Manufacturer: r.Manufacturer, MCU_Class: r.MCU_Class,
MCU_Vendor: r.MCU_Vendor, Customer: r.Customer, Address: r.Address,
Phone: r.phone})) |> yield()
我只是在一个公共元素(字段)上添加这两个表 Sensor_id ,我有一张桌子,它的所有内容都放在一个位置!
有几种方法可以使用这个功能来合并来自不同源的数据。为了更好地了解您的传感器部署,我想听听您如何实现这些功能。
我已经使用 InfluxDB 2.0 的 Alpha18 版本完成了所有这些工作,这是我操作的 — 我从 中自定义构建我的版本, master 因为我对 Flux 有一些新功能,但这是另一个帖子。对于这些事情,OSS InfluxDB 2.0 的 Alpha 正常施工。你应该试试!
进一步阅读