首页> 中国专利> 基于消息队列的分布式事件总线处理方法、终端及介质

基于消息队列的分布式事件总线处理方法、终端及介质

摘要

本申请涉及计算机技术领域,旨在提供基于消息队列的分布式事件总线处理方法、终端及介质,其方法包括与发布方建立连接,获取发布事件报文;对获取的事件报文进行事件数据的基本属性校验,生成唯一的事件ID;从节点池中选择可用的节点,向消息中间件推送消息;当接收到消费方的订阅事件请求消息后,解码订阅事件请求消息,校验事件数据的合法性并获取事件ID;与消息中间件建立连接,根据事件ID,向消息中间件订阅相关的消息队列,建立回调处理器;获取回调处理器的事件,以接收消息中间件的消息,将消息进行解码,转换为订阅的事件结构,以事件报文流推送给消费方。本申请具有实现客户端的轻量级操作的效果。

著录项

  • 公开/公告号CN112527525A

    专利类型发明专利

  • 公开/公告日2021-03-19

    原文格式PDF

  • 申请/专利权人 广州伊智信息科技有限公司;

    申请/专利号CN202011459468.9

  • 发明设计人 莫伦辉;莫伦冰;钟嘉文;赖泽平;

    申请日2020-12-11

  • 分类号G06F9/54(20060101);

  • 代理机构

  • 代理人

  • 地址 511400 广东省广州市番禺区东环街番禺大道北555号天安总部中心19号楼301室之一

  • 入库时间 2023-06-19 10:19:37

说明书

技术领域

本申请涉及计算机技术领域,尤其是涉及基于消息队列的分布式事件总线处理方法、终端及介质。

背景技术

目前,在分布式事件总线系统中,存在大量使用消息中间件解耦应用逻辑的场景,以在不同应用中传递消息。消息中间件通过提供消息传递和消息排队模型,实现在分布式环境下扩展进程间的通信。消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。消息中间件支持同步方式和异步方式,因异步中间件比同步中间件具有更强的容错性,故使用更广泛。异步中间件的发布-订阅接口,可以指定哪种类型的用户可以接受哪种类型的消息,更加有针对性,已成为异步中间件的非正式标准。

但很多消息中间件都由几个组件组成,客户端使用的时候需要了解不同组件的功能和使用方式,同时,使用这些中间件时,客户端需要关注不同事件的路由、分区等细节,即并没有为客户端提供一个易于配置和使用的基于事件总线语义的接口,客户端操作繁琐。

针对上述中的相关技术,申请人认为存在有现有的消息中间件没有为客户端提供一个易于配置和使用的基于事件总线语义的接口,客户端操作繁琐的缺陷。

发明内容

为了实现客户端的轻量级操作,给客户端提供一个易于配置和使用的基于事件总线语义的接口,本申请提供了基于消息队列的分布式事件总线处理方法、终端及介质。

本申请目的一是提供一种基于消息队列的分布式事件总线处理方法,具有易于客户端配置和使用的特点。

本申请的上述申请目的一是通过以下技术方案得以实现的:

一种基于消息队列的分布式事件总线处理方法,与发布方建立连接,获取发布方发布的以事件数据编码、封装形成的事件报文;

对获取的事件报文进行事件数据的基本属性校验,生成唯一的事件ID;

从节点池中选择可用的节点,将事件报文投递至消息中间件,返回与事件报文对应的事件ID给消息中间件,向消息中间件推送消息;

当接收到消费方的订阅事件请求消息后,解码订阅事件请求消息,校验事件数据的合法性并获取事件ID;

与消息中间件建立连接,根据事件ID,向消息中间件订阅相关的消息队列,建立回调处理器;

获取回调处理器的事件,以接收消息中间件的消息,将消息进行解码,转换为订阅的事件结构,以事件报文流推送给消费方。

