博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
MapReduce/Hadoop的TopN解决方案之键不唯一的情况
阅读量:2490 次
发布时间:2019-05-11

本文共 3605 字,大约阅读时间需要 12 分钟。

一、)

二、针对键不唯一的情况,即文件中可能出现多次关键字
解决办法:先讲不唯一键转换为唯一键,即使用MapReduce合并键相同的项,再使用(一)所述的唯一键TopN方案
package topN_hadoop1;import java.io.IOException;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.mapreduce.Mapper;public class AggregateByKeyMapper extends         Mapper
{ private Text K2 = new Text(); private IntWritable V2 = new IntWritable(); @Override public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String valueAsString = value.toString().trim(); String[] tokens = valueAsString.split(","); if (tokens.length != 2) { return; } String url = tokens[0]; int frequency = Integer.parseInt(tokens[1]); K2.set(url); V2.set(frequency); context.write(K2, V2); }}
package topN_hadoop1;import java.io.IOException;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.mapreduce.Reducer;public class AggregateByKeyReducer  extends    Reducer
{ @Override public void reduce(Text key, Iterable
values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable value : values) { sum += value.get(); } context.write(key, new IntWritable(sum)); }}
package topN_hadoop1;import org.apache.log4j.Logger;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class AggregateByKeyDriver  extends Configured implements Tool {   private static Logger THE_LOGGER = Logger.getLogger(AggregateByKeyDriver.class);   public int run(String[] args) throws Exception {      Job job = new Job(getConf());      HadoopUtil.addJarsToDistributedCache(job, "/lib/");      job.setJobName("AggregateByKeyDriver");      job.setInputFormatClass(TextInputFormat.class);      job.setOutputFormatClass(SequenceFileOutputFormat.class);      job.setOutputKeyClass(Text.class);      job.setOutputValueClass(IntWritable.class);      job.setMapperClass(AggregateByKeyMapper.class);      job.setReducerClass(AggregateByKeyReducer.class);      job.setCombinerClass(AggregateByKeyReducer.class);       // args[0] = input directory       // args[1] = output directory      FileInputFormat.setInputPaths(job, new Path(args[0]));      FileOutputFormat.setOutputPath(job, new Path(args[1]));      boolean status = job.waitForCompletion(true);      THE_LOGGER.info("run(): status="+status);      return status ? 0 : 1;   }   /**   * The main driver for "Aggregate By Key" program.   * Invoke this method to submit the map/reduce job.   * @throws Exception When there is communication problems with the job tracker.   */   public static void main(String[] args) throws Exception {      // Make sure there are exactly 2 parameters      if (args.length != 2) {         THE_LOGGER.warn("usage AggregateByKeyDriver  ");         System.exit(1);      }      THE_LOGGER.info("inputDir="+args[0]);      THE_LOGGER.info("outputDir="+args[1]);      int returnStatus = ToolRunner.run(new AggregateByKeyDriver(), args);      System.exit(returnStatus);   }}

转载地址:http://dkqrb.baihongyu.com/

你可能感兴趣的文章
期货市场技术分析04_持续形态
查看>>
期货市场技术分析05_交易量和持仓兴趣
查看>>
TB交易开拓者入门教程
查看>>
TB创建公式应用dll失败 请检查用户权限,终极解决方案
查看>>
python绘制k线图(蜡烛图)报错 No module named 'matplotlib.finance
查看>>
talib均线大全
查看>>
期货市场技术分析06_长期图表和商品指数
查看>>
期货市场技术分析07_摆动指数和相反意见理论
查看>>
满屏的指标?删了吧,手把手教你裸 K 交易!
查看>>
不吹不黑 | 聊聊为什么要用99%精度的数据回测
查看>>
对于模拟交易所引发的思考
查看>>
高频交易的几种策略
查看>>
量化策略回测TRIXKDJ
查看>>
量化策略回测唐安奇通道
查看>>
CTA策略如何过滤部分震荡行情?
查看>>
量化策略回测DualThrust
查看>>
量化策略回测BoolC
查看>>
量化策略回测DCCV2
查看>>
mongodb查询优化
查看>>
五步git操作搞定Github中fork的项目与原作者同步
查看>>