一、JMS介绍

JMS(Java Message Service),是Java平台中关于面向消息中间件的接口,是一种与厂商无关的API,用来访问消息收发。它的使用场景如下:

  1. 实现业务的解耦、削峰,异步
  2. 跨平台,多语言
  3. 分布式事务,最终一致性

二、什么是消息队列

2.1 消息

消息(Message)是指在应用之间传送的数据,消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象。

2.2 消息队列

消息队列(Message Queue)是一种应用间的通信方式,消息发送后可以立即返回,有消息系统来确保信息的可靠专递,消息发布者只管把消息发布到MQ中而不管谁来取,消息使用者只管从MQ中取消息而不管谁发布的,这样发布者和使用者都不用知道对方的存在。

三、RockerMQ

3.1 开源组件

要使用Java消息服务,你必须要有一个JMS提供者,管理会话和队列。既有开源的提供者也有专有的提供者。

开源的提供者包括:

  1. Apache ActiveMQ
  2. JBoss 社区所研发的 HornetQ
  3. RockerMQ
  4. ......

专有的提供者包括:

  1. BEA的BEA WebLogic Server JMS
  2. TIBCO Software的EMS
  3. GigaSpaces Technologies的GigaSpaces
  4. ......

3.2 使用RocketMQ好处

  1. 实现开发语言间的解耦,比如生产者是Java,消费者是.NET
  2. 实现高并发场景下的分布式线程池
  3. 将无需即时返回且耗时的操作进行异步处理,比如短信群发
  4. ......

3.3 RocketMQ专业术语

Producer

消息生产者,位于用户的进程内,Producer通过NameServer获取所有Broker的路由信息,根据负载均衡策略选择将消息发到哪个Broker,然后调用Broker接口提交消息。

Producer Group

生产者组,简单来说就是多个发送同一类消息的生产者称之为一个生产者组。

Consumer

消息消费者,位于用户进程内。Consumer通过NameServer获取所有broker的路由信息后,向Broker发送Pull请求来获取消息数据。Consumer可以以两种模式启动,广播(Broadcast)和集群(Cluster),广播模式下,一条消息会发送给所有Consumer,集群模式下消息只会发送给一个Consumer。

Consumer Group

消费者组,和生产者类似,消费同一类消息的多个 Consumer 实例组成一个消费者组。

Topic

Topic用于将消息按主题做划分,Producer将消息发往指定的Topic,Consumer订阅该Topic就可以收到这条消息。Topic跟发送方和消费方都没有强关联关系,发送方可以同时往多个Topic投放消息,消费方也可以订阅多个Topic的消息。在RocketMQ中,Topic是一个上逻辑概念。消息存储不会按Topic分开。

Message

代表一条消息,使用MessageId唯一识别,用户在发送时可以设置messageKey,便于之后查询和跟踪。一个 Message 必须指定 Topic,相当于寄信的地址。Message 还有一个可选的 Tag 设置,以便消费端可以基于 Tag 进行过滤消息。也可以添加额外的键值对,例如你需要一个业务 key 来查找 Broker 上的消息,方便在开发过程中诊断问题。

Tag

标签可以被认为是对 Topic 进一步细化。一般在相同业务模块中通过引入标签来标记不同用途的消息。

Broker

Broker是RocketMQ的核心模块,负责接收并存储消息,同时提供Push/Pull接口来将消息发送给Consumer。Consumer可选择从Master或者Slave读取数据。多个主/从组成Broker集群,集群内的Master节点之间不做数据交互。Broker同时提供消息查询的功能,可以通过MessageID和MessageKey来查询消息。Borker会将自己的Topic配置信息实时同步到NameServer。

Queue

Topic和Queue是1对多的关系,一个Topic下可以包含多个Queue,主要用于负载均衡。发送消息时,用户只指定Topic,Producer会根据Topic的路由信息选择具体发到哪个Queue上。Consumer订阅消息时,会根据负载均衡策略决定订阅哪些Queue的消息。

Offset

RocketMQ在存储消息时会为每个Topic下的每个Queue生成一个消息的索引文件,每个Queue都对应一个Offset记录当前Queue中消息条数。

NameServer

NameServer可以看作是RocketMQ的注册中心,它管理两部分数据:集群的Topic-Queue的路由配置;Broker的实时配置信息。其它模块通过Nameserv提供的接口获取最新的Topic配置和路由信息。

  • Producer/Consumer :通过查询接口获取Topic对应的Broker的地址信息
  • Broker : 注册配置信息到NameServer, 实时更新Topic信息到NameServer

