消息队列-RabbitMQ

安装rabbitMQ

双击运行otp_win64_24.3.4.6.exe,点击下一步下一步即可,可以更改安装位置,需要注意路径不要有中文或空格

他的作用类似JDK,因为rabbitMQ是erlang语言开发的,所以需要环境支持

配置环境变量,在path中配置: 自己安装erlang的路径\bin
image-20230224202431336
image-20230224202301804

双击运行rabbitmq-server-3.10.1.exe,下一步即可,安装好后进入以下目录
image-20230224202733157
在该目录下打开dos窗口,输入以下运行命令

1
rabbitmq-plugins enable rabbitmq_management

启动结束后,访问:http://localhost:15672
image-20230224203737805
用户名和密码都是:guest
image-20230224203828259

rabbitMQ的基本概念

image-20230224210652678

Exchange

接受生产者发送的消息,并根据binding规则将消息路由给服务器中的队列。

ExchangeType决定了Exchange路由消息的行为。在RabbitMQ中,ExchangeType常用的类型有三种:direct、fanout和topic。

Message Queue

消息队列。我们发送给RabbitMQ的消息最后都会到达各种queue,并且存储在其中(如果路由找不到对应的queue则数据会丢失),等待消费者来取。

Binding Key

它表示的是Exchange和Message Queue是通过binding key进行联系的,这个关系是固定。

Routing Key

生产者在将消息发送给Exchange的时候,一般会指定一个routing key,来指定这个消息的路由规则。这个routing key需要与ExchangeType及binding key联合使用才能生效,我们的生产者只需要通过指定routing key来决定消息流向哪里

RabbitMQ六种工作模式

创建一个RabbitMQ的项目,用来学习rabbitmq,需要添加依赖

1
2
3
4
5
6
7
8
9
10
11
<!--添加RabbitMQ依赖-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.12.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.6.4</version>
</dependency>

简单模式

只有一个消费者
image-20230224212545820

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package cn.tedu.rabbitmq.simple;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

//模拟简单模式的生产者
public class Producer {
public static void main(String[] args)throws Exception{
//1.配置RabbitMQ的连接信息
ConnectionFactory cf = new ConnectionFactory();
cf.setHost("127.0.0.1");
cf.setPort(5672);//15672是UI端口,5672是客户端端口(可以省略)
cf.setUsername("guest");
cf.setPassword("guest");
//2.创建连接以及channel
Connection nc = cf.newConnection();
Channel cc = nc.createChannel();
//3.创建队列
/*
第一个参数:队列名称
第二个参数:是否是一个持久队列
第三个参数:是否是一个独占队列
第四个参数:是否自动删除
第五个参数:其他参数属性的设置
*/
cc.queueDeclare("helloworld",false,false,false,null);
//4.准备数据
String msg = "hello world" + System.currentTimeMillis();
//5.发送数据
/*
第一个参数:先忽略
第二个参数:先写成队列名称
第三个参数:其他属性设置
第四个参数:消息数据,需要转成byte[]类型
*/
cc.basicPublish("","helloworld",null,msg.getBytes());
nc.close();
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package cn.tedu.rabbitmq.simple;

import com.rabbitmq.client.*;

import java.io.IOException;

//简单模式的消费者
public class Consumer {
public static void main(String[] args) throws Exception{
//1.配置连接信息
ConnectionFactory cf = new ConnectionFactory();
cf.setHost("127.0.0.1");
cf.setUsername("guest");
cf.setPassword("guest");
//2.创建连接以及channel
Connection nc = cf.newConnection();
Channel cc = nc.createChannel();
//3.创建队列
//如果该队列已经在服务器中存在,那么会忽略创建该队列的命令
cc.queueDeclare("helloworld",false,false,false,null);
//4.消费消息
//处理数据的回调函数
DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
byte[] body = delivery.getBody();
String msg = new String(body);
System.out.println("收到了消息:"+msg);
System.out.println("===========================");
}
};
//取消接收数据的回调函数
CancelCallback cancelCallback = new CancelCallback() {
@Override
public void handle(String s) throws IOException {

}
};
cc.basicConsume("helloworld",true,deliverCallback,cancelCallback);
}
}

工作队列模式

多个消费者,从同一个队列中接受消息

负载均衡,消息会轮询发送给所有消费者

合理的分发消息

  1. 手动ack

    消息回执

    向服务器发送一个通知,告诉服务器一条消息已经处理完毕

    服务器可以通过ack,知道消费者是空闲还是繁忙

  2. qos=1

​ 每次抓取的消息数量

​ 消息处理完毕之前,不会抓取新消息

​ 手动ack模式下才有效

