资讯详情

大数据智慧数字电商第五课 程序整合 可视化和BI分析

第05天大数据仓库项目

  • 能够整合Phoenix、HBase实现订单详细查询
  • 掌握使用Phoenix创建二级索引,提高查询效率
  • 掌握Flink程序优化
  • 完成 Imply 安装
  • 能够使用 Druid 完成数据的摄取以及数据查询
  • 能够使用JDBC查询Druid中的数据
  • 使用Druid进行OLAP分析
  • 构建实时数仓数据可视化项目
  • 理解Druid架构原理
  • 掌握使用 Superset 进行BI分析

Phoenix

什么是Phoenix

官网地址:

http://phoenix.apache.org/

1571358989303

Phoenix的安装部署

1、准备工作

  • 提前安装好ZK集群、hadoop集群、Hbase集群

2、安装包 下载地址:https://mirrors.cnnic.cn/apache/phoenix/apache-phoenix-4.14.0-cdh5.14.2/bin/

资料\安装\apache-phoenix-4.14.0-HBase-1.1-bin.tar.gz

三、上传、解压

  • 将相应的安装包上传到相应的安装包Hbase在集群中服务器的目录下,解压
tar -xvzf apache-phoenix-4.14.0-HBase-1.1-bin.tar.gz -C ../servers/ 

4、拷贝Phoenix整合HBase所需JAR包

将phoenix目录下的 phoenix-4.14.0-HBase-1.1-server.jar(phoenix-4.14.0-cdh5.14.2-server.jar)、phoenix-core-4.14.0-HBase-1.1.jar(phoenix-core-4.14.0-cdh5.14.2.jar)拷贝到各个 hbase的lib目录下

scp phoenix-4.14.0-HBase-1.1-server.jar phoenix-core-4.14.0-HBase-1.1.jar node1:/export/servers/hbase-1.1.1/lib scp phoenix-4.14.0-HBase-1.1-server.jar phoenix-core-4.14.0-HBase-1.1.jar node2:/export/servers/hbase-1.1.1/lib scp phoenix-4.14.0-HBase-1.1-server.jar phoenix-core-4.14.0-HBase-1.1.jar node3:/export/servers/hbase-1.1.1/lib 

5、在Phoenix中配置HADOOP、配置HBASE

将hbase的配置文件hbase-site.xml、 hadoop/etc/hadoop下的core-site.xml 、hdfs-site.xml放到phoenix/bin/下,替换phoenix原配置文件

# 进入到 hbase bin目录 cd /export/servers/apache-phoenix-4.14.0-HBase-1.1-bin/bin  # 备份原先的 hbase-site.xml文件 mv hbase-site.xml hbase-site.xml.bak  ln -s $HBASE_HOME/conf/hbase-site.xml . ln -s $HADOOP_HOME/etc/hadoop/core-site.xml . ln -s $HADOOP_HOME/etc/hadoop/hdfs-site.xml . 

6、重启hbase集群,使Phoenix的jar包生效

7.验证是否成功

./sqlline.py node1:2181 

如下界面显示启动成功

Setting property: [incremental, false] Setting property: [isolation, TRANSACTION_READ_COMMITTED] issuing: !connect jdbc:phoenix:node1:2181 none none org.apache.phoenix.jdbc.PhoenixDriver Connecting to jdbc:phoenix:node1:2181 SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/export/servers/apache-phoenix-4.14.0-HBase-1.1-bin/phoenix-4.14.0-HBase-1.1-client.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/export/servers/hadoop-2.6.0-cdh5.14.0/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. 19/10/18 09:58:06 WARN util.NativeCodeLoader: Unable to load native-haoop library for your platform... using builtin-java classes where applicable
Connected to: Phoenix (version 4.14)
Driver: PhoenixEmbeddedDriver (version 4.14)
Autocommit status: true
Transaction isolation: TRANSACTION_READ_COMMITTED
Building list of tables and columns for tab-completion (set fastconnect to true to skip)...
133/133 (100%) Done
Done
sqlline version 1.2.0

8、查看当前有哪些表

  • 输入!tables 查看都有哪些表
  • 红框部分是用户建的表,其他为Phoenix系统表,系统表中维护了用户表的元数据信息