3.4 流程图

消息先发到Topic,然后消费者去Topic拿消息。那它到底是怎么存储消息数据的呢,这里就要引入Broker概念。

img

Topic是一个逻辑上的概念,实际上Message是在每个Broker上以Queue的形式记录。

img

结论

  1. 消费者发送的Message会在Broker中的Queue队列中记录

  2. 一个Topic的数据可能会存在多个Broker中

  3. 一个Broker存在多个Queue

  4. 单个的Queue也可能存储多个Topic的消息

也就是说每个Topic在Broker上会划分成几个逻辑队列,每个逻辑队列保存一部分消息数据,但是保存的消息数据实际上不是真正的消息数据,而是指向commit log的消息索引。Queue不是真正存储Message的地方,真正存储Message的地方是在CommitLog。

img

左边的是CommitLog。这个是真正存储消息的地方。RocketMQ所有生产者的消息都是往这一个地方存的。

右边是ConsumeQueue。这是一个逻辑队列。和上文中Topic下的Queue是一一对应的。消费者是直接和ConsumeQueue打交道。ConsumeQueue记录了消费位点,这个消费位点关联了commitlog的位置。所以即使ConsumeQueue出问题,只要commitlog还在,消息就没丢,可以恢复出来。还可以通过修改消费位点来重放或跳过一些消息。

3.5 部署模型

在部署RocketMQ时,会部署两种角色。NameServer和Broker。

img

说明:

  1. Product和consumer集群部署,是你开发的项目进行集群部署

  2. Broker 集群部署是为了高可用,因为Broker是真正存储Message的地方,集群部署是为了避免一台挂掉,导致整个项目KO

  3. Name Server是一个几乎无状态节点,可集群部署,节点之间无任何信息同步

  4. 每个Broker与Name Server集群中的所有节点建立长连接,定时注册Topic信息到所有Name Server

  5. Producer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息

  6. Consumer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息

每个Broker与Name Server集群中的所有节点建立长连接的好处:

  1. 这样可以使Name Server之间可以没有任何关联,因为它们绑定的Broker是一致的

  2. 作为Producer或者Consumer可以绑定任何一个Name Server 因为它们都是一样的

3.6 Broker

Broker与Name Server关系

  1. 连接

    单个Broker和所有Name Server保持长连接

  2. 心跳

    心跳间隔:每隔30秒向所有NameServer发送心跳,心跳包含了自身的Topic配置信息

    心跳超时:NameServer每隔10秒,扫描所有还存活的Broker连接,若某个连接2分钟内没有发送心跳数据,则断开连接

  3. 断开

    当Broker挂掉,NameServer会根据心跳超时主动关闭连接,一旦连接断开,会更新Topic与队列的对应关系,但不会通知生产者和消费者

负载均衡

  1. 一个Topic分布在多个Broker上,一个Broker可以配置多个Topic,它们是多对多的关系。如果某个Topic消息量很大,应该给它多配置几个Queue,并且尽量多分布在不同Broker上,减轻某个Broker的压力。

可用性

  1. 由于消息分布在各个Broker上,一旦某个Broker宕机,则该Broker上的消息读写都会受到影响。所以RocketMQ提供了Master/Slave的结构,Salve定时从Master同步数据,如果Master宕机,则Slave提供消费服务,但是不能写入消息,此过程对应用透明,由RocketMQ内部解决

思考问题

  1. 一旦某个broker master宕机,生产者和消费者多久才能发现?

    受限于Rocketmq的网络连接机制,默认情况下最多需要30秒,因为消费者每隔30秒从nameserver获取所有topic的最新队列情况,这意味着某个broker如果宕机,客户端最多要30秒才能感知。

  2. master恢复恢复后,消息能否恢复?
    消费者得到Master宕机通知后,转向Slave消费,但是Slave不能保证Master的消息100%都同步过来了,因此会有少量的消息丢失。但是消息最终不会丢的,一旦Master恢复,未同步过去的消息会被消费掉

3.7 Consumer

Consumer与Name Server关系

  1. 连接

    单个Consumer和一台NameServer保持长连接,如果该NameServer挂掉,消费者会自动连接下一个NameServer,直到有可用连接为止,并能自动重连

  2. 心跳

    与NameServer没有心跳

  3. 轮询时间

    默认情况下,消费者每隔30秒从NameServer获取所有Topic的最新队列情况,这意味着某个Broker如果宕机,客户端最多要30秒才能感知