通过采用上述技术方案,发布方将事件数据编码、封装形成事件报文,发布事件;在发布方发布事件后,服务端获取发布方的事件报文,进行参数校验并生成唯一事件ID,从节点池中选择可用的节点,将事件报文投递至消息中间件,返回与事件报文对应的事件ID给消息中间件,向消息中间件推送消息,以将复杂的操作放在服务端进行,向客户端隐藏了复杂的内部消息路由,使得事件数据的细节被封装了,客户端无需关注不同事件的路由、分区等细节,易于客户端配置和使用;当消费方请求订阅事件后,服务端接收订阅消息请求并解码,校验事件数据的合法性,与消息中间件建立连接以向消息中间件订阅相关的消息队列,建立回调处理器,再获取回调处理器的事件,以接收消息中间件的消息,将消息进行解码,转换为订阅的事件结构,以事件报文流推送给消费方,进而消息的存储和分发依赖所使用的消息队列提供的能力,客户端只处理消息的投递和消费 ,其他逻辑都封装在了服务端,减少基于消息中间件的事件处理的复杂性,实现了客户端操作的轻量级,给客户端提供一个易于配置和使用的基于事件总线语义的接口,易于客户端操作。

本申请在一较佳示例中可以进一步配置为:从节点池中选择可用节点的具体步骤为:

从已记录的上次投递过的节点偏移,确定下一个节点;

从节点池中找到确定的节点,发送ping报文;

当有节点状态信息反馈时,确定该节点为可用节点,选择该节点。

通过采用上述技术方案,从已记录的上次投递过的节点偏移,确定下一个节点的信息,以能够从节点池中找到该节点并进行可用性检测;当有节点状态信息反馈时,即确定该节点为可用节点,完成从节点池中选择可用节点的操作,避免进行无效的事件投递。

本申请在一较佳示例中可以进一步配置为:当发送ping报文后无节点状态信息反馈时,确定该节点为异常节点;继续从已记录的上次投递过的节点偏移,确定新的下一个节点;

从节点池中找到新确定的节点并发送ping报文,直至有节点状态信息反馈。

通过采用上述技术方案,保证了从节点池中选择的节点为可用节点,避免进行无效的事件投递。

本申请在一较佳示例中可以进一步配置为:当节点池中所有的节点均无节点状态信息反馈,且从已记录的上次投递过的节点偏移后,无新的下一节点,即判断为系统异常,终止投递消息。

通过采用上述技术方案,当服务端发现系统异常时,及时终止消息投递,以减轻系统负担。

本申请在一较佳示例中可以进一步配置为:消费方的订阅请求消息包括客户端的ID和需要订阅的事件类型列表。

通过采用上述技术方案,消费方的订阅请求消息包括消费方的ID和需要订阅的事件类型列表,以用于表征消费方的订阅请求,使得服务端能够区分不同消费方的订阅请求。

本申请在一较佳示例中可以进一步配置为:回调处理器的事件是根据消费方的订阅事件请求,丢弃消费方不感兴趣的事件后得到的。

通过采用上述技术方案,回调处理器根据消费方的订阅事件请求,丢弃消费方不感兴趣的事件,以获取消费方订阅的事件,基于事件总线,针对请求进行事件处理。

本申请在一较佳示例中可以进一步配置为:在事件报文流推送给消费方后,获取消费方接收到事件后进行业务逻辑处理的结果,判断是否需要重试事件处理。

通过采用上述技术方案,在事件报文流推送后,根据消费方接收到事件后进行业务逻辑处理结果的反馈,判断是否需要重试事件处理,以保证事件的成功推送,提高事件推送成功概率。

本申请在一较佳示例中可以进一步配置为:消息队列至少有四种,不同的消息队列能提供不同的特性。

通过采用上述技术方案,在需要时,通过选择不同的消息队列提供的能力来满足特定的需求,使用灵活,适用性强。

本申请目的二是提供一种智能终端,具有实现客户端的轻量级操作,给客户端提供一个易于配置和使用的基于事件总线语义接口的特点。

本申请的上述申请目的二是通过以下技术方案得以实现的:

一种智能终端,包括存储器和处理器,所属存储器上存储有能够被处理器加载并执行如上述基于消息队列的分布式事件总线处理方法的计算机程序。

本申请目的三是提供一种计算机可读存储介质,具有实现客户端的轻量级操作,给客户端提供一个易于配置和使用的基于事件总线语义接口的特点。

