Apache Spark示例:Java中的字数统计程序

时间:2020-02-23 14:29:41  来源:igfitidea点击:

Apache Spark

Apache Spark是一个开放源数据处理框架,可以在分布式环境中对大数据执行分析操作。
这是UC Berkley的一个学术项目,最初由Matei Zaharia在UC Berkeley的AMPLab于2009年发起。
Apache Spark是在称为Mesos的集群管理工具之上创建的。
后来对其进行了修改和升级,使其可以在具有分布式处理的基于集群的环境中工作。

Apache Spark示例项目设置

我们将使用Maven创建一个示例项目进行演示。
要创建项目,请在将用作工作空间的目录中执行以下命令:

mvn archetype:generate -DgroupId=com.theitroad.sparkdemo -DartifactId=JD-Spark-WordCount -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false

如果您是第一次运行maven,则完成生成命令将需要几秒钟,因为maven必须下载所有必需的插件和工件才能完成生成任务。

创建项目后,请随时在您喜欢的IDE中打开它。
下一步是向项目添加适当的Maven依赖关系。
这是带有适当依赖项的pom.xml文件:

<dependencies>

  <!-- Import Spark -->
  <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>1.4.0</version>
  </dependency>

  <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.11</version>
      <scope>test</scope>
  </dependency>

</dependencies>

<build>
  <plugins>
      <plugin>
          <groupId>org.apache.maven.plugins</groupId>
          <artifactId>maven-compiler-plugin</artifactId>
          <version>2.0.2</version>
          <configuration>
              <source>1.8</source>
              <target>1.8</target>
          </configuration>
      </plugin>
      <plugin>
          <groupId>org.apache.maven.plugins</groupId>
          <artifactId>maven-jar-plugin</artifactId>
          <configuration>
              <archive>
                  <manifest>
                      <addClasspath>true</addClasspath>
                      <classpathPrefix>lib/</classpathPrefix>
                      <mainClass>com.geekcap.javaworld.sparkexample.WordCount</mainClass>
                  </manifest>
              </archive>
          </configuration>
      </plugin>
      <plugin>
          <groupId>org.apache.maven.plugins</groupId>
          <artifactId>maven-dependency-plugin</artifactId>
          <executions>
              <execution>
                  <id>copy</id>
                  <phase>install</phase>
                  <goals>
                      <goal>copy-dependencies</goal>
                  </goals>
                  <configuration>
                      <outputDirectory>${project.build.directory}/lib</outputDirectory>
                  </configuration>
              </execution>
          </executions>
      </plugin>
  </plugins>
</build>

由于这是一个基于Maven的项目,因此实际上不需要在计算机上安装和设置Apache Spark。
当我们运行该项目时,将启动Apache Spark的运行时实例,一旦程序执行完毕,它将关闭。

最后,要了解添加此依赖项时添加到项目中的所有JAR,我们可以运行一个简单的Maven命令,当我们向其添加一些依赖项时,该命令使我们能够查看项目的完整依赖关系树。
这是我们可以使用的命令:

mvn dependency:tree

当我们运行此命令时,它将向我们显示以下依赖关系树:

shubham:JD-Spark-WordCount shubham$mvn dependency:tree

[INFO] Scanning for projects...
[WARNING]
[WARNING] Some problems were encountered while building the effective model for com.theitroad:java-word-count:jar:1.0-SNAPSHOT
[WARNING] 'build.plugins.plugin.version' for org.apache.maven.plugins:maven-jar-plugin is missing. @ line 41, column 21
[WARNING]
[WARNING] It is highly recommended to fix these problems because they threaten the stability of your build.
[WARNING]
[INFO] |  \- org.spark-project.spark:unused:jar:1.0.0:compile
[INFO] \- junit:junit:jar:4.11:test
[INFO]    \- org.hamcrest:hamcrest-core:jar:1.3:test
[INFO] -----------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] -----------------------------------------------------------------------
[INFO] Total time: 2.987 s
[INFO] Finished at: 2016-04-07T15:50:34+05:30
[INFO] -----------------------------------------------------------------------

