首页> 中国专利> 一种应用于政务云平台构建微服务MQ的方法

一种应用于政务云平台构建微服务MQ的方法

摘要

本发明公开一种应用于政务云平台构建微服务MQ的方法,涉及MQ构建技术领域;构建微服务MQ的服务端和客户端,建立服务端的消息代理中心Broker、消息存储模块和注册中心,其中消息代理中心接收消息,同时提供RPC服务供消费者使用消费消息,消息存储模块持久化存储消息,注册中心包括Broker注册中心和Consumer注册中心,Broker注册中心提供Broker注册RPC服务,Consumer注册中心提供Consumer注册消费节点服务,建立客户端的消息生产者模块Producer和消息消费者模块Consumer。

著录项

  • 公开/公告号CN112559208A

    专利类型发明专利

  • 公开/公告日2021-03-26

    原文格式PDF

  • 申请/专利权人 浪潮云信息技术股份公司;

    申请/专利号CN202011469208.X

  • 发明设计人 许前刚;

    申请日2020-12-15

  • 分类号G06F9/54(20060101);G06Q10/10(20120101);H04L29/08(20060101);

  • 代理机构37100 济南信达专利事务所有限公司;

  • 代理人孙晶伟

  • 地址 250100 山东省济南市高新区浪潮路1036号浪潮科技园S01号楼

  • 入库时间 2023-06-19 10:24:22

说明书

技术领域

本发明公开一种方法,涉及MQ构建技术领域;具体地说是一种应用于政务云平台构建微服务MQ的方法。

背景技术

随着政务云平台迅速发展,在政务云平台中的很多场景下,不会立即处理消息,需要在消息队列中存储消息,并在某一时刻再进行处理;在特殊的高并发环境下,由于来不及同步处理业务,请求往往会发生堵塞,比如大量的insert、update之类的请求同时到达MySql,直接导致无数的行锁和表锁,甚至最后请求会堆积过多,从而触发too manyconnections错误,通过使用消息队列,我们可以异步处理请求,从而缓解系统压力。

目前流行的ActiveMQ、RabbitMQ和ZeroMQ等消息队列的软件中,大多为了实现AMQP、STOMP、XMPP之类的协议,必须为重量级,但是很多Web应用中缓解高并发请求的解决方案需要轻量级的消息队列实现。

发明内容

本发明针对现有技术的问题,提供一种应用于政务云平台构建微服务MQ的方法,具有通用性强、实施简便等特点,具有广阔的应用前景。

本发明提出的具体方案是:

一种应用于政务云平台构建微服务MQ的方法,构建微服务MQ的服务端和客户端,

建立服务端的消息代理中心Broker、消息存储模块和注册中心,其中消息代理中心接收消息生产者推送生产的消息,同时提供RPC服务供消费者使用消费消息,消息存储模块持久化存储消息,支持MySql和TiDB两种存储方式,注册中心包括Broker注册中心和Consumer注册中心,Broker注册中心提供Broker注册RPC服务,Consumer注册中心提供Consumer注册消费节点服务,

建立客户端的消息生产者模块Producer和消息消费者模块Consumer,其中消息生产者模块Producer负责提供API接口供开发者调用,并生成和发送队列消息,并兼容异步批量多线程生产及同步批量多线程生产两种方式,

消息消费者模块Consumer,负责订阅消息并消费消息,并采用多线程轮训消息,消息分片及消息锁定实现订阅消息并消费消息。

优选地,所述的一种应用于政务云平台构建微服务MQ的方法中通过微服务MQ的服务端支持三种消息分发模式,包括并行消息、串行消息和广播消息分发模式,

所述并行消息分发模式是消息平均分配在给消息主题的在线消费者;

所述串行消息分发模式是消息固定分配给消息主题的在线消费者中其中一个;

所述广播消息分发模式是消息将广播发送给消息主题的在线消费者分组。

优选地,所述的一种应用于政务云平台构建微服务MQ的方法中通过微服务MQ,根据具体场景选择使用不同的方法控制消息,

其中延时控制方法为消息生产者模块发布消息时,设置消息的延迟生效时间,到达设置的生效时间时,该消息才被消息消费者模块消费;

事务性控制方法为消息消费者模块自主决定是否开启事务开关,开启事务开关后,保证消息被消息消费者模块成功执行一次;

失败重试控制方法为设置发布和订阅消息的失败重试次数,在执行发布或者订阅消息失败后按照设置的值进行消息重试执行,直至重试次数耗尽或者执行成功;

