Spark2.1.0官方文档:快速开始-Quick Start

xiaoxiao2021-02-27  388

本教程对于Spark的使用 进行了简单的介绍。我们首先通过Spark的交互式shell环境介绍一些基础API的使用,然后展示如何使用Scala语言编写一个具体的应用。如果想要更详细的教程,请移步编程指南。 如果想要亲自试试本篇中的例子,您需要从Spark官网下载最新的Spark安装包,由于本教程不涉及HDFS,所以可以下载对应任意hadoop版本的预编译包。 利用Spark Shell进行交互式分析 基本操作 Spark shell 提供了一种简单的方式学习API,同时也是一个功能强大的交互式数据分析工具。提供了Scala和Pyhton两种方式。你可以在Spark安装目录下用如下命令启动Spark Shell ./bin/spark-shell Spark的主要抽象是称为弹性分布式数据集(RDD)的分布式数据集合。 RDD可以从Hadoop InputFormats(如HDFS文件)创建,也可以通过转换其他RDD来创建。 我们利用Spark安装目录中的README文件来创建一个新的RDD: [java] view plain copy scala> val textFile = sc.textFile("README.md")  textFile: org.apache.spark.rdd.RDD[String] = README.md MapPartitionsRDD[1] at textFile at <console>:25   对RDD的操作分为两种,分别为:返回某个值的action操作和返回一个指向新RDD的指针的transformation操作,我们先执行几个action操作: [java] view plain copy scala> textFile.count() // Number of items in this RDD  res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs    scala> textFile.first() // First item in this RDD  res1: String = # Apache Spark   现在我们使用一个transformation操作:filter,来获取一个表示原文件的子集的新的RDD。 [java] view plain copy scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))  linesWithSpark: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at filter at <console>:27   我们也可以将action操作和transformation操作串联起来: [java] view plain copy scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"?  res3: Long = 15   更多关于RDD的操作 RDDaction操作和transformation操作可用于更复杂的计算。 假设我们想找到有最多的单词的行: [java] view plain copy scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)  res4: Long = 15   上述操作首先将每一行映射到一个整数,这会创建一个新的RDD,在RDD上调用reduce操作来找到单词最多的那一行。map和reduce操作的参数是Scala函数的字面量(闭包),这里你可以使用任何语言特性或者Scala/Java的库。例如:我们可以随意调用其它地方声明的函数,在下面的例子中我们调用了java.lang.Math中声明的max()方法: [java] view plain copy scala> import java.lang.Math  import java.lang.Math    scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))  res5: Int = 15   一个常见的数据处理模型是MapReduce,比如Hadoop。在Spark中也可以轻松实现MapReduce的数据处理逻辑: [java] view plain copy scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)  wordCounts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[8] at reduceByKey at <console>:28   在上面的例子中,我们结合了flatMap,map和reduceByKey这三个transformation操作将文件转换成包含一个个(String,int)对的RDD,然后统计每个单词的出现次数,我们可以使用collect操作将RDD转化为本地数组: [java] view plain copy scala> wordCounts.collect()  res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)   缓存 Spark还支持将数据集缓存到集群的内存中。 当数据被重复访问时,例如当查询小的“热”数据集或运行像PageRank这样的迭代算法时,这是非常有用的。 作为一个简单的例子,下面对 lineWithSpark数据集进行缓存标记: [java] view plain copy scala> linesWithSpark.cache()  res7: linesWithSpark.type = MapPartitionsRDD[2] at filter at <console>:27    scala> linesWithSpark.count()  res8: Long = 15    scala> linesWithSpark.count()  res9: Long = 15   使用Spark对100行文本文件进行统计和缓存似乎很愚蠢。 有趣的是,这些相同的功能可以在非常大的数据集上使用,即使它们在数十或数百个节点上进行条带化(译者注:条带化是指将一个很大的数据集分割成很多小部分,分别存储到不同节点上)。 您也可以通过将bin/spark-shell连接到群集来进行交互操作。 独立的应用程序 假设我们希望使用Spark API编写一个独立的应用程序。 我们将基于Scala(使用sbt)创建一个简单的应用程序。 我们将利用Scala中创建一个非常简单的Spark应用程序,名为SimpleApp.scala: [java] view plain copy /* SimpleApp.scala */  import org.apache.spark.SparkContext  import org.apache.spark.SparkContext._  import org.apache.spark.SparkConf    object SimpleApp {    def main(args: Array[String]) {      val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system      val conf = new SparkConf().setAppName("Simple Application")      val sc = new SparkContext(conf)      val logData = sc.textFile(logFile, 2).cache()      val numAs = logData.filter(line => line.contains("a")).count()      val numBs = logData.filter(line => line.contains("b")).count()      println(s"Lines with a: $numAs, Lines with b: $numBs")      sc.stop()    }  }   需要注意的是,应用程序中应该定义一个main()方法,而不是扩展scala.App。 因为scala.App的子类可能无法正常工作。 该程序只计算Spark README文件中包含“a”的行数和包含“b”的行数。请注意,您需要将YOUR_SPARK_HOME替换为安装您的Spark的安装位置。 在Spark Shell中shell会帮我们创建一个名为sc的SparkContext实例,但是在开发独立Spark应用时,我们需要自己在程序中初始化一个SparkContext。 我们需要先创建一个SparkConf对象,这个对象包含应用程序的信息,然后将该对象传递给SparkContext的构造方法。(译者注:如果需要在IDEA、eclipse等开发环境直接运行、测试Spark应用,需要至少设置AppName和master,且master必须设为“local",例如:val conf = new SparkConf().setAppName("yourAppName").setMaster("local") ) 我们的应用是基于Spark API开发,所以必须要引入Spark依赖,可以通过sbt来管理这些依赖: name := "Simple Project" version := "1.0" scalaVersion := "2.11.7" libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0" 当然,maven也是可以的: [html] view plain copy <project>    <groupId>edu.berkeley</groupId>    <artifactId>simple-project</artifactId>    <modelVersion>4.0.0</modelVersion>    <name>Simple Project</name>    <packaging>jar</packaging>    <version>1.0</version>    <dependencies>      <dependency> <!-- Spark dependency -->        <groupId>org.apache.spark</groupId>        <artifactId>spark-core_2.11</artifactId>        <version>2.1.0</version>      </dependency>    </dependencies>  </project>   打包完成后,可以通过spark-submit脚本提交执行Spark应用: # Use spark-submit to run your application $ YOUR_SPARK_HOME/bin/spark-submit \ --class "SimpleApp" \ --master local [ 4 ] \ target/scala-2.11/simple-project_2.11-1.0.jar ... Lines with a: 46 , Lines with b: 23 下一步可以做什么 恭喜您运行您的第一个Spark应用程序! 有关API的深入概述,请从Spark编程指南开始,或参见“编程指南”菜单中的其他组件。 对于在集群上运行应用程序,请转到部署概述。 最后,Spark在examples目录中包含几个示例(Scala,Java,Python,R)。 您可以按如下方式运行它们: # For Scala and Java, use run-example: ./bin/run-example SparkPi # For Python examples, use spark-submit directly: ./bin/spark-submit examples/src/main/python/pi.py # For R examples, use spark-submit directly: ./bin/spark-submit examples/src/main/r/dataframe.R
转载请注明原文地址: https://www.6miu.com/read-1069.html

最新回复(0)