+--------------+-------------+---------------+-+-----------------+---------------+---------+
| TABLE_SCHEM  | TABLE_NAME  |  TABLE_TYPE   | | IMMUTABLE_ROWS  | SALT_BUCKETS  | MULTI_T |
+--------------+-------------+---------------+-+-----------------+---------------+---------+
| SYSTEM       | CATALOG     | SYSTEM TABLE  | | false           | null          | false   |
| SYSTEM       | FUNCTION    | SYSTEM TABLE  | | false           | null          | false   |
| SYSTEM       | LOG         | SYSTEM TABLE  | | true            | 32            | false   |
| SYSTEM       | SEQUENCE    | SYSTEM TABLE  | | false           | null          | false   |
| SYSTEM       | STATS       | SYSTEM TABLE  | | false           | null          | false   |
+--------------+-------------+---------------+-+-----------------+---------------+---------+

9、退出Phoenix,输入!quit

Phoenix入门案例

需求一:

  • 使用SQL语句在Phoenix中,创建一个用户表。该用户表有以下列
ID 姓名 年龄 性别 地址
1 张三 30 北京西城区
2 李四 20 上海闵行区
  • 往表中插入两条数据,查询数据,并查看HBase中的数据

需求分析:

  • 直接在 Phoenix 中,使用 create table 语法创建表结构
  • 因为数据最终都需要保存在HBase中,故创建表的时候需要指定 HBase 中的列蔟名称

参考代码:

-- 创建表
create table if not exists "user_info"(
    "id" varchar primary key,
    "cf"."name" varchar,
    "cf"."age" integer,
    "cf"."sex" varchar,
    "cf"."address" varchar
);

-- 新增数据
upsert into "user_info" values('1', '张三', 30, '男', '北京市西城区');
upsert into "user_info" values('2', '李四', 20, '女', '上海市闵行区');

需求二:

  • 修改 id为1 用户的年龄为 35
-- 修改数据
upsert into "user_info"("id", "age") values('1', 35);

需求三:

  • 删除 id为2 用户数据
-- 删除数据
delete from "user_info" where "id" = '2';

建立与HBase表映射

在HBase已经存在表,需要使用 Phoenix 建立与 HBase的映射,从而以SQL的方式,通过Phoenix 操作HBase。

案例:

1、在HBase中,建立employee的映射表—数据准备

create 'employee','company','family'

put 'employee','row1','company:name','ted'
put 'employee','row1','company:position','worker'
put 'employee','row1','family:tel','13600912345'

put 'employee','row2','company:name','michael'
put 'employee','row2','company:position','manager'
put 'employee','row2','family:tel','1894225698'

scan 'employee'

2、建立映射视图

  • 在HBase中已有表,在Phoenix中建立映射,必须要使用 create view

  • Phoenix是大小写敏感的

  • 所有命令都是大写

  • 如果表名不用双引号括起来,无论输入大写或小写,建立的表名都是大写

  • 如果要建立同时包含大写和小写的表名和字段名,用双引号把表名或者字段名括起来

在Phoenix中打开命令行

CREATE VIEW IF NOT EXISTS "employee" (
    "rowid" VARCHAR NOT NULL PRIMARY KEY, 
    "company"."name" VARCHAR,
    "company"."position" VARCHAR, 
    "family"."tel" VARCHAR
);

这个语句有几个注意点

  • IF NOT EXISTS可以保证如果已经有建立过这个表,配置不会被覆盖

  • 作为rowkey的字段用 PRIMARY KEY标定

  • 列簇用 columnFamily.columnName 来表示

  • 建立好后,查询一下数据

3、 查询所有映射表数据

0: jdbc:phoenix:node01> SELECT * FROM "employee";
+-------+----------+-----------+--------------+-------+
|  no   |   name   | position  |     tel      |  age  |
+-------+----------+-----------+--------------+-------+
| row1  | ted      | worker    | 13600912345  | null  |
| row2  | michael  | manager   | 1894225698   | null  |
+-------+----------+-----------+--------------+-------+

4、查询职位为 ‘worker’ 的所有员工数据

select * from "employee" where "position" = 'worker'

使用Phoenix构建二级索引加快查询效率

  • HBase通过rowkey来查询,否则就必须逐行地比较每一列的值,即全表扫瞄
  • 数据量较大的表,全表扫描的性能很差
  • 如果需要从多个角度查询数据,不可能使用 rowkey 来实现查询。此时可使用secondary index(二级索引)来完成这件事
  • Phoenix提供了对HBase secondary index的支持

配置HBase支持Phoenix二级索引

1、在每一个 HRegionServce的 hbase-site.xml 加入以下属性

