Windows环境安装
-
安装Erlang运行环境 下载安装Erlang:http://www.erlang.org/downloads。
-
安装RabbitMQ 下载安装Windows版本的RabbitMQ:http://www.rabbitmq.com/install-windows.html。
-
启动RabbitMQ Server 点击Windows开始按钮,输入RabbitMQ找到 RabbitMQCommanPrompt,以管理员身份运行。
-
依次执行以下命令启动RabbitMQ服务 rabbitmq-service install rabbitmq-service enable rabbitmq-service start
-
执行 rabbitmqlctl status检查RabbitMQ状态
-
安装管理平台插件 执行 rabbitmq-plugins enable rabbitmq_management即可成功安装,使用默认账号密码(guest/guest)登录http://localhost:15672/即可。
Centos7环境安装与操作
也可以直接下载rpm进行安装
rpm -ivh erlang-19.3.1-1.el7.centos.x86_64.rpm
rpm -ivh socat-1.7.3.2-2.el7.x86_64.rpm
rpm -ivh rabbitmq-server-3.6.10-1.el7.noarch.rpm
启动:
chkconfig rabbitmq-server on
/sbin/service rabbitmq-server start
/sbin/service rabbitmq-server status
/sbin/service rabbitmq-server stop
还有另外的重启命令:
rabbitmqctl stop
rabbitmq-server -detached
添加插件:
rabbitmq-plugins enable rabbitmq_management
用户操作:
rabbitmqctl list_users
rabbitmqctl add_user username userpassword
rabbitmqctl change_password username 'newpassword'
rabbitmqctl set_permissions -p "/" username ".*" ".*" ".*"
rabbitmqctl set_user_tags administrator
set_user_tags是设置用户角色,set_permissions是设置用户权限。具体可了解
专有名词说明
-
Publisher:生产者,消息的发送方。
-
Connection:网络连接。
-
Channel:信道,多路复用连接中的一条独立的双向数据流通道。
-
Exchange:交换器(路由器),负责消息的路由到相应队列。
-
Binding:队列与交换器间的关联绑定。消费者将关注的队列绑定到指定交换器上,以便Exchange能准确分发消息到指定队列。
-
Queue:队列,消息的缓冲存储区。
-
Virtual Host:虚拟主机,虚拟主机提供资源的逻辑分组和分离。包含连接,交换,队列,绑定,用户权限,策略等。
-
Broker:消息队列的服务器实体。
-
Consumer:消费者,消息的接收方。
聊天室的实现
每个玩家有唯一的用户名
输入/用户 可以对指定用户进行私聊
否则为发送的公共聊天消息
实现如下:
using System;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace MQ
{
class Program
{
static string exchangeName = "chat";
private static string queueName = "room_";
private static string routeKey = "";
private static string user;
private static IModel channel;
static void Main(string[] args)
{
user = args[0];
queueName += user;
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory
{
UserName = "root", //用户名
Password = "root", //密码
HostName = "127.0.0.1" //rabbitmq ip
};
// 创建连接
var connection = factory.CreateConnection();
// 创建通道
channel = connection.CreateModel();
DeclarePublicChannel();
DeclarePrivateChannel();
while (true)
{
var input = Console.ReadLine();
switch (input)
{
case "exit":
Exit();
break;
default:
if (input.StartsWith("/"))
{
var index = input.IndexOf(' ');
var target = input.Substring(1, index - 1);
var msg = input.Substring(index + 1);
SendPrivateMsg(target, msg);
}
else
{
SendPublicMsg(input);
}
break;
}
}
channel.Close();
connection.Close();
}
private static void DeclarePublicChannel()
{
//定义一个Direct类型交换机
channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout, false, false, null);
//定义队列1
channel.QueueDeclare(queueName, false, false, false, null);
channel.QueueBind(queueName, exchangeName, routeKey);
//生成两个队列的消费者
//事件基本消费者
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
//接收到消息事件
consumer.Received += (ch, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
Console.WriteLine($"{message}");
//确认该消息已被消费
channel.BasicAck(ea.DeliveryTag, true);
};
//启动消费者 设置为手动应答消息
channel.BasicConsume(queueName, false, consumer);
}
private static void DeclarePrivateChannel()
{
var privateQueueName = user;
channel.QueueDeclare(privateQueueName, false, false, false, null);
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
consumer.Received += (ch, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
Console.WriteLine($"{message}");
};
//启动消费者 设置为手动应答消息
channel.BasicConsume(privateQueueName, true, consumer);
}
private static void Exit()
{
//将队列绑定到交换机
channel.QueueUnbind(queueName, exchangeName, routeKey, null);
channel.QueueDelete(queueName);
channel.QueueDelete(user);
}
private static void SendPublicMsg(string msg)
{
var properties = channel.CreateBasicProperties();
properties.Expiration = "10000";
channel.BasicPublish(exchangeName, routeKey, properties, Encoding.UTF8.GetBytes(user + ": " + msg));
}
private static void SendPrivateMsg(String target, String msg)
{
channel.BasicPublish("", target, null, Encoding.UTF8.GetBytes(user + ": " + msg));
}
}
}
运行结果
私聊通过路由来实现。每个用户都会创建一个队列,以用户名命名队列名。消息发送者通过指定路由发送给特定消息队列处理
公共聊天通过消息广播的形式(fanout),发送给每个注册到exchange上的队列
公共聊天设置了消息过期时间
var properties = channel.CreateBasicProperties();
properties.Expiration = "10000";
channel.BasicPublish(exchangeName, routeKey, properties, Encoding.UTF8.GetBytes(user + ": " + msg));
Expiration的单位为毫秒。超过10秒未被处理的消息将被删除
公共消息消费完后,手动通知broker移除队列中的消息:
channel.BasicAck(ea.DeliveryTag, true);
其中第二个参数multiple,为true时表示之前的消息批量处理为完毕,为false时表示只对当前消息设置成功处理。BasicAck的更多参数说明可以参考RabbitMQ消息可靠投递及消费机制