本文共 3605 字,大约阅读时间需要 12 分钟。
package topN_hadoop1;import java.io.IOException;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.mapreduce.Mapper;public class AggregateByKeyMapper extends Mapper
package topN_hadoop1;import java.io.IOException;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.mapreduce.Reducer;public class AggregateByKeyReducer extends Reducer{ @Override public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable value : values) { sum += value.get(); } context.write(key, new IntWritable(sum)); }}
package topN_hadoop1;import org.apache.log4j.Logger;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class AggregateByKeyDriver extends Configured implements Tool { private static Logger THE_LOGGER = Logger.getLogger(AggregateByKeyDriver.class); public int run(String[] args) throws Exception { Job job = new Job(getConf()); HadoopUtil.addJarsToDistributedCache(job, "/lib/"); job.setJobName("AggregateByKeyDriver"); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setMapperClass(AggregateByKeyMapper.class); job.setReducerClass(AggregateByKeyReducer.class); job.setCombinerClass(AggregateByKeyReducer.class); // args[0] = input directory // args[1] = output directory FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); boolean status = job.waitForCompletion(true); THE_LOGGER.info("run(): status="+status); return status ? 0 : 1; } /** * The main driver for "Aggregate By Key" program. * Invoke this method to submit the map/reduce job. * @throws Exception When there is communication problems with the job tracker. */ public static void main(String[] args) throws Exception { // Make sure there are exactly 2 parameters if (args.length != 2) { THE_LOGGER.warn("usage AggregateByKeyDriver
转载地址:http://dkqrb.baihongyu.com/