若余相思 发表于 2018-5-31 22:32:37

大数据mapreduce疑惑

Parameter接口:

package com.xiaohong.TongJi;

public interface Parameter {
        /**
       * LenFields is the length of fields, or columns; it need to be modified for varied
       */
        public final int LenFields = 4;
        /**
       * iFields for min()
       * iFields for max()
       * iFields for sum()
       * iFields for sum2()
       */
        public final int LenStat = 4;

        public final int iMin = 0x80000000;
        public final int iMax = 0x7FFFFFFF;
        public final float fMin = (float) -3.4E38;
        public final float fMax = (float) 3.4E38;
}


map端:

package com.xiaohong.TongJi;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class TongJiMapper extends Mapper<LongWritable, Text, IntWritable, FloatWritable> implements Parameter {

        /**
       * The iFields[][] is the Array for Keys to present the min(), max(), sum()
       * of each field in the input. iFields for min() iFields for max()
       * iFields for sum() iFields for sum2()
       */

        private static IntWritable iFields[][] = new IntWritable;
        private static float min[] = new float;
        private static float max[] = new float;
        private static IntWritable iwCnt = new IntWritable(LenStat * LenFields);
        private final static FloatWritable one = new FloatWritable(1);

        public TongJiMapper() {
                for (int i = 0; i < LenStat; i++) {
                        for (int j = 0; j < LenFields; j++) {
                                iFields = new IntWritable(i * LenFields + j);
                        }
                }

                for (int j = 0; j < LenFields; j++) {
                        min = fMax; // the maximum integer
                        max = fMin; // the minimum integer
                }

        }

        @Override
        protected void map(LongWritable key, Text values,
                        Mapper<LongWritable, Text, IntWritable, FloatWritable>.Context context)
                        throws IOException, InterruptedException {

                /*
               * 原始数据:
               *         9.0000000e+000        1.0000000e+000        2.0000000e+000        4.0000000e+000
                        3.0000000e+000        8.0000000e+000        4.0000000e+000        8.0000000e+000
                        6.0000000e+000        5.0000000e+000        9.0000000e+000        1.0000000e+000
                        5.0000000e+000        6.0000000e+000        9.0000000e+000        2.0000000e+000
                        9.0000000e+000        8.0000000e+000        4.0000000e+000        2.0000000e+000
                        7.0000000e+000        9.0000000e+000        9.0000000e+000        2.0000000e+000
                        5.0000000e+000        7.0000000e+000        1.0000000e+000        6.0000000e+000
               */
                StringTokenizer st = new StringTokenizer(values.toString().toLowerCase(), " \t,;");
                float iTmp;
                for (int j = 0; j < LenFields; j++) {
                        /** handle each field. */
                        iTmp = Float.parseFloat(st.nextToken());
                        /**
                       * for min(), this judgement just output
                       * about 37 <key,value> pairs in 100,000
                       * records.
                       */
                       
                        if (min > iTmp) {
                                min = iTmp;
                                context.write(iFields, new FloatWritable(min));
                        }
                        if (max < iTmp) { /** for max() */
                                max = iTmp;
                                context.write(iFields, new FloatWritable(max));
                        }
                        context.write(iFields, new FloatWritable(iTmp));
                        /** for sum() */
                        context.write(iFields, new FloatWritable(iTmp * iTmp));/** for sum2() */
                }
                context.write(iwCnt, one); /** for cnt() */

        }

}



reducer端:


package com.xiaohong.TongJi;

import java.io.IOException;

import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;

public class TongJiReducer extends Reducer<IntWritable, FloatWritable, IntWritable, FloatWritable>
                implements Parameter {

        @Override
        protected void reduce(IntWritable K, Iterable<FloatWritable> values,
                        Reducer<IntWritable, FloatWritable, IntWritable, FloatWritable>.Context context)
                        throws IOException, InterruptedException {

                float min = iMax;
                float max = iMin;
                float sum = 0;
                long iCnt = 0;
                int iCategory = K.get() / LenFields; // restore the category from Keys.
                switch (iCategory) {
                case 0:/** min() */
                        for (FloatWritable value : values) {
                                if (min > value.get()) {
                                        min = value.get();
                                }
                        }
                        context.write(K, new FloatWritable(min));
                        break;
                case 1:/** max() */
                        for (FloatWritable value : values) {
                                if (max < value.get()) {
                                        max = value.get();
                                }
                        }
                        context.write(K, new FloatWritable(max));
                        break;
                case 2:/** sum() */
                        for (FloatWritable value : values) {
                                sum += value.get();
                        }
                        context.write(K, new FloatWritable(sum));
                        break;
                case 3: /** sum2() */
                        for (FloatWritable value : values) {
                                sum += value.get();
                        }
                        context.write(K, new FloatWritable(sum));
                        break;
                case 4:
                        for (FloatWritable value : values) {
                                iCnt += value.get();
                        }
                        context.write(K, new FloatWritable(iCnt));
                        break;
                } // switch

        }

}



yarn客户端:

package com.xiaohong.TongJi;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class JobSubMitter {

        public static void main(String[] args) throws Exception {

                Configuration conf = new Configuration();
                Job job = Job.getInstance(conf);

                job.setJarByClass(JobSubMitter.class);

                job.setMapperClass(TongJiMapper.class);
                job.setReducerClass(TongJiReducer.class);

                job.setMapOutputKeyClass(IntWritable.class);
                job.setMapOutputValueClass(FloatWritable.class);

                job.setOutputKeyClass(IntWritable.class);
                job.setOutputValueClass(FloatWritable.class);

                job.setInputFormatClass(TextInputFormat.class);
                FileInputFormat.setInputPaths(job, new Path("d:/mrData/TongJi/input"));

                job.setOutputFormatClass(TextOutputFormat.class);
                FileOutputFormat.setOutputPath(job, new Path("d:/mrData/TongJi/output"));

                boolean res = job.waitForCompletion(true);
                System.exit(res ? 0 : 1);

        }

}





问题:我想问下当reducer端执行到这句代码的时候"int iCategory = K.get()",怎么将K转换成0~4了呢?,K不是0~15吗,难道调用.get()方法有什么玄机吗?
页: [1]
查看完整版本: 大数据mapreduce疑惑