<property> 
  <name>hbase.regionserver.wal.codec</name> 
  <value>org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec</value> 
</property>

2、重启HBase集群使配置生效

使用Phoenix创建二级索引

1、创建索引

create local index "idx_tel" on "employee"("family"."tel");

2、查看执行计划,检查是否查询二级索引

explain select * from "employee" where "name" = 'ted';
explain select  * from "employee" where "tel" = '13600912345';

3、删除索引

drop index "idx_tel" on "employee";

4、查看表上的所有索引

!indexes "employee"

订单明细创建Phoenix映射表

建立映射表

实现步骤:

  • 在Phoenix中建立映射表
  • 实现各种查询

1、建表SQL语句:

create view "dwd_order_detail"(
    "rowid" varchar primary key,
    "detail"."ogId" varchar,
    "detail"."orderId" varchar,
    "detail"."goodsId" varchar,
    "detail"."goodsNum" varchar,
    "detail"."goodsPrice" varchar,
    "detail"."goodsName" varchar,
    "detail"."shopId" varchar,
    "detail"."goodsThirdCatId" varchar,
    "detail"."goodsThirdCatName" varchar,
    "detail"."goodsSecondCatId" varchar,
    "detail"."goodsSecondCatName" varchar,
    "detail"."goodsFirstCatId" varchar,
    "detail"."goodsFirstCatName" varchar,
    "detail"."areaId" varchar,
    "detail"."shopName" varchar,
    "detail"."shopCompany" varchar,
    "detail"."cityId" varchar,
    "detail"."cityName" varchar,
    "detail"."regionId" varchar,
    "detail"."regionName" varchar
);

2、在表上的以下列创建索引

列名 说明
goodsThirdCatName 三级分类
goodsSecondCatName 二级分类
goodsFirstCatName 一级分类
cityName 城市名称
regionName 大区名称

参考代码:

-- 创建索引
create local index "idx_dwd_order_detail" on "dwd_order_detail"("detail"."goodsThirdCatName", "detail"."goodsSecondCatName", "detail"."goodsFirstCatName", "detail"."cityName", "detail"."regionName");

explain select * from "dwd_order_detail" where "goodsThirdCatName" = '其他蔬果' and "cityName" = '景德镇市分公司';

使用编写JDBC程序查询Phoenix数据

需求:

  • 编写Java 代码查询 订单明细

1、导入依赖

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencies>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>3.8.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.phoenix</groupId>
        <artifactId>phoenix-core</artifactId>
        <version>4.14.0-HBase-1.1</version>
    </dependency>
</dependencies>

2、编写JDBC程序

URL: jdbc:phoenix:node1:2181

3、新建一个 模块,注意不要从 itcast_shop_parent 继承(否则,会依赖CDH)

参考代码:

Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");

Connection connection = DriverManager.getConnection("jdbc:phoenix:node1:2181", "", "");
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery("select * from \"dwd_order_detail\" limit 10 ");

while(resultSet.next()) { 
        
    String rowid = resultSet.getString("rowid");
    String ogId = resultSet.getString("ogId");
    String orderId = resultSet.getString("orderId");
    String goodsId = resultSet.getString("goodsId");
    String goodsNum = resultSet.getString("goodsNum");
    String goodsPrice = resultSet.getString("goodsPrice");
    String goodsName = resultSet.getString("goodsName");
    String shopId = resultSet.getString("shopId");
    String goodsThirdCatId = resultSet.getString("goodsThirdCatId");
    String goodsThirdCatName = resultSet.getString("goodsThirdCatName");
    String goodsSecondCatId = resultSet.getString("goodsSecondCatId");
    String goodsSecondCatName = resultSet.getString("goodsSecondCatName");
    String goodsFirstCatId = resultSet.getString("goodsFirstCatId");
    String goodsFirstCatName = resultSet.getString("goodsFirstCatName");
    String areaId = resultSet.getString("areaId");
    String shopName = resultSet.getString("shopName");
    String shopCompany = resultSet.getString("shopCompany");
    String cityId = resultSet.getString("cityId");
    String cityName = resultSet.getString("cityName");
    String regionId = resultSet.getString("regionId");
    String regionName = resultSet.getString("regionName");

    System.out.print(rowid);
    System.out.print(ogId);
    System.out.print(orderId);
    System.out.print(goodsId);
    System.out.print(goodsNum);
    System.out.print(goodsPrice);
    System.out.print(goodsName);
    System.out.print(shopId);
    System.out.print(goodsThirdCatId);
    System.out.print(goodsThirdCatName);
    System.out.print(goodsSecondCatId);
    System.out.print(goodsSecondCatName);
    System.out.print(goodsFirstCatId);
    System.out.print(goodsFirstCatName);
    System.out.print(areaId);
    System.out.print(shopName);
    System.out.print(shopCompany);
    System.out.print(cityId);
    System.out.print(cityName);
    System.out.print(regionId);
    System.out.print(regionName);
    System.out.println();
}