超时控制方法为消息生产者模块发布消息时自定义消息超时时间,消息消费者模块超时未消费消息,则消息代理中心主动中断该消息的消费,并在消息存储模块中设置该消息为失效状态。

优选地,所述的一种应用于政务云平台构建微服务MQ的方法中通过服务端和客户端的访问令牌AccessToken匹配,进行安全校验。

优选地,所述的一种应用于政务云平台构建微服务MQ的方法中利用前端界面对服务端进行管理。

一种应用于政务云平台构建微服务MQ的系统,包括构建模块,

构建模块构建微服务MQ的服务端和客户端,

建立服务端的消息代理中心Broker、消息存储模块和注册中心,其中消息代理中心接收消息生产者推送生产的消息,同时提供RPC服务供消费者使用消费消息,消息存储模块持久化存储消息,支持MySql和TiDB两种存储方式,注册中心包括Broker注册中心和Consumer注册中心,Broker注册中心提供Broker注册RPC服务,Consumer注册中心提供Consumer注册消费节点服务,

建立客户端的消息生产者模块Producer和消息消费者模块Consumer,其中消息生产者模块Producer负责提供API接口供开发者调用,并生成和发送队列消息,并兼容异步批量多线程生产及同步批量多线程生产两种方式,

消息消费者模块Consumer,负责订阅消息并消费消息,并采用多线程轮训消息,消息分片及消息锁定实现订阅消息并消费消息。

一种应用于政务云平台构建微服务MQ的装置,包括至少一个存储器和至少一个处理器;

所述至少一个存储器,用于存储机器可读程序;

所述至少一个处理器,用于调用所述机器可读程序,执行所述的一种应用于政务云平台构建微服务MQ的方法。

计算机可读介质,所述计算机可读介质上存储有计算机指令,所述计算机指令在被处理器执行时,使所述处理器执行所述的一种应用于政务云平台构建微服务MQ的方法。

本发明的有益之处是:

本发明提供一种应用于政务云平台构建微服务MQ的方法,面向微服务架构,提供统一的微服务MQ消息队列处理方法,其中服务端作为消息代理中心和消息存储中心,负责处理消息和存储消息,是一个独立的微服务应用,服务端可以实现集群部署和容器化部署,以提高容灾性和高可用性;而客户端分为消息发布端和消息订阅端,客户端分布在需要使用消息队列的各微服务实例中,消息发布端发布消息,消息订阅端消费消息;通过本发明方法,可以进一步在不同进程之间添加一层实现解耦,方便今后的扩展;可以处理异步请求,从而缓解系统的压力,提高系统的响应速度和吞吐量,从而提高系统的稳定性,可以构建一个轻量级的消息队列,从而实现缓解高并发请求。

附图说明

图1是本发明系统框架示意图;

图2是本发明方法中并行模式流程图;

图3是本发明方法中串行模式流程图;

图4是本发明方法中广播模式流程图。

具体实施方式

下面结合附图和具体实施例对本发明作进一步说明,以使本领域的技术人员可以更好地理解本发明并能予以实施,但所举实施例不作为对本发明的限定。

本发明提供一种应用于政务云平台构建微服务MQ的方法,构建微服务MQ的服务端和客户端,

建立服务端的消息代理中心Broker、消息存储模块和注册中心,其中消息代理中心接收消息生产者推送生产的消息,同时提供RPC服务供消费者使用消费消息,消息存储模块持久化存储消息,支持MySql和TiDB两种存储方式,注册中心包括Broker注册中心和Consumer注册中心,Broker注册中心提供Broker注册RPC服务,Consumer注册中心提供Consumer注册消费节点服务,

建立客户端的消息生产者模块Producer和消息消费者模块Consumer,其中消息生产者模块Producer负责提供API接口供开发者调用,并生成和发送队列消息,并兼容异步批量多线程生产及同步批量多线程生产两种方式,

消息消费者模块Consumer,负责订阅消息并消费消息,并采用多线程轮训消息,消息分片及消息锁定实现订阅消息并消费消息。