Consumer与Broker关系

  1. 连接

    单个消费者和该消费者关联的所有broker保持长连接

负载均衡

  1. 集群消费模式下,一个消费者集群多台机器共同消费一个Topic的多个队列,一个队列只会被一个消费者消费。如果某个消费者挂掉,分组内其它消费者会接替挂掉的消费者继续消费

3.8 Producer

Producer与Name Server关系

  1. 连接

    单个Producer和一台NameServer保持长连接,如果该NameServer挂掉,生产者会自动连接下一个NameServer,直到有可用连接为止,并能自动重连

  2. 轮询时间

    默认情况下,生产者每隔30秒从NameServer获取所有Topic的最新队列情况,这意味着某个Broker如果宕机,生产者最多要30秒才能感知,在此期间,发往该broker的消息发送失败

  3. 心跳

    与nameserver没有心跳

Producer与broker关系

  1. 连接

    单个生产者和该生产者关联的所有broker保持长连接

四、部署RocketMQ

4.1 Docker部署

Broker 和 NameServer 分别都做了集群部署,各部署两个。

broker-1.conf

mkdir -p data/broker-1
vim data/broker-1/broker-1.conf
# 所属集群名字
brokerClusterName = rocketmq-cluster
# Broker的名称
brokerName = broker-1
# 0表示Master,大于0表示不同的Slave的ID
brokerId = 0
# 开发环境填外网,生产环境建议填内网
brokerIP1 = 192.168.17.101
# 表明在几点做消息删除动作,默认值04,表示凌晨4点
deleteWhen = 04
# 在磁盘上保存消息的时长,单位是小时,自动删除超时的消息
fileReservedTime = 48
# Broker角色,ASYNC_MASTER异步复制Master,SYNC_MASTER同步双写Master,SLAVE从节点
brokerRole = ASYNC_MASTER
# 刷盘方式,ASYNC_FLUSH异步刷盘,SYNC_FLUSH同步刷盘
flushDiskType = ASYNC_FLUSH
# nameserver地址,如果nameserver是多台集群的话,用分号分割,开发环境填外网,生产环境建议填内网
namesrvAddr=192.168.17.101:9876;192.168.17.101:9877
autoCreateTopicEnable=true
# Broker对外服务的监听端口
listenPort = 10911

broker-2.conf

mkdir -p data/broker-2
vim data/broker-2/broker-2.conf
# 所属集群名字
brokerClusterName = rocketmq-cluster
# Broker的名称
brokerName = broker-2
# 0表示Master,大于0表示不同的Slave的ID
brokerId = 0
# 开发环境填外网,生产环境建议填内网
brokerIP1 = 192.168.17.101
# 表明在几点做消息删除动作,默认值04,表示凌晨4点
deleteWhen = 04
# 在磁盘上保存消息的时长,单位是小时,自动删除超时的消息
fileReservedTime = 48
# Broker角色,ASYNC_MASTER异步复制Master,SYNC_MASTER同步双写Master,SLAVE从节点
brokerRole = ASYNC_MASTER
# 刷盘方式,ASYNC_FLUSH异步刷盘,SYNC_FLUSH同步刷盘
flushDiskType = ASYNC_FLUSH
# nameserver地址,如果nameserver是多台集群的话,用分号分割,开发环境填外网,生产环境建议填内网
namesrvAddr=192.168.17.101:9876;192.168.17.101:9877
autoCreateTopicEnable=true
# Broker对外服务的监听端口
listenPort = 10912

docker-compose.yaml