仅添加了两个依赖关系,Spark收集了项目中所有必需的依赖关系,其中包括Scala依赖关系以及Apache Spark是用Scala本身编写的。

创建输入文件

在创建Word Counter程序时,我们将在项目的根目录中为项目创建一个示例输入文件,名称为input.txt。
将任何内容放入其中,我们使用以下文本:

Hello, my name is Shubham and I am author at theitroad . theitroad is a great website to ready
great lessons about Java, Big Data, Python and many more Programming languages.

Big Data lessons are difficult to find but at theitroad , you can find some excellent
pieces of lessons written on Big Data.

随时使用此文件中的任何文本。

项目结构

在继续进行并开始处理项目代码之前,让我们在此介绍完成所有代码添加到项目后将拥有的项目结构:Project Structure

创建WordCounter

现在,我们准备开始编写程序了。
当您开始使用大数据程序时,导入会造成很多混乱。
为避免这种情况,以下是我们将在项目中使用的所有导入:

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.util.Arrays;

接下来,这是我们将使用的类的结构:

package com.theitroad.sparkdemo;

...imports...

public class WordCounter {

  private static void wordCount(String fileName) {
      ...
  }

  public static void main(String[] args) {
      ...
  }
}

所有的逻辑都将在wordCount方法内。
我们将从为" SparkConf"类定义一个对象开始。
此类的对象用于将各种Spark参数设置为程序的键值对。
我们只提供简单的参数:

SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("JD Word Counter");

" master"指定本地,这意味着该程序应连接到在" localhost"上运行的Spark线程。
应用程序名称只是向Spark提供应用程序元数据的一种方式。
现在,我们可以使用以下配置对象构造一个Spark Context对象:

JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);

Spark将要处理的每个资源都视为RDD(弹性分布式数据集),这有助于其将数据组织到查找数据结构中,从而可以更有效地进行分析。
现在,我们将输入文件转换为JavaRDD对象本身:

JavaRDD<String> inputFile = sparkContext.textFile(fileName);

现在,我们将使用Java 8 API处理JavaRDD文件,并将文件中包含的单词拆分为单独的单词:

JavaRDD<String> wordsFromFile = inputFile.flatMap(content -> Arrays.asList(content.split(" ")));

同样,我们利用Java 8的mapToPair(...)方法对单词进行计数,并提供一个"单词,数字"对,可以将其显示为输出:

JavaPairRDD countData = wordsFromFile.mapToPair(t -> new Tuple2(t, 1)).reduceByKey((x, y) -> (int) x + (int) y);

现在,我们可以将输出文件另存为文本文件:

countData.saveAsTextFile("CountData");

最后,我们可以使用main()方法为程序提供入口点:

public static void main(String[] args) {
  if (args.length == 0) {
      System.out.println("No files provided.");
      System.exit(0);
  }
  wordCount(args[0]);
}

完整的文件如下所示:

package com.theitroad.sparkdemo;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.util.Arrays;

public class WordCounter {

  private static void wordCount(String fileName) {

      SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("JD Word Counter");

      JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);

      JavaRDD<String> inputFile = sparkContext.textFile(fileName);

      JavaRDD<String> wordsFromFile = inputFile.flatMap(content -> Arrays.asList(content.split(" ")));

      JavaPairRDD countData = wordsFromFile.mapToPair(t -> new Tuple2(t, 1)).reduceByKey((x, y) -> (int) x + (int) y);

      countData.saveAsTextFile("CountData");
  }

  public static void main(String[] args) {

      if (args.length == 0) {
          System.out.println("No files provided.");
          System.exit(0);
      }

      wordCount(args[0]);
  }
}

现在,我们将继续使用Maven本身运行该程序。

运行应用程序

要运行该应用程序,请进入程序的根目录并执行以下命令:

mvn exec:java -Dexec.mainClass=com.theitroad.sparkdemo.WordCounter -Dexec.args="input.txt"

在此命令中,我们为Maven提供Main类的完全限定名称以及输入文件的名称。
执行完此命令后,我们可以看到在项目中创建了一个新目录:Project Output Directory

打开目录并其中打开名为" part-00000.txt"的文件时,其内容如下:字计数器输出