消息持久化:在服务器端,持久保存消息,避免因为服务器宕机而造成消息丢失

  1. 队列持久化: cc.queueDeclare(“队列名”,true,…)
  2. 消息持久化: cc.basocPublish(“”,“队列名”,MessageProperties.PERSISTENT_TEXT_PLAIN,消息)

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package cn.tedu.rabbitmq.work;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.util.Scanner;

//模拟工作队列模式的生产者
public class Producer {
public static void main(String[] args)throws Exception{
//1.配置RabbitMQ的连接信息
ConnectionFactory cf = new ConnectionFactory();
cf.setHost("127.0.0.1");
cf.setPort(5672);//15672是UI端口,5672是客户端端口(可以省略)
cf.setUsername("guest");
cf.setPassword("guest");
//2.创建连接以及channel
Connection nc = cf.newConnection();
Channel cc = nc.createChannel();
//3.创建队列
/*
第一个参数:队列名称
第二个参数:是否是一个持久队列
第三个参数:是否是一个独占队列
第四个参数:是否自动删除
第五个参数:其他参数属性的设置
*/
cc.queueDeclare("work_queue",false,false,false,null);
//4.准备数据,发送数据
while(true){
System.out.print("输入消息:");
String msg = new Scanner(System.in).nextLine();
if (msg.equals("quit"))
break;
cc.basicPublish("","work_queue",null,msg.getBytes());
}
nc.close();
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package cn.tedu.rabbitmq.work;

import com.rabbitmq.client.*;

import java.io.IOException;

//工作队列模式的消费者
public class Consumer {
public static void main(String[] args) throws Exception{
//1.配置连接信息
ConnectionFactory cf = new ConnectionFactory();
cf.setHost("127.0.0.1");
cf.setUsername("guest");
cf.setPassword("guest");
//2.创建连接以及channel
Connection nc = cf.newConnection();
Channel cc = nc.createChannel();
//3.创建队列
//如果该队列已经在服务器中存在,那么会忽略创建该队列的命令
cc.queueDeclare("work_queue",false,false,false,null);
//4.消费消息
//处理数据的回调函数
DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
byte[] body = delivery.getBody();
String msg = new String(body);
System.out.println("收到了消息:"+msg);
//处理消息的时候,遇到一个‘.’,暂停1s,模拟实际处理消息的阻塞场景
for (int i = 0; i < msg.length(); i++) {
//charAt(i)
if(msg.charAt(i)=='.'){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
System.out.println("消息处理完毕");
}
};
//取消接收数据的回调函数
CancelCallback cancelCallback = new CancelCallback() {
@Override
public void handle(String s) throws IOException {

}
};
cc.basicConsume("work_queue",true,deliverCallback,cancelCallback);
}
}

合理分发

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package cn.tedu.rabbitmq.work;

import com.rabbitmq.client.*;

import java.io.IOException;

//工作队列模式的消费者
public class Consumer {
public static void main(String[] args) throws Exception{
//1.配置连接信息
ConnectionFactory cf = new ConnectionFactory();
cf.setHost("127.0.0.1");
cf.setUsername("guest");
cf.setPassword("guest");
//2.创建连接以及channel
Connection nc = cf.newConnection();
Channel cc = nc.createChannel();
//3.创建队列
//如果该队列已经在服务器中存在,那么会忽略创建该队列的命令
cc.queueDeclare("work_queue",false,false,false,null);
//4.消费消息
//处理数据的回调函数
DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
byte[] body = delivery.getBody();
String msg = new String(body);
System.out.println("收到了消息:"+msg);
//处理消息的时候,遇到一个‘.’,暂停1s,模拟实际处理消息的阻塞场景
for (int i = 0; i < msg.length(); i++) {
//charAt(i)
if(msg.charAt(i)=='.'){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
System.out.println("消息处理完毕");
//发送消息确认(消息回执)
//第一个参数:消息的标签,需要从消息中获取
//第二个参数:是否确认多条信息,false,只确认一条消息
cc.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
};
//取消接收数据的回调函数
CancelCallback cancelCallback = new CancelCallback() {
@Override
public void handle(String s) throws IOException {

}
};
//第二个参数:ack的设置,默认是autoAck,false是手动ack
//设置消费者每次只获取一条消息
cc.basicQos(1);
cc.basicConsume("work_queue",false,deliverCallback,cancelCallback);
}
}

持久化

如果消息保存在服务器端的内存中,是不安全的,如果服务器宕机,信息会丢失

1
2
停止rabbitmq服务:rabbitmq-service stop或者rabbitmqctl stop
启动rabbitmq服务:rabbitmq-service start

消息数据持久化、消息队列持久化

但是需要注意持久化队列时,不能直接修改队列属性,因为队列一定创建属性就固定了,不可以进行修改

可以选择新建一个别名队列或者删除该队列重新创建

队列持久化:cc.queueDeclare(“task_queue”,true,false,false,null);

数据持久化:cc.basicPublish(“”,“task_queue”, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());

发布订阅模式

把消息群发给所有消费者,同一条消息所有消费者都可以收到

fanout类型的交换机

生产者:定义交换机,向交换机发送数据

消费者:定义交换机,定义自已独占的非持久的自动删除随机队列,绑定,正常从随机队列中接受数据

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package cn.tedu.rabbitmq.publishsubscribe;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.util.Scanner;

//模拟发布订阅模式的生产者
public class Producer {
public static void main(String[] args) throws Exception{
//1.配置RabbitMQ的连接信息
ConnectionFactory cf = new ConnectionFactory();
cf.setHost("127.0.0.1");
cf.setPort(5672);//15672是UI端口,5672是客户端端口(可以省略)
cf.setUsername("guest");
cf.setPassword("guest");
//2.创建连接以及channel
Connection nc = cf.newConnection();
Channel cc = nc.createChannel();
//3.定义交换机
//如果服务器中没有,就创建,有就直接使用
//第一个参数:交换机名称
//第二个参数:交换机类型
cc.exchangeDeclare("ps_exchange","fanout");
//4.向交换机发送数据,交换机只是接受发送数据,并不保存数据
//如果没有消费者接受,数据会丢失
while (true){
System.out.print("输入消息:");
String msg = new Scanner(System.in).nextLine();
if (msg.equals("quit"))
break;
cc.basicPublish("ps_exchange","",null,msg.getBytes());
}
nc.close();
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package cn.tedu.rabbitmq.publishsubscribe;

import com.rabbitmq.client.*;

import java.io.IOException;

//发布订阅模式的消费者
public class Consumer {
public static void main(String[] args) throws Exception{
//1.配置RabbitMQ的连接信息
ConnectionFactory cf = new ConnectionFactory();
cf.setHost("127.0.0.1");
cf.setPort(5672);//15672是UI端口,5672是客户端端口(可以省略)
cf.setUsername("guest");
cf.setPassword("guest");
//2.创建连接以及channel
Connection nc = cf.newConnection();
Channel cc = nc.createChannel();
//3.定义交换机
//如果服务器中没有,就创建,有就直接使用
//第一个参数:交换机名称
//第二个参数:交换机类型
cc.exchangeDeclare("ps_exchange","fanout");
//4.定义队列--随机队列名,否持久,独占、自动删除的
String queue = cc.queueDeclare().getQueue();//无参构造就可以满足我们的需求
//5.绑定交换机和队列
/*
第一个参数:队列名称
第二个参数:交换机名称
第三个参数:队列和交换机绑定的关系
*/
cc.queueBind(queue,"ps_exchange","");

//6.处理接收到的消息
//处理数据
DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
byte[] body = delivery.getBody();
String msg = new String(body);
System.out.println("收到:"+msg);
System.out.println("消息处理完毕");
}
};
//取消数据时的回调函数
CancelCallback cancelCallback = new CancelCallback() {
@Override
public void handle(String s) throws IOException {

}
};
cc.basicConsume(queue,true,deliverCallback,cancelCallback);
}
}

路由模式

通过关键字匹配,来决定把消息发送到哪个队列

生产者:定义direct类型的交换机,向交换机发送数据并携带路由键

消费者:定义交换机,定义队列,用绑定键来绑定交换机和队列,正常接收消息

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package cn.tedu.rabbitmq.route;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.util.Scanner;

//模拟路由模式的生产者
public class Producer {
public static void main(String[] args) throws Exception{
//1.配置RabbitMQ的连接信息
ConnectionFactory cf = new ConnectionFactory();
cf.setHost("127.0.0.1");
cf.setPort(5672);//15672是UI端口,5672是客户端端口(可以省略)
cf.setUsername("guest");
cf.setPassword("guest");
//2.创建连接以及channel
Connection nc = cf.newConnection();
Channel cc = nc.createChannel();
//3.定义direct类型的交换机
cc.exchangeDeclare("route_exchange", BuiltinExchangeType.DIRECT);
//4.发送消息,携带路由键
while(true){
Scanner scanner = new Scanner(System.in);
System.out.print("输入消息:");
String msg = scanner.nextLine();
System.out.print("路由键:");
String routingKey = scanner.nextLine();
if(msg.equals("quit"))
break;
cc.basicPublish("route_exchange",routingKey,null,msg.getBytes());
System.out.println("消息发送成功");
}
nc.close();
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package cn.tedu.rabbitmq.route;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.Scanner;

//模拟路由模式的消费者
public class Consumer {
public static void main(String[] args) throws Exception{
//1.配置RabbitMQ的连接信息
ConnectionFactory cf = new ConnectionFactory();
cf.setHost("127.0.0.1");
cf.setPort(5672);//15672是UI端口,5672是客户端端口(可以省略)
cf.setUsername("guest");
cf.setPassword("guest");
//2.创建连接以及channel
Connection nc = cf.newConnection();
Channel cc = nc.createChannel();
//3.定义direct类型的交换机
cc.exchangeDeclare("route_exchange", BuiltinExchangeType.DIRECT);
//4.定义队列
String queue = cc.queueDeclare().getQueue();
//5.绑定交换机和队列(重复绑定多次)
System.out.print("输入绑定键,用空格隔开:");
String bindingKeys = new Scanner(System.in).nextLine();
String[] keys = bindingKeys.split("\\s+");
for (String bindingKey : keys) {
cc.queueBind(queue,"route_exchange",bindingKey);
}
//处理数据
DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
String msg = new String(delivery.getBody());
String routingKey = delivery.getEnvelope().getRoutingKey();
System.out.println(msg+"-"+routingKey);
System.out.println("====================");
}
};
CancelCallback cancelCallback = new CancelCallback() {
@Override
public void handle(String s) throws IOException {

}
};
cc.basicConsume(queue,true,deliverCallback,cancelCallback);
}
}

主题模式

和路由模式相同,具有特殊的关键字规则

topic类型的交换机实现这种特殊路由规则

aaa.bbb.ccc.ddd

*.ccc.ddd.eee

#.ddd

"*"可以通配单个单词

"#"可以通配零个或多个单词

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package cn.tedu.rabbitmq.topic;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.util.Scanner;

//模拟主题模式的生产者
public class Producer {
public static void main(String[] args) throws Exception{
//1.配置RabbitMQ的连接信息
ConnectionFactory cf = new ConnectionFactory();
cf.setHost("127.0.0.1");
cf.setPort(5672);//15672是UI端口,5672是客户端端口(可以省略)
cf.setUsername("guest");
cf.setPassword("guest");
//2.创建连接以及channel
Connection nc = cf.newConnection();
Channel cc = nc.createChannel();
//3.定义direct类型的交换机
cc.exchangeDeclare("topic_exchange", BuiltinExchangeType.TOPIC);
//4.发送消息,携带路由键
while(true){
Scanner scanner = new Scanner(System.in);
System.out.print("输入消息:");
String msg = scanner.nextLine();
System.out.print("路由键:");
String routingKey = scanner.nextLine();
if(msg.equals("quit"))
break;
cc.basicPublish("topic_exchange",routingKey,null,msg.getBytes());
System.out.println("消息发送成功");
}
nc.close();
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package cn.tedu.rabbitmq.topic;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.Scanner;

//模拟主题模式的消费者
public class Consumer {
public static void main(String[] args) throws Exception{
//1.配置RabbitMQ的连接信息
ConnectionFactory cf = new ConnectionFactory();
cf.setHost("127.0.0.1");
cf.setPort(5672);//15672是UI端口,5672是客户端端口(可以省略)
cf.setUsername("guest");
cf.setPassword("guest");
//2.创建连接以及channel
Connection nc = cf.newConnection();
Channel cc = nc.createChannel();
//3.定义direct类型的交换机
cc.exchangeDeclare("topic_exchange", BuiltinExchangeType.TOPIC);
//4.定义队列
String queue = cc.queueDeclare().getQueue();
//5.绑定交换机和队列(重复绑定多次)
System.out.print("输入绑定键,用空格隔开:");
String bindingKeys = new Scanner(System.in).nextLine();
String[] keys = bindingKeys.split("\\s+");
for (String bindingKey : keys) {
cc.queueBind(queue,"topic_exchange",bindingKey);
}
//处理数据
DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
String msg = new String(delivery.getBody());
String routingKey = delivery.getEnvelope().getRoutingKey();
System.out.println(msg+"-"+routingKey);
System.out.println("====================");
}
};
CancelCallback cancelCallback = new CancelCallback() {
@Override
public void handle(String s) throws IOException {
}
};
cc.basicConsume(queue,true,deliverCallback,cancelCallback);
}
}

RPC模式

实现原理

  1. 两个队列

    调用队列

    返回队列:每个客户端,都需要有自己的返回队列

  2. 返回队列的队列名

    在调用消息数据中,携带返回队列名

    根据返回队列名,向正确的返回队列来发送计算结果

  3. 关联ID

    用来匹配计算结果和调用

    如果连续发送多次调用,获得计算结果时,需要直到这个节骨欧式按一次调用的结果

    客户端发送调用时,携带一个关联ID

    服务器端返回结果时,也携带这个关联ID

客户端:多线程异步处理结果

  1. 主线程

    发送调用信息

    需要计算结果时,要取结果

  2. 次线程:等待接受结果

  3. 线程之间传递数据,可以使用BlockingQueue

    集合工具

    这个集合中,添加了线程的等待和通知

    如果没有数据,取数据时会暂停等待

    有多个子类:比如ArrayBlockingQueue

服务器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package cn.tedu.rabbitmq.rpc;

import com.rabbitmq.client.*;
import com.rabbitmq.client.AMQP.BasicProperties;

import java.io.IOException;

public class RPCServer {
public static void main(String[] args) throws Exception{
ConnectionFactory cf = new ConnectionFactory();
cf.setHost("127.0.0.1");
cf.setPort(5672);//15672是UI端口,5672是客户端端口(可以省略)
cf.setUsername("guest");
cf.setPassword("guest");
Connection nc = cf.newConnection();
Channel cc = nc.createChannel();
//1.接受客户端发送的调用信息(正整数n)
//2.执行算法求第n个斐波那契数
//3.向客户端发送计算结果
//定义调用队列
cc.queueDeclare("rpc_queue",false,false,false,null);
//从调用队列中取调用信息
DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
//从delivery取出正整数n
String msg = new String(delivery.getBody());
String replyTo = delivery.getProperties().getReplyTo();//返回队列名
String correlationId = delivery.getProperties().getCorrelationId();//关联id
long fbnqs = fbnqs(Integer.parseInt(msg));
BasicProperties basicProperties = new BasicProperties.Builder().correlationId(correlationId).build();
cc.basicPublish("",replyTo,basicProperties,(""+fbnqs).getBytes());
}
};
CancelCallback cancelCallback = new CancelCallback() {
@Override
public void handle(String s) throws IOException {

}
};
cc.basicConsume("rpc_queue",true,deliverCallback,cancelCallback);

}
//服务:求第n个斐波那契数
//1 1 2 3 5 8 13 21 34 55 ......
//递归效率低,可以用来模拟服务器端的耗时运算
static long fbnqs(int n){
if(n==1 || n==2)
return 1;
return fbnqs(n-1)+fbnqs(n-2);
}
}

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
package cn.tedu.rabbitmq.rpc;

import com.rabbitmq.client.*;
import com.rabbitmq.client.AMQP.BasicProperties;

import java.io.IOException;
import java.util.Scanner;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;


public class RPCClient {
static BlockingQueue<Long> q = new ArrayBlockingQueue<Long>(10);
public static void main(String[] args) throws Exception{
System.out.print("输入求第几个斐波那契数:");
int n = new Scanner(System.in).nextInt();
long fbnqs = fbnqs(n);
System.out.println("第"+n+"个的斐波那契数是:"+fbnqs);
}
//异步调用服务器,从服务器中获取结果
private static long fbnqs(int n) throws Exception{
ConnectionFactory cf = new ConnectionFactory();
cf.setHost("127.0.0.1");
cf.setPort(5672);//15672是UI端口,5672是客户端端口(可以省略)
cf.setUsername("guest");
cf.setPassword("guest");
Connection nc = cf.newConnection();
Channel cc = nc.createChannel();
//定义调用队列
cc.queueDeclare("rpc_queue",false,false,false,null);
//返回队列
String replyTo = cc.queueDeclare().getQueue();
//关联id
String cid = UUID.randomUUID().toString();
BasicProperties basicProperties = new BasicProperties.Builder()
.replyTo(replyTo)
.correlationId(cid)
.build();
cc.basicPublish("","rpc_queue",basicProperties,(""+n).getBytes());
//模拟执行其他运算,不等待计算结果
System.out.println("调用消息已经发送");
System.out.println("模拟执行其他运算,不立即等待计算结果");
//获取计算结果
DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
//处理数据之前,先对比关联id
if(cid.equals(delivery.getProperties().getCorrelationId())){
String msg = new String(delivery.getBody());
long fbnqs = Integer.parseInt(msg);
q.offer(fbnqs);
cc.getConnection().close();
}
}
};
CancelCallback cancelCallback = new CancelCallback() {
@Override
public void handle(String s) throws IOException {

}
};
cc.basicConsume(replyTo,true,deliverCallback,cancelCallback);
return q.take();
}
}