基于Spark实时检测和分析物联网设备故障
- 一、业务场景
- 二、数据集说明
- 三、操作步骤
-
- 第一阶段,启动HDFS、Spark集群服务
- 阶段二、准备案例中使用的数据集
- 阶段3.处理实时数据
- 阶段自行练习
一、业务场景
在滑动窗口上实时计算所有计算机机架的平均温度,发现异常温度,及时检测故障。
二、数据集说明
本案中使用的实时数据说明如下: ??- 原始数据文件位于本地/data/dataset/streaming/iot/file1.json和/data/dataset/streaming/iot/file2.json
两个机架的温度数据在数据中心检测到: ??file1.json:
???{
“rack”:”rack1”,”temperature”:99.5,”ts”:”2017-06-02T08:01:01”} ???{
“rack”:”rack1”,”temperature”:100.5,”ts”:”2017-06-02T08:06:02”} ???{
“rack”:”rack1”,”temperature”:101.0,”ts”:”2017-06-02T08:11:03”} ???{
“rack”:”rack1”,”temperature”:102.0,”ts”:”2017-06-02T08:16:04”}
file2.json:
???{
“rack”:”rack2”,”temperature”:99.5,”ts”:”2017-06-02T08:01:02”} ???{
“rack”:”rack2”,”temperature”:105.5,”ts”:”2017-06-02T08:06:04”} ???{
“rack”:”rack2”,”temperature”:104.0,”ts”:”2017-06-02T08:11:06”} ???{
“rack”:”rack2”,”temperature”:108.0,”ts”:”2017-06-02T08:16:08”}
三、操作步骤
第一阶段,启动HDFS、Spark集群服务
1、启动HDFS集群 ??在Linux输入以下命令,在终端窗口下启动HDFS集群:
1. $ start-dfs.sh
2、启动Spark集群 ??在Linux输入以下命令,在终端窗口下启动Spark集群:
1. $ cd /opt/spark 2. $ ./sbin/start-all.sh
3.验证上述过程是否已启动 ??在Linux输入以下命令,查看终端窗口下启动的服务流程:
1. $ jps
若显示以下五个流程,则表明各项服务启动正常,可继续下一阶段。
1. 2288 NameNode 2. 2402 DataNode 3. 2603 SecondaryNameNode 4. 2769 Master 5. 2891 Worker
阶段二、准备案例中使用的数据集
1、在HDFS创建数据存储目录。Linux终端窗口下,输入以下命令:
1. $ hdfs dfs -mkdir -p /data/dataset/streaming/iot-input
2、将iot上传到数据文件HDFS上。在Linux输入以下命令:
1. $ hdfs dfs -put /data/dataset/streaming/iot/* /data/dataset/streaming/iot-input/
3、在Linux终端窗口下,输入以下命令,查看HDFS是否已经存在了iot目录:
1. $ hdfs dfs -ls /data/dataset/streaming/iot-input
目录下有两份文件:file1.json和file2.json。
阶段3.处理实时数据
1、启动spark shell。在Linux在终端窗口中,输入以下命令:(请注意以下命令localhost用虚拟机的实际机器名代替)
1. $ spark-shell --master spark://localhost:7077
执行上述代码,输入Spark Shell交互开发环境:
2.首先需要导入案例代码操作所依赖的包。paste输入以下代码:
1. // 首先导入包 2. import org.apache.sparksql.types._
3. import org.apache.spark.sql.functions._
同时按下【Ctrl + D】按键,执行以上代码,输出结果如下:
3、构造数据Schema模式,注意其中温度和时间的数据类型。在paste模式下,输入以下代码:
1. // 定义schema
2. val iotDataSchema = new StructType().add("rack", StringType, false)
3. .add("temperature", DoubleType, false)
4. .add("ts", TimestampType, false)
同时按下 【Ctrl + D】 键,执行以上代码,输出结果如下:
4、加载数据文件到DataFrame中。在paste模式下,输入以下代码:
1. // 读取温度数据
2. val dataPath = "hdfs://localhost:9000/data/dataset/streaming/iot-input"
3. val iotSSDF = spark.readStream.schema(iotDataSchema).json(dataPath)
同时按下 【Ctrl + D】 键,执行以上代码,输出内容如下:
由以上输出内容可以看出,采集到的机架温度(json格式)数据被读取到了DataFrame中。
5、创建10分钟大小的滑动窗口,并在temperature列上求机架温度的平均值。在paste模式下,输入以下代码:
1. // group by一个滑动窗口,并在temperature列上求平均值
2. val iotAvgDF = iotSSDF.groupBy(window($"ts", "10 minutes", "5 minutes"))
3. .agg(avg("temperature") as "avg_temp")
同时按下 【Ctrl + D】 键,执行以上代码,输出内容如下:
6、将数据写出到memory data sink。在paste模式下,输入以下代码:
1. // 将数据写出到memory data sink,使用查询名称iot
2. val iotMemorySQ = iotAvgDF.writeStream
3. .format("memory")
4. .queryName("iot")
5. .outputMode("complete")
6. .start()
同时按下 【Ctrl + D】 键,执行以上代码,输出内容如下:
7、在iot上执行sql查询,显示数据,以start时间排序。在paste模式下,输入以下代码:
1. // 显示数据,以start时间排序
2. spark.sql("select * from iot").orderBy($"window.start").show(false)
同时按下【 Ctrl + D 】键,执行以上代码,输出结果如下:
从实时检测的结果来看,机架的平均温度一直在上升,其中8:15-8:25这十分钟内的机架温度最高。
8、停止该流查询,命令行下执行如下命令:
1. iotMemorySQ.stop
按下【Enter】键,执行以上代码。
阶段四、自行练习
1、在本案例的基础上,请继续分析,找出是哪些机架的温度在上升。