spark常见操作系列(2)--spark读写hadoop

xiaoxiao2021-02-28  35

真正开发大数据之前,本人折腾过hadoop,spark组件,其中把之前公司的日志放到hadoop 和hive里面去读写.但实际上,真正开发spark程序,遇到的坑,是十分多的.

本篇主要介绍spark读写hadoop.hadoop的读写,有java方式,也有scala方式. 虽scala可以嵌入java代码,并不意味着直接把java 操作工具类放到scala程序就行了. hadoop 一般放的都是文件,我们的是csv文件.读取csv,然后对其中的行列数据做计算,生成新的列数据. 这里用的scala 版本是2.11.7.

spark 内置一些读写及计算api,也就是transformation 和 action操作. 比如读取List:

val testList : List[String] = List[String]("1223","123","3435") val sc : SparkContext = sparkSeesion.sparkContext val rddtest = sc.makeRDD(testList) rddtest.foreach(x=>println(x)) // val rdd2 = rdd.map(x => x.toString()) //map里面有函数 val rdd3 = rdd.map(x=>ComputeDate(x.toString()))

spark api 的精华是transformation 操作,这几十个transformation 函数,会将数据在集群内存里面做各种转换以满足需求,效率非常高. 最后生成的结果数据才转到driver端,而这是在action操作之后执行的. 本人理解,这可能是spark效率比hadoop的mapreduce高的地方.spark将数据分成许多分片,而这些分片可以同时执行,虽然如此,数据的前后顺序,在内部,却是有序的.

spark读取csv代码还是比较简洁的:

val conf = new SparkConf().setAppName("ReadCsvFile").setMaster("spark://10.0.0.1:7077").set("spark.driver.host","10.0.0.1").setJars(Seq("")) val sc = new SparkContext(conf) val inputFile = sc.textFile("hdfs://10.0.0.1:9000/res/csvFile.csv").cache()

这里,Seq(“”) 里面要加载该项目程序编译处理的jar包: D:\IdeaProjects\scalaProject03\out\artifacts\scalaProject03\scalaProject03.jar

还可以使用df方式:

val spark = SparkSession.builder.master("local[1]").appName("Spark CSV Reader").getOrCreate; val df = spark.read.format("com.databricks.spark.csv").option("header", "true").option("mode", "DROPMALFORMED").load(csvFileName)

写文件:

df2.write.option("header", "true").csv("hdfs://10.0.0.1:9000/res/newCsvFile.csv")

读取文件后,我们要对文件行列做处理.

使用DF可以方便我们操作,因为DF的操作类似SQL的操作. 常见的DF操作,可以百度搜索下,比较丰富的. 之前,在一篇”spark查询任意字段,并使用dataframe输出结果”中: http://blog.csdn.net/cafebar123/article/details/78636605 简单使用DF做了查询,但是,真正的问题是:

(1)DF只能在单机操作,集群资源没有利用上; (2)spark的核心操作还是transformation 和 action, 因此,rdd之间的如何方便的转换,才是真正要思考的; (3)在写工具类的时候,一般不知道用户要读写多少个字段,以及字段类型, 怎么设计一个比较通用的工具类; (4)实测发现,action操作的耗时是远高transformation 的,因此,在写处理逻辑的时候,action操作的次数应尽量少; (5)实测还发现, 数据在集群上transformation 操作时,比如map()操作,有时耗时反而比不使用transformation 操作,仅仅单机执行的高,而且耗时高不少,达到远高的程度.

当然,还有别的问题,这个以后再讲. 在实际开发中,以上5个问题,对我们来说,是很难的问题. 问题(1),我们可以减少使用DF,却放弃了DF 方便的SQL语句. 问题(2), spark的transformation 操作比较抽象, 而且适用于集群, 在local模式单机调试时,尚可看到map()里面的数据执行步骤, 集群上,就看不到了. 在具体的业务逻辑上,一般需要比对业务逻辑 和transformation 操作的原理, 最好是匹配一致的. 实际开发中,这种情况比较少见.因为 (1)业务逻辑要处理的地方比较多, 枝节很多,而transformation 多是一些算法,真正用到的可能只有一两个. (2)transformation 针对集群的,也就是集群计算大批量数据的映射,往往聚焦在计算上,而不是业务上.

对于问题(3),针对这个问题,spark官网上有2种思路,构建case clas 和使用StructField 和StructType . 本人写了一个工具类,主要思路是 (1)读取某个csv文件时,获取csv表头第一行的某个字段所在的位置,比如第n列,生成数组1; (2)将要查询的字段放进数组2; (3)将数组2与数组1匹配,记录数组2中的字段在数组1中的位置,最后生成一个新的数组3; (4)数组3就是记录要读写的字段在所有字段数组中的位置,利用数组3,就能不需要使用具体字段名,以及字段的个数等数据, 方便读写数据.

示例代码: 生成字段位置数组函数:

/** * Description:获取csv表头第一行的某个字段所在的位置,比如第n列 * Author: zhouyang * Date 2017/11/14 11:13 * @param header : String * @param columnsNameArr : Array[String] * @return Int */ def getColumnsNameIndexArr(header : String, columnsNameArr : Array[String]) : Array[Int] ={ val tempHeaderArr : Array[String] = header.split(",") var indexArrb = new ArrayBuffer[Int]() if(tempHeaderArr.length>0){ for(j<-0 until columnsNameArr.length){ val columnsNameStrTemp = columnsNameArr(j) var i : Int = 0 breakable { while(i<tempHeaderArr.length){ if(columnsNameStrTemp.equals(tempHeaderArr(i))){ indexArrb+=i break } else{ i+=1 } } } } if(indexArrb.length<1){ //没有匹配的列名 logger.info("getColumnsNameIndex:tempHeaderArr.length==0") indexArrb+:=(-2) return indexArrb.toArray } return indexArrb.toArray } else{ logger.info("getColumnsNameIndex:没有匹配的列名") indexArrb+=(-1) return indexArrb.toArray } }

读写应用(spark1.6版本):

/** * Description: 将csv转成rdd,其中包含每一行的row_key 和每一行关键点数据 * Author: zhouyang * Date 2017/11/29 10:19 * @param csvToRDD:RDD[String] * @param keysName : String * @return DataFrame */ def createRowKeyDataRdd(sparkContext: SparkContext, csvToRDD:RDD[String], keysName : String, rowKeysElementArr : Array[String]) : RDD[Row] = { val sqlContext = new org.apache.spark.sql.SQLContext(sparkContext) // Create an RDD val mainDataRdd = csvToRDD val csvUtils : CsvUtils = new CsvUtils() //获取row_key组成元素列头每个字段的位置,比如第n列 val rowKeysElementIndexArr = csvUtils.getColumnsNameIndexArr(keysName, rowKeysElementArr) //获取列头每个字段的位置,比如第n列 val keysNameIndexArr = csvUtils.getColumnsNameIndexArr(keysName, keysName.split(",")) //row_key组成元素广播变量 rowKeysElementIndexArrBroadcast = sparkContext.broadcast(rowKeysElementIndexArr) //广播变量 colsNameIndexArrBroadcast = sparkContext.broadcast(keysNameIndexArr) //将rdd中数据转成数组 val rowRDD = mainDataRdd.map(_.split(",")) //将csv中某一整行的数组抽取需要的字段,并转成一个字符串 val rowRDD2 = rowRDD.map(attributes => { val myattributes : Array[String] = attributes val rowKeyEleIndexArr : Array[Int] = rowKeysElementIndexArrBroadcast.value var mycolumnsNameDataArrb : ArrayBuffer[String] = new ArrayBuffer[String]() //字段1 val keyNoTemp = myattributes(rowKeyEleIndexArr(0)) //字段2 val keyValueVer = myattributes(rowKeyEleIndexArr(1)) val myNewRow_Key = "S" + keyNoTemp + keyValueVer //添加生成的row_key mycolumnsNameDataArrb+=myNewRow_Key // val mycolumnsNameIndexArr : Array[Int] = colsNameIndexArrBroadcast.value for(i<-0 until mycolumnsNameIndexArr.length){ //添加csv文件中每一列的数值 mycolumnsNameDataArrb+=myattributes(mycolumnsNameIndexArr(i)) } val mycolumnsNameDataArr : Array[String] = mycolumnsNameDataArrb.toArray mycolumnsNameDataArr }).map(x => Row(x)).cache() rowRDD2 }

以上是一个写hbase的一个方法,不过里面的字段数组的应用正是以上的思路. 一般字段类型尽量使用基本数据类型,比如Int,Boolean,Byte, String 反而不要用太多,因为String拼接效率需要考虑. 本人这里是统一使用String,主要减少数据互转出现异常报错情况. 以上,算是对问题(3)的一点思考.

对于问题(4),这个就需要根据具体业务场景去避免使用action操作. 实际开发中,却又需要数据的一个中间计算结果,因此不得不action,这经常让本人感到矛盾. 比如,在写一个通用的广播变量的工具类.就是将计算生成的中间数据,生成一个二维数组,转成一个广播变量. 这个工具类要求是: (1)读取csv文件效率要高; (2)数据转成广播变量效率要高;

因此,在优化该工具类上面,本人主要使用以下手段: (1)读取csv使用了多个executor core, 比如15个; (2)spark常见配置调优; (3)尽量用基本数据格式转换,比如array, arraybuffer. (3)使用了以上的字段放到数组里面,查询字段位置的思路; (4)其中有一步,生成的是Array[Row] ,取Row里面的数据,采用:

//获取row val row_temp : Row = newRowArr2(j) //获取row里面的数组 var row_tempStrArr = row_temp(0).asInstanceOf[Array[String]]

光是改成这一步,就快了0.5秒,当时比较激动. (5)使用tuple 作为函数返回格式,这样可以一次返回多个值,比如:

(myNewBroadcast, csvRDD)

这样可以反复利用同一个中间变量,不用重新执行,而且减少了中间步骤,确实节省了时间.

(6)减少action操作次数; (7)某些能在单机执行的就在单机执行, 不使用transformation 操作; (8)高度封装,执行结果环环相扣.这个视具体代码写法.

可能还有其它的优化思路没有想到,但以上的思路中,就有”减少action操作次数”这一项,在为了减少这个同时,也想出一些新的优化思路.

问题(5):实测还发现, 数据在集群上transformation 操作时,比如map()操作,有时耗时反而比不使用transformation 操作,仅仅单机执行的高,而且耗时高不少,达到远高的程度.

这个本人理解,跟要处理的数据有关.如果某个步骤要处理的数据是大批连续的,就是比如一个广播变量,这个广播变量有100M-1G或更大时,是适合用集群来算,而反之,如果我们已经把数据分成一片片,每片数据都不大时,最好用纯单机去实现.或者传过来的中间变量不大,实测是这样的.也最好用纯单机去实现.这个中间数据规模评估标准,需要根据实际情况. 还有,如果整个计算过程是有很多步骤,而不是一个大步骤,也适合去用纯单机去实现.

今天就写到这里吧.

转载请注明原文地址: https://www.6miu.com/read-2150194.html

最新回复(0)