当有多个消费者监听同一个队列时,主要有两种模式: . 轮询模式的分发:一个消费者一条,按均分配; . 公平分发:根据消费者的消费能力进行公平分发,处理快的处理的多,处理慢的处理的少;按劳分配;
1. 轮询模式(Round-Robin)
该模式接收消息是当有多个消费者接入时,消息的分配模式是一个消费者分配一条,直至消息消费完成;
1.1. 生产端
public static void main(String[] argv) throws IOException, TimeoutException {
try (
Connection connection = Config.getConnectionFact().newConnection();
Channel channel = connection.createChannel() ){
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
for(int i = 0; i < 100; i++)
{
String msg = "测试" + i;
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes(StandardCharsets.UTF_8));
}
System.out.println("消息发送完成!");
}
1.2. 消费端
public static void main(String[] argv) throws IOException, TimeoutException {
Channel channel = getConsumerChannel();
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [x] Consumer1 Received '" + message + "',时间:" + getTime());
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//确认被消费
System.out.println(" [x] Consumer1 '" + message + "' 消费完成,时间:" + getTime());
};
//启动消费着
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { (1)
});
}
public static Channel getConsumerChannel() throws IOException, TimeoutException {
Connection connection = Config.getConnectionFact().newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
return channel;
}
public static ConnectionFactory getConnectionFact () {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("");
factory.setPort(5673);
factory.setUsername("");
factory.setPassword("");
return factory;
}
1 | 启动自动应答 |
2. 公平分发(Fair Dispatch)
由于消息接收者处理消息的能力不同,存在处理快慢的问题,我们就需要能者多劳,处理快的多处理,处理慢的少处理;
2.2. 消费端
public static void main(String[] argv) throws IOException, TimeoutException {
Channel channel = getConsumerChannel();
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [x] Consumer1 Received '" + message + "',时间:" + getTime());
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//确认被消费
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); (3)
System.out.println(" [x] Consumer1 '" + message + "' 消费完成,时间:" + getTime());
};
//启动消费着
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { (2)
});
}
public static Channel getConsumerChannel() throws IOException, TimeoutException {
Connection connection = Config.getConnectionFact().newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//设置流量
channel.basicQos(0,1,false); (1)
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
return channel;
}
1 | 消费者一次接收一条消息 |
2 | 关闭自动应答 |
3 | 消费完后手动触发应答 |