当有多个消费者监听同一个队列时,主要有两种模式: . 轮询模式的分发:一个消费者一条,按均分配; . 公平分发:根据消费者的消费能力进行公平分发,处理快的处理的多,处理慢的处理的少;按劳分配;

1. 轮询模式(Round-Robin)

该模式接收消息是当有多个消费者接入时,消息的分配模式是一个消费者分配一条,直至消息消费完成;

1.1. 生产端

public static void main(String[] argv) throws IOException, TimeoutException {
try (
    Connection connection = Config.getConnectionFact().newConnection();
    Channel channel = connection.createChannel() ){
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    for(int i = 0; i < 100; i++)
    {
        String msg = "测试" + i;
        channel.basicPublish("", QUEUE_NAME, null, msg.getBytes(StandardCharsets.UTF_8));
    }
    System.out.println("消息发送完成!");
}

1.2. 消费端

public static void main(String[] argv) throws IOException, TimeoutException {
    Channel channel = getConsumerChannel();
    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
        String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
        System.out.println(" [x] Consumer1 Received '" + message + "',时间:" + getTime());
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //确认被消费
        System.out.println(" [x] Consumer1 '" + message + "' 消费完成,时间:" + getTime());
    };
    //启动消费着
    channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { (1)
    });
}

public static Channel getConsumerChannel() throws IOException, TimeoutException {
    Connection connection = Config.getConnectionFact().newConnection();
    Channel channel = connection.createChannel();
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    return channel;
}

public static ConnectionFactory getConnectionFact () {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("");
    factory.setPort(5673);
    factory.setUsername("");
    factory.setPassword("");
    return factory;
}
1 启动自动应答

2. 公平分发(Fair Dispatch)

由于消息接收者处理消息的能力不同,存在处理快慢的问题,我们就需要能者多劳,处理快的多处理,处理慢的少处理;

2.1. 生产端

同轮询模式

2.2. 消费端

public static void main(String[] argv) throws IOException, TimeoutException {
    Channel channel = getConsumerChannel();
    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
        String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
        System.out.println(" [x] Consumer1 Received '" + message + "',时间:" + getTime());
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //确认被消费
        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); (3)
        System.out.println(" [x] Consumer1 '" + message + "' 消费完成,时间:" + getTime());
    };
    //启动消费着
    channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { (2)
    });
}

public static Channel getConsumerChannel() throws IOException, TimeoutException {
    Connection connection = Config.getConnectionFact().newConnection();
    Channel channel = connection.createChannel();
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    //设置流量
    channel.basicQos(0,1,false); (1)
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    return channel;
}
1 消费者一次接收一条消息
2 关闭自动应答
3 消费完后手动触发应答