本发明方法中面向微服务架构,提供微服务MQ统一的消息队列处理方法,分服务端和客户端,其中服务端作为消息代理中心和消息存储中心,负责处理消息和存储消息,是一个独立的微服务应用,服务端可以实现集群部署和容器化部署,部署多个服务端实例,以提高容灾性和高可用性;而客户端分为消息发布端和消息订阅端,客户端分布在需要使用消息队列的各微服务实例中,消息发布端发布消息,消息订阅端消费消息。本发明方法中建立服务端的消息代理中心Broker负责接收消息生产者Producer推送生产的消息,同时负责提供RPC服务供消费者Consumer使用来消费消息。Broker通过内置注册中心实现集群功能,并支持集群部署,集群节点之间地位平等,集群部署情况下可大大提高系统的消息吞吐量,各节点在启动时自动注册到注册中心,Producer或者Consumer在生产消息或者消费消息时,将会通过内置注册中心自动感知到在线的Broker节点;Broker在接收到Producer的生产消息的RPC调用时,并不会立即存储该消息,而是立即Push到内存队列中,同时立即响应RPC调用,内存队列将会异步将队列中的消息数据存储到DB中;Broker在接收到“消息锁定”等同步RPC调用时,将会触发同步调用,采用乐观锁方式锁定消息。

而消息存储模块Message Queue具有消息持久化功能,负责把消息数据存储在DB中,支持MySql和TiDB两种存储方式,前者支持千万级消息堆积,后者支持百亿级别消息堆积,保障了数据安全,防止消息数据丢失。

Register Center(注册中心)主要分为两个子模块:Broker注册中心和Consumer注册中心。

Broker注册中心子模块供Broker注册RPC服务使用;Consumer注册中心子模块供Consumer注册消费节点使用。

而客户端包含消息生产者模块Producer和消息消费者模块Consumer。

其中消息生产者模块Producer负责提供API接口供开发者调用,并生成和发送队列消息;兼容“异步批量多线程生产”+“同步生产”两种方式,提升消息发送性能;底层通讯全异步化,消息新增+消息新增接收+消息回调+消息回调接收,仅批量Pull消息与锁消息非异步。

消息消费者模块Consumer,负责订阅消息并消费消息。通过“多线程轮训+消息分片+PULL+消息锁定”的方式来实现。

在具体的应用中,在政务云平台中的很多场景下,微服务不会立即处理消息,需要在消息队列中存储消息,并在某一时刻再进行处理;在特殊的高并发环境下,由于来不及同步处理业务,请求往往会发生堵塞,通过使用消息队列,我们可以异步处理请求,从而缓解系统压力。在一些比较耗时的业务场景中,可以把耗时较多的业务解耦通过异步队列执行,提高系统响应速度和吞吐量。目前流行的消息队列的软件中,大多为了实现AMQP、STOMP、XMPP之类的协议,变得极其重量级,但是很多Web应用中的实际情况是:我们只是想找到一个缓解高并发请求的解决方案,一个轻量级的消息队列实现方式才是我们真正需要的。

利用本发明方法进行部署:

(1)部署服务端消息中心,消息中心包括消息代理中心Broker、消息存储模块和注册中心,

A、解压源码,获取“消息中心数据库初始化SQL脚本”,并执行即可。“消息中心数据库初始化SQL脚本”位置为:/hc-mq/doc/db/hc-mq-mysql.sql。

消息中心数据库,兼容支持“MySql、TiDB”两种存储方式,前者支持千万级消息堆积,后者支持百亿级消息堆积(TiDB理论上无上限);可视情况选用,当选择TiDB时,仅需要修改消息中心数据库连接jdbc地址配置即可,其他部分如Sql和驱动等兼容MySql和TiDB,无需修改。

B、按照maven格式将源码导入IDE,使用maven进行编译即可。

C、消息中心配置:消息中心配置文件地址/hc-mq/hc-mq-admin/src/main/resources/application.properties,根据配置文件的说明进行数据库、心跳时间等的配置。

D、如果已经正确进行上述配置,可将项目编译打包部署,消息中心访问地址:http://127.0.0.1:8080/hc-mq-admin(该地址作为注册地址)。

消息中心支持集群部署,集群情况下各节点务必连接同一个MySql实例。消息中心集群部署时,如下:

DB配置保持一致;登录账号配置保持一致;集群机器时钟保持一致(单机集群忽视);推荐通过nginx为消息中心集群做负载均衡,分配域名。消息中心访问、客户端使用等操作通过该域名进行。

(2)客户端接入消息中心

A、maven依赖:确认pom文件中引入“hc-mq-client”的maven依赖。

B、消息接入方配置内容:hc.mq.admin.address=http://127.0.0.1:8080/hc-mq-admin。

C、消息接入方组件配置:HcMqSpringClientFactory hcMqSpringClientFactory=new HcMqSpringClientFactory();hcMqSpringClientFactory.setAdminAddress(adminAddress);return hcMqSpringClientFactory;

