Hbase的Java开发API操作实现

xiaoxiao2021-02-27  341

      Hbase的JavaAPI,提供了对于Hbase数据的表的创建,修改,删除,数据写入,删除,修改,查询(Scan,get两种方法)。下面是综合的一个Hbase操作实例;

import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.security.UserGroupInformation; /** * @Description: TODO(这里用一句话描述这个类的作用) HBase数据库操作实现类 * @author: wh * @date: 2016-10-10 上午11:16:02 */ public class HbaseService { // 声明静态配置 private static Configuration conf = null; private String ip = "10.134.161.107,10.134.161.108,10.134.161.110";//"192.168.204.214,192.168.204.242,192.168.204.215";10.134.161.107 private String port = "2181"; private final static Log log = LogFactory.getLog(HbaseService.class); public HbaseService(){ conf = new Configuration(); conf.set("hbase.zookeeper.property.clientPort", port); conf.set("hbase.zookeeper.quorum", ip); conf.set("hbase.zookeeper.property.maxclientcnxns", "300"); conf.set("hbase.ipc.client.socket.timeout.connect","1000"); conf.set("zookeeper.session.timeout", "500"); conf.set("hbase.regionserver.handler.count", "500"); conf = HBaseConfiguration.create(conf); } /** * 获取hbase所有的表 * @return */ public String[] getAllTable(){ String[] tablenames ={}; Connection connection = null; try { connection = ConnectionFactory.createConnection(conf); HBaseAdmin admin = (HBaseAdmin) connection.getAdmin();//new HBaseAdmin(conf); tablenames = admin.getTableNames(); closeAdmin(admin); } catch (Exception e) { // TODO Auto-generated catch block log.error(e); } return tablenames; } /** * 根据列簇名获取有此列簇的hbase所有的表 * @return */ public List<String> getAllTable(String familyColumn){ String[] tablenames ={}; List<String> list = new ArrayList<String>(); try { HBaseAdmin admin = new HBaseAdmin(conf); tablenames = admin.getTableNames(); for(String tablename : tablenames){ HTableDescriptor hTableDescriptor = admin.getTableDescriptor(tablename.getBytes()); Collection<HColumnDescriptor> column = hTableDescriptor.getFamilies(); for(HColumnDescriptor hColumnDescriptor : column){ if(familyColumn.equals(new String(hColumnDescriptor.getName()))) list.add(tablename); } } } catch (Exception e) { e.printStackTrace(); } return list; } /** * 创建hbase表 * @param tableName 表名 * @param family 列族列表 * @return false 为未创建成功,或者表已存在,true:成功 */ public boolean creatTable(String tableName, String[] family){ boolean flag = false; try { HBaseAdmin admin = new HBaseAdmin(conf); HTableDescriptor desc = new HTableDescriptor(tableName); for (int i = 0; i < family.length; i++) { HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(family[i]); //设置列簇的版本数 hColumnDescriptor.setMaxVersions(1000000); desc.addFamily(hColumnDescriptor); } if (admin.tableExists(tableName)) { } else { admin.createTable(desc); flag = true; } closeAdmin(admin); } catch (IOException e) { // TODO Auto-generated catch block log.error(e.getMessage()); } return flag; } private void closeAdmin(HBaseAdmin admin){ try { if(admin!=null)admin.close(); } catch (Exception e) { log.error(e); } } /** * 判断是否存在该表 * @param tableName * @return */ public boolean exitTable(String tableName){ boolean flag = false; HBaseAdmin admin = null; try { admin = new HBaseAdmin(conf); try { if (admin.tableExists(tableName)) flag = true; } catch (IOException e) { // TODO Auto-generated catch block log.error(e); } } catch (Exception e1) { // TODO Auto-generated catch block log.error(e1); } closeAdmin(admin); return flag; } /** * 向HBase数据库表中插入一个列值 * @param tableName 表名 * @param rowKey rowkey * @param familyName 列簇 * @param column 列名 * @param columnValue 列值 * @return 是否插入成功 */ public boolean addOneRecord(String tableName, String rowKey, String familyName, String column,String columnValue) { boolean flag = false; try { Put put = new Put(Bytes.toBytes(rowKey));// 设置rowkey HTable table = new HTable(conf, Bytes.toBytes(tableName)); put.add(Bytes.toBytes(familyName),Bytes.toBytes(column), Bytes.toBytes(columnValue)); table.put(put); flag = true; table.close(); } catch (IOException e) { // TODO Auto-generated catch block log.error(e); }// HTabel负责跟记录相关的操作如增删改查等// return flag; } /** * 为表添加数据(适合知道有多少列族的固定表) * @param rowKey rowKey * @param tableName 表名 * @param column1 第一个列族列表 * @param value1 第一个列的值的列表 * @return 返回hbase.Result * @throws IOException */ public boolean addData(String tableName,String familyName,String rowKey, String[] column1, String[] value1){ boolean flag = false; try { Put put = new Put(Bytes.toBytes(rowKey));// 设置rowkey HTable table = new HTable(conf, Bytes.toBytes(tableName)); for (int j = 0; j < column1.length; j++) { put.add(Bytes.toBytes(familyName),Bytes.toBytes(column1[j]), Bytes.toBytes(value1[j])); } table.put(put); flag = true; table.close(); } catch (IOException e) { // TODO Auto-generated catch block log.error(e); } return flag; } /** * 为表添加数据(适合知道有多少列族的固定表) * @param rowKey rowKey * @param tableName 表名 * @param column1 第一个列族列表 * @param value1 第一个列的值的列表 * @return 返回hbase.Result * @throws IOException */ public boolean addDataByMap(String tableName,String familyName,String rowKey,HashMap<String,String> map){ boolean flag = false; Connection connection = null; Table table= null; try { connection = ConnectionFactory.createConnection(conf); table = connection.getTable(TableName.valueOf(tableName)); List<Put> list = new ArrayList<Put>(); for (Map.Entry<String, String> entry : map.entrySet()) { Put put = new Put(Bytes.toBytes(rowKey));// 设置rowkey put.addColumn(Bytes.toBytes(familyName),Bytes.toBytes(entry.getKey()), Bytes.toBytes(entry.getValue())); list.add(put); } table.put(list); flag = true; } catch (IOException e) { log.error(e.getMessage()); }finally{ try { table.close(); connection.close(); } catch (IOException e) { log.error(e.getMessage()); } } return flag; } /* * 根据键值对插入数据 */ public void addDataByMap(List<HashMap<HashMap<String,String>, String>> list,Long timesLong){ String familyName = "pw"; String[] family = {"pw"}; Connection connection = null; //Table table= null; try { for (int i = 0; i < list.size(); i++) { HashMap<HashMap<String,String>, String> maps = list.get(i); for (Entry<HashMap<String, String>, String> entry : maps.entrySet()) { String tableName = entry.getValue(); HashMap<String, String> map = entry.getKey(); String rowKey = map.get("mRID"); if(!exitTable(tableName)) creatTable(tableName, family); // connection = ConnectionFactory.createConnection(conf); // table = connection.getTable(TableName.valueOf(tableName)); HTable table = new HTable(conf, Bytes.toBytes(tableName)); List<Put> listPut = new ArrayList<Put>(); for (Map.Entry<String, String> entry1 : map.entrySet()) { Put put = new Put(Bytes.toBytes(rowKey));// 设置rowkey put.addColumn(Bytes.toBytes(familyName),Bytes.toBytes(entry1.getKey()),timesLong, Bytes.toBytes(entry1.getValue())); listPut.add(put); } //System.out.println(tableName+" add data Success!"); table.put(listPut); table.close(); } } } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); log.error(e); } } /** * 根据rwokey查询 * @param tableName 表名 * @param rowKey * @return 返回hbase.Result * @throws IOException */ public List<HashMap<String, String>> getResultList(String tableName, String rowKey,Long minStamp,Long maxStamp){ List<HashMap<String, String>> list = new ArrayList<HashMap<String,String>>(); Get get = new Get(Bytes.toBytes(rowKey)); HTable table = null; Result result = null; try { if(minStamp!=0L&&maxStamp!=0L) get.setTimeRange(minStamp, maxStamp); table = new HTable(conf, Bytes.toBytes(tableName));// 获取表 result = table.get(get); if(!result.isEmpty()){ for(KeyValue kv : result.list()){ HashMap<String, String> map = new HashMap<String, String>(); map.put("row", Bytes.toString(kv.getRow())); map.put("family", Bytes.toString(kv.getFamily())); map.put("qualifier", Bytes.toString(kv.getQualifier())); map.put("value", Bytes.toString(kv.getValue())); map.put("timestamp", kv.getTimestamp()+""); list.add(map); } } } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); log.error(e.getMessage()); }finally{ try { if (table!=null) table.close(); } catch (Exception e2) { // TODO: handle exception e2.printStackTrace(); log.error(e2.getMessage()); } } return list; } /** * 遍历hbase表,返回 * @param tableName 表名 * @param minStamp 最小时间 * @param maxStamp 最大时间 * @return */ public List<HashMap<String, String>> getResultScannList(String tableName, String rowKey,Long minStamp,Long maxStamp){ List<HashMap<String, String>> list = new ArrayList<HashMap<String,String>>(); Scan scan = new Scan(); ResultScanner rs = null; HTable table = null; try { table = new HTable(conf, Bytes.toBytes(tableName)); if(minStamp!=0L&&maxStamp!=0L) scan.setTimeRange(minStamp, maxStamp); rs = table.getScanner(scan); for(Result result:rs){ for(KeyValue kv : result.list()){ HashMap<String, String> map = new HashMap<String, String>(); map.put("row", Bytes.toString(kv.getRow())); map.put("family", Bytes.toString(kv.getFamily())); map.put("qualifier", Bytes.toString(kv.getQualifier())); map.put("value", Bytes.toString(kv.getValue())); map.put("timestamp", kv.getTimestamp()+""); list.add(map); } } } catch (Exception e) { // TODO: handle exception log.error(e.getMessage()); }finally{ try { if (rs!=null) rs.close(); if (table!=null) table.close(); } catch (Exception e2) { // TODO: handle exception } } return list; } /** * 根据设置SingleColumnValueFilter 过滤器,查询数据 * @param tableName * @param filter * @param minStamp * @param maxStamp * @return */ public List<HashMap<String, String>> getResultScannList(String tableName, SingleColumnValueFilter filter, Long minStamp, Long maxStamp) { // TODO Auto-generated method stub Scan scan = new Scan(); ResultScanner rs = null; ResultScanner rs1 = null; HTable table = null; List<HashMap<String, String>> list = null; try { filter.setFilterIfMissing(true);//设置当某一行没有过滤的这一列存在时,不返回此行数据 table = new HTable(conf, Bytes.toBytes(tableName)); if(minStamp!=0L&&maxStamp!=0L) scan.setTimeRange(minStamp, maxStamp); if(filter!=null)scan.setFilter(filter); rs = table.getScanner(scan); boolean flag = false; for (Result r : rs) { for (KeyValue kv : r.list()) { if(Bytes.toString(filter.getQualifier()).equals(Bytes.toString(kv.getQualifier()))){ flag = true; break; } } if(flag) break; } if(flag) rs1 = table.getScanner(scan); if(rs1!=null){ for(Result result:rs){ for(KeyValue kv : result.list()){ HashMap<String, String> map = new HashMap<String, String>(); map.put("row", Bytes.toString(kv.getRow())); map.put("family", Bytes.toString(kv.getFamily())); map.put("qualifier", Bytes.toString(kv.getQualifier())); map.put("value", Bytes.toString(kv.getValue())); map.put("timestamp", kv.getTimestamp()+""); list.add(map); } } } } catch (Exception e) { // TODO: handle exception log.error(e.getMessage()); }finally{ try { if (rs!=null) rs.close(); if (table!=null) table.close(); } catch (Exception e2) { // TODO: handle exception } } return list; } /** * 根据rowkey段遍历hbase表,返回hbase.Result对象 * @param tableName * @param start_rowkey * @param stop_rowkey * @throws IOException */ public ResultScanner getResultScann(String tableName, String start_rowkey, String stop_rowkey) { Scan scan = new Scan(); scan.setStartRow(Bytes.toBytes(start_rowkey)); scan.setStopRow(Bytes.toBytes(stop_rowkey)); ResultScanner rs = null; try { HTable table = new HTable(conf, Bytes.toBytes(tableName)); rs = table.getScanner(scan); } catch (Exception e) { log.error(e.getMessage()); } return rs; } /** * 插叙具体某一列的数据,返回 * @param tableName 表名 * @param rowKey rowKey * @param familyName 列簇名 * @param columnName 列名 * @throws IOException */ public void getResultByColumn(String tableName, String rowKey, String familyName, String columnName) throws IOException { HTable table = new HTable(conf, Bytes.toBytes(tableName)); Get get = new Get(Bytes.toBytes(rowKey)); get.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(columnName)); // 获取指定列族和列修饰符对应的列 Result result = table.get(get); } /** * 更新表中的某一列, * @param tableName 表名 * @param rowKey * @param familyName 列族名 * @param columnName 列名 * @param value 更新后的值 * @return 返回true / false */ public boolean updateTable(String tableName, String rowKey, String familyName, String columnName, String value){ HTable table = null; boolean flag = false; try { table = new HTable(conf, Bytes.toBytes(tableName)); Put put = new Put(Bytes.toBytes(rowKey)); put.add(Bytes.toBytes(familyName), Bytes.toBytes(columnName),Bytes.toBytes(value)); table.put(put); flag = true; } catch (IOException e) { // TODO Auto-generated catch block log.error(e.getMessage()); }finally{ try { table.close(); } catch (Exception e2) { log.error(e2.getMessage()); } } return flag; } /** * 查询某列数据的多个版本 * @param tableName 表名 * @param rowKey rowKey * @param familyName 列族名 * @param columnName 列名 * @param version 获取版本数 * @return 返回Result对象 */ public Result getResultByVersion(String tableName, String rowKey, String familyName, String columnName ,int version){ HTable table = null; Result result = null; try { table = new HTable(conf, Bytes.toBytes(tableName)); Get get = new Get(Bytes.toBytes(rowKey)); get.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(columnName)); get.setMaxVersions(version); result = table.get(get); } catch (IOException e) { // TODO Auto-generated catch block log.error(e.getMessage()); } return result; /* * List<?> results = table.get(get).list(); Iterator<?> it = * results.iterator(); while (it.hasNext()) { * System.out.println(it.next().toString()); } */ } /** * 根据rowkey、列簇、列名 删除某一列数据 * @param tableName 表名 * @param rowKey rowkey * @param falilyName 列簇 * @param columnName 列名 * @return 返回成功与否 */ public boolean deleteColumn(String tableName, String rowKey, String falilyName, String columnName) { HTable table = null; boolean flag = false; try { table = new HTable(conf, Bytes.toBytes(tableName)); Delete deleteColumn = new Delete(Bytes.toBytes(rowKey)); deleteColumn.deleteColumns(Bytes.toBytes(falilyName),Bytes.toBytes(columnName)); table.delete(deleteColumn); flag = true; table.close(); //System.out.println(falilyName + ":" + columnName + "is deleted!"); } catch (IOException e) { // TODO Auto-generated catch block log.error(e.getMessage()); } return flag; } /** * 根据表名和rowkey删除某一rowkey的数据 * @param tableName * @param rowKey * @return 返回成功与否 */ public boolean deleteAllColumn(String tableName, String rowKey){ HTable table = null; boolean flag = false; try { table = new HTable(conf, Bytes.toBytes(tableName)); Delete deleteAll = new Delete(Bytes.toBytes(rowKey)); table.delete(deleteAll); flag = true; table.close(); //System.out.println("all columns are deleted!"); } catch (IOException e) { // TODO Auto-generated catch block log.error(e.getMessage()); } return flag; } /** * 删除hbase表 * @param tableName 表名 * @throws IOException * @return 返回成功与否 */ public boolean deleteTable(String tableName){ boolean flag = false; try { HBaseAdmin admin = new HBaseAdmin(conf); admin.disableTable(tableName); admin.deleteTable(tableName); flag = true; closeAdmin(admin); //System.out.println(tableName + "is deleted!"); } catch (IOException e) { // TODO Auto-generated catch block log.error(e.getMessage()); } return flag; } public static void main(String[] args) throws Exception { // 创建表 HbaseService hs = new HbaseService(); //hs.setPort("2181"); System.out.println("ssssssssssssss"); System.out.println(hs.exitTable("pwmx:test1")); hs.exitTable("ACLineSegment"); } 以上代码,没有使用Hbase提供的连接池进行连接管理,同时使用的操作方法,也比较落后,目前Hbase提供了更优化的操作方法,下一篇会贴出来。

转载请注明原文地址: https://www.6miu.com/read-1609.html

最新回复(0)