鱼C论坛

 找回密码
 立即注册
查看: 2785|回复: 0

[学习笔记] 大数据spark单词统计

[复制链接]
发表于 2019-4-11 09:27:29 | 显示全部楼层 |阅读模式

马上注册,结交更多好友,享用更多功能^_^

您需要 登录 才可以下载或查看,没有账号?立即注册

x
本帖最后由 DAY 于 2019-4-11 09:29 编辑

  1. import java.util.Arrays;
  2. import java.util.Iterator;

  3. import org.apache.spark.SparkConf;
  4. import org.apache.spark.api.java.JavaPairRDD;
  5. import org.apache.spark.api.java.JavaRDD;
  6. import org.apache.spark.api.java.JavaSparkContext;
  7. import org.apache.spark.api.java.function.FlatMapFunction;
  8. import org.apache.spark.api.java.function.Function2;
  9. import org.apache.spark.api.java.function.PairFunction;
  10. import org.apache.spark.api.java.function.VoidFunction;

  11. import scala.Tuple2;

  12. public class JavaSparkWordCount {

  13.         public static void main(String[] args) {
  14.                 /**
  15.                  * conf
  16.                  * 1.可以设置spark的运行模式
  17.                  * 2.可以设置spark在 webui中显示的application的名称
  18.                  * 3.可以设置当前spark application运行资源
  19.                  *
  20.                  * spark运行模式
  21.                  * 1.local -- 在eclipse,IDEA中开发spark程序要用local模式,本地模式,多用于测试
  22.                  * 2.stanalone -- spark自带的资源调度框架,支持分布式搭建spark任务可以依赖stanalone调度资源
  23.                  * 3.yarn -- hadoop生态圈中资源调度框架,spark也可以基于yarn调度资源
  24.                  * 4.mesos -- 资源调度框架
  25.                  * */
  26.                 SparkConf conf = new SparkConf();
  27.                 conf.setMaster("local");
  28.                 conf.setAppName("JavaSparkWordCount");
  29.                
  30.                 JavaSparkContext sc = new JavaSparkContext(conf);
  31.                
  32.                 /**
  33.                  * sc.textFile 读取文件
  34.                  *
  35.                  * */
  36.                 JavaRDD<String> lines = sc.textFile("./words");
  37.                
  38.                 /**
  39.                  * flatMap 进入一条数据,出多条数据
  40.                  * */
  41.                 JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String,String>(){

  42.                         /**
  43.                          *
  44.                          */
  45.                         private static final long serialVersionUID = 1L;

  46.                         @Override
  47.                         public Iterator<String> call(String line) throws Exception {
  48.                                 // TODO Auto-generated method stub
  49.                                 return Arrays.asList(line.split(" ")).iterator();
  50.                         }
  51.                        
  52.                 });
  53.                
  54.                        
  55.                 /**
  56.                  * 在Java中如果想让某个RDD转换成K,V格式使用xxxToPair
  57.                  * */
  58.                 JavaPairRDD<String, Integer> pairWords = words.mapToPair(new PairFunction<String,String,Integer>(){

  59.                         /**
  60.                          *
  61.                          */
  62.                         private static final long serialVersionUID = 1L;

  63.                         @Override
  64.                         public Tuple2<String, Integer> call(String word) throws Exception {
  65.                                 // TODO Auto-generated method stub
  66.                                 return new Tuple2(word,1);
  67.                         }
  68.                        
  69.                 });
  70.                
  71.                
  72.                 /**
  73.                  * reduceByKey
  74.                  * 1.先将相同的key分组
  75.                  * 2.对每个组的key对应的value去按照你的逻辑去处理
  76.                  * */
  77.                 JavaPairRDD<String, Integer> result = pairWords.reduceByKey(new Function2<Integer,Integer,Integer>(){

  78.                         /**
  79.                          *
  80.                          */
  81.                         private static final long serialVersionUID = 1L;

  82.                         @Override
  83.                         public Integer call(Integer v1, Integer v2) throws Exception {
  84.                                 return v1+v2;
  85.                         }});
  86.                
  87.                
  88.                
  89.                 result.foreach(new VoidFunction<Tuple2<String,Integer>>(){

  90.                         /**
  91.                          *
  92.                          */
  93.                         private static final long serialVersionUID = 1L;

  94.                         @Override
  95.                         public void call(Tuple2<String, Integer> tuple) throws Exception {
  96.                                 System.out.println(tuple);
  97.                         }
  98.                        
  99.                 });
  100.                
  101.                 sc.stop();
  102.         }

  103. }
复制代码
小甲鱼最新课程 -> https://ilovefishc.com
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

小黑屋|手机版|Archiver|鱼C工作室 ( 粤ICP备18085999号-1 | 粤公网安备 44051102000585号)

GMT+8, 2025-6-7 19:47

Powered by Discuz! X3.4

© 2001-2023 Discuz! Team.

快速回复 返回顶部 返回列表