鱼C论坛

 找回密码
 立即注册
查看: 1509|回复: 0

大数据mapreduce疑惑

[复制链接]
发表于 2018-5-31 22:32:37 | 显示全部楼层 |阅读模式

马上注册,结交更多好友,享用更多功能^_^

您需要 登录 才可以下载或查看,没有账号?立即注册

x
Parameter接口:
package com.xiaohong.TongJi;

public interface Parameter {
        /**
         * LenFields is the length of fields, or columns; it need to be modified for varied
         */
        public final int LenFields = 4;
        /**
         * iFields[0] for min() 
         * iFields[1] for max() 
         * iFields[2] for sum() 
         * iFields[3] for sum2()
         */
        public final int LenStat = 4;

        public final int iMin = 0x80000000;
        public final int iMax = 0x7FFFFFFF;
        public final float fMin = (float) -3.4E38;
        public final float fMax = (float) 3.4E38;
}

map端:
package com.xiaohong.TongJi;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class TongJiMapper extends Mapper<LongWritable, Text, IntWritable, FloatWritable> implements Parameter {

        /**
         * The iFields[][] is the Array for Keys to present the min(), max(), sum()
         * of each field in the input. iFields[0] for min() iFields[1] for max()
         * iFields[2] for sum() iFields[3] for sum2()
         */

        private static IntWritable iFields[][] = new IntWritable[LenStat][LenFields];
        private static float min[] = new float[LenFields];
        private static float max[] = new float[LenFields];
        private static IntWritable iwCnt = new IntWritable(LenStat * LenFields);
        private final static FloatWritable one = new FloatWritable(1);

        public TongJiMapper() {
                for (int i = 0; i < LenStat; i++) {
                        for (int j = 0; j < LenFields; j++) {
                                iFields[i][j] = new IntWritable(i * LenFields + j);
                        }
                }

                for (int j = 0; j < LenFields; j++) {
                        min[j] = fMax; // the maximum integer
                        max[j] = fMin; // the minimum integer
                }

        }

        @Override
        protected void map(LongWritable key, Text values,
                        Mapper<LongWritable, Text, IntWritable, FloatWritable>.Context context)
                        throws IOException, InterruptedException {

                /*
                 * 原始数据:
                 *         9.0000000e+000        1.0000000e+000        2.0000000e+000        4.0000000e+000
                        3.0000000e+000        8.0000000e+000        4.0000000e+000        8.0000000e+000
                        6.0000000e+000        5.0000000e+000        9.0000000e+000        1.0000000e+000
                        5.0000000e+000        6.0000000e+000        9.0000000e+000        2.0000000e+000
                        9.0000000e+000        8.0000000e+000        4.0000000e+000        2.0000000e+000
                        7.0000000e+000        9.0000000e+000        9.0000000e+000        2.0000000e+000
                        5.0000000e+000        7.0000000e+000        1.0000000e+000        6.0000000e+000
                 */
                StringTokenizer st = new StringTokenizer(values.toString().toLowerCase(), " \t,;");
                float iTmp;
                for (int j = 0; j < LenFields; j++) {
                        /** handle each field. */
                        iTmp = Float.parseFloat(st.nextToken());
                        /**
                         * for min(), this judgement just output
                         * about 37 <key,value> pairs in 100,000
                         * records.
                         */
                        
                        if (min[j] > iTmp) { 
                                min[j] = iTmp;
                                context.write(iFields[0][j], new FloatWritable(min[j]));
                        }
                        if (max[j] < iTmp) { /** for max() */
                                max[j] = iTmp;
                                context.write(iFields[1][j], new FloatWritable(max[j]));
                        }
                        context.write(iFields[2][j], new FloatWritable(iTmp));
                        /** for sum() */
                        context.write(iFields[3][j], new FloatWritable(iTmp * iTmp));/** for sum2() */
                }
                context.write(iwCnt, one); /** for cnt() */

        }

}


reducer端:


package com.xiaohong.TongJi;

import java.io.IOException;

import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;

public class TongJiReducer extends Reducer<IntWritable, FloatWritable, IntWritable, FloatWritable>
                implements Parameter {

        @Override
        protected void reduce(IntWritable K, Iterable<FloatWritable> values,
                        Reducer<IntWritable, FloatWritable, IntWritable, FloatWritable>.Context context)
                        throws IOException, InterruptedException {

                float min = iMax;
                float max = iMin;
                float sum = 0;
                long iCnt = 0;
                int iCategory = K.get() / LenFields; // restore the category from Keys.
                switch (iCategory) {
                case 0:/** min() */
                        for (FloatWritable value : values) {
                                if (min > value.get()) {
                                        min = value.get();
                                }
                        }
                        context.write(K, new FloatWritable(min));
                        break;
                case 1:/** max() */
                        for (FloatWritable value : values) {
                                if (max < value.get()) {
                                        max = value.get();
                                }
                        }
                        context.write(K, new FloatWritable(max));
                        break;
                case 2:/** sum() */
                        for (FloatWritable value : values) {
                                sum += value.get();
                        }
                        context.write(K, new FloatWritable(sum));
                        break;
                case 3: /** sum2() */
                        for (FloatWritable value : values) {
                                sum += value.get();
                        }
                        context.write(K, new FloatWritable(sum));
                        break;
                case 4:
                        for (FloatWritable value : values) {
                                iCnt += value.get();
                        }
                        context.write(K, new FloatWritable(iCnt));
                        break;
                } // switch

        }

}


yarn客户端:

package com.xiaohong.TongJi;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class JobSubMitter {

        public static void main(String[] args) throws Exception {

                Configuration conf = new Configuration();
                Job job = Job.getInstance(conf);

                job.setJarByClass(JobSubMitter.class);

                job.setMapperClass(TongJiMapper.class);
                job.setReducerClass(TongJiReducer.class);

                job.setMapOutputKeyClass(IntWritable.class);
                job.setMapOutputValueClass(FloatWritable.class);

                job.setOutputKeyClass(IntWritable.class);
                job.setOutputValueClass(FloatWritable.class);

                job.setInputFormatClass(TextInputFormat.class);
                FileInputFormat.setInputPaths(job, new Path("d:/mrData/TongJi/input"));

                job.setOutputFormatClass(TextOutputFormat.class);
                FileOutputFormat.setOutputPath(job, new Path("d:/mrData/TongJi/output"));

                boolean res = job.waitForCompletion(true);
                System.exit(res ? 0 : 1);

        }

}




问题:我想问下当reducer端执行到这句代码的时候"int iCategory = K.get()",怎么将K转换成0~4了呢?,K不是0~15吗,难道调用.get()方法有什么玄机吗?
想知道小甲鱼最近在做啥?请访问 -> ilovefishc.com
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

小黑屋|手机版|Archiver|鱼C工作室 ( 粤ICP备18085999号-1 | 粤公网安备 44051102000585号)

GMT+8, 2025-1-23 07:23

Powered by Discuz! X3.4

© 2001-2023 Discuz! Team.

快速回复 返回顶部 返回列表