一、辅助排序

  需求:先有一个订单数据文件,包含了订单id、商品id、商品价格,要求将订单id正序,商品价格倒序,且生成结果文件个数为订单id的数量,每个结果文件中只要一条该订单最贵商品的数据。

SRE实战 互联网时代守护先锋,助力企业售后服务体系运筹帷幄!一键直达领取阿里云限量特价优惠。

  思路:1.封装订单类OrderBean,实现WritableComparable接口;

     2.自定义Mapper类,确定输入输出数据类型,写业务逻辑;

     3.自定义分区,根据不同的订单id返回不同的分区值;

     4.自定义Reducer类;

     5.辅助排序类OrderGroupingComparator继承WritableComparator类,并定义无参构成方法、重写compare方法;

     6.书写Driver类;

  代码如下:

/**
 * @author: PrincessHug
 * @date: 2019/3/25, 21:42
 * @Blog: https://www.cnblogs.com/HelloBigTable/
 */
public class OrderBean implements WritableComparable<OrderBean> {
    private int orderId;
    private double orderPrice;

    public OrderBean() {
    }

    public OrderBean(int orderId, double orderPrice) {
        this.orderId = orderId;
        this.orderPrice = orderPrice;
    }

    public int getOrderId() {
        return orderId;
    }

    public void setOrderId(int orderId) {
        this.orderId = orderId;
    }

    public double getOrderPrice() {
        return orderPrice;
    }

    public void setOrderPrice(double orderPrice) {
        this.orderPrice = orderPrice;
    }

    @Override
    public String toString() {
        return orderId + "\t" + orderPrice;
    }

    @Override
    public int compareTo(OrderBean o) {
        int rs ;
        if (this.orderId > o.getOrderId()){
            rs = 1;
        }else if (this.orderId < o.getOrderId()){
            rs = -1;
        }else {
            rs = (this.orderPrice > o.getOrderPrice()) ? -1:1;
        }
        return rs;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeInt(orderId);
        out.writeDouble(orderPrice);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        orderId = in.readInt();
        orderPrice = in.readDouble();
    }
}

public class OrderMapper extends Mapper<LongWritable, Text,OrderBean, NullWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //获取数据
        String line = value.toString();

        //切割数据
        String[] fields = line.split("\t");

        //封装数据
        int orderId = Integer.parseInt(fields[0]);
        double orderPrice = Double.parseDouble(fields[2]);
        OrderBean orderBean = new OrderBean(orderId, orderPrice);

        //发送数据
        context.write(orderBean,NullWritable.get());
    }
}

public class OrderPartitioner extends Partitioner<OrderBean, NullWritable> {
    @Override
    public int getPartition(OrderBean orderBean, NullWritable nullWritable, int i) {
        //构造参数中i的值为reducetask的个数
        return (orderBean.getOrderId() & Integer.MAX_VALUE ) % i;
    }
}

public class OrderReducer extends Reducer<OrderBean, NullWritable,OrderBean,NullWritable> {
    @Override
    protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        context.write(key,NullWritable.get());
    }
}

public class OrderGrouptingComparator extends WritableComparator {
    //必须使用super调用父类的构造方法来定义对比的类为OrderBean
    protected OrderGrouptingComparator(){
        super(OrderBean.class,true);
    }

    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        OrderBean aBean = (OrderBean)a;
        OrderBean bBean = (OrderBean)b;

        int rs ;
        if (aBean.getOrderId() > bBean.getOrderId()){
            rs = 1;
        }else if (aBean.getOrderId() < bBean.getOrderId()){
            rs = -1;
        }else {
            rs = 0;
        }
        return rs;
    }
}

public class OrderDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //配置信息,Job对象
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        //执行类
        job.setJarByClass(OrderBean.class);

        //设置Mapper、Reducer类
        job.setMapperClass(OrderMapper.class);
        job.setReducerClass(OrderReducer.class);

        //设置Mapper输出数据类型
        job.setMapOutputKeyClass(OrderBean.class);
        job.setMapOutputValueClass(NullWritable.class);

        //设置Reducer输出数据类型
        job.setOutputKeyClass(OrderBean.class);
        job.setOutputValueClass(NullWritable.class);

        //设置辅助排序
        job.setGroupingComparatorClass(OrderGrouptingComparator.class);

        //设置分区类
        job.setPartitionerClass(OrderPartitioner.class);

        //设置reducetask数量
        job.setNumReduceTasks(3);

        //设置文件输入输出流
        FileInputFormat.setInputPaths(job,new Path("G:\\mapreduce\\order\\in"));
        FileOutputFormat.setOutputPath(job,new Path("G:\\mapreduce\\order\\out"));

        //提交任务
        if (job.waitForCompletion(true)){
            System.out.println("运行完成!");
        }else {
            System.out.println("运行失败!");
        }
    }
}

  由于这是敲了很多次的代码,没有加太多注释,请谅解!

 

二、Mapreduce整体的流程

  1.有一块200M的文本文件,首先将待处理的数据提交客户端;

  2.客户端会向Yarn平台提交切片信息,然后Yarn计算出所需要的maptask的数量为2;

  3.程序默认使用FileInputFormat的TextInputFormat方法将文件数据读到maptask;

  4.maptask运行业务逻辑,然后将数据通过InputOutputContext写入到环形缓冲区;

  5.环形缓冲区其实是内存开辟的一块空间,就是内存,当环形缓冲区内数据达到默认大小100M的80%时,发生溢写;

  6.溢写出的数据会进行多次的分区排序(shuffle机制,下一个随笔详细解释)

  7.分区排序后的数据块可以选择进行Combiner合并,然后写入本地磁盘;

  8.reducetask等maptask完全运行完毕后,开始从磁盘中读取maptask产出写出的数据,然后进行合并文件,归并排序(这时就是进行上面辅助排序的时候);

  9.Reducer一次读取一组数据,然后使用默认的TextOutputFormat方法将数据写出到结果文件。 辅助排序和Mapreduce整体流程 Hadoop

 

扫码关注我们
微信号:SRE实战
拒绝背锅 运筹帷幄