spark读取hbase中的数据,做数据处理后,利用插件写入到hbase中 package cn.com.zoesoft.bigdata.ihr.brain.tool.infectious
import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.Result import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.hbase.util.Bytes import org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog import org.apache.spark.sql.{SaveMode, SparkSession} import org.apache.spark.{SparkConf, SparkContext}
object Infectious { val list = Array(“B20”,“B21”,“B22”,“B23”)
def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName(“read data from hbase”).setMaster(“local[24]”) val sc = new SparkContext(conf) val hbaseConf = HBaseConfiguration.create()
val tableName = "test:ming" hbaseConf.set("hbase.zookeeper.quorum","zoe-001,zoe-002,zoe-003") hbaseConf.set("hbase.zookeeper.property.clientPort","2181") hbaseConf.set(TableInputFormat.INPUT_TABLE,tableName) //读取数据并转化成rdd val hbaseRdd = sc.newAPIHadoopRDD(hbaseConf,classOf[TableInputFormat], classOf[ImmutableBytesWritable],classOf[Result]) val count = hbaseRdd.count() // println(count)//根据转化后的rdd,读取需要处理的数据字段 val rdd = hbaseRdd.map(x=>{ val key = Bytes.toString(x._2.getRow) val xman_id = Bytes.toString(x._2.getValue(“p”.getBytes,“XMAN_ID”.getBytes)) val icd_code = Bytes.toString(x._2.getValue(“p”.getBytes,“ICD_CODE”.getBytes)) val start_time = Bytes.toString(x._2.getValue(“p”.getBytes,“START_TIME”.getBytes)) (xman_id,start_time,icd_code) })
val icdRdd = rdd.map(x=>{ if(x._3!=null){ val x3 = x._3.split("\\.")(0) if (list.contains(x3)) { ((x._1,x._3),x._2) }else null }else null }).filter(_!= null) val sqlContext = SparkSession.builder().config(conf).getOrCreate() import sqlContext.implicits._ hbaseConf.set(TableInputFormat.INPUT_TABLE, "ihr:sehr_tags") /** * 写数据到hbase {"name":"具体icd10值","code":"Infectious","startTime":"2018-01-15 08:57:46"} * 列设计 p:Infectious * */ //根据插件写入到hbase中 icdRdd.groupBy(_._1).map(x=>{ val lastvalue = x._2.toArray.max; lastvalue (lastvalue._1._1+";"+lastvalue._1._2,"{\"name\":\""+ lastvalue._1._2 + "\",\"code\":\"Infectious\",\"startTime\":\""+ lastvalue._2 +"\"}") }).toDF() //.show(8000,false) .write .mode(SaveMode.Append) .options(Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5")) .format("org.apache.spark.sql.execution.datasources.hbase") .save() sc.stop()}
// 存储到HBASE结构 def catalog = s"""{ |“table”:{“namespace”:“ihr”, “name”:“test”}, |“rowkey”:"_1", |“columns”:{ |"_1":{“cf”:“rowkey”,“col”:"_1",“type”:“string”}, |"_2":{“cf”:“p”,“col”:“INFECTIOUS”,“type”:“string”} |} |}""".stripMargin }