本申请的上述申请目的三是通过以下技术方案得以实现的:

一种计算机可读存储介质,存储有能够被处理器加载并执行上述基于消息队列的分布式事件总线处理方法的计算机程序。

综上所述,本申请包括以下至少一种有益技术效果:

1.一种基于消息队列的分布式事件总线处理方法将复杂的操作放在服务端进行,消息的存储和分发依赖所使用的消息队列的提供能力,客户端只处理消息的投递和消费 ,其他逻辑都封装在了服务端,减少了基于消息中间件的事件处理的复杂性,实现了客户端操作的轻量级,给客户端提供一个易于配置和使用的基于事件总线语义的接口,易于客户端操作,实现了基于消息队列的分布式事件总线处理方法;

2.从节点池中找到下一节点并进行可用性检测,避免进行无效的事件投递;

3.当节点池中所有的节点均无节点状态信息反馈,且从已记录的上次投递过的节点偏移后,无新的下一节点,即判断为系统异常,终止投递消息,减轻系统负担;

4.消费方的订阅请求消息包括消费方的ID和需要订阅的事件类型列表,以用于表征消费方的订阅请求,使得服务端能够区分不同消费方的订阅请求;

5.回调处理器根据消费方的订阅事件请求,丢弃消费方不感兴趣的事件,以获取消费方订阅的事件,基于事件总线,针对请求进行事件处理;

6.服务端根据消费方接收到事件后进行业务逻辑处理结果的反馈,判断是否需要重试事件处理,以保证事件的成功推送,提高事件推送成功概率;

7.消息队列至少有四种,不同的消息队列能提供不同的特性;在需要时,通过选择不同的消息队列提供的能力来满足特定的需求,使用灵活,适用性强。

附图说明

图1是本申请其中一实施例的整体流程示意图。

图2是本申请的服务端与发布方的交互流程示意图。

图3是本申请的服务端与消费方的交互流程示意图。

图4是本申请的服务端轮询选择消息队列节点的流程示意图。

具体实施方式

以下结合附图对本申请作进一步详细说明。

本具体实施例仅仅是对本申请的解释,其并不是对本申请的限制,本领域技术人员在阅读完本说明书后可以根据需要对本实施例做出没有创造性贡献的修改,但只要在本申请的权利要求范围内都受到专利法的保护。

本申请实施例提供一种基于消息队列的分布式事件总线处理方法,包括发布方发布事件;服务端轮询选择消息队列节点,直至消息队列节点为可用节点;服务端选择该可用节点,向消息中间件推送消息;当消费方请求订阅消息时,服务端向消息中间件订阅相关的消息队列,消息中间件向服务端推送消息;服务端将消息进行解码,转换为订阅的事件结构,通过订阅请求建立连接推送给消费方。

其中,事件总线模式是观察者设计模式的一种,它的工作部件主要分为事件源、事件监听器、通道和事件总线四种。工作时,事件源将产生的消息发送到事件总线的特定通道之上,事件监听器在事先会订阅事件总线之中不同的通道以区分消息的响应,当消息被发送到事务总线的特定通道之中时,所对应的事件监听器会监听到消息,然后事件监听器根据程序中设置的响应函数进行执行。

为使本申请实施例的目的、技术方案和优点更加清楚,下面将结合本申请实施例中的附图,对本申请实施例中的技术方案进行清楚、完整地描述,显然,所描述的实施例是本申请一部分实施例,而不是全部的实施例。基于本申请中的实施例,本领域普通技术人员在没有作出创造性劳动前提下所获得的所有其他实施例,都属于本申请保护的范围。

另外,本文中术语“和/或”,仅仅是一种描述关联对象的关联关系,表示可以存在三种关系,例如,A和/或B,可以表示:单独存在A,同时存在A和B,单独存在B这三种情况。另外,本文中字符“/”,如无特殊说明,一般表示前后关联对象是一种“或”的关系。

下面结合说明书附图对本申请实施例作进一步详细描述。

本申请实施例提供一种基于消息队列的分布式事件总线处理方法,所述方法的主要流程描述如下。

