Rabbit MQ

Posted by 戚骏 on April 12, 2021

Windows环境安装

官方安装指南

也可以直接下载rpm进行安装

rpm -ivh erlang-19.3.1-1.el7.centos.x86_64.rpm 
rpm -ivh socat-1.7.3.2-2.el7.x86_64.rpm 
rpm -ivh rabbitmq-server-3.6.10-1.el7.noarch.rpm 

启动:

chkconfig rabbitmq-server on

/sbin/service rabbitmq-server start
/sbin/service rabbitmq-server status
/sbin/service rabbitmq-server stop

还有另外的重启命令:

rabbitmqctl stop
rabbitmq-server -detached

添加插件:

rabbitmq-plugins enable rabbitmq_management

用户操作:

rabbitmqctl list_users
rabbitmqctl add_user username userpassword
rabbitmqctl change_password username 'newpassword'
rabbitmqctl set_permissions -p "/" username ".*" ".*" ".*"
rabbitmqctl set_user_tags administrator

set_user_tags是设置用户角色,set_permissions是设置用户权限。具体可了解

专有名词说明

  • Publisher:生产者,消息的发送方。

  • Connection:网络连接。

  • Channel:信道,多路复用连接中的一条独立的双向数据流通道。

  • Exchange:交换器(路由器),负责消息的路由到相应队列。

  • Binding:队列与交换器间的关联绑定。消费者将关注的队列绑定到指定交换器上,以便Exchange能准确分发消息到指定队列。

  • Queue:队列,消息的缓冲存储区。

  • Virtual Host:虚拟主机,虚拟主机提供资源的逻辑分组和分离。包含连接,交换,队列,绑定,用户权限,策略等。

  • Broker:消息队列的服务器实体。

  • Consumer:消费者,消息的接收方。

聊天室的实现

每个玩家有唯一的用户名

输入/用户 可以对指定用户进行私聊

否则为发送的公共聊天消息

实现如下:

using System;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace MQ
{
    class Program
    {
        static string exchangeName = "chat";
        private static string queueName = "room_";
        private static string routeKey = "";
        private static string user;
        private static IModel channel;

        static void Main(string[] args)
        {
            user = args[0];
            queueName += user;
            // 创建连接工厂
            ConnectionFactory factory = new ConnectionFactory
            {
                UserName = "root", //用户名
                Password = "root", //密码
                HostName = "127.0.0.1" //rabbitmq ip
            };

            // 创建连接
            var connection = factory.CreateConnection();
            // 创建通道
            channel = connection.CreateModel();

            DeclarePublicChannel();
            DeclarePrivateChannel();

            while (true)
            {
                var input = Console.ReadLine();
                switch (input)
                {
                    case "exit":
                        Exit();
                        break;
                    default:
                        if (input.StartsWith("/"))
                        {
                            var index = input.IndexOf(' ');
                            var target = input.Substring(1, index - 1);
                            var msg = input.Substring(index + 1);
                            SendPrivateMsg(target, msg);
                        }
                        else
                        {
                            SendPublicMsg(input);
                        }
                        break;
                }
            }

            channel.Close();
            connection.Close();
        }

        private static void DeclarePublicChannel()
        {
            //定义一个Direct类型交换机
            channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout, false, false, null);

            //定义队列1
            channel.QueueDeclare(queueName, false, false, false, null);

            channel.QueueBind(queueName, exchangeName, routeKey);

            //生成两个队列的消费者
            //事件基本消费者
            EventingBasicConsumer consumer = new EventingBasicConsumer(channel);

            //接收到消息事件
            consumer.Received += (ch, ea) =>
            {
                var message = Encoding.UTF8.GetString(ea.Body.ToArray());

                Console.WriteLine($"{message}");
                //确认该消息已被消费
                channel.BasicAck(ea.DeliveryTag, true);
            };
            //启动消费者 设置为手动应答消息
            channel.BasicConsume(queueName, false, consumer);
        }
        
        private static void DeclarePrivateChannel()
        {
            var privateQueueName = user;
            channel.QueueDeclare(privateQueueName, false, false, false, null);
            EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
            consumer.Received += (ch, ea) =>
            {
                var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                Console.WriteLine($"{message}");
            };
            //启动消费者 设置为手动应答消息
            channel.BasicConsume(privateQueueName, true, consumer);
        }

        private static void Exit()
        {
            //将队列绑定到交换机
            channel.QueueUnbind(queueName, exchangeName, routeKey, null);
            channel.QueueDelete(queueName);
            channel.QueueDelete(user);
        }

        private static void SendPublicMsg(string msg)
        {
            var properties = channel.CreateBasicProperties();
            properties.Expiration = "10000";
            channel.BasicPublish(exchangeName, routeKey, properties, Encoding.UTF8.GetBytes(user + ": " + msg));
        }

        private static void SendPrivateMsg(String target, String msg)
        {
            channel.BasicPublish("", target, null, Encoding.UTF8.GetBytes(user + ": " + msg));
        }
    }
}

运行结果

结果展示

私聊通过路由来实现。每个用户都会创建一个队列,以用户名命名队列名。消息发送者通过指定路由发送给特定消息队列处理

公共聊天通过消息广播的形式(fanout),发送给每个注册到exchange上的队列

公共聊天设置了消息过期时间

var properties = channel.CreateBasicProperties();
properties.Expiration = "10000";
channel.BasicPublish(exchangeName, routeKey, properties, Encoding.UTF8.GetBytes(user + ": " + msg));

Expiration的单位为毫秒。超过10秒未被处理的消息将被删除

公共消息消费完后,手动通知broker移除队列中的消息:

channel.BasicAck(ea.DeliveryTag, true);

其中第二个参数multiple,为true时表示之前的消息批量处理为完毕,为false时表示只对当前消息设置成功处理。BasicAck的更多参数说明可以参考RabbitMQ消息可靠投递及消费机制

参考资料

RabbitMQ 知多少

RabbitMQ消息可靠投递及消费机制