java 对 Hbase 的操作
一.引入pom
<!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-client --> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.2.0</version> </dependency>
二.获取hbase连接
获取Hbase连接
/** * 获取 hbase 连接 * @param zookeeperLink zookeeper链接, 默认端口为2181 * @return */ private static Connection getConnected(String zookeeperLink){
// 创建配置 Configuration config = HBaseConfiguration.create(); config.set("hbase.zookeeper.quorum", zookeeperLink); // 创建连接 Connection conn = null; try {
conn = ConnectionFactory.createConnection(config); } catch (IOException e) {
e.printStackTrace(); } return conn; }
public static void main(String[] args) {
String zk = "192.168.1.113:2181";
Connection conn = null;
try {
// 创建连接
conn = getConnected(zk);
} finally {
try {
if (conn != null) {
conn.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
三.命名空间操作
1.创建命名空间
/** * 创建命名空间 * @param conn hbase连接 * @param nameSpace 命名空间 */
private static void createNamespace(Connection conn, String nameSpace) {
Admin admin = null;
try {
// 获取表管理器对象
admin = conn.getAdmin();
//通过构建器模式创建命名空间描述对象
NamespaceDescriptor.Builder builder = NamespaceDescriptor.create(nameSpace);
NamespaceDescriptor namespaceDescriptor = builder.build();
// 创建命名空间
admin.createNamespace(namespaceDescriptor);
} catch (IOException e) {
e.printStackTrace();
}finally {
try {
if (admin != null) {
admin.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
String zk = "192.168.1.113:2181";
Connection conn = null;
try {
// 创建连接
conn = getConnected(zk);
// 创建命名空间 why
createNamespace(conn, "why");
} finally {
try {
if (conn != null) {
conn.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
2.查看全部命名空间
/** * 查看全部命名空间 * @param conn hbase连接 */
private static void getAllNamespace(Connection conn) {
Admin admin = null;
try {
// 获取表管理器对象
admin = conn.getAdmin();
//获取所有命名空间描述对象
NamespaceDescriptor[] namespaceDescriptors = admin.listNamespaceDescriptors();
for (NamespaceDescriptor namespaceDescriptor : namespaceDescriptors) {
System.out.println(namespaceDescriptor);
System.out.println(namespaceDescriptor.getConfiguration());
}
} catch (IOException e) {
e.printStackTrace();
}finally {
try {
if (admin != null) {
admin.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
String zk = "192.168.1.113:2181";
Connection conn = null;
try {
// 创建连接
conn = getConnected(zk);
// 查看所有的命名空间
getAllNamespace(conn);
} finally {
try {
if (conn != null) {
conn.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
3.查看指定命名空间
/** * 查看指定命名空间 * @param conn hbase 连接 * @param nameSpace 命名空间 */
private static void getNamespaceByName(Connection conn, String nameSpace) {
Admin admin = null;
try {
// 获取表管理器对象
admin = conn.getAdmin();
// 获取命名空间描述对象
NamespaceDescriptor namespaceDescriptor = admin.getNamespaceDescriptor(nameSpace);
System.out.println(namespaceDescriptor);
// 获取该命名空间配置
Map<String, String> configuration = namespaceDescriptor.getConfiguration();
System.out.println(configuration);
// 获取命名空间名称
String name = namespaceDescriptor.getName();
System.out.println(name);
} catch (IOException e) {
e.printStackTrace();
}finally {
try {
if (admin != null) {
admin.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
String zk = "192.168.1.113:2181";
Connection conn = null;
try {
// 创建连接
conn = getConnected(zk);
// 查看指定命名空间
getNamespaceByName(conn, "why");
} finally {
try {
if (conn != null) {
conn.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
4.查看指定命名空间下的表
/** * 查看指定命名空间下的表 * @param conn hbase 连接 * @param nameSpace 命名空间 */
private static void getTableByNamespace(Connection conn, String nameSpace) {
Admin admin = null;
try {
// 获取表管理器对象
admin = conn.getAdmin();
//获取指定命名空间下的表描述对象
HTableDescriptor[] tableDescriptors = admin.listTableDescriptorsByNamespace(nameSpace);
for (HTableDescriptor tableDescriptor : tableDescriptors) {
System.out.println(tableDescriptor.getNameAsString());
}
} catch (IOException e) {
e.printStackTrace();
}finally {
try {
if (admin != null) {
admin.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
String zk = "192.168.1.113:2181";
Connection conn = null;
try {
// 创建连接
conn = getConnected(zk);
// 获取指定命名空间的所有表
getTableByNamespace(conn, "default");
} finally {
try {
if (conn != null) {
conn.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
四.表操作
1.创建表
/** * 创建表, 并指定列族名(创建一个列族) * * 可以用多个HColumnDescriptor来定义多个列族,然后通过hTableDescriptor.addFamily添加, * 但是目前只建议一个表创建一个列族,防止对性能有影响 * @param conn hbase连接 * @param table 表名 * @param columnFamily 列族 */
private static void createTable(Connection conn, String table, String columnFamily) {
Admin admin = null;
try {
// 获取表管理器对象
admin = conn.getAdmin();
// 创建一个表描述对象
HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf(table));
// 创建一个列族描述对象, 通过列族描述定义对象,可以设置列族的很多重要属性信息
HColumnDescriptor columnDescriptor = new HColumnDescriptor(columnFamily);
// 设置该列族中存储数据的最大版本数,默认是1
columnDescriptor.setMaxVersions(3);
// 添加该列族
hTableDescriptor.addFamily(columnDescriptor);
// 创建表
admin.createTable(hTableDescriptor);
} catch (IOException e) {
e.printStackTrace();
}finally {
try {
if (admin != null) {
admin.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
String zk = "192.168.1.113:2181";
Connection conn = null;
try {
// 创建连接
conn = getConnected(zk);
// 创建表并指定列族
createTable(conn, "why:xyz", "hy");
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (conn != null) {
conn.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
2.获取 hbase 所有表名
/** * 获取 hbase 所有表名 * @param conn hbase 连接 * @return */
private static void getAllTable(Connection conn) {
Admin admin = null;
try {
// 获取表管理器对象
admin = conn.getAdmin();
// 获取所有表
TableName[] tableNames = admin.listTableNames();
for (TableName tableName : tableNames) {
// 打印表名
System.out.println(tableName);
System.out.println(tableName.getNameAsString());
// 打印命名空间和表名
System.out.println(tableName.getNameWithNamespaceInclAsString());
}
} catch (IOException e) {
e.printStackTrace();
}finally {
try {
if (admin != null) {
admin.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
String zk = "192.168.1.113:2181";
Connection conn = null;
try {
// 创建连接
conn = getConnected(zk);
// 查看所有的表
getAllTable(conn);
} finally {
try {
if (conn != null) {
conn.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
3.查看指定表元数据
/** * 查看指定表元数据 * @param conn hbase连接 * @param table 命名空间 */
private static void getTableMetaInfo(Connection conn, String table){
Admin admin = null;
try {
// 获取表管理器对象
admin = conn.getAdmin();
//获取指定表的描述对象
HTableDescriptor tableDescriptor = admin.getTableDescriptor(TableName.valueOf(table));
System.out.println(tableDescriptor);
// 获取 命名空间:表名
String name = tableDescriptor.getNameAsString();
System.out.println(String.format("命名空间:表名: %s", name));
// 获取所有列族描述对象
HColumnDescriptor[] columnFamilies = tableDescriptor.getColumnFamilies();
for (HColumnDescriptor columnFamily : columnFamilies) {
System.out.println(String.format("列族: %s", columnFamily));
}
// 获取配置
Map<String, String> configuration = tableDescriptor.getConfiguration();
System.out.println("表配置: " + configuration);
// 获取表名称
TableName tableName = tableDescriptor.getTableName();
System.out.println(String.format("命名空:表名: %s", tableName.getNameAsString()));
System.out.println(String.format("命名空间: %s", tableName.getNamespaceAsString()));
System.out.println(String.format("表名: %s", tableName.getQualifierAsString()));
} catch (IOException e) {
e.printStackTrace();
}finally {
try {
if (admin != null) {
admin.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
String zk = "192.168.1.113:2181";
Connection conn = null;
try {
// 创建连接
conn = getConnected(zk);
// 创建表并指定列族
createTable(conn, "why:xyz", "hy");
// 获取表的源数据信息
getTableMetaInfo(conn, "why:xyz");
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (conn != null) {
conn.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
4.为表增加列族
/** * 为表增加列族 * @param conn hbase连接 * @param table 表名 * @param columnFamily 列族 */
public static void addColumnFamily(Connection conn, String table, String columnFamily) {
Admin admin = null;
try {
// 获取表管理器对象
admin = conn.getAdmin();
// 创建一个表描述对象
HTableDescriptor tableDescriptor = admin.getTableDescriptor(TableName.valueOf(table));
// 获取列族描述对象
HColumnDescriptor columnDescriptor = new HColumnDescriptor(columnFamily);
// 设置该列族中存储数据的最大版本数,默认是1
columnDescriptor.setMaxVersions(3);
tableDescriptor.addFamily(columnDescriptor);
// 对表进行修改
admin.modifyTable(TableName.valueOf(table),tableDescriptor);
} catch (IOException e) {
e.printStackTrace();
}finally {
try {
if (admin != null) {
admin.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
String zk = "192.168.1.113:2181";
Connection conn = null;
try {
// 创建连接
conn = getConnected(zk);
// 为表新增列族
addColumnFamily(conn, "why:xyz", "ld");
// 获取表的源数据信息
getTableMetaInfo(conn, "why:xyz");
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (conn != null) {
conn.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
5.删除某个列族
/** * 删除某个列族 * @param conn hbase连接 * @param table 表名 * @param columnFamily 列族 */
public static void deleteColumnFamily(Connection conn, String table, String columnFamily) {
Admin admin = null;
try {
// 获取表管理器对象
admin = conn.getAdmin();
// 创建一个表描述对象
HTableDescriptor tableDescriptor = admin.getTableDescriptor(TableName.valueOf(table));
// 删除列族
tableDescriptor.removeFamily(columnFamily.getBytes());
// 更新表信息
admin.modifyTable(TableName.valueOf(table),tableDescriptor);
} catch (IOException e) {
e.printStackTrace();
}finally {
try {
if (admin != null) {
admin.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
String zk = "192.168.1.113:2181";
Connection conn = null;
try {
// 创建连接
conn = getConnected(zk);
// 为表删除列族
deleteColumnFamily(conn, "why:xyz", "ld");
// 获取表的源数据信息
getTableMetaInfo(conn, "why:xyz");
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (conn != null) {
conn.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
6.删除某个表
/** * 删除表(想删除表,必须先关闭表) * @param conn hbase 连接 * @param table 表名 */
public static void deleteTable(Connection conn, String table) {
Admin admin = null;
try {
// 获取表管理器对象
admin = conn.getAdmin();
// 删除表之前, 需要关闭该表
admin.disableTable(TableName.valueOf(table));
// 删除表
admin.deleteTable(TableName.valueOf(table));
} catch (IOException e) {
e.printStackTrace();
}finally {
try {
if (admin != null) {
admin.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
String zk = "192.168.1.113:2181";
Connection conn = null;
try {
// 创建连接
conn = getConnected(zk);
// 删除表
deleteTable(conn, "why:xyz");
// 获取表的源数据信息
getTableMetaInfo(conn, "why:xyz");
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (conn != null) {
conn.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
五.数据操作
1.插入或更新单条数据
/** * 插入或更新单条数据 * @param table hbase数据表 * @param rowKey 行健 * @param familys 列表列表 * @param keys key列表 * @param values 值列表 */
private static void insertOrUpdateSingleData(Table table, String rowKey, List<String> familys, List<String> keys, List<String> values) {
// 创建put, 参数为行健
Put put = new Put(rowKey.getBytes());
for(int i=0; i<familys.size(); i++){
// 三个参数一次是列族, 字段, 值
put.addColumn(familys.get(i).getBytes(), keys.get(i).getBytes(), values.get(i).getBytes());
}
try {
table.put(put);
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
String zk = "192.168.1.113:2181";
Connection conn = null;
try {
// 创建连接
conn = getConnected(zk);
// 创建表并指定列族
createTable(conn, "why:xyz", "hy");
// 增加一个新列族
addColumnFamily(conn, "why:xyz", "zz");
// 获取表的源数据信息
getTableMetaInfo(conn, "why:xyz");
// 获取表
Table table = conn.getTable(TableName.valueOf("why:xyz"));
// 插入单条数据
List<String> familys = Arrays.asList(new String[]{
"hy", "zz"});
List<String> keys = Arrays.asList(new String[]{
"haha", "heihei"});
List<String> values = Arrays.asList(new String[]{
"lz", "zj"});
insertOrUpdateSingleData(table,"111", familys, keys, values);
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (conn != null) {
conn.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
2.批量插入或更新数据
/** * 批量插入或更新数据 * @param table hbase 表 * @param datas 数据列表 */
private static void insertOrUpdateBatchData(Table table, List<Map> datas) {
List<Put> puts = new ArrayList<>();
for (int i = 0; i < datas.size(); i++) {
Map data = datas.get(i);
// 获取行健
String rowKey = String.valueOf(data.get("rowKey"));
// 依次获取 familys, keys, values 等字段
List<String> familys = (List<String>) data.get("familys");
List<String> keys = (List<String>) data.get("keys");
List<String> values = (List<String>) data.get("values");
Put put = new Put(rowKey.getBytes());
for(int j=0; j<familys.size(); j++){
// 三个参数一次是列族, 字段, 值
put.addColumn(familys.get(j).getBytes(), keys.get(j).getBytes(), values.get(j).getBytes());
}
puts.add(put);
}
try {
table.put(puts);
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
String zk = "192.168.1.113:2181";
Connection conn = null;
try {
// 创建连接
conn = getConnected(zk);
// 获取表
Table table = conn.getTable(TableName.valueOf("why:xyz"));
// 批量插入数据
List<Map> datas = new ArrayList<>();
Map one = new HashMap();
one.put("rowKey", "222");
List<String> one_familys = Arrays.asList(new String[]{
"hy", "zz"});
List<String> one_keys = Arrays.asList(new String[]{
"haha", "heihei"});
List<String> one_values = Arrays.asList(new String[]{
"aaa", "bbb"});
one.put("familys", one_familys);
one.put("keys", one_keys);
one.put("values", one_values);
datas.add(one);
Map two = new HashMap();
two.put("rowKey", "333");
List<String> two_familys = Arrays.asList(new String[]{
"hy", "zz"});
List<String> two_keys = Arrays.asList(new String[]{
"haha", "heihei"});
List<String> two_values = Arrays.asList(new String[]{
"ccc", "ddd"});
two.put("familys", two_familys);
two.put("keys", two_keys);
two.put("values", two_values);
datas.add(two);
// 批量插入两行数据
insertOrUpdateBatchData(table, datas);
// 批量更新数据
List<Map> datas = new ArrayList<>();
Map one = new HashMap();
one.put("rowKey", "222");
List<String> one_familys = Arrays.asList(new String[]{
"hy", "zz"});
List<String> one_keys = Arrays.asList(new String[]{
"haha", "heihei"});
List<String> one_values = Arrays.asList(new String[]{
"ttt", "rrr"});
one.put("familys", one_familys);
one.put("keys", one_keys);
one.put("values", one_values);
datas.add(one);
Map two = new HashMap();
two.put("rowKey", "333");
List<String> two_familys = Arrays.asList(new String[]{
"hy", "zz"});
List<String> two_keys = Arrays.asList(new String[]{
"haha", "heihei"});
List<String> two_values = Arrays.asList(new String[]{
"ggg", "vvv"});
two.put("familys", two_familys);
two.put("keys", two_keys);
two.put("values", two_values);
datas.add(two);
// 批量更新两行数据
insertOrUpdateBatchData(table, datas);
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (conn != null) {
conn.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
3.查询指定rowkey的数据
/** * 查询指定rowkey的数据 * @param table 表名 * @param rowKey 行健 */
public static void getRowDataFromTable(Table table, String rowKey) {
try {
// get对象指定行键
Get get = new Get(rowKey.getBytes(StandardCharsets.UTF_8));
// 请求数据
Result result = table.get(get);
// 获取每个cells列表
List<Cell> cells = result.listCells();
cells.forEach(a -> {
// 行键的字节数组
byte[] rowArray = a.getRowArray();
//列族名的字节数组
byte[] familyArray = a.getFamilyArray();
//列名的字节数据
byte[] qualifierArray = a.getQualifierArray();
// value的字节数组
byte[] valueArray = a.getValueArray();
System.out.println(String.format("rowKey: %s, family: %s, key: %s, value: %s",
new String(rowArray, a.getRowOffset(), a.getRowLength()),
new String(familyArray, a.getFamilyOffset(), a.getFamilyLength()),
new String(qualifierArray, a.getQualifierOffset(), a.getQualifierLength()),
new String(valueArray, a.getValueOffset(), a.getValueLength()))
);
});
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
String zk = "192.168.1.113:2181";
Connection conn = null;
try {
// 创建连接
conn = getConnected(zk);
// 获取表
Table table = conn.getTable(TableName.valueOf("why:xyz"));
// 根据行健获取某行数据
getRowDataFromTable(table, "111");
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (conn != null) {
conn.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
4.查询指定表的全部数据
/** * 查询指定表的全部数据 */
private static void getAllDataFromTable(Table table) {
try {
// 建立 scanner 对象, 进行全表扫描
ResultScanner scanner = table.getScanner(new Scan());
for (Result result : scanner) {
// 获取每个cells列表
List<Cell> cells = result.listCells();
cells.forEach(a -> {
// 行键的字节数组
byte[] rowArray = a.getRowArray();
//列族名的字节数组
byte[] familyArray = a.getFamilyArray();
//列名的字节数据
byte[] qualifierArray = a.getQualifierArray();
// value的字节数组
byte[] valueArray = a.getValueArray();
System.out.println(String.format("rowKey: %s, family: %s, key: %s, value: %s",
new String(rowArray, a.getRowOffset(), a.getRowLength()),
new String(familyArray, a.getFamilyOffset(), a.getFamilyLength()),
new String(qualifierArray, a.getQualifierOffset(), a.getQualifierLength()),
new String(valueArray, a.getValueOffset(), a.getValueLength()))
);
});
}
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
String zk = "192.168.1.113:2181";
Connection conn = null;
try {
// 创建连接
conn = getConnected(zk);
// 获取表
Table table = conn.getTable(TableName.valueOf("why:xyz"));
getAllDataFromTable(table);
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (conn != null) {
conn.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
5.查询指定表的全部数据
/** * 根据行健删除数据 * @param table 表 * @param rowKeys 行健列表 */
private static void deleteDataByRowKey(Table table, List<String> rowKeys) {
List<Delete> deletes = new ArrayList<>();
rowKeys.forEach(a -> deletes.add(new Delete(a.getBytes())));
try {
table.delete(deletes);
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
String zk = "192.168.1.113:2181";
Connection conn = null;
try {
// 创建连接
conn = getConnected(zk);
// 获取表
Table table = conn.getTable(TableName.valueOf("why:xyz"));
List<String> rowKeys = Arrays.asList(new String[]{
"222", "333"});
deleteDataByRowKey(table, rowKeys);
getAllDataFromTable(table);
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (conn != null) {
conn.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}