参照图1和图2,服务端和发布方通过自定义的gRPC接口交互。具体地,发布方通过publish接口和服务端交互,发布事件。

当发布方将事件数据以protobuf编码,再封装成gRPC接口的报文结构,发布事件。

服务端与发布方通过gRPC接口建立HTTP/2连接,获取发布方发布的事件报文。

服务端接收到事件报文后,对事件报文进行事件数据的基本属性校验,生成唯一的事件ID,将事件数据编码为NSQ的消息结构。

服务端轮询选择消息队列节点,直至消息队列节点为存活节点,即确定该节点是可用节点。

服务端从节点池中选择该可用的节点,将NSQ的消息结构投递至消息中间件,返回与事件报文对应的事件ID给消息中间件,向消息中间件推送消息,并通过gRPC接口报文返回事件ID给发布方。

参照图1和图3,服务端和消费方通过自定义的gRPC接口交互。具体地,消费方通过Subscribe接口和服务端交互,订阅事件。

消费方与服务端通过gRPC接口建立HTTP/2连接,并向服务端请求订阅消息,其中,订阅消息包括消费方的ID和需要订阅的事件类型列表。消费方将自身的ID以及需要订阅的事件类型列表告知服务端。

当接收到订阅消息后,服务端解码订阅请求消息,校验事件数据的合法性并获取事件ID。

服务端与消息中间件建立连接,根据事件ID,向消息中间件订阅相关的消息队列,建立回调处理器。其中,回调处理器在收到服务端的消息推送时,挑选消费方订阅的事件类型,丢弃不感兴趣的事件。

服务端获取回调处理器收到的事件,以接收消息中间件的消息,将消息进行解码,转换为消费方订阅的事件结构,通过gRPC接口流推送给消费方。

消费方仅此一次,至少一次,至多一次的消费方式依赖所使用的消息队列提供的能力。

消费方接收到事件后,进行业务逻辑处理,以AckReq响应服务端处理结果。

服务端根据AckReq的处理结果决定是否需要重试事件处理。

当需要重试事件处理时,服务端继续获取回调处理器接收的事件,以接收消息中间件的消息,将消息进行解码,转换为消费方订阅的事件结构,通过gRPC接口流推送给消费方,直至AckReq的处理结果无需重试事件处理。subscribe接口依赖业务逻辑的异常来判断事件是否成功处理。发生异常时,事件会有重试的机会,业务逻辑中需要做好幂等处理。

subscribe接口使用grpc接口的双向流实现。为了简化消费方的内部处理逻辑,协议上使用了request/response的模式,事件流处理过程中,需要等待上一个事件的应答才会继续处理下一个事件,以限制事件处理的并行度,使得事件处理有序进行。开发者也可以使用不同线程subscribe不同事件的方式来提高并行处理的通量。

本实施例中,发布方和消费方均为客户端。

参照图4,服务端轮询选择消息队列节点的具体步骤包括,从记录的上次投递过的节点偏移确定下一个节点,从NSQ节点池中选择下一个节点,发送ping报文以确定节点是否存活。

当发送ping报文后,服务端接收到节点状态信息反馈,则该节点存活,确定该节点为可用节点。

当发送ping报文后无节点状态信息反馈时,服务端确定该节点为异常节点。服务端继续从已记录的上次投递过的节点偏移,确定新的下一个节点。服务端从节点池中找到新确定的节点并发送ping报文,直至有节点状态信息反馈至服务端,确定该节点为可用节点,选择该节点。

当发送ping报文后无节点状态信息反馈且从记录的上次投递过的节点偏移,无法确定下一个节点时,即节点池中所有的节点均尝试过且尝试后均无节点状态信息反馈,即判断为系统异常,服务端终止向消息中间件进行消息投递。

进一步地,服务端可以兼容多个消息队列。本实施例中,消息队列可以为四种,不同的消息队列能提供不同的特性。在需要时,可以选择不同的消息队列提供的能力来满足特定的需求,比如有些消息队列支持事务性的消息投递和消费,有些消息队列支持延时消费的消息。

