RabbitMQ 简介

RabbitMQ是一个开源的AMQP(Advanced Message Queuing Protocol,高级消息队列协议)实现,是可复用的企业级消息系统。服务器端使用Erlang语言编写,支持几乎所有常见的开发平台如:Python、Ruby、.NET、Java、C、PHP、Go、Perl、Erlang、Node.js、object-c\swift、ruby等。 消息队列最基本的功能就是为了实现在不同系统或者组件间安全、可靠的存储和转发消息。而RabbitMQ是当前最流行的消息中间件之一,支持大部分主流的操作系统。


RabbitMQ

安装

Ubuntu 14.04 LTS 为例来讲,由于RabbitMQ是用Erlang编写的,在软件包的依赖上主要就是要安装一下Erlang。不过那是对于低版本的Ubuntu系统来说,14.04以上的版本,会自动安装所有相关依赖包的。我们使用APT来安装,RabbitMQ的最新版本是3.6.5。

RabbitMQ install

  • 执行以下命令将APT存储库添加到/etc/apt/sources.list.d:

    1
    2
    $ echo 'deb http://www.rabbitmq.com/debian/ testing main' 
    | sudo tee /etc/apt/sources.list.d/rabbitmq.list
  • 为了避免有关未签名包的警告,可以使用apt-key将RabbitMQ的公钥添加到信任密钥列表中,当然也可以不管它:

    1
    $ wget -O- https://www.rabbitmq.com/rabbitmq-release-signing-key.asc | sudo apt-key add -
  • 更新一下软件包列表:

    1
    $ sudo apt-get update
  • 安装rabbitmq-server:

    1
    $ sudo apt-get install rabbitmq-server
  • 安装完成后,RabbitMQ会自动的启动运行,默认的会使用用户rabbitmq来运行,可以使用ps aux命令来查询一下,日常的启动停止可以使用service或者invoke-rc.d来执行,也可以用RabbitMQ的rabbitmqctl管理命令:

    1
    2
    3
    4
    5
    $ invoke-rc.d rabbitmq-server start/stop  #启动和停止服务器。
    $ invoke-rc.d rabbitmq-server etc #可以了解还有那些其他参数。
    $ invoke-rc.d rabbitmq-server status #看看运行状态。
    $ service rabbitmq-server status
    $ rabbitmqctl status

RabbitMQ status - 两个重要的配置 - 在生产环境中RabbitMQ可能需要系统限制和内核参数调整,以便处理大量的并发连接和队列。 主要是打开文件的最大数量:ulimit -n。 许多操作系统上的默认值对于消息传递来说太低(例如,在多个Linux发行版上为1024),建议为设置为4096应该足以满足大多数开发工作负载。操作系统限制通过/etc/systemd/system/rabbitmq-server.service.d/limits.conf来进行设置,例如:

1
2
[Service]
LimitNOFILE=300000
- 每个用户的限制可以使用下面的命令设置。
1
2
$ ulimit -S -n 4096

- 修改完设置以后都需要重新启动RabbitMQ。

术语

本质上,RabbitMQ是一个消息代理,它的功能就是负责接收和发送消息。使用前我们先了解一下RabbitMQ的术语:

  • 生产者(producer),就是产生消息要发送的程序。 RabbitMQ producer
  • 队列(queue),消息传递的载体,可以看成是邮局。它本质上是一个无限缓冲区。 生产者可以发送一个队列的消息,消费者可以从一个队列接收数据。 RabbitMQ queue
  • 消费者(consuming),就是等待接收消息的程序。 RabbitMQ consuming

Hello World

看看最简单的Hello World一个例子: 我们发送一条消息,然后接收它并将其打印在屏幕上。为此,我们需要两个程序:一个发送消息,一个接收并打印它。生产者将消息发送到“hello”队列,消费者从该队列接收消息。 RabbitMQ Hello World

发送端的程序是send.py将向队列发送单个消息。

RabbitMQ send

  • 我们需要做的第一件事是建立与RabbitMQ服务器的连接,这里用到了pika库,可以pip用命令安装一下:pip install pika。

    1
    2
    3
    4
    5
    6
    #!/usr/bin/env python
    import pika

    connection = pika.BlockingConnection(pika.ConnectionParameters(
    'localhost')) //如果不是本机可以指定IP。
    channel = connection.channel()
  • 在发送之前,我们需要一个收件人队列。创建一个队列命名为hello,消息将发送到它:

    1
    channel.queue_declare(queue='hello')
  • 然后如果我们发送的内容是Hello World! 的话,我们需要把这句话放到队列里,在RabbitMQ中,消息不能直接发送到队列,它总是需要通过Exchange,像这样:
    1
    2
    3
    4
    channel.basic_publish(exchange='',          //默认交换
    routing_key='hello', //队列名称
    body='Hello World!') //消息内容
    print("[x] 发送 'Hello World!'")
  • 这样就ok了,发送后关闭连接。

    1
    connection.close()  
  • send.py的完整内容: RabbitMQ send.py

负责接收的程序receive.py将从队列中接收消息并将其打印在屏幕上。 RabbitMQ receive

  • 同样的,我们需要做的第一件事是建立与RabbitMQ服务器的连接。

    1
    2
    3
    4
    5
    6
    #!/usr/bin/env python
    import pika

    connection = pika.BlockingConnection(pika.ConnectionParameters(
    'localhost')) //如果不是本机可以指定IP。
    channel = connection.channel()
  • 在接收之前,我们也需要确定从哪个队列接收。创建一个队列命名为hello,当然不声明也是可以的,但是我们不确定发送程序是否已经运行了,最好还是声明一下:

    1
    channel.queue_declare(queue='hello')
  • 从队列接收消息稍微复杂一些,它通过向队列订阅回调函数来工作。每当我们收到一个消息,这个回调函数就会被调用。我们现在的功能要求是,将在屏幕上打印消息的内容。
    1
    2
    def callback(ch, method, properties, body):
    print("[x] 收到 %r" % body)
  • 当然最关键的是我们需要告诉RabbitMQ,这个回调函数应该从我们的hello队列接收消息:
    1
    2
    3
    channel.basic_consume(callback,
    queue='hello',
    no_ack=True)
  • 最后,我们进入一个循环,始终等待数据并在必要时运行回调。
    1
    2
    print('[*] 等待消息中... 退出请按CTRL+C')
    channel.start_consuming()
  • receive.py的完成内容: RabbitMQ receive.py

这样就OK了,我们来试试,先运行一下发送程序,然后运行接收程序:

1
2
3
 
$ python send.py
$ python receive.py

结果就是这样的: RabbitMQ go

可以通过命令看看都有哪些正在工作的队列:

1
2
 
$ sudo rabbitmqctl list_queues

如果大家希望深入的了解RabbitMQ,可以去官网看看。