chown 3000:3000 -R data/
vim docker-compose.yaml
version: '3.5'
services:
  nameserver-1:
    image: rocketmqinc/rocketmq:4.3.0
    container_name: nameserver-1
    ports:
      - 9876:9876
    volumes:
      - ./data/nameserver-1/logs:/home/rocketmq/logs
      - ./data/nameserver-1/store:/home/rocketmq/store
    command: sh mqnamesrv
    networks:
      rocketmq:
        aliases:
          - nameserver-1
  nameserver-2:
    image: rocketmqinc/rocketmq:4.3.0
    container_name: nameserver-2
    ports:
      - 9877:9876
    volumes:
      - ./data/nameserver-2/logs:/home/rocketmq/logs
      - ./data/nameserver-2/store:/home/rocketmq/store
    command: sh mqnamesrv
    networks:
      rocketmq:
        aliases:
          - nameserver-2
  broker-1:
    image: rocketmqinc/rocketmq:4.3.0
    container_name: broker-1
    ports:
      - 10911:10911
    volumes:
      - ./data/broker-1/logs:/home/rocketmq/logs
      - ./data/broker-1/store:/home/rocketmq/store
      - ./data/broker-1/broker-1.conf:/opt/rocketmq-4.3.0/conf/broker.conf
    environment:
      TZ: Asia/Shanghai
      NAMESRV_ADDR: "nameserver-1:9876"
      JAVA_OPT_EXT: "-server -Xms256m -Xmx256m -Xmn128m"
    command: sh mqbroker -c /opt/rocketmq-4.3.0/conf/broker.conf autoCreateTopicEnable=true &
    links:
      - nameserver-1:nameserver-1
      - nameserver-2:nameserver-2
    networks:
      rocketmq:
        aliases:
          - broker-1
  broker-2:
    image: rocketmqinc/rocketmq:4.3.0
    container_name: broker-2
    ports:
      - 10912:10912
    volumes:
      - ./data/broker-2/logs:/home/rocketmq/logs
      - ./data/broker-2/store:/home/rocketmq/store
      - ./data/broker-2/broker-2.conf:/opt/rocketmq-4.3.0/conf/broker.conf  
    environment:
      TZ: Asia/Shanghai
      NAMESRV_ADDR: "nameserver-2:9876"
      JAVA_OPT_EXT: "-server -Xms256m -Xmx256m -Xmn128m"
    command: sh mqbroker -c /opt/rocketmq-4.3.0/conf/broker.conf autoCreateTopicEnable=true &
    links:
      - nameserver-1:nameserver-1
      - nameserver-2:nameserver-2
    networks:
      rocketmq:
        aliases:
          - broker-2
  console:
    image: styletang/rocketmq-console-ng:latest
    container_name: console
    ports:
      - 8080:8080
    environment:
      JAVA_OPTS: -Drocketmq.namesrv.addr=nameserver-1:9876;nameserver-2:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false
    networks:
      rocketmq:
        aliases:
          - console
networks:
  rocketmq:
    name: rocketmq
    driver: bridge

4.2 Windows部署

环境变量

# JAVA_HOME
D:\JDK
# ROCKETMQ_HOME
D:\rocketmq-all-4.8.0-bin-release
# Path
%JAVA_HOME%\bin
%ROCKETMQ_HOME%\bin

下载地址

# 启动mqnamesrv
start mqnamesrv.cmd
# 启动mqbroker
start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true

4.3 配置文件说明

基础配置

配置描述默认值例子
namesrvAddrnameServer地址,如果nameserver是多台集群的话,用分号分割namesrvAddr=10.1.219.75:9876;10.1.219.76:9876
brokerClusterName所属集群名字。Cluster 的地址,如果集群机器数比较多,可以分成多个Cluster ,每个Cluster 供一个业务群使用brokerClusterName=rocketmq-cluster
brokerNameBroker 的名称, Master 和Slave 通过使用相同的Broker 名称来表明相互关系,以说明某个Slave 是哪个Master 的Slave brokerName=broker-a
brokerId一个Master Barker 可以有多个Slave, 0 表示Master ,大于0 表示不同的 Slave 的ID brokerId=0
fileReservedTime在磁盘上保存消息的时长,单位是小时,自动删除超时的消息 fileReservedTime=48
deleteWhen与fileReservedTim巳参数呼应,表明在几点做消息删除动作,默认值04 表示凌晨4 点 deleteWhen=04
brokerRolebrokerRole 有3 种: SYNC_MASTER(同步双写) 、ASYNC_MASTER(异步复制) 、SLAVE 。关键词SYNC 和ASYNC 表示Master 和Slave 之间同步消息的机制, SYNC 的意思是当Slave 和Master 消息同步完成后,再返回发送成功的状态 brokerRole=SYNC_MASTER
flushDiskTypeflushDiskType 表示刷盘策略,分为SYNC_FLUSH 和ASYNC_FLUSH两种,分别代表同步刷盘和异步刷盘。同步刷盘情况下,消息真正写人磁盘后再返回成功状态;异步刷盘情况下,消息写人page_cache 后就返回成功状态 flushDiskType=ASYNC_FLUSH
listenPortBroker 监听的端口号,如果一台机器上启动了多个Broker , 则要设置不同的端口号,避免冲突 listenPort=10911
storePathRootDir存储消息以及一些配置信息的根目录 storePathRootDir=/app/custom/data/rocketmq/store-a

