Apache Spark示例:Java中的字数统计程序
Apache Spark
Apache Spark是一个开放源数据处理框架,可以在分布式环境中对大数据执行分析操作。
这是UC Berkley的一个学术项目,最初由Matei Zaharia在UC Berkeley的AMPLab于2009年发起。
Apache Spark是在称为Mesos的集群管理工具之上创建的。
后来对其进行了修改和升级,使其可以在具有分布式处理的基于集群的环境中工作。
Apache Spark示例项目设置
我们将使用Maven创建一个示例项目进行演示。
要创建项目,请在将用作工作空间的目录中执行以下命令:
mvn archetype:generate -DgroupId=com.theitroad.sparkdemo -DartifactId=JD-Spark-WordCount -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false
如果您是第一次运行maven,则完成生成命令将需要几秒钟,因为maven必须下载所有必需的插件和工件才能完成生成任务。
创建项目后,请随时在您喜欢的IDE中打开它。
下一步是向项目添加适当的Maven依赖关系。
这是带有适当依赖项的pom.xml
文件:
<dependencies> <!-- Import Spark --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>1.4.0</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>2.0.2</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-jar-plugin</artifactId> <configuration> <archive> <manifest> <addClasspath>true</addClasspath> <classpathPrefix>lib/</classpathPrefix> <mainClass>com.geekcap.javaworld.sparkexample.WordCount</mainClass> </manifest> </archive> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-dependency-plugin</artifactId> <executions> <execution> <id>copy</id> <phase>install</phase> <goals> <goal>copy-dependencies</goal> </goals> <configuration> <outputDirectory>${project.build.directory}/lib</outputDirectory> </configuration> </execution> </executions> </plugin> </plugins> </build>
由于这是一个基于Maven的项目,因此实际上不需要在计算机上安装和设置Apache Spark。
当我们运行该项目时,将启动Apache Spark的运行时实例,一旦程序执行完毕,它将关闭。
最后,要了解添加此依赖项时添加到项目中的所有JAR,我们可以运行一个简单的Maven命令,当我们向其添加一些依赖项时,该命令使我们能够查看项目的完整依赖关系树。
这是我们可以使用的命令:
mvn dependency:tree
当我们运行此命令时,它将向我们显示以下依赖关系树:
shubham:JD-Spark-WordCount shubham$mvn dependency:tree [INFO] Scanning for projects... [WARNING] [WARNING] Some problems were encountered while building the effective model for com.theitroad:java-word-count:jar:1.0-SNAPSHOT [WARNING] 'build.plugins.plugin.version' for org.apache.maven.plugins:maven-jar-plugin is missing. @ line 41, column 21 [WARNING] [WARNING] It is highly recommended to fix these problems because they threaten the stability of your build. [WARNING] [INFO] | \- org.spark-project.spark:unused:jar:1.0.0:compile [INFO] \- junit:junit:jar:4.11:test [INFO] \- org.hamcrest:hamcrest-core:jar:1.3:test [INFO] ----------------------------------------------------------------------- [INFO] BUILD SUCCESS [INFO] ----------------------------------------------------------------------- [INFO] Total time: 2.987 s [INFO] Finished at: 2016-04-07T15:50:34+05:30 [INFO] -----------------------------------------------------------------------
仅添加了两个依赖关系,Spark收集了项目中所有必需的依赖关系,其中包括Scala依赖关系以及Apache Spark是用Scala本身编写的。
创建输入文件
在创建Word Counter程序时,我们将在项目的根目录中为项目创建一个示例输入文件,名称为input.txt。
将任何内容放入其中,我们使用以下文本:
Hello, my name is Shubham and I am author at theitroad . theitroad is a great website to ready great lessons about Java, Big Data, Python and many more Programming languages. Big Data lessons are difficult to find but at theitroad , you can find some excellent pieces of lessons written on Big Data.
随时使用此文件中的任何文本。
项目结构
在继续进行并开始处理项目代码之前,让我们在此介绍完成所有代码添加到项目后将拥有的项目结构:Project Structure
创建WordCounter
现在,我们准备开始编写程序了。
当您开始使用大数据程序时,导入会造成很多混乱。
为避免这种情况,以下是我们将在项目中使用的所有导入:
import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import scala.Tuple2; import java.util.Arrays;
接下来,这是我们将使用的类的结构:
package com.theitroad.sparkdemo; ...imports... public class WordCounter { private static void wordCount(String fileName) { ... } public static void main(String[] args) { ... } }
所有的逻辑都将在wordCount
方法内。
我们将从为" SparkConf"类定义一个对象开始。
此类的对象用于将各种Spark参数设置为程序的键值对。
我们只提供简单的参数:
SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("JD Word Counter");
" master"指定本地,这意味着该程序应连接到在" localhost"上运行的Spark线程。
应用程序名称只是向Spark提供应用程序元数据的一种方式。
现在,我们可以使用以下配置对象构造一个Spark Context对象:
JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
Spark将要处理的每个资源都视为RDD(弹性分布式数据集),这有助于其将数据组织到查找数据结构中,从而可以更有效地进行分析。
现在,我们将输入文件转换为JavaRDD对象本身:
JavaRDD<String> inputFile = sparkContext.textFile(fileName);
现在,我们将使用Java 8 API处理JavaRDD文件,并将文件中包含的单词拆分为单独的单词:
JavaRDD<String> wordsFromFile = inputFile.flatMap(content -> Arrays.asList(content.split(" ")));
同样,我们利用Java 8的mapToPair(...)方法对单词进行计数,并提供一个"单词,数字"对,可以将其显示为输出:
JavaPairRDD countData = wordsFromFile.mapToPair(t -> new Tuple2(t, 1)).reduceByKey((x, y) -> (int) x + (int) y);
现在,我们可以将输出文件另存为文本文件:
countData.saveAsTextFile("CountData");
最后,我们可以使用main()方法为程序提供入口点:
public static void main(String[] args) { if (args.length == 0) { System.out.println("No files provided."); System.exit(0); } wordCount(args[0]); }
完整的文件如下所示:
package com.theitroad.sparkdemo; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import scala.Tuple2; import java.util.Arrays; public class WordCounter { private static void wordCount(String fileName) { SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("JD Word Counter"); JavaSparkContext sparkContext = new JavaSparkContext(sparkConf); JavaRDD<String> inputFile = sparkContext.textFile(fileName); JavaRDD<String> wordsFromFile = inputFile.flatMap(content -> Arrays.asList(content.split(" "))); JavaPairRDD countData = wordsFromFile.mapToPair(t -> new Tuple2(t, 1)).reduceByKey((x, y) -> (int) x + (int) y); countData.saveAsTextFile("CountData"); } public static void main(String[] args) { if (args.length == 0) { System.out.println("No files provided."); System.exit(0); } wordCount(args[0]); } }
现在,我们将继续使用Maven本身运行该程序。
运行应用程序
要运行该应用程序,请进入程序的根目录并执行以下命令:
mvn exec:java -Dexec.mainClass=com.theitroad.sparkdemo.WordCounter -Dexec.args="input.txt"
在此命令中,我们为Maven提供Main类的完全限定名称以及输入文件的名称。
执行完此命令后,我们可以看到在项目中创建了一个新目录:Project Output Directory
打开目录并其中打开名为" part-00000.txt"的文件时,其内容如下:字计数器输出