|
马上注册,结交更多好友,享用更多功能^_^
您需要 登录 才可以下载或查看,没有账号?立即注册
x
本帖最后由 DAY 于 2019-4-11 09:29 编辑
- import java.util.Arrays;
- import java.util.Iterator;
- import org.apache.spark.SparkConf;
- import org.apache.spark.api.java.JavaPairRDD;
- import org.apache.spark.api.java.JavaRDD;
- import org.apache.spark.api.java.JavaSparkContext;
- import org.apache.spark.api.java.function.FlatMapFunction;
- import org.apache.spark.api.java.function.Function2;
- import org.apache.spark.api.java.function.PairFunction;
- import org.apache.spark.api.java.function.VoidFunction;
- import scala.Tuple2;
- public class JavaSparkWordCount {
- public static void main(String[] args) {
- /**
- * conf
- * 1.可以设置spark的运行模式
- * 2.可以设置spark在 webui中显示的application的名称
- * 3.可以设置当前spark application运行资源
- *
- * spark运行模式
- * 1.local -- 在eclipse,IDEA中开发spark程序要用local模式,本地模式,多用于测试
- * 2.stanalone -- spark自带的资源调度框架,支持分布式搭建spark任务可以依赖stanalone调度资源
- * 3.yarn -- hadoop生态圈中资源调度框架,spark也可以基于yarn调度资源
- * 4.mesos -- 资源调度框架
- * */
- SparkConf conf = new SparkConf();
- conf.setMaster("local");
- conf.setAppName("JavaSparkWordCount");
-
- JavaSparkContext sc = new JavaSparkContext(conf);
-
- /**
- * sc.textFile 读取文件
- *
- * */
- JavaRDD<String> lines = sc.textFile("./words");
-
- /**
- * flatMap 进入一条数据,出多条数据
- * */
- JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String,String>(){
- /**
- *
- */
- private static final long serialVersionUID = 1L;
- @Override
- public Iterator<String> call(String line) throws Exception {
- // TODO Auto-generated method stub
- return Arrays.asList(line.split(" ")).iterator();
- }
-
- });
-
-
- /**
- * 在Java中如果想让某个RDD转换成K,V格式使用xxxToPair
- * */
- JavaPairRDD<String, Integer> pairWords = words.mapToPair(new PairFunction<String,String,Integer>(){
- /**
- *
- */
- private static final long serialVersionUID = 1L;
- @Override
- public Tuple2<String, Integer> call(String word) throws Exception {
- // TODO Auto-generated method stub
- return new Tuple2(word,1);
- }
-
- });
-
-
- /**
- * reduceByKey
- * 1.先将相同的key分组
- * 2.对每个组的key对应的value去按照你的逻辑去处理
- * */
- JavaPairRDD<String, Integer> result = pairWords.reduceByKey(new Function2<Integer,Integer,Integer>(){
- /**
- *
- */
- private static final long serialVersionUID = 1L;
- @Override
- public Integer call(Integer v1, Integer v2) throws Exception {
- return v1+v2;
- }});
-
-
-
- result.foreach(new VoidFunction<Tuple2<String,Integer>>(){
- /**
- *
- */
- private static final long serialVersionUID = 1L;
- @Override
- public void call(Tuple2<String, Integer> tuple) throws Exception {
- System.out.println(tuple);
- }
-
- });
-
- sc.stop();
- }
- }
复制代码 |
|