D、如果已经正确进行上述配置,可将项目编译打包部署。访问地址:http://127.0.0.1:8081/。

消息接入方支持集群部署,提升消息系统可用性,同时提升消息处理能力。消息接入方集群部署时,要求和建议:消息中心根地址(hc.mq.admin.address)保持一致。

在上述部署基础上,本发明的一些实施例中,通过服务端消息的分发具有并行消息、串行消息和广播消息三种消息模式。微服务开发人员可以根据具体场景选择使用不同的消息模式。

进一步的,如图2所示,并行消息是消息平均分配在该主题在线消费者,分片方式并行消费,可应用于发送邮件和发送短信等吞吐量较大的消息场景;如图3所示,串行消息是消息固定分配给该主题在线消费者中其中一个,FIFO方式串行消费,可应用于排队和秒杀等严格限制并发的业务逻辑和消息场景;如图4所示,广播消息是消息将会广播发送给该主题的在线消费者分组,全部分组都会消费该消息,但是一个分组只会消费一次,应用于广播更新缓存等消息场景。

而客户端中消费者通过“多线程轮训+消息分片+PULL+消息锁定”的方式来实现订阅消息并消费消息,每个Consumer将会存在一个线程,如存在多个Consumer,多个Consumer将会并行消费同一主题下的消息,大大提高消息的消费速度;线程轮训方式PULL消息,如若获取不到消息,将会主动休眠,休眠时间依次递增10s,最长60s,即消息生产之后,距离被消费存在0-60s的时间差,1分钟范围内,