进阶配置

配置描述默认值例子
autoCreateTopicEnable是否允许 Broker 自动创建Topic,建议线下开启,线上关闭 autoCreateTopicEnable=true
defaultTopicQueueNums在发送消息时,自动创建服务器不存在的topic,默认创建的队列数。 defaultTopicQueueNums=4
autoCreateSubscriptionGroup是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭 autoCreateTopicEnable=true
mapedFileSizeCommitLogcommitLog每个文件的大小,默认1G1GmapedFileSizeCommitLog=1073741824
mapedFileSizeConsumeQueueConsumeQueue每个文件默认存30W条,根据业务情况调整 mapedFileSizeConsumeQueue=300000

存储配置

配置描述默认值例子
storePathRootDir存储消息以及一些配置信息的根目录 storePathRootDir=/app/custom/data/rocketmq/store-a
storePathCommitLogcommitLog 存储路径 storePathCommitLog=/data/rocketmq/store/commitlog
storePathConsumeQueue消费队列存储路径存储路径 storePathConsumeQueue=/data/rocketmq/store/consumequeue
storePathIndex消息索引存储路径 storePathIndex=/data/rocketmq/store/index
storeCheckpointcheckpoint 文件存储路径 storeCheckpoint=/data/rocketmq/store/checkpoint
abortFileabort 文件存储路径 abortFile=/data/rocketmq/store/abort

五、整合RocketMQ

SpringBoot整合RocketMQ

pom.xml

<!-- SpringBoot整合RocketMQ -->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.1.0</version>
</dependency>

application.yml

server:
  port: 8087

rocketmq:
  # rocketmq服务地址
  name-server: 192.168.17.101:9876;192.168.17.101:9877
  producer:
    # 自定义的组名称
    group: producer-group
    # 消息发送超时时长
    send-message-timeout: 30000

RocketMqController.java

package com.qiang.rocketmq.service.controller;

import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.UUID;

/**
 * @author: 小强崽
 * @create: 2021/1/20 11:14
 * @description:
 **/
@RestController
@RequestMapping("/rocketmq")
public class RocketMqController {

    private Logger logger = LoggerFactory.getLogger(this.getClass());

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    private static final String TOPIC_NAME = "testTopic";

    /**
     * http://localhost:8087/rocketmq/send?msg=小强崽
     * 普通消息投递
     */
    @GetMapping("/send")
    public String send(String msg) {
        Message<String> message = MessageBuilder.withPayload(msg).build();
        rocketMQTemplate.convertAndSend("testTopic", message);
        return "投递消息 => " + message + " => 成功";
    }

    /**
     * http://localhost:8087/rocketmq/key?msg=小强崽
     * 带key的消息发送
     */
    @GetMapping("/key")
    public String key(String msg) {
        Message<String> message = MessageBuilder.withPayload(msg).setHeader("KEYS", UUID.randomUUID()).build();
        SendResult sendResult = rocketMQTemplate.syncSend(TOPIC_NAME, message);
        logger.info(sendResult.toString());
        return "投递消息 => " + message + " => 成功";
    }

    /**
     * http://localhost:8087/rocketmq/tag?msg=小强崽
     * 带tag的消息发送
     */
    @GetMapping("/tag")
    public String tag(String msg) {
        Message<String> message = MessageBuilder.withPayload(msg).setHeader("KEYS", UUID.randomUUID()).build();
        SendResult sendResult = rocketMQTemplate.syncSend(TOPIC_NAME + ":testTag", message);
        logger.info(sendResult.toString());
        return "投递消息 => " + message + " => 成功";
    }

}

发送结果

image-20210121112828878

TestTopicListener.java

package com.qiang.rocketmq.service.listener;

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

/**
 * @author: 小强崽
 * @create: 2021/1/20 14:16
 * @description: testTopic主题消费
 **/
@Component
@RocketMQMessageListener(consumerGroup = "consumer-group", topic = "testTopic")
public class TestTopicListener implements RocketMQListener<String> {

    private Logger logger = LoggerFactory.getLogger(this.getClass());

    @Override
    public void onMessage(String message) {
        logger.info("testTopic消费到的消息:{}", message);
    }
}

消费结果

image-20210121112931223