Spark WordCount的两种方式。

语言:Java

SRE实战 互联网时代守护先锋,助力企业售后服务体系运筹帷幄!一键直达领取阿里云限量特价优惠。

工具:Idea

项目:Java Maven

pom.xml如下:

<properties>
        <spark.version>1.2.0</spark.version>
    </properties>


    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>${spark.version}</version>
        </dependency>
    </dependencies>

第一种方式,比较常规的按部就班的

package pairs; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import scala.Tuple2; import java.util.Arrays; public class WordCount1 { public static void main(String[] args) { SparkConf conf = new SparkConf().setMaster("local").setAppName("wordcount1"); JavaSparkContext sc = new JavaSparkContext(conf); String filename = "D:\\tmp\\words.txt"; JavaRDD<String> input = sc.textFile(filename); JavaRDD<String> lines = input.flatMap(new FlatMapFunction<String, String>() { public Iterable<String> call(String s) throws Exception { return Arrays.asList(s.split(" ")); } }); //pairs
        JavaPairRDD<String,Integer> pairs = lines.mapToPair(new PairFunction<String, String, Integer>() { public Tuple2<String, Integer> call(String s) throws Exception { return new Tuple2<String, Integer>(s,1); } }); //reduce
        JavaPairRDD<String,Integer> counts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() { public Integer call(Integer x, Integer y) throws Exception { return x+y; } }); //output
        counts.foreach(new VoidFunction<Tuple2<String, Integer>>() { public void call(Tuple2<String, Integer> tuple2) throws Exception { System.out.println(tuple2); } }); sc.stop(); } }

代码输出:

(rose,2) (jack,3)

 

第二种更为简洁

package pairs; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import java.util.Arrays; import java.util.Map; public class WordCount2 { public static void main(String[] args) { SparkConf conf = new SparkConf().setMaster("local").setAppName("wordcount2"); JavaSparkContext sc = new JavaSparkContext(conf); String filename = "D:\\tmp\\words.txt"; JavaRDD<String> input = sc.textFile(filename); JavaRDD<String> lines = input.flatMap(new FlatMapFunction<String, String>() { public Iterable<String> call(String s) throws Exception { return Arrays.asList(s.split(" ")); } }); Map<String,Long> result = lines.countByValue(); System.out.println(result); sc.stop(); } }

代码输出:

{rose=2, jack=3}

通过对比可以发现,第一种方式一直都是转化操作,最后打印的是Tuple2;而第二种方式变成了行动操作,直接输出Map<String,Long>。

具体有什么区别,或者效率上有啥不同,待后续深入学习。

 

参考资料:

《Spark快速大数据分析》

 

扫码关注我们
微信号:SRE实战
拒绝背锅 运筹帷幄