队列中消息将会按照“Registry Center”中注册的Consumer列表顺序进行消息分段,保证一条消息只会被分配给其中一个Consumer,每个Consumer只会消费分配给自己的消息。因此,在多个Consumer并发消息时,可以保证同一条消息不被多个Consumer竞争来重复消费消息。还可利用分片函数MOD(“消息分片ID”,#{在线消费者总数})=#{当前消费者排名},进行消息分片计算,每个Consumer通过注册中心感知到在线所有的Consumer,计算出在线Consumer总数total,以及当前Consumer在所有Consumer中的排名rank;把消息分片ID对在线Consumer总数total进行取模,余数和当前Consumer排名rank一致的消息认定为分配给自己的消息。每个Consumer将会轮训PULL消息分片分配给自己的消息,顺序消费,Consumer在消费每一条消息时,开启事务后,将会主动进行消息锁定,通过数据库乐观锁来实现,锁定成功后消息状态变更为执行中状态,将不会被Consumer再次PULL到。因此,可以更进一步保证每条消息只会被消费一次;

消息执行结束后,将会调用Broker的RPC服务修改消息状态并追加消息日志,Broker将会通过内存队列方式,异步消息队列中变更存储到数据库中。

进一步的,利用本发明方法部署后,可对微服务MQ进行延时控制、事务性控制、失败重试控制和超时控制,

其中所述延时控制,消息生产者模块发布消息时,设置消息的延迟生效时间,到达设置的生效时间时,该消息才会被消息消费者模块消费,可应用于订单取消等延时消费场景;

所述事务性控制,消息消费者模块可以自主决定是否开启事务开关,开启事务开关后,消息事务性控制方法保证该消息只会被消息消费者模块成功执行一次;

所述失败重试控制,可设置发布和订阅消息的失败重试次数,在执行发布或者订阅消息失败后将会按照设置的值进行消息重试执行,直至重试次数耗尽或者执行成功;

所述超时控制,消息生产者模块发布消息时可自定义消息超时时间,消息消费者模块超时未消费该消息,消息代理中心将会主动中断该消息的消费,并在消息存储模块中设置该消息为失效状态。

同时,在上述实施基础上,通过前端界面进行管理,管理界面包含运行报表、消息主题管理、消息记录管理、业务线管理,

1、运行报表,展示消息中心系统信息,如业务线数量、消息主题数量以及消息数量等;通过日期分布图和成功比例图展示未消费、消费中、消费成功和消费失败的比例;

2、消息主题管理,可查看在线消息主题列表,底层会周期性扫描消息记录,发现并录入新的消息主题,并展示在这里;支持为消息主题设置业务线,和其他一些附属参数,如负责人、告警邮箱,提供增强功能;

所述业务线,是该消息所属业务线,方便分组管理;所述负责人,是该消息所属负责人;所述告警邮箱,一个或者多个,多个用逗号分隔,消息消费失败时,会周期性发送告警邮件;

3、消息记录管理,可查看在线消息记录,支持筛选、查看消息流转轨迹;消息在线管理功能支持在线新增、编辑和删除消息记录;支持根据消息主题、状态和清理类型等手动清理消息;

4、业务线管理,可查看在线业务线列表,并管理维护;可通过自定义业务线,绑定消息主题,从而方便消息主题的分组管理。

本发明还提供一种应用于政务云平台构建微服务MQ的系统,包括构建模块,

构建模块构建微服务MQ的服务端和客户端,

建立服务端的消息代理中心Broker、消息存储模块和注册中心,其中消息代理中心接收消息生产者推送生产的消息,同时提供RPC服务供消费者使用消费消息,消息存储模块持久化存储消息,支持MySql和TiDB两种存储方式,注册中心包括Broker注册中心和Consumer注册中心,Broker注册中心提供Broker注册RPC服务,Consumer注册中心提供Consumer注册消费节点服务,

建立客户端的消息生产者模块Producer和消息消费者模块Consumer,其中消息生产者模块Producer负责提供API接口供开发者调用,并生成和发送队列消息,并兼容异步批量多线程生产及同步批量多线程生产两种方式,

消息消费者模块Consumer,负责订阅消息并消费消息,并采用多线程轮训消息,消息分片及消息锁定实现订阅消息并消费消息。

上述系统内的各模块之间信息交互、执行过程等内容,由于与本发明方法实施例基于同一构思,具体内容可参见本发明方法实施例中的叙述,此处不再赘述。

同时本发明还一种应用于政务云平台构建微服务MQ的装置,包括至少一个存储器和至少一个处理器;

所述至少一个存储器,用于存储机器可读程序;

所述至少一个处理器,用于调用所述机器可读程序,执行所述的一种应用于政务云平台构建微服务MQ的方法。上述装置内的处理器的信息交互、执行可读程序过程等内容,由于与本发明方法实施例基于同一构思,具体内容可参见本发明方法实施例中的叙述,此处不再赘述。

以及本发明提供计算机可读介质,所述计算机可读介质上存储有计算机指令,所述计算机指令在被处理器执行时,使所述处理器执行所述的一种应用于政务云平台构建微服务MQ的方法。具体地,可以提供配有存储介质的系统或者装置,在该存储介质上存储着实现上述实施例中任一实施例的功能的软件程序代码,且使该系统或者装置的计算机(或CPU或MPU)读出并执行存储在存储介质中的程序代码。

在这种情况下,从存储介质读取的程序代码本身可实现上述实施例中任何一项实施例的功能,因此程序代码和存储程序代码的存储介质构成了本发明的一部分。

用于提供程序代码的存储介质实施例包括软盘、硬盘、磁光盘、光盘(如CD-ROM、CD-R、CD-RW、DVD-ROM、DVD-RAM、DVD-RW、DVD+RW)、磁带、非易失性存储卡和ROM。可选择地,可以由通信网络从服务器计算机上下载程序代码。

此外,应该清楚的是,不仅可以通过执行计算机所读出的程序代码,而且可以通过基于程序代码的指令使计算机上操作的操作系统等来完成部分或者全部的实际操作,从而实现上述实施例中任意一项实施例的功能。

此外,可以理解的是,将由存储介质读出的程序代码写到插入计算机内的扩展板中所设置的存储器中或者写到与计算机相连接的扩展单元中设置的存储器中,随后基于程序代码的指令使安装在扩展板或者扩展单元上的CPU等来执行部分和全部实际操作,从而实现上述实施例中任一实施例的功能。

需要说明的是,上述实施例的各流程和各系统装置结构中不是所有的步骤和模块都是必须的,可以根据实际的需要忽略某些步骤或模块。各步骤的执行顺序不是固定的,可以根据需要进行调整。上述各实施例中描述的系统结构可以是物理结构,也可以是逻辑结构,即,有些模块可能由同一物理实体实现,或者,有些模块可能分由多个物理实体实现,或者,可以由多个独立设备中的某些部件共同实现。

以上所述实施例仅是为充分说明本发明而所举的较佳的实施例,本发明的保护范围不限于此。本技术领域的技术人员在本发明基础上所作的等同替代或变换,均在本发明的保护范围之内。本发明的保护范围以权利要求书为准。

去获取专利,查看全文>

相似文献

  • 专利
  • 中文文献
  • 外文文献
获取专利

客服邮箱:kefu@zhangqiaokeyan.com

京公网安备:11010802029741号 ICP备案号:京ICP备15016152号-6 六维联合信息科技 (北京) 有限公司©版权所有
  • 客服微信

  • 服务号