resultSet.close();
statement.close();
connection.close();
}

Flink程序优化

使用Flink Checkpoint进行容错处理

checkpoint是Flink容错的核心机制。它可以定期地将各个Operator处理的数据进行快照存储( Snapshot )。如果Flink程序出现宕机,可以重新从这些快照中恢复数据。

  1. checkpoint coordinator(协调器)线程周期生成 barrier (栅栏),发送给每一个source
  2. source将当前的状态进行snapshot(可以保存到HDFS)
  3. source向coordinator确认snapshot已经完成
  4. source继续向下游transformation operator发送 barrier
  5. transformation operator重复source的操作,直到sink operator向协调器确认snapshot完成
  6. coordinator确认完成本周期的snapshot

配置以下checkpoint:

1、开启 checkpoint

2、设置 checkpoint 保存HDFS的位置

3、配置 checkpoint 的最小时间间隔(1秒)

4、配置 checkpoint 最大线程数 (1)

5、配置 checkpoint 超时时间 (60秒)

6、配置程序关闭,额外触发 checkpoint

7、配置重启策略 (尝试1次,延迟1秒启动)

8、给两个 source 添加 checkpoint 容错支持

  • 给需要进行checkpoint的operator设置 uid

参考代码

// 配置Checkpoint
env.enableCheckpointing(5000)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// checkpoint的HDFS保存位置
env.setStateBackend(new FsStateBackend("hdfs://node1:8020/flink/checkpoint/"))
// 配置两次checkpoint的最小时间间隔
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(1000)
// 配置最大checkpoint的并行度
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
// 配置checkpoint的超时时长
env.getCheckpointConfig.setCheckpointTimeout(60000)
// 当程序关闭,触发额外的checkpoint
env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 1000))
// 3. 将配置添加到数据流中
val clickLogJSONDataStream: DataStream[String] = env.addSource(finkKafkaConsumer)
.uid(UUID.randomUUID().toString)
.setParallelism(3)
// clickLogJSONDataStream.print()

val canalJsonDataStream: DataStream[String] = env.addSource(flinkKafkaCanalConsumer).uid(UUID.randomUUID().toString)
// canalJsonDataStream.print()

使用Flink时间窗口

生成watermark(水印)

1、实现 extractTimestamp 获取水印时间

设置 EventTime

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

2、获取当前的水印时间

val canalEntityWithWarterMark: DataStream[CanalEntity] = canalEntityDataStream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[CanalEntity] { 
        
    var currentMaxTimestamp = 0L
    var maxOutOfOrderness = 10 * 1000L // 最大允许的乱序时间是10s

    override def getCurrentWatermark: Watermark = { 
        
        return new Watermark(currentMaxTimestamp - maxOutOfOrderness)
    }

    override def extractTimestamp(t: CanalEntity, l: Long): Long = { 
        
        currentMaxTimestamp = t.exe_time
        currentMaxTimestamp
    }
})

3、修改使用 apply 方法

// 设置5s的时间窗口
val windowDataStream: AllWindowedStream[CanalEntity, TimeWindow] = orderGoodsCanalEntityDataStream.
timeWindowAll(Time.seconds(5))                               // 设置5秒时间窗口
.allowedLateness(Time.seconds(10))                           // 设置最大延迟时间
.sideOutputLateData(new OutputTag[CanalEntity]("outlateData"))    // 设置延迟的数据存放地方

val orderGoodsWideEntityDataStream: DataStream[OrderGoodsWideEntity] = windowDataStream.apply((timeWindow, iter, collector: Collector[OrderGoodsWideEntity]) => { 
        
    var jedis = RedisUtil.getJedis()
    val iterator = iter.iterator

    // ... 此处省略 ...

    collector.collect(orderGoodsWideEntity
        标签: jwn70mt显卡口电容

锐单商城拥有海量元器件数据手册IC替代型号,打造 电子元器件IC百科大全!

锐单商城 - 一站式电子元器件采购平台