消息队列提供的能力由接口集实现。本实施例中,publish接口有四个,分别是阻塞式的无延迟publish接口、阻塞式的支持延迟事件publish接口、异步的无延迟publish接口、异步的支持延迟事件publish接口,提供了延时投递事件的支持。阻塞式的事件接口适合在事务块,即应用层的数据库事务逻辑块中调用,在很大程度上保证跨服务的数据一致性。异步的事件接口,因需要确保外部调用逻辑的结束,或提前终止不会导致事件也被终止,通过grpc接口,保证内部逻辑执行不被中断,确保内层调用不受影响。

使用时根据使用场景来选择是否使用异步接口。具体地,对接口响应速度敏感的可用使用异步接口;对数据一致性敏感的可用使用同步接口。

本实施例利用已有的消息中间件的扩展性,提供了基本的消息分区的能力,同时,当接口的特性不满足时还能再进行扩展。

本申请实施例还提供一种智能终端,包括存储器和处理器,所属存储器上存储有能够被处理器加载并执行如上述基于消息队列的分布式事件总线处理方法的计算机程序。

具体地,与发布方建立连接,获取发布方发布的以事件数据编码、封装形成的事件报文。

对获取的事件报文进行事件数据的基本属性校验,生成唯一的事件ID。

从节点池中选择可用的节点,将事件报文投递至消息中间件,返回与事件报文对应的事件ID给消息中间件,向消息中间件推送消息。

当接收到消费方的订阅事件请求消息后,解码订阅事件请求消息,校验事件数据的合法性并获取事件ID。

与消息中间件建立连接,根据事件ID,向消息中间件订阅相关的消息队列,建立回调处理器。

获取回调处理器的事件,以接收消息中间件的消息,将消息进行解码,转换为订阅的事件结构,以事件报文流推送给消费方。

进一步地,本申请实施例还提供一种计算机可读存储介质,存储有能够被处理器加载并执行上述基于消息队列的分布式事件总线处理方法的计算机程序。计算机可读存储介质例如包括:U盘、移动硬盘、只读存储器(Read-Only Memory,ROM)、随机存取存储器(Random Access Memory,RAM)、磁碟或者光盘等各种可以存储程序代码的介质。

具体地,与发布方建立连接,获取发布方发布的以事件数据编码、封装形成的事件报文。

对获取的事件报文进行事件数据的基本属性校验,生成唯一的事件ID。

从节点池中选择可用的节点,将事件报文投递至消息中间件,返回与事件报文对应的事件ID给消息中间件,向消息中间件推送消息。

当接收到消费方的订阅事件请求消息后,解码订阅事件请求消息,校验事件数据的合法性并获取事件ID。

与消息中间件建立连接,根据事件ID,向消息中间件订阅相关的消息队列,建立回调处理器。

获取回调处理器的事件,以接收消息中间件的消息,将消息进行解码,转换为订阅的事件结构,以事件报文流推送给消费方。

本申请实施例的有益效果如下:

1、将复杂的操作放在服务端进行,向客户端隐藏了复杂的内部消息路由,使得事件数据的细节被封装了,客户端无需关注不同事件的路由、分区等细节,易于客户端配置和使用;同时,消息的存储和分发依赖所使用的消息队列的提供能力,客户端只处理消息的投递和消费 ,其他逻辑都封装在了服务端,减少了基于消息中间件的事件处理的复杂性,实现了客户端操作的轻量级,给客户端提供一个易于配置和使用的基于事件总线语义的接口,易于客户端操作,实现了基于消息队列的分布式事件总线处理;

2、服务端可以兼容多个消息队列,不同的消息队列能提供不同的特性;在需要时,可以选择不同的消息队列提供的能力来满足特定的需求,使用灵活,适用性强。

去获取专利,查看全文>

相似文献

  • 专利
  • 中文文献
  • 外文文献
获取专利

客服邮箱:kefu@zhangqiaokeyan.com

京公网安备:11010802029741号 ICP备案号:京ICP备15016152号-6 六维联合信息科技 (北京) 有限公司©版权所有
  • 客服微信

  • 服务号