资讯详情

基于Spark的物联网设备故障实时检测与分析

基于Spark实时检测和分析物联网设备故障

  • 一、业务场景
  • 二、数据集说明
  • 三、操作步骤
    • 第一阶段,启动HDFS、Spark集群服务
    • 阶段二、准备案例中使用的数据集
    • 阶段3.处理实时数据
    • 阶段自行练习
未经许可,禁止以任何形式转载。如需引用,请标记链接地址 全文共3770字,阅读时间约3分钟

一、业务场景

在滑动窗口上实时计算所有计算机机架的平均温度,发现异常温度,及时检测故障。

二、数据集说明

本案中使用的实时数据说明如下: ??- 原始数据文件位于本地/data/dataset/streaming/iot/file1.json和/data/dataset/streaming/iot/file2.json

两个机架的温度数据在数据中心检测到: ??file1.json:

???{ 
        “rack”:”rack1”,”temperature”:99.5,”ts”:”2017-06-02T08:01:01”} ???{ 
        “rack”:”rack1”,”temperature”:100.5,”ts”:”2017-06-02T08:06:02”} ???{ 
        “rack”:”rack1”,”temperature”:101.0,”ts”:”2017-06-02T08:11:03”} ???{ 
        “rack”:”rack1”,”temperature”:102.0,”ts”:”2017-06-02T08:16:04”} 

file2.json:

???{ 
        “rack”:”rack2”,”temperature”:99.5,”ts”:”2017-06-02T08:01:02”} ???{ 
        “rack”:”rack2”,”temperature”:105.5,”ts”:”2017-06-02T08:06:04”} ???{ 
        “rack”:”rack2”,”temperature”:104.0,”ts”:”2017-06-02T08:11:06”} ???{ 
        “rack”:”rack2”,”temperature”:108.0,”ts”:”2017-06-02T08:16:08”} 

三、操作步骤

第一阶段,启动HDFS、Spark集群服务

1、启动HDFS集群 ??在Linux输入以下命令,在终端窗口下启动HDFS集群:

1. $ start-dfs.sh 

2、启动Spark集群 ??在Linux输入以下命令,在终端窗口下启动Spark集群:

1. $ cd /opt/spark 2. $ ./sbin/start-all.sh 

3.验证上述过程是否已启动 ??在Linux输入以下命令,查看终端窗口下启动的服务流程:

1. $ jps 

若显示以下五个流程,则表明各项服务启动正常,可继续下一阶段。

1. 2288 NameNode 2. 2402 DataNode 3. 2603 SecondaryNameNode 4. 2769 Master 5. 2891 Worker 

阶段二、准备案例中使用的数据集

1、在HDFS创建数据存储目录。Linux终端窗口下,输入以下命令:

1. $ hdfs dfs -mkdir -p /data/dataset/streaming/iot-input 

2、将iot上传到数据文件HDFS上。在Linux输入以下命令:

1. $ hdfs dfs -put /data/dataset/streaming/iot/*  /data/dataset/streaming/iot-input/ 

3、在Linux终端窗口下,输入以下命令,查看HDFS是否已经存在了iot目录:

1. $ hdfs dfs -ls /data/dataset/streaming/iot-input 

目录下有两份文件:file1.json和file2.json。

阶段3.处理实时数据

1、启动spark shell。在Linux在终端窗口中,输入以下命令:(请注意以下命令localhost用虚拟机的实际机器名代替)

1. $ spark-shell --master spark://localhost:7077 

执行上述代码,输入Spark Shell交互开发环境:

在这里插入图片描述

2.首先需要导入案例代码操作所依赖的包。paste输入以下代码:

1. // 首先导入包 2. import org.apache.sparksql.types._
3.	import org.apache.spark.sql.functions._

同时按下【Ctrl + D】按键,执行以上代码,输出结果如下:

3、构造数据Schema模式,注意其中温度和时间的数据类型。在paste模式下,输入以下代码:

1.	// 定义schema
2.	val iotDataSchema = new StructType().add("rack", StringType, false)
3.	                                    .add("temperature", DoubleType, false)
4.	                                    .add("ts", TimestampType, false)

同时按下 【Ctrl + D】 键,执行以上代码,输出结果如下:

4、加载数据文件到DataFrame中。在paste模式下,输入以下代码:

1.	// 读取温度数据
2.	val dataPath = "hdfs://localhost:9000/data/dataset/streaming/iot-input"
3.	val iotSSDF = spark.readStream.schema(iotDataSchema).json(dataPath)

同时按下 【Ctrl + D】 键,执行以上代码,输出内容如下:

由以上输出内容可以看出,采集到的机架温度(json格式)数据被读取到了DataFrame中。

5、创建10分钟大小的滑动窗口,并在temperature列上求机架温度的平均值。在paste模式下,输入以下代码:

1.	// group by一个滑动窗口,并在temperature列上求平均值
2.	val iotAvgDF = iotSSDF.groupBy(window($"ts", "10 minutes", "5 minutes"))
3.	                      .agg(avg("temperature") as "avg_temp")

同时按下 【Ctrl + D】 键,执行以上代码,输出内容如下:

6、将数据写出到memory data sink。在paste模式下,输入以下代码:

1.	// 将数据写出到memory data sink,使用查询名称iot
2.	val iotMemorySQ = iotAvgDF.writeStream
3.	                          .format("memory")
4.	                          .queryName("iot")
5.	                          .outputMode("complete")
6.	                          .start()

同时按下 【Ctrl + D】 键,执行以上代码,输出内容如下:

7、在iot上执行sql查询,显示数据,以start时间排序。在paste模式下,输入以下代码:

1.	// 显示数据,以start时间排序
2.	spark.sql("select * from iot").orderBy($"window.start").show(false)

同时按下【 Ctrl + D 】键,执行以上代码,输出结果如下:

从实时检测的结果来看,机架的平均温度一直在上升,其中8:15-8:25这十分钟内的机架温度最高。

8、停止该流查询,命令行下执行如下命令:

1.	iotMemorySQ.stop

按下【Enter】键,执行以上代码。

阶段四、自行练习

1、在本案例的基础上,请继续分析,找出是哪些机架的温度在上升。

标签: 传感器dfs

锐单商城拥有海量元器件数据手册IC替代型号,打造 电子元器件IC百科大全!

锐单商城 - 一站式电子元器件采购平台