资讯详情

大数据应用

15:50 2016/8/4 summary 1.环境 1.1.服务端 1.1.1.宿主环境 1.1.1.1.处理器/4核/8线程(X64 i7-4790K 4.00GHz) 8G内存 256G固态硬盘/2T机械硬盘 (有些主板需要打开Intel VT(虚拟技术) 1.1.1.2.操作系统64位Windows 7 旗舰版 Service Pack 1 1.1.1.3.无任务剩余资源:100%CPU 6G内存 90G固态硬盘 1.1.2.虚拟硬件(安装)VMware-workstation-full-10.0.2-1744117.1398244508) 1.1.2.1.新建虚拟机 1.1.2.2.自定义(高级) 1.1.2.3.Workstation 10.0 1.1.2.4.光盘图像安装程序(iso)(选择CentOS-6.5-x86_64-bin-DVD1.iso所在路径) 1.1.2.5.全名(node1)用户名密码 1.1.2.6.虚拟机的名称和位置 1.1.2.7.处理器数量选择1,核心数量2 1.1.2.8.内存3g 1.1.2.9.网络连接:桥接网络使用: 1.1.2.10.SCSI控制器默认 1.1.2.11.虚拟硬盘类型默认 1.1.2.12.创建新的虚拟硬盘 1.1.2.13.最大磁盘大小30G,其它默认 1.1.2.14.默认硬盘文件 1.1.2.15.完成 1.1.3.操作系统(CentOS-6.5-x86_64-bin-DVD1.iso) 1.1.3.1.打开虚拟机,等待安装束,自行重启并进入登录界面 1.1.3.2.ctrl alt f1.进入桌面(如果目前是终端界面),使用帐户登录,设置网络(如校园网登录),确保可以使用互联网 1.1.3.3.ctrl alt f2使用进入终端登录界面root登陆,查看ip地址,退出 1.1.3.4.使用putty管理该结点,putty_V0.63.0.0.43510830.exe -v -ssh -pw password -P 22 root@node1OrIp 1.1.3.5.修改网卡ip地址获取为dhcp(如果创建虚拟机时是桥接模式,则已设置为dhcp),使用以下第一行命令编辑并修改以下第二行的相应名值.操作命令见界面下提示(^x 表示ctrl x 退出 ^o保存) nano /etc/sysconfig/network-scripts/ifcfg-eth0 BOOTPROTO="dhcp" 1.1.3.6.关闭selinux,为影响smb使用 nano /etc/selinux/config SELINUX=disabled 1.1.3.7.安装在网络条件下smb server与client #不同牌的centos可能不同(安装名称可能不同)smb,而不是samba) yum -y install samba mv /etc/samba/smb.conf /etc/samba/smb.conf_bak #新建文件并写入(注意每行前无空格) nano /etc/samba/smb.conf [global] workgroup = WORKGROUP server string = hi netbios name = hi security = user [hi] comment = hi path = / public = no writable = yes guest = no browseable = yes valid users = root,hds #admin users = hds create mask = 0765 #开机启动smb chkconfig smb on #或者使用以下方式开机启动 nano /etc/rc.d/rc.local service smb start 1.1.3.8.修改主机名称 nano /etc/sysconfig/network HOSTNAME=node1 1.1.3.9.添加hadoop设置访问密码的用户和组 groupadd hadoop useradd -s /bin/bash -d /home/hds -m hds -g hadoop passwd hds smbpasswd -a hds smbpasswd -a root 1.1.3.10.共享文件存储空间 #先开发机windows以上共享有读写权的文件夹 #root下操作 mkdir /home/hds/center chown -R hds /home/hds/center chgrp -R hadoop /home/hds/center #开机挂载 nano /etc/rc.d/rc.local mount -t cifs -o username=administrator,passwd=password,rw,dir_mode=0777,file_mode=0777 //centerIp/center /home/hds/center #或使用fstab开挂载 nano /etc/fstab //centerIp/center /home/hds/center cifs username=administrator,passwd=password,rw,dir_mode=0777,file_mode=0777 0 2 df 1.1.3.11.免密访问 #root登陆 ssh-keygen -t dsa -P '' -f ~/.ssh/id_dsa mv ~/.ssh/id_dsa.pub ~/.ssh/authorized_keys #hds登陆 su hds ssh-keygen -t dsa -P '' -f ~/.ssh/id_dsa mv ~/.ssh/id_dsa.pub ~/.ssh/authorized_keys 1.1.3.12 修改防火墙 #关掉(避免复杂iptables设置) chkconfig iptables off #或者设置(需要很明确打开哪些端口,hadoop及spark许多端口将启动) iptables -I INPUT -p tcp --dport 80 -j ACCEPT iptables -I INPUT -p tcp --dport 8080 -j ACCEPT iptables -I INPUT -p tcp --dport 8088 -j ACCEPT iptables -I INPUT -p tcp --dport 4040 -j ACCEPT iptables -I INPUT -p tcp --dport 443 -j ACCEPT iptables -I INPUT -p tcp --dport 53 -j ACCEPT iptables -I INPUT -p tcp --dport 139 -j ACCEPT iptables -I INPUT -p tcp --dport 445 -j ACCEPT /etc/rc.d/init.d/iptables save service iptables restart chkconfig iptables on iptables -L -x -v -n 1.1.3.13.reboot 1.1.3.14.验证之前的设置是否正确,此时系统内存占用不到300m,磁盘不到3G 1.1.3.15.关机,虚拟机完整clone,按需要clone在这种环境下,所需的数量也可以clone一台 1.1.3.16.修改hosts及hostname取得各结点ip地址,命名并保存在\\centerIP\center\nodes.txt 172.16.2.106 node1 172.16.2.57 node2 cat /home/hds/center/nodes.txt >> /etc/hosts scp -r /etc/hosts root@node2:/etc/hosts #手动(也可以写)shell)修改各结点主机名 ssh node2 hostname node2 nano /etc/sysconfig/network HOSTNAME=node2 1.1.3.17.使用ssh登录每个结点不仅是验证,也是初始答案yes #root登陆 ssh node2 ssh node1 #hds登陆 ssh node2 ssh node1 1.1.4.JAVA/HADOOP(hdfs/yarn) 1.1.4.1.以任何方式下载以下文件\\centerIP\center\目录下 jdk-8u73-linux-x64.tar.gz 1.1.4.2.以hds(以下默认以hds用户操作)在各结点登陆解压至其home tar zxvf ~/center/jdk-8u73-linux-x64.tar.gz tar zxvf ~/center/hadoop-2.6.4.tar.gz 1.1.4.3.在每个结点创建以下目录 mkdir /tmp/tmp mkdir ~/namenodeNameDir mkdir ~/datanodeDataDir 1.1.4.4.配置hadoop #复制~/hadoop-2.6.4/etc/hadoop 为~/center/conf mkdir ~/center/logs mkdir ~/center/conf cp -R ~/hadoop-2.6.4/etc/hadoop/* ~/center/conf/ nano ~/center/conf/core-site.xml <configuration>  <property><name>fs.defaultFS</name><value>hdfs://node1:9000</value></property>  <property><name>hadoop.tmp.dir</name><value>file:/tmp/tmp</value><description>tmp</description></property>  <property><name>io.file.buffer.size</name><value>131072</value></property>  </configuration> #xml配置中的路径需要完整,不能是如 file:~/namenodeNameDir nano ~/center/conf/hdfs-site.xml <configuration>  <property><name>dfs.namenode.name.dir</name><value>file:/home/hds/namenodeNameDir</value></property>  <property><name>dfs.datanode.data.dir</name><value>file:/home/hds/datanodeDataDir</value></property>  <property><name>dfs.replication</name><value>2</value></property>  <property><name>dfs.webhdfs.enabled</name><value>true</value></property> </configuration> #让每个yarn计算结点管理2G(1G留给系统)内存,2*6=12G虚存,两个核,每次申请最多只允许申请2g内存 nano ~/center/conf/yarn-site.xml <configuration>  <property><name>yarn.resourcemanager.address</name><value>node1:8032</value></property>  <property><name>yarn.resourcemanager.scheduler.address</name><value>node1:8030</value></property>  <property><name>yarn.resourcemanager.resource-tracker.address</name><value>node1:8035</value></property>  <property><name>yarn.resourcemanager.admin.address</name><value>node1:8033</value></property>  <property><name>yarn.resourcemanager.webapp.address</name><value>node1:8088</value></property>  <property><name>yarn.nodemanager.aux-services</name><value>mapreduce_shuffle</value></property>  <property><name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name><value>org.apache.hadoop.mapred.ShuffleHandler</value></property>    <property><name>yarn.nodemanager.resource.memory-mb</name><value>2048</value></property>  <property><name>yarn.nodemanager.vmem-pmem-ratio</name><value>6</value></property>  <property><name>yarn.nodemanager.pmem-check-enabled</name><value>true</value></property>  <property><name>yarn.nodemanager.vmem-check-enabled</name><value>true</value></property>  <property><name>yarn.scheduler.minimum-allocation-mb</name><value>16</value></property>  <property><name>yarn.scheduler.maximum-allocation-mb</name><value>2048</value></property>  <property><name>yarn.nodemanager.resource.cpu-vcores</name><value>2</value></property>    <property><name>yarn.scheduler.minimum-allocation-vcores</name><value>1</value></property>  <property><name>yarn.scheduler.maximum-allocation-vcores</name><value>2</value></property>  </configuration> nano ~/center/conf/slaves node1 node2 1.1.4.5.修改 .bashrc,文件尾部添加 nano ~/.bashrc   export JAVA_HOME=~/jdk1.8.0_73   export HADOOP_HOME=~/hadoop-2.6.4   export CLASSPATH=.:$JAVA_HOME/jre/lib:$JAVA_HOME/lib:$JAVA_HOME/lib/tools.jar   export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin   export HADOOP_CONF_DIR=~/center/conf   export HADOOP_LOG_DIR=~/center/logs scp ~/.bashrc hds@node2:~/.bashrc 1.1.4.6.初始化 ~/hadoop-2.6.4/bin/hdfs namenode -format 1.1.4.7.启动 ~/hadoop-2.6.4/sbin/stop-yarn.sh ~/hadoop-2.6.4/sbin/stop-dfs.sh ~/hadoop-2.6.4/sbin/start-dfs.sh ~/hadoop-2.6.4/sbin/start-yarn.sh 1.1.4.8.验证 #主结点包含 hdfs的主结点NameNode和SecondNameNode Yarn的主结点ResourceManager #从结点包含 hdfs的从结点DataNode yarn的从结点NodeManager #node1既是主结点也是从结点,node2仅是从结点 jps 3712 DataNode 4403 Jps 4102 NodeManager 4007 ResourceManager 3865 SecondaryNameNode 3594 NameNode ssh node2 jps 2482 NodeManager 2379 DataNode 2639 Jps #分别为hdfs/yarn集群 http://node1:50070 http://node1:8088/cluster/nodes 1.1.4.9.资源剩余: 内存2.2G 磁盘22G free -m df -m 1.2.客户端 1.2.1.硬件:1处理器/4核(AMD A8-5600K) 4G内存 1T机械硬盘 1.2.2.OS: windows7 旗舰版 Service Pack1 1.2.3.对于以下软件,进行下载,安装或解压至某个目录,其中hadoop-eclipse-plugin-2.2.0.jar插件直接丢进eclipseHome/plugins/目录下.scala暂时示使用   jdk-8-windows-x64.zip   apache-maven-3.3.9-bin.zip   eclipse-jee-mars-1-win32-x86_64.zip   hadoop-2.6.4.tar.gz   scala-2.11.6.tgz   spark-2.0.0-bin-hadoop2.6.tgz   hadoop-eclipse-plugin-2.2.0.jar 1.2.4.环境变量与命令接口(%yang%为软件所在目录,%center%为上述共享目录,该版本eclips自带maven,但没有独立版的速度快和灵活) #创建 hds.bat set yang=...... set center=...... set java_home=%yang%\jdk1.8.0_60 set maven_home=%yang%\apache-maven-3.3.9 set ECLIPSE_HOME=%yang%\eclipse set HADOOP_HOME=%yang%\hadoop-2.6.4 set HADOOP_CONF_DIR=%center%\conf set HADOOP_USER_NAME=hds set SCALA_HOME=%yang%\scala-2.11.6 set SPARK_HOME=%yang%\spark-2.0.0-bin-hadoop2.6 set classpath=.;%java_home%\lib set path=.;%HADOOP_HOME%\bin;%java_home%\bin;%JAVA_HOME%\jre\bin\client\;%maven_home%\bin;%path% start cmd /k "doskey eclipse=start %ECLIPSE_HOME%\eclipse.exe" 1.2.5.eclipse 1.2.5.1.上一步打开的命令行中输入 eclipse 回车,即可运行eclipse,选择定workspace后,按需要设置maven等 1.2.5.2.新建工程->Maven Project->Create a simple project(打勾)->填写 Groud Id(如Learning)和Atfifact Id(如Spark),其它默认->finish 1.2.5.3.修改pom.xml文件内容为: <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>Learning</groupId> <artifactId>Spark</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.6.4</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.18</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-mllib_2.10</artifactId> <version>2.0.0</version> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>3.3</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <archive> <manifest> <mainClass>org.shirdrn.spark.job.UserAgentStats</mainClass> </manifest> </archive> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <excludes> <exclude>*.properties</exclude> <exclude>*.xml</exclude> </excludes> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project> 1.2.5.4.alt+f5 选择此项目点OK以更新maven项目(更新完后,会提示错,其实不影响,可从Problems视图里删掉这些错误提示) 1.2.5.5.复制%spark_home%\examples\src进工程根目录,覆盖原目录,然后在eclipse选择工程按f5刷新,删除除了org.apache.spark.examples外的其它包,将以该包的JavaSparkPi为例运行 1.3.5.6.新建main程序,用以编写JavaSparkPi的launcher. import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import org.apache.spark.launcher.SparkLauncher; public class PiLauncher { static class InputStreamReaderRunnable implements Runnable { private BufferedReader reader; private String name; public InputStreamReaderRunnable(InputStream is, String name) { this.reader = new BufferedReader(new InputStreamReader(is)); this.name = name; } public void run() { System.out.println("InputStream " + name + ":"); try { String line = reader.readLine(); while (line != null) { System.out.println(line); line = reader.readLine(); } reader.close(); } catch (IOException e) { e.printStackTrace(); } } } public static void main(String[] args) throws Exception { SparkLauncher spark1 = new SparkLauncher() .setMainClass("org.apache.spark.examples.JavaSparkPi") .addAppArgs("100") .setConf("spark.executor.instances", "3") .setConf(SparkLauncher.DRIVER_MEMORY, "512m") .setConf("spark.yarn.am.memory", "512m") .setConf(SparkLauncher.EXECUTOR_MEMORY, "512m") .setConf(SparkLauncher.EXECUTOR_CORES, "1") //YARN cluster mode .setMaster("yarn").setDeployMode("client").setAppResource("target/Spark-0.0.1-SNAPSHOT-jar-with-dependencies.jar").setConf("spark.sql.warehouse.dir", "file:///C:/Users/Administrator/AppData/Local/Temp/warehouse") .setVerbose(true); Process spark = spark1.launch(); InputStreamReaderRunnable inputStreamReaderRunnable = new InputStreamReaderRunnable(spark.getInputStream(),"input"); Thread inputThread = new Thread(inputStreamReaderRunnable, "LogStreamReader input"); inputThread.start(); InputStreamReaderRunnable errorStreamReaderRunnable = new InputStreamReaderRunnable(spark.getErrorStream(),"error"); Thread errorThread = new Thread(errorStreamReaderRunnable, "LogStreamReader error"); errorThread.start(); System.out.println("Waiting for finish..."); int exitCode = spark.waitFor(); System.out.println("Finished! Exit code:" + exitCode); } } 1.3.5.7.run as maven build...->goals写package->run以打包,下载依赖需要很长时间,有时还会卡死,需要重新执行. 1.3.5.8.选择上面的launcher以java application运行.打印的日志包含: ...... 16/08/06 11:39:01 INFO cluster.YarnScheduler: Removed TaskSet 0.0, whose tasks have all completed, from pool  Pi is roughly 3.1419064 16/08/06 11:39:01 INFO scheduler.DAGScheduler: Job 0 finished: reduce at JavaSparkPi.java:59, took 72.764996 s ...... 2.应用(注意资源的供给,当单次任务需求过多资源时,应手动分成多次执行任务,否则会OOM) 18:58 2016/8/6 电气隐患的数据挖掘 1.理解数据 1.1.N个配电柜,每个8组数据,每组数据(x,y)分别为时刻和该时刻传感器采集感知的值. 1.2.每个配电柜可能正常也可能隐患,目的是找出各配电柜模式,再分析哪些是隐患模式,哪些是正常模式,或者说各模式分别属于哪个隐患类别. 1.3.需要从每个配电柜的包含8个函数的数据文件中计算出一个对象用以代表此配电柜.如8个量的最大最小平均值组成的8*3=24维向量. 1.4.人可以分析出其中的规律,但数据量太多,需要交由机器处理,人只需要告诉机器自己是怎么处理的. 1.5.可以一点一点地比较,先比较各量平均值差异,再比较最大最小及其组合等,直到各模式显著不同. 1.6.不一定非要用spark api,用一切可以使用的api,只要达到了目的,尽管方法可能只是小数据才适用 2.上网查已有研究 2.1.配电柜隐患 低压(200v-400v)配电柜 配电网时空故障信息/故障诊断 温度/老化检测(不要太潮湿,常检查有无绝缘老化现象) 振动(曾有过所钢窗的塑料皮(带金属)刮倒变压器上造成短路) 3.实践 3.1.取得所有配电柜的文件名,文件名是配电柜的id cd /d C:\Users\Administrator\Desktop\electronic dir /b > id.csv 3.2.id.csv + data.csv = table.DistrubutionBox 3.3.将合并的表存入文件比存入mysql快很多.存文件时不能以文本存,因为表有多于一列的数据 3.4. text < csv < json < parque < mysql < hive     解释:   文件格式的数据类型强度.  意识到这点,才想起.csv是文本格式的,想要用比如数值最好用 mysql,但mysql太慢,hive暂时还未学习,使用parpue应该是不错的选择.这里暂时选择csv,代码手动指定其schema 3.5.逻辑处理流程(向量组DAG):   file -> (fileName(id),file) -> (id, label, sensorname, data, time) -> (id, label, sensorname, max(data), min(data), avg(data) -> (id, max_s0, min_s0, avg_s0, max_s1 ... max_s7, min_s7, avg_s7) -> (id, vector) (聚类得:)f:vector->clusterId    (id, vector) -> (id,clusterId)    (id,clusterId) join (id,label) by id -> (id,label,clusterId) -> f:cluster->label    (id,label,clusterId)->(id,knownLabel,prediectedLabel)         解释:   (在这里以向量组来表示问题,一个向量组同时也是向量集合Collection<Vector>,也是一张数据库表.)   (这是一个抽象概念. 当看到spark的DAG时很亲切,一直想找个可以用DAG表示处理流程的工具,本想着有时间自己去实现一个,我称之为发展DAG.其实这种概念在不少软件里都有,但一来不太了解,二来可能用起来会发现跟我想要的不一样)   (RDD DAG与这里的向量组DAG相比,这里的向量组DAG可能会更加抽象?)      file 原始数据是一组文件,部分是有label的(仅5个),部分是无label的(159个)   (fileName(id), file) file隐含了这样的向量,一个fileName唯一对应一个配电柜,以其作为id,一个配电柜也唯一对应一组数据   (id,label,sensorname,data,time) 将(fileName(id), file)中的file数据取出,与id笛卡尔乘,同时fileName中包含label数据,如果文件名是 ^\d+-\d+\.csv格式,则后一个\d+是label,否则认为分类未知而为0   (id, label, sensorname, max(data), min(data), avg(data) 对上表或向量组,执行sql:select id, sensorname, label, count(data), max(data), min(data), avg(data) from DistributionBox group by id, sensorname, label   (id, max_s0, min_s0, avg_s0, max_s1 ... max_s7, min_s7, avg_s7) 对一表reduceByKey(id),把每个id对应的配电柜数据转换成一个向量,这个转换过程很重要,此方法的好坏决定后续分类预测结果的好坏.   (id, vector) 上表,虽本是向量组,但为了方便使用,而把后面的24维作为一个vector,在与id一些作为一个向量.   (聚类得:)f:vector->clusterId  取上表中的vector作为K-MEANS聚类的输入,并指定4个cluster,结果得出了此函数.   (id, vector) -> (id,clusterId) 利用上述函数,得(id, clusterId),即得出每个配电柜所属的cluster,也是其所属的模式.   (id,clusterId) join (id,label) by id -> (id,label,clusterId) 表连接   f:cluster->label 通过上表得函数   (id,label,clusterId)->(id,knownLabel,prediectedLabel)  使用上函数,得最终结果 4.改进 4.1.采集尽可能多的数据,有些量一旦偏离正常值则有问题,有些组合量偏离正常空间则有问题 5.结果 每次运行的结果会不一样,有一次,它把2和4当作同一类了. 以下这次结果显示:有分类的都与原来预测的一样(最后5行),从来有理由相信,未知分类的预测也是较可信的. +---------+------------+----------------+ | id      | knownLabel | predictedLabel | +---------+------------+----------------+ | 146.csv |          0 |              3 | | 45.csv  |          0 |              2 | | 53.csv  |          0 |              3 | | 145.csv |          0 |              2 | | 91.csv  |          0 |              4 | | 110.csv |          0 |              4 | | 114.csv |          0 |              2 | | 94.csv  |          0 |              4 | | 13.csv  |          0 |              4 | | 44.csv  |          0 |              3 | | 127.csv |          0 |              2 | | 9.csv   |          0 |              2 | | 29.csv  |          0 |              4 | | 92.csv  |          0 |              4 | | 117.csv |          0 |              4 | | 96.csv  |          0 |              2 | | 24.csv  |          0 |              2 | | 111.csv |          0 |              2 | | 82.csv  |          0 |              4 | | 34.csv  |          0 |              4 | | 122.csv |          0 |              4 | | 134.csv |          0 |              4 | | 78.csv  |          0 |              4 | | 30.csv  |          0 |              2 | | 147.csv |          0 |              2 | | 81.csv  |          0 |              2 | | 109.csv |          0 |              4 | | 106.csv |          0 |              3 | | 61.csv  |          0 |              1 | | 83.csv  |          0 |              2 | | 79.csv  |          0 |              2 | | 132.csv |          0 |              2 | | 26.csv  |          0 |              2 | | 20.csv  |          0 |              2 | | 38.csv  |          0 |              2 | | 150.csv |          0 |              2 | | 86.csv  |          0 |              2 | | 103.csv |          0 |              1 | | 87.csv  |          0 |              4 | | 52.csv  |          0 |              3 | | 116.csv |          0 |              1 | | 107.csv |          0 |              3 | | 67.csv  |          0 |              4 | | 15.csv  |          0 |              2 | | 97.csv  |          0 |              1 | | 27.csv  |          0 |              3 | | 48.csv  |          0 |              3 | | 5.csv   |          0 |              3 | | 159.csv |          0 |              4 | | 113.csv |          0 |              2 | | 12.csv  |          0 |              2 | | 144.csv |          0 |              2 | | 131.csv |          0 |              2 | | 22.csv  |          0 |              3 | | 139.csv |          0 |              2 | | 115.csv |          0 |              3 | | 112.csv |          0 |              4 | | 135.csv |          0 |              3 | | 47.csv  |          0 |              3 | | 18.csv  |          0 |              2 | | 90.csv  |          0 |              4 | | 152.csv |          0 |              3 | | 1.csv   |          0 |              2 | | 125.csv |          0 |              4 | | 37.csv  |          0 |              2 | | 40.csv  |          0 |              4 | | 28.csv  |          0 |              2 | | 136.csv |          0 |              4 | | 100.csv |          0 |              4 | | 138.csv |          0 |              1 | | 11.csv  |          0 |              2 | | 33.csv  |          0 |              4 | | 16.csv  |          0 |              3 | | 141.csv |          0 |              2 | | 148.csv |          0 |              4 | | 25.csv  |          0 |              2 | | 21.csv  |          0 |              2 | | 140.csv |          0 |              2 | | 155.csv |          0 |              2 | | 85.csv  |          0 |              3 | | 39.csv  |          0 |              4 | | 54.csv  |          0 |              3 | | 41.csv  |          0 |              2 | | 23.csv  |          0 |              2 | | 19.csv  |          0 |              2 | | 35.csv  |          0 |              3 | | 31.csv  |          0 |              2 | | 130.csv |          0 |              2 | | 69.csv  |          0 |              3 | | 80.csv  |          0 |              2 | | 137.csv |          0 |              2 | | 72.csv  |          0 |              4 | | 153.csv |          0 |              2 | | 119.csv |          0 |              3 | | 105.csv |          0 |              3 | | 51.csv  |          0 |              1 | | 98.csv  |          0 |              4 | | 57.csv  |          0 |              2 | | 6.csv   |          0 |              2 | | 101.csv |          0 |              2 | | 14.csv  |          0 |              2 | | 129.csv |          0 |              4 | | 143.csv |          0 |              2 | | 88.csv  |          0 |              2 | | 154.csv |          0 |              2 | | 89.csv  |          0 |              2 | | 121.csv |          0 |              3 | | 124.csv |          0 |              4 | | 59.csv  |          0 |              2 | | 77.csv  |          0 |              2 | | 42.csv  |          0 |              2 | | 32.csv  |          0 |              4 | | 62.csv  |          0 |              2 | | 76.csv  |          0 |              2 | | 74.csv  |          0 |              3 | | 63.csv  |          0 |              4 | | 126.csv |          0 |              2 | | 95.csv  |          0 |              3 | | 8.csv   |          0 |              2 | | 17.csv  |          0 |              2 | | 56.csv  |          0 |              2 | | 3.csv   |          0 |              4 | | 133.csv |          0 |              4 | | 4.csv   |          0 |              4 | | 49.csv  |          0 |              2 | | 73.csv  |          0 |              2 | | 84.csv  |          0 |              2 | | 158.csv |          0 |              4 | | 93.csv  |          0 |              2 | | 58.csv  |          0 |              4 | | 128.csv |          0 |              2 | | 64.csv  |          0 |              4 | | 118.csv |          0 |              1 | | 157.csv |          0 |              2 | | 65.csv  |          0 |              2 | | 2.csv   |          0 |              2 | | 151.csv |          0 |              2 | | 46.csv  |          0 |              4 | | 102.csv |          0 |              4 | | 66.csv  |          0 |              4 | | 71.csv  |          0 |              3 | | 10.csv  |          0 |              2 | | 68.csv  |          0 |              2 | | 43.csv  |          0 |              1 | | 7.csv   |          0 |              3 | | 75.csv  |          0 |              3 | | 120.csv |          0 |              3 | | 149.csv |          0 |              3 | | 99.csv  |          0 |              2 | | 104.csv |          0 |              2 | | 156.csv |          0 |              2 | | 60.csv  |          0 |              3 | | 50.csv  |          0 |              2 | | 108.csv |          0 |              2 | | 142.csv |          0 |              2 | | 123.csv |          0 |              2 | | 36.csv  |          0 |              2 | | 55.csv  |          0 |              2 | | 70.csv  |          0 |              4 | | 1-1.csv |          1 |              1 | | 3-1.csv |          1 |              1 | | 2-2.csv |          2 |              2 | | 5-3.csv |          3 |              3 | | 4-4.csv |          4 |              4 | +---------+------------+----------------+ 6.代码 public static void electricity(String[] args) throws InterruptedException { SparkSession ss = SparkSession.builder().appName("electronic").getOrCreate(); /* 一个.csv是一个配电柜,也是一个对象,其包含(文件名id,标签,数据) */ String dir = "file:///C:/Users/Administrator/Desktop/electronic"; List<String> ids = ss.read().format("csv") .option("path", dir + "/id.csv") .option("header", true) .option("encoding", "gbk") .load() .map(row -> row.getString(0), Encoders.STRING()).collectAsList(); List<String> sensornames = ss.read().format("csv") .option("path", dir + "/" + ids.get(0)).option("header", true) .option("encoding", "gbk") .load() .map(row -> row.getString(0), Encoders.STRING()).distinct().collectAsList(); Map<String, Integer> sensornameToInt = new HashMap<>(); for (int i=0; i<sensornames.size(); i++) sensornameToInt.put(sensornames.get(i), i); sensornameToInt.forEach((k,v)->System.out.println(k + " " + v)); /* 统一数据为一个表,5维(id,label,sensorname,data,time) */ /* 使用for存mysql可运行,很慢 尝试改为存为文件 */ List<StructField> fields = new ArrayList<StructField>(); fields.add(DataTypes.createStructField("id", DataTypes.StringType, true)); fields.add(DataTypes.createStructField("label", DataTypes.IntegerType, true)); StructType schema = DataTypes.createStructType(fields); for (int i = 0; i < ids.size(); i++) { String id = ids.get(i); int label = 0; Pattern p = Pattern.compile("^\\d+-(\\d)+\\.csv"); Matcher m = p.matcher(id); if (m.matches()) label = Integer.valueOf(m.group(1)); List<Row> rows = new ArrayList<>(); rows.add(RowFactory.create(id, label)); Dataset<Row> l = ss.createDataFrame(rows, schema); Dataset<Row> r = ss.read().format("csv").option("path", dir + "/" + id).option("header", true).option("encoding", "gbk").load(); StructType st = r.schema(); r = ss.createDataFrame(r.javaRDD() .map(row->RowFactory.create(""+sensornameToInt.get(row.getString(0)), row.get(1), row.get(2))) , st); l.join(r).write().mode(i == 0 ? SaveMode.Overwrite : SaveMode.Append).format("csv") .option("path", "file:///C:/Users/Administrator/Desktop/DistributionBox.csv") .option("header", true) .option("encoding", "gbk") .save(); } ss.stop(); } public static void electricityAnalyze(String[] args) throws InterruptedException, AnalysisException { SparkSession ss = SparkSession.builder().appName("electronic").getOrCreate(); Dataset<Row> rows = ss.read() .format("csv") .option("path", "file:///C:/Users/Administrator/Desktop/DistributionBox.csv") .option("header", true) .option("encoding", "gbk") .load(); // rows.groupBy(rows.col("id")).count().show(); List<StructField> fields = new ArrayList<StructField>(); fields.add(DataTypes.createStructField("id", DataTypes.StringType, true)); fields.add(DataTypes.createStructField("label", DataTypes.IntegerType, true)); fields.add(DataTypes.createStructField("sensorname", DataTypes.IntegerType, true)); fields.add(DataTypes.createStructField("data", DataTypes.DoubleType, true)); fields.add(DataTypes.createStructField("time", DataTypes.TimestampType, true)); StructType schema = DataTypes.createStructType(fields); rows = ss.createDataFrame(rows.javaRDD().map(row->RowFactory.create(row.getString(0), Integer.valueOf(row.getString(1)), Integer.valueOf(row.getString(2)), Double.valueOf(row.getString(3)), new Timestamp(Long.valueOf(row.getString(4))))), schema); rows.createTempView("DistributionBox"); Dataset<Row> db = ss.sql("select id, sensorname, label, count(data), max(data), min(data), avg(data) from DistributionBox group by id, sensorname, label"); // db.show(); JavaRDD<Tuple2<String, Vector>>  kv = db.javaRDD().mapToPair(row->{ String id = row.getString(row.fieldIndex("id")); int sn = row.getInt(row.fieldIndex("sensorname")); Map<String, Double> value = new HashMap<>(); value.put("max"+sn, row.getDouble(4)); value.put("min"+sn, row.getDouble(5)); value.put("avg"+sn, row.getDouble(6)); return new Tuple2<>(id, value); }).reduceByKey((r1,r2)->{ Map<String, Double> value = new HashMap<>(); value.putAll(r1); value.putAll(r2); return value; }).map(t->{ Double[] values = t._2.values().toArray(new Double[]{}); double[] values2 = new double[values.length]; for (int i=0; i<values.length; i++) values2[i] = values[i];  String id = t._1; return new Tuple2<>(id, Vectors.dense(values2)); }); JavaRDD<Vector> dbv = kv.map(t->t._2); //dbv.foreach(s->System.out.println(s.getId() + " " + s));    int numClusters = 4;    int numIterations = 20;    KMeansModel clusters = KMeans.train(dbv.rdd(), numClusters, numIterations);    System.out.println("Cluster centers:");    for (Vector center: clusters.clusterCenters()) {      System.out.println(" " + center);    }    double cost = clusters.computeCost(dbv.rdd());    System.out.println("Cost: " + cost);    // Evaluate clustering by computing Within Set Sum of Squared Errors    double WSSSE = clusters.computeCost(dbv.rdd());    System.out.println("Within Set Sum of Squared Errors = " + WSSSE);    /* 1.输出所有vector所属cluster     * 2.连接(clusterId,vector)和原来的(vector,id),(id,label),得到 (clusterId, id, label)*/    //clusters.predict(points) Dataset<Row> idLable = db.select(db.col("id"), db.col("label")).distinct(); fields = new ArrayList<StructField>(); fields.add(DataTypes.createStructField("id", DataTypes.StringType, true)); fields.add(DataTypes.createStructField("clusterId", DataTypes.IntegerType, true)); schema = DataTypes.createStructType(fields); Dataset<Row> idClusterId = ss.createDataFrame(kv.map(kv1->RowFactory.create(kv1._1, clusters.predict(kv1._2))), schema); Dataset<Row> result = idLable.join(idClusterId, "id"); /* 1.取label值不为0的行,取label和clusterId两列,做出由clusterId->label的map映射 * 2.把result中所有配电柜的clusterId翻译成label*/ //result.where(result.col("label").equals(0)); Map<Integer, Integer> clusterIdToLabel = result.where(result.col("label").notEqual(0)).select(result.col("label"), result.col("clusterId")) .javaRDD() .map(r->{ Map<Integer, Integer> clusterIdLabelMap = new HashMap<>(); clusterIdLabelMap.put(r.getInt(1), r.getInt(0)); return clusterIdLabelMap; }).reduce((m1,m2)->{ Map<Integer, Integer> clusterIdLabelMap = new HashMap<>(); clusterIdLabelMap.putAll(m1); clusterIdLabelMap.putAll(m2); return clusterIdLabelMap; }); System.out.println(clusterIdToLabel); fields = new ArrayList<StructField>(); fields.add(DataTypes.createStructField("id", DataTypes.StringType, true)); fields.add(DataTypes.createStructField("knownLabel", DataTypes.IntegerType, true)); fields.add(DataTypes.createStructField("predictedLabel", DataTypes.IntegerType, true)); schema = DataTypes.createStructType(fields); result = ss.createDataFrame(result.javaRDD().map(kv1->RowFactory.create(kv1.

标签: 2t温度振动传感器

锐单商城拥有海量元器件数据手册IC替代型号,打造 电子元器件IC百科大全!

锐单商城 - 一站式电子元器件采购平台