- 我们可以通过 Spark 整合 HBase,实现通过 Spark 来读取 HBase 的数据。
- 数据准备:创建 HBase 插入数据:
create 'spark_hbase','info' put 'spark_hbase','0001','info:name','tangseng' put 'spark_hbase','0001','info:age','30' put 'spark_hbase','0001','info:sex','0' put 'spark_hbase','0001','info:addr','beijing' put 'spark_hbase','0002','info:name','sunwukong' put 'spark_hbase','0002','info:age','508' put 'spark_hbase','0002','info:sex','0' put 'spark_hbase','0002','info:addr','shanghai' put 'spark_hbase','0003','info:name','zhubajie' put 'spark_hbase','0003','info:age','715' put 'spark_hbase','0003','info:sex','0' put 'spark_hbase','0003','info:addr','shenzhen' put 'spark_hbase','0004','info:name','bailongma' put 'spark_hbase','0004','info:age','1256' put 'spark_hbase','0004','info:sex','0' put 'spark_hbase','0004','info:addr','donghai' put 'spark_hbase','0005','info:name','shaheshang'
put 'spark_hbase','0005','info:age','1008'
put 'spark_hbase','0005','info:sex','0'
put 'spark_hbase','0005','info:addr','tiangong'

create 'spark_hbase_out','info'
<repositories>
<!-- spark on hbase是cloudera提供的,所以这个地方添加了cdh仓库地址 -->
<repository>
<id>cloudera</id>
<!-- <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>-->
<url>https://mvnrepository.com/artifact/org.apache.hbase/hbase-spark</url>
</repository>
</repositories>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.3.3</version>
</dependency>
<!-- hadoop-core 2009、7月更名为hadoop-common -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<!-- <version>2.6.0-mr1-cdh5.14.2</version>-->
<version>3.1.4</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<!-- <version>1.2.0-cdh5.14.2</version>-->
<version>2.2.6</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.apache.hbase</groupId>-->
<!-- <artifactId>hbase-common</artifactId>-->
<!-- <version>1.2.0-cdh5.14.2</version>-->
<!--<!– <version>2.2.6</version>–>-->
<!-- </dependency>-->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<!-- <version>1.2.0-cdh5.14.2</version>-->
<version>2.2.6</version>
</dependency>
<!-- 添加此依赖,解决spark hbase集成,低版本时报Base64找不到的问题 -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-mapreduce</artifactId>
<version>2.2.6</version>
</dependency>
<!-- 为了让它与apache hbase 2.2.2兼容,使用cdh6.2版本的;具体dependency可以去https://mvnrepository.com/查找 -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-spark</artifactId>
<!-- <version>1.2.0-cdh5.14.2</version>-->
<version>2.1.0-cdh6.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.3.3</version>
</dependency>
object Case09_SparkWithHBase {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]")
val sc = new SparkContext(conf)
// 1. 创建HBase的环境参数
val hbaseConf = HBaseConfiguration.create()
hbaseConf.set("hbase.zookeeper.quorum", "node01:2181,node02:2181,node03:2181");
hbaseConf.set(TableInputFormat.INPUT_TABLE, "spark_hbase")
// // 2. 设置过滤器,还可以设置起始和结束rowkey
// val scan = new Scan
// scan.setFilter(new RandomRowFilter(0.5f))
// // 设置scan对象,让filter生效(序列化)
// hbaseConf.set(TableInputFormat.SCAN, Base64.encodeBytes(ProtobufUtil.toScan(scan).toByteArray))
// 3. 读取HBase数据,生成RDD
val resultRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat],
classOf[ImmutableBytesWritable], classOf[Result])
resultRDD.foreach(x => {
// 查询出来的结果集存在 (ImmutableBytesWritable, Result)第二个元素
val result = x._2
// 获取行键
val rowKey = Bytes.toString(result.getRow)
val name = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("name")))
val age = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("age")))
println(rowKey + ":" + name + ":" + age)
})
// 4. 向HBase表写入数据
hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, "spark_hbase_out")
val job = Job.getInstance(hbaseConf)
job.setOutputKeyClass(classOf[ImmutableBytesWritable])
job.setOutputValueClass(classOf[Result])
job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
// 5. 封装输出结果 resultRDD: RDD[(ImmutableBytesWritable, Result)]
val outRDD: RDD[(ImmutableBytesWritable, Put)] = resultRDD.mapPartitions(eachPartition => {
eachPartition.map(keyAndEachResult => {
val result = keyAndEachResult._2
val rowKey = Bytes.toString(result.getRow)
val name = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("name")))
val age = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("age")))
val put = new Put(Bytes.toBytes(rowKey))
val immutableBytesWritable = new ImmutableBytesWritable(Bytes.toBytes(rowKey))
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(name))
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes(age))
// 向 HBase 插入数据,需要 rowKey 和 put 对象
(immutableBytesWritable, put)
})
})
// 6. 调用API Output the RDD to any Hadoop-supported storage system with new Hadoop API
outRDD.saveAsNewAPIHadoopDataset(job.getConfiguration)
sc.stop()
}
}
- 参考资料:
- https://github.com/cloudera-labs/SparkOnHBase/blob/cdh5-0.0.2/src/main/scala/com/cloudera/spark/hbase/HBaseContext.scala
- https://issues.apache.org/jira/browse/HBASE-13992
- https://github.com/cloudera-labs/SparkOnHBase
- 优势:
- 无缝的使用 HBase Connection
- 和 Kerberos 无缝集成
- 通过 get 或 scan 直接生成 RDD
- 利用 RDD 支持 HBase 的任何组合操作
- 为通用操作提供简单的方法,同时通过 API 允许不受限制的未知高级操作
- 支持 Java 和 Scala
- 为 Spark 和 Spark Streaming 提供相似的 API
- 由于 hbaseContext 是一个只依赖 hadoop、hbase、spark 的 jar 包的工具类,因此可以拿过来直接用
- 添加依赖包:
<!-- 为了让它与apache hbase 2.2.2兼容,使用cdh6.2版本的;具体dependency可以去https://mvnrepository.com/查找 -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-spark</artifactId>
<version>2.1.0-cdh6.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.3.3</version>
</dependency>
object Case10_SparkOnHBase {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]")
val sc = new SparkContext(conf)
val hbaseConf = HBaseConfiguration.create()
hbaseConf.set("hbase.zookeeper.quorum", "node01:2181,node02:2181,node03:2181");
hbaseConf.set(TableInputFormat.INPUT_TABLE, "spark_hbase")
val hbaseContext = new HBaseContext(sc, hbaseConf)
val scan = new Scan()
val hbaseRDD: RDD[(ImmutableBytesWritable, Result)] = hbaseContext.hbaseRDD(TableName.valueOf("spark_hbase"), scan)
hbaseRDD.map(eachResult => {
val result: Result = eachResult._2
val rowKey = Bytes.toString(result.getRow)
val name = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("name")))
val age = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("age")))
println(rowKey + ":" + name + ":" + age)
}).foreach(println)
sc.stop()
}
}