Apache Spark示例:Java中的字数统计程序
Apache Spark
Apache Spark是一个开放源数据处理框架,可以在分布式环境中对大数据执行分析操作。
这是UC Berkley的一个学术项目,最初由Matei Zaharia在UC Berkeley的AMPLab于2009年发起。
Apache Spark是在称为Mesos的集群管理工具之上创建的。
后来对其进行了修改和升级,使其可以在具有分布式处理的基于集群的环境中工作。
Apache Spark示例项目设置
我们将使用Maven创建一个示例项目进行演示。
要创建项目,请在将用作工作空间的目录中执行以下命令:
mvn archetype:generate -DgroupId=com.theitroad.sparkdemo -DartifactId=JD-Spark-WordCount -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false
如果您是第一次运行maven,则完成生成命令将需要几秒钟,因为maven必须下载所有必需的插件和工件才能完成生成任务。
创建项目后,请随时在您喜欢的IDE中打开它。
下一步是向项目添加适当的Maven依赖关系。
这是带有适当依赖项的pom.xml文件:
<dependencies>
<!-- Import Spark -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<classpathPrefix>lib/</classpathPrefix>
<mainClass>com.geekcap.javaworld.sparkexample.WordCount</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy</id>
<phase>install</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
由于这是一个基于Maven的项目,因此实际上不需要在计算机上安装和设置Apache Spark。
当我们运行该项目时,将启动Apache Spark的运行时实例,一旦程序执行完毕,它将关闭。
最后,要了解添加此依赖项时添加到项目中的所有JAR,我们可以运行一个简单的Maven命令,当我们向其添加一些依赖项时,该命令使我们能够查看项目的完整依赖关系树。
这是我们可以使用的命令:
mvn dependency:tree
当我们运行此命令时,它将向我们显示以下依赖关系树:
shubham:JD-Spark-WordCount shubham$mvn dependency:tree [INFO] Scanning for projects... [WARNING] [WARNING] Some problems were encountered while building the effective model for com.theitroad:java-word-count:jar:1.0-SNAPSHOT [WARNING] 'build.plugins.plugin.version' for org.apache.maven.plugins:maven-jar-plugin is missing. @ line 41, column 21 [WARNING] [WARNING] It is highly recommended to fix these problems because they threaten the stability of your build. [WARNING] [INFO] | \- org.spark-project.spark:unused:jar:1.0.0:compile [INFO] \- junit:junit:jar:4.11:test [INFO] \- org.hamcrest:hamcrest-core:jar:1.3:test [INFO] ----------------------------------------------------------------------- [INFO] BUILD SUCCESS [INFO] ----------------------------------------------------------------------- [INFO] Total time: 2.987 s [INFO] Finished at: 2016-04-07T15:50:34+05:30 [INFO] -----------------------------------------------------------------------
仅添加了两个依赖关系,Spark收集了项目中所有必需的依赖关系,其中包括Scala依赖关系以及Apache Spark是用Scala本身编写的。
创建输入文件
在创建Word Counter程序时,我们将在项目的根目录中为项目创建一个示例输入文件,名称为input.txt。
将任何内容放入其中,我们使用以下文本:
Hello, my name is Shubham and I am author at theitroad . theitroad is a great website to ready great lessons about Java, Big Data, Python and many more Programming languages. Big Data lessons are difficult to find but at theitroad , you can find some excellent pieces of lessons written on Big Data.
随时使用此文件中的任何文本。
项目结构
在继续进行并开始处理项目代码之前,让我们在此介绍完成所有代码添加到项目后将拥有的项目结构:Project Structure
创建WordCounter
现在,我们准备开始编写程序了。
当您开始使用大数据程序时,导入会造成很多混乱。
为避免这种情况,以下是我们将在项目中使用的所有导入:
import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import scala.Tuple2; import java.util.Arrays;
接下来,这是我们将使用的类的结构:
package com.theitroad.sparkdemo;
...imports...
public class WordCounter {
private static void wordCount(String fileName) {
...
}
public static void main(String[] args) {
...
}
}
所有的逻辑都将在wordCount方法内。
我们将从为" SparkConf"类定义一个对象开始。
此类的对象用于将各种Spark参数设置为程序的键值对。
我们只提供简单的参数:
SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("JD Word Counter");
" master"指定本地,这意味着该程序应连接到在" localhost"上运行的Spark线程。
应用程序名称只是向Spark提供应用程序元数据的一种方式。
现在,我们可以使用以下配置对象构造一个Spark Context对象:
JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
Spark将要处理的每个资源都视为RDD(弹性分布式数据集),这有助于其将数据组织到查找数据结构中,从而可以更有效地进行分析。
现在,我们将输入文件转换为JavaRDD对象本身:
JavaRDD<String> inputFile = sparkContext.textFile(fileName);
现在,我们将使用Java 8 API处理JavaRDD文件,并将文件中包含的单词拆分为单独的单词:
JavaRDD<String> wordsFromFile = inputFile.flatMap(content -> Arrays.asList(content.split(" ")));
同样,我们利用Java 8的mapToPair(...)方法对单词进行计数,并提供一个"单词,数字"对,可以将其显示为输出:
JavaPairRDD countData = wordsFromFile.mapToPair(t -> new Tuple2(t, 1)).reduceByKey((x, y) -> (int) x + (int) y);
现在,我们可以将输出文件另存为文本文件:
countData.saveAsTextFile("CountData");
最后,我们可以使用main()方法为程序提供入口点:
public static void main(String[] args) {
if (args.length == 0) {
System.out.println("No files provided.");
System.exit(0);
}
wordCount(args[0]);
}
完整的文件如下所示:
package com.theitroad.sparkdemo;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.util.Arrays;
public class WordCounter {
private static void wordCount(String fileName) {
SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("JD Word Counter");
JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
JavaRDD<String> inputFile = sparkContext.textFile(fileName);
JavaRDD<String> wordsFromFile = inputFile.flatMap(content -> Arrays.asList(content.split(" ")));
JavaPairRDD countData = wordsFromFile.mapToPair(t -> new Tuple2(t, 1)).reduceByKey((x, y) -> (int) x + (int) y);
countData.saveAsTextFile("CountData");
}
public static void main(String[] args) {
if (args.length == 0) {
System.out.println("No files provided.");
System.exit(0);
}
wordCount(args[0]);
}
}
现在,我们将继续使用Maven本身运行该程序。
运行应用程序
要运行该应用程序,请进入程序的根目录并执行以下命令:
mvn exec:java -Dexec.mainClass=com.theitroad.sparkdemo.WordCounter -Dexec.args="input.txt"
在此命令中,我们为Maven提供Main类的完全限定名称以及输入文件的名称。
执行完此命令后,我们可以看到在项目中创建了一个新目录:Project Output Directory
打开目录并其中打开名为" part-00000.txt"的文件时,其内容如下:字计数器输出

