samedi 26 août 2017

Mapper Class and Reducer Class in MapReduce Design pattern

I am a new guy learning MapReduce design patterns and I have some doubt on this code on the Mapper class and Reducer Class design, I have come here to clear my confusion by your kind help.

I am familier with Map Side Joining in MapReduce and I learned this:

public static class CustsMapper extends Mapper<Object, Text, Text, Text> {
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {

Here, in the above code snippet I learned that we extends our class to Mapper class and as the Object is a key and Text is a value so map method take this key-value as an input with the context object helps here like this as an output context.write(new Text(), new Text()) as per the design of the logic body of the code.

BUT MY TWO QUESTIONS ARE GIVEN BELOW,

  1. Why we have extended our class to MapReduceBase(what it does?) and why we have implemented our class to Mapper(I knew it's a class but in the web somewhere its showing as an interface so if I extends it to org.apache.hadoop.mapreduce.Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT> class what is the problem?
  2. In the map function what does OutputCollector<Text, IntWritable> output, Reporter reporter ?? I didn't know it? I knew that Context context should be here but what is OutputCollector and Reporter here?

I was doing this program given below:

Input:

1979   23   23   2   43   24   25   26   26   26   26   25   26  25 
1980   26   27   28  28   28   30   31   31   31   30   30   30  29 
1981   31   32   32  32   33   34   35   36   36   34   34   34  34 
1984   39   38   39  39   39   41   42   43   40   39   38   38  40 
1985   38   39   39  39   39   41   41   41   00   40   39   39  45 

Code:

package hadoop; 

import java.util.*; 

import java.io.IOException; 
import java.io.IOException; 

import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.conf.*; 
import org.apache.hadoop.io.*; 
import org.apache.hadoop.mapred.*; 
import org.apache.hadoop.util.*; 

public class ProcessUnits 
{ 
   //Mapper class 
   public static class E_EMapper extends MapReduceBase implements Mapper<LongWritable ,Text,Text,IntWritable>       
   {       
      //Map function 
      public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException 
      { 
         String line = value.toString(); 
         String lasttoken = null; 
         StringTokenizer s = new StringTokenizer(line,"\t"); 
         String year = s.nextToken(); 

         while(s.hasMoreTokens())
            {
               lasttoken=s.nextToken();
            } 

         int avgprice = Integer.parseInt(lasttoken); 
         output.collect(new Text(year), new IntWritable(avgprice)); 
      } 
   } 


   //Reducer class 
   public static class E_EReduce extends MapReduceBase implements Reducer< Text, IntWritable, Text, IntWritable> 
   {     
      //Reduce function 
      public void reduce( Text key, Iterator <IntWritable> values, 
         OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException 
         { 
            int maxavg=30; 
            int val=Integer.MIN_VALUE; 

            while (values.hasNext()) 
            { 
               if((val=values.next().get())>maxavg) 
               { 
                  output.collect(key, new IntWritable(val)); 
               } 
            } 

         } 
   }  


   //Main function 
   public static void main(String args[])throws Exception 
   { 
      JobConf conf = new JobConf(ProcessUnits.class); 

      conf.setJobName("max_eletricityunits"); 
      conf.setOutputKeyClass(Text.class);
      conf.setOutputValueClass(IntWritable.class); 
      conf.setMapperClass(E_EMapper.class); 
      conf.setCombinerClass(E_EReduce.class); 
      conf.setReducerClass(E_EReduce.class); 
      conf.setInputFormat(TextInputFormat.class); 
      conf.setOutputFormat(TextOutputFormat.class); 

      FileInputFormat.setInputPaths(conf, new Path(args[0])); 
      FileOutputFormat.setOutputPath(conf, new Path(args[1])); 

      JobClient.runJob(conf); 
   } 
} 

Output:

1981    34 
1984    40 
1985    45 

Aucun commentaire:

Enregistrer un commentaire