首页> 中国专利> 消息处理方法及系统、消息目的端设备与分布式消息系统

消息处理方法及系统、消息目的端设备与分布式消息系统

摘要

本发明公开了一种消息处理方法及系统、消息目的端设备与分布式消息系统。所述方法包括:Producer为待发送的消息组生成消息标识,该消息标识用以唯一标识该消息组;并向Broker发送该消息标识和该消息组;Broker接收并存储Producer发送的该消息标识和该消息组;当Producer确定需要再次发送消息组时,从已生成的消息标识中获取消息组的消息标识;并向Broker发送消息标识和消息组;当Broker根据已接收到的消息标识,确定再次接收到消息标识对应的消息组时,对本次接收到的消息组进行去重处理。本发明技术方案能够在Broker端去除误算率,提高消息处理效率。

著录项

  • 公开/公告号CN104092717A

    专利类型发明专利

  • 公开/公告日2014-10-08

    原文格式PDF

  • 申请/专利权人 深圳市腾讯计算机系统有限公司;

    申请/专利号CN201310687237.7

  • 发明设计人 董宇;

    申请日2013-12-13

  • 分类号H04L29/08(20060101);

  • 代理机构11138 北京三高永信知识产权代理有限责任公司;

  • 代理人江崇玉

  • 地址 518000 广东省深圳市南山区高新区高新南一路飞亚达大厦5-10楼

  • 入库时间 2023-12-17 02:19:08

法律信息

  • 法律状态公告日

    法律状态信息

    法律状态

  • 2015-08-12

    授权

    授权

  • 2014-10-29

    实质审查的生效 IPC(主分类):H04L29/08 申请日:20131213

    实质审查的生效

  • 2014-10-08

    公开

    公开

说明书

技术领域

本发明涉及信息处理技术领域,特别涉及一种消息处理方法及系统、消息目的端设备与分布式消息系统。 

背景技术

在分布式消息系统的应用中,由消息源端设备(Producer)发布的消息经消息代理端设备(Broker)转发到消息目的端设备(Consumer),期间流转的消息都达到海量,由于本身也是分布式系统,复杂度较高,导致消息重复或者丢失发生的概率被放大。因此在分布式消息系统的应用中,消息去重以及防止消息丢失显得尤为重要。 

为了提高消息传输的效率,分布式消息系统在消息传递过程中需要提供明确的消息分发保障级别,消息分发保障级别描述一条消息被Producer成功发布后,相应的消息Consumer成功接收到这条消息的次数:消息分发保障级别具体可以包括如下三种:第一种、At least once,同一条消息至少会成功接收到1次,但可能接收到多次;第二种、At most once,同一条消息最多会成功接收到1次,但可能接收到0次;第三种、Exactly once,同一条消息成功接收到1次且仅1次。 

从分布式消息系统的应用角度来看,第三种保障级别最符合使用场景,但现有技术中系统正常情况下可以提供Exactly once,当系统出现异常时,为了提高消息传输的成功率,通常采用第一种保障级别At least once来进行消息传输。例如Producer与Broker之间消息分发保障级别达到At least once;即要保证Broker至少收到消息1次,Producer可能会尝试多次发送同1条消息,这种情况 下,Broker可能会收到重复消息。如果此时不执行任何去重操作,在一些对消息准确度要求高的场景下,消息重复意味着数据出错,是不可容忍的。因此需要Broker对接收到的Producer消息进行去重,同时保证消息存储不丢失。然后Broker与Consumer之间消息分发保障级别也要达到At least once,保证Consumer能够消费到消息。在Consumer端,Consumer消费消息的过程可以分为3个原子步骤:a)从Broker端取得待读取消息的位置并读取消息;b)以读取的消息作为输入进行业务计算,将计算结果作为输出保存到外部存储器;c)在Consumer中保存新的待读取消息位置,其中,步骤b)和c)可以互换顺序。 

在实现本发明的过程中,发明人发现现有技术至少存在以下问题:现有技术中,通常采用Bloom Filter方案在Broker端对收到的消息进行去重操作,但Bloom Filter的缺陷在消息中间件这个场景下会被放大,显得异常突出。因为Bloom Filter存在较为严重的问题——误算率,误算率与元素数量成正比关系,而分布式消息中间件存储的消息量是TB数量级,误算率将会超过可接受范围,造成消息处理效率较低。或者,在Consumer端消费消息的过程中,步骤b)或c)的处理过程未成功结束前进程崩溃,计算结果与待读取消息的位置之间都会处于不一致状态,恢复处理时造成重复消费消息或消息丢失,导致消息处理效率较低。 

发明内容

为了解决现有技术的问题,本发明实施例提供了一种消息处理方法及系统、消息目的端设备与分布式消息系统。所述技术方案如下: 

一方面,提供了一种消息处理方法,所述方法包括: 

消息源端设备为待发送的消息组生成消息标识,每个消息组包括多条消息,所述消息标识用以唯一标识所述消息组; 

所述消息源端设备向Broker发送所述消息标识和所述消息组; 

所述Broker接收并存储所述消息源端设备发送的所述消息标识和所述消息 组; 

当所述消息源端设备确定需要再次发送所述消息组时,从已生成的消息标识中获取所述消息组的消息标识; 

所述消息源端设备向所述Broker发送所述消息标识和所述消息组; 

当所述消息代理端设备根据已接收到的消息标识,确定再次接收到所述消息标识对应的消息组时,对本次接收到的所述消息组进行去重处理。 

另一方面,还提供了一种消息处理方法,所述方法包括: 

从外部存储器中获取当前待处理的消息的位置; 

根据所述当前待处理的消息的位置,调用应用程序接口从Broker中获取所述当前待处理的消息;所述应用程序接口提供所述当前待处理的消息的位置; 

根据所述当前待处理的消息进行业务计算,得到计算结果;并确定下一条待处理的消息的位置; 

将所述计算结果和所述下一条待处理的消息的位置以同一事务存储在所述外部存储器中。 

再一方面,还提供了一种消息处理系统,所述系统包括:消息源端设备和Broker; 

所述消息源端设备,用于为待发送的消息组生成消息标识,每个消息组包括多条消息,所述消息标识用以唯一标识所述消息组;并向Broker发送所述消息标识和所述消息组; 

所述Broker,用于接收并存储所述消息源端设备发送的所述消息标识和所述消息组; 

所述消息源端设备,还用于当确定需要再次发送所述消息组时,从已生成的消息标识中获取所述消息组的消息标识;并向所述Broker发送所述消息标识和所述消息组;所述消息代理端设备,还用于当根据已接收到的消息标识,确定再次接收到所述消息标识对应的消息组时,对本次接收到的所述消息组进行去重处理。 

又一方面,提供了一种消息目的端设备,所述设备包括: 

获取模块,用于从外部存储器中获取当前待处理的消息的位置; 

所述获取模块,还用于根据所述当前待处理的消息的位置,调用应用程序接口从Broker中获取所述当前待处理的消息;所述应用程序接口提供所述当前待处理的消息的位置; 

计算模块,用于根据所述当前待处理的消息进行业务计算,得到计算结果;并确定下一条待处理的消息的位置; 

存储模块,用于将所述计算结果和所述下一条待处理的消息的位置以同一事务存储在所述外部存储器中。 

再另一方面,提供一种分布式消息系统, 

所述分布式消息系统包括消息处理系统和消息目的端设备;所述消息处理系统包括消息源端设备和Broker;所述消息处理系统中的所述Broker分别与所述消息目的端设备和所述消息处理系统中的所述消息源端设备进行通讯; 

所述消息处理系统采用如上所述的消息处理系统; 

和/或所述消息目的端设备采用如上所述的消息目的端设备。 

本发明实施例的消息处理方法及系统、消息目的端设备与分布式消息系统,通过在Producer端为待发送的消息组生成消息标识,每个消息组包括多条消息,消息标识用以唯一标识所述消息组;并向Broker发送该消息标识和该消息组;Broker接收并存储Producer端发送的该消息标识和该消息组;当Producer确定需要再次发送该消息组时,从已生成的消息标识中获取该消息组的消息标识,并向Broker端发送该消息标识和该消息组;而当Broker根据已接收到的消息标识,确定再次接收到的该消息标识对应的消息组时,对本次接收到的消息组进行去重处理。与现有技术的采用Bloom Filter方案在Broker端对收到的消息进行去重操作相比,去除了误算率,提高了处理的准确率,达到exactly once的目标,从而提高了消息处理的效率。 

本发明实施例还可以在Consumer端,从外部存储器中获取当前待处理的消 息的位置;根据当前待处理的消息的位置,调用应用程序接口(Application Program Interface;API)从Broker中获取当前待处理的消息;API提供当前待处理的消息的位置;根据当前待处理的消息进行业务计算,得到计算结果;并确定下一条待处理的消息的位置;将计算结果和下一条待处理的消息的位置以同一事务存储在外部存储器中。通过采用该方案,Consumer处理进程获取到下一条待处理消息的位置,而未进行业务计算时发生崩溃,在崩溃修复之后,由于API能够提供当前待处理的消息的位置,因此通过调用API还可以获取到当前待处理的消息,防止了消息的丢失。而且本发明技术方案中,计算结果和下一条待处理的消息的位置以同一事务存储在外部存储器中。如果获取计算结果之后,获取下一条待处理的消息的位置之前系统发生崩溃,该事务处理未完成,不会在外部存储器中仅存储计算结果,导致崩溃恢复后重复消费消息的缺陷。而将计算结果和下一条待处理的消息的位置以同一事务存储在外部存储器中,保证外部存储器中的计算结果和下一条待处理的消息的位置的状态始终同步,从而避免重复消费消息,提高消息处理准确性,从而提高消息的处理效率。 

附图说明

为了更清楚地说明本发明实施例中的技术方案,下面将对实施例描述中所需要使用的附图作简单地介绍,显而易见地,下面描述中的附图仅仅是本发明的一些实施例,对于本领域普通技术人员来讲,在不付出创造性劳动的前提下,还可以根据这些附图获得其他的附图。 

图1为本发明一实施例提供的消息处理方法的流程图。 

图2为本发明另一实施例提供的消息处理方法的流程图。 

图3为本发明再一实施例提供的消息处理方法的流程图。 

图4为本发明实施例提供的消息处理系统的结构示意图。 

图5为本发明实施例提供的消息目的端设备的结构示意图。 

图6为本发明实施例提供的分布式消息系统的结构示意图。 

具体实施方式

为使本发明的目的、技术方案和优点更加清楚,下面将结合附图对本发明实施方式作进一步地详细描述。 

图1为本发明一实施例提供的消息处理方法的流程图。如图1所示,本实施例的消息处理方法,具体可以包括如下步骤: 

100、Producer为待发送的消息组生成消息标识; 

本实施例中,每个消息组可以包括多条消息,也可以仅包括一条消息。本实施例的消息标识用以唯一标识该消息组。该消息标识可以被称为消息key。当每个消息组中包括多条消息时,消息组内各条消息相对固定不变。 

101、Producer向Broker发送该消息标识和该消息组; 

按照Producer与Broker之间的数据传输机制,可以将该消息标识携带在传输该消息组的数据包的包头部分。 

102、Broker接收并存储Producer发送的该消息标识和该消息组; 

该步骤发生在Broker首次接收到Producer发送的消息标识和消息组。对于Broker而言,每接收到一个消息标识和消息组之后,都可以先判断一下,是否是首次接收该消息标识和该消息标识对应的消息组。具体地,可以判断已经存储的消息标识中是否有本次接收的该消息标识,若有没有,则确定是首次接收该消息标识和该消息组。此时在接收到Producer发送的消息标识和消息组之后,在Broker侧存储该消息标识和该消息组。其中存储该消息组,是为了便于后续向Consumer传输该消息,实现消息的成功传输。存储该消息组是便后续如果再次(即非首次)接收到Producer发送的该消息标识和该消息组,能够根据该消息标识来准确判断后续接收的消息组是否是重复消息,来实现消息的去重处理。 

103、当Producer确定需要再次发送消息组时,从已生成的消息标识中获取消息组的消息标识; 

在消息传输过程中,exactly once是最理想的保障级别。但是通信不理想的 情况下,采用障级别At least once来进行数据传输,以保证Producer能够将消息组发送给Broker。例如当Producer未收到Broker反馈的成功接收到消息组的响应消息,此时Producer确定需要再次发送消息组,此时Producer需要从已生成的消息标识中获取该消息组的消息标识。 

104、Producer向Broker发送消息标识和消息组; 

该步骤即为101的再次发送过程,发送的内容以及实现机制完全相同。 

105、当Broker根据已接收到的消息标识,确定再次接收到消息标识对应的消息组时,对本次接收到的消息组进行去重处理。 

由于消息标识能够标识唯一一组消息,当保障级别为At least once的消息传输机制下,Producer会向Broker发送该消息组以及该消息标识,但是每一该消息组以及该消息标识都相同,为了避免Broker存储多组相同的消息组,降低消息处理效率,本实施例中,Broker可以根据消息标识对消息组去重,将同一个消息标识对应的消息组仅保存一份,提高消息处理准确性,从而提高消息的处理效率。 

由上述步骤102,可以知道,Broker首次接收到该消息组和消息标识后,已经存储该消息标识和该消息,当Broker再次接收到该消息标识时,能够根据已接收到并存储的消息标识确定本次为非首次接收该消息标识对应的消息组,此时Broker对本次接收到的消息组进行去重处理。 

本实施例的消息处理方法,通过在Producer端为待发送的消息组生成消息标识,每个消息组包括多条消息,消息标识用以唯一标识所述消息组;并向Broker发送该消息标识和该消息组;Broker接收并存储Producer端发送的该消息标识和该消息组;当Producer确定需要再次发送该消息组时,从已生成的消息标识中获取该消息组的消息标识,并向Broker端发送该消息标识和该消息组;而当Broker根据已接收到的消息标识,确定再次接收到的该消息标识对应的消息组时,对本次接收到的消息组进行去重处理。本实施例的技术方案,与现有技术的采用Bloom Filter方案在Broker端对收到的消息进行去重操作相比,去除了误 算率,提高了处理的准确率,达到exactly once的目标,从而提高了消息处理的效率。 

可选地,在上述实施例的技术方案的基础上,步骤105“当Broker根据已接收到的消息标识,确定再次接收到消息标识对应的消息组时,对本次接收到的消息组进行去重处理”,具体可以包括:当Broker根据已接收到的消息标识,确定再次接收到消息标识对应的消息组时,丢弃本次接收的消息标识和消息组。 

具体实现时,Broker可以管理一个存储消息标识的库,将首次接收到的消息组对应的消息标识都存储在该库中。每次接收消息标识和消息组,都先判断是否已经接收过该消息组,具体可以通过遍历该库以判断该库中是否存储有接收的消息标识,若有,确定已经接收过该消息标识对应的消息组,否则若库中没有该消息标识,则确定Broker还未接收过该消息标识对应的消息组。若接收过该消息标识对应的消息组,直接丢弃该消息组,防止消息重复。若未接收过消息标识对应的消息组,此时直接存储该消息组,并在该库中存储该消息标识。 

可选地,在上述实施例的技术方案的基础上,步骤102“Broker接收并存储Producer发送的消息标识和消息组”,具体可以包括:Broker接收Producer发送的消息标识和消息组,并在本地存储消息组,在外部存储器中存储消息标识,同时并在外部存储器中记录存储消息标识的时刻。存储时间为存储该消息标识的时刻到当前时刻的时间段长度。 

也就是说,消息标识也可以存储在Broker之外的外部存储器中,由于在分布式消息系统中,Broker也存储有较多的信息,若消息标识也存储在Broker中时,当遍历查找是否存储有该消息标识时,遍历检查速度较慢,可能比较耗时。因此,为了控制检查耗时,本实施例中可以在Broker外部设置该外部存储器来存储消息标识,从而能够加快遍历检查速度,减少检查时间,提高处理速度,增强处理效率。同时,为了便于管理,还在外部存储器中记录了存储每一消息标识的存储时刻。 

而且需要说明的是,消息处理系统可以是分布式的,其中可以包括多个分 布式的Broker,Procuder重新发送同一个消息组的时候,不一定每次都发送到同一个Broker上,所以,每个Broker都应该知道自己和其它Broker是否处理过同一个消息组,因此必须有个共享的外部存储器,让所有Broker都可以访问到,用来做这个重复判断,可以防止不同的Broker处理同一消息组。因此本实施例中的外部存储器是分布式消息处理系统中的所有Broker共有的。同时,为了便于消息标识的管理,Broker还需要在外部存储器中记录存储每一个消息标识的存储时刻。 

进一步地,在上述实施例的技术方案的基础上,还可以包括Broker删除外部存储器中存储时间超出存储周期的消息标识。存储时间为存储该消息标识的时刻到当前时刻的时间段长度。 

实际应用中,可以为外部存储器中存储的消息标识设置对应的存储周期。存储周期可以根据实际需求来设置,例如可以为1小时、半天或者一天或者半个月、或者1个月等等。并且,Broker对于存储时间超出存储周期的消息标识,Broker可以及时从外部存储器中删除,保证仅留下有效时间段内的消息标识,从而能够进一步加快遍历检查速度,减少检查时间,提高处理速度,增强处理效率。 

上述实施例的所有可选技术方案,可以采用可以结合的方式任意组合,形成本发明的可选技术方案,在此不再赘述。 

上述实施例的技术方案,与现有技术的采用Bloom Filter方案在Broker端对收到的消息进行去重操作相比,去除了误算率,提高了处理的准确率,达到exactly once的目标,从而提高了消息处理的效率。 

图2为本发明另一实施例提供的消息处理方法的流程图。本实施例在上述图1及后续可选技术方案的基础上,进一步更加详细地介绍本发明的技术方案。如图2所示,本实施例的消息处理方法,具体可以包括如下步骤: 

200、Producer为待发送的消息组生成消息标识; 

本实施例的消息标识用以唯一标识待发送的消息组;该消息标识可以被称 为消息key。 

201、Producer首次向Broker发送该消息标识和该消息组; 

本实施例是以一个消息组的传输为例描述本发明的技术方案。该消息组可以包括一条消息,也可以包括多条消息。当消息组包括多条消息时,该消息标识对应该消息组,该消息组内部的包括多条消息固定不变。 

202、Broker接收Producer发送的该消息标识和该消息组; 

203、Broker在本地存储该消息组,同时在外部存储器中存储该消息标识,并在外部存储器中记录存储该消息标识的时间; 

本实施例中是以一个Broker为例来介绍本发明的技术方案。实际应用中分布式系统中可以包括多个Broker,该外部存储器为多个Broker共享。每一个Broker的功能均与本实施例的记载相同。 

该步骤实现时,虽然Producer是首次发送该消息标识和消息组,但对于Broker而言,Broker并不知道该消息标识和消息组是否为首次发送。因此对于Broker而言,每接收到一个消息组和消息标识,都需要在外部存储器中检查一下是否已经有该消息标识,如没有,确定是首次接收该消息组,此时才在本地存储该消息组,同时在外部存储器中存储该消息标识,并在外部存储器中记录存储该消息标识的时刻。 

204、Producer在预设时间段内未接收到Broker反馈的成功接收到消息组的响应消息,Producer需要再次发送消息组时,从已生成的消息标识中获取消息组的消息标识; 

该步骤发生的场景可能为网络通信的临时故障,例如当Broker正好在向Producer反馈成功接收到该消息组的响应消息时,网络通信临时故障,导致该响应消息丢失,未成功发送至Producer。这样导致Producer在预设时间段内未接收到反馈的成功接收到消息组的响应消息,此时按照保障级别At least once,Producer确定需要再次发送消息组时,从已生成的消息标识中获取消息组的消息标识。 

205、Producer再次向Broker发送消息标识和消息组; 

该步骤即为101的再次发送过程,发送的内容以及实现机制完全相同。 

206、Broker接收Producer再次发送的该消息标识和该消息组,并根据外部存储器中已经存储的该消息标识,确定已经接收了该消息标识对应的消息组; 

对于Broker而言,每接收到一个消息组和消息标识,都需要在外部存储器中检查一下是否已经有该消息标识,如果有,确定已经接收了该消息标识对应的消息组。 

207、Broker丢弃再次接收到的该消息标识和该消息组。 

即丢弃步骤206中接收到的该消息标识和该消息组,实现消息的去重处理。 

由步骤203可以知道,对于每一个首次接收的消息组,Broker都会在外部存储器中记录存储该消息标识的时刻,来对消息标识进行管理。具体地,Broker可以周期性地对外部存储器中的消息标识进行检测,判断是否有存储时间超过存储周期的消息标识,若有,则删除该超期的消息标识。其中的存储时间为存储该消息标识的时刻到当前时刻的时间段。 

本实施例的消息处理方法,通过采用上述技术方案实现消息的去重处理,与现有技术的采用Bloom Filter方案在Broker端对收到的消息进行去重操作相比,去除了误算率,提高了处理的准确率,达到exactly once的目标,从而提高了消息处理的效率。 

在分布式消息系统的应用中,Producer向Broker发布的消息,Broker存储接收到Producer发送的消息,并向Consumer转发接收到的消息。从Producer发布消息到Consumer接收到该消息的过程可以称为消息中间件技术。而Consumer消费消息的过程,仅接收消息之后,处理消息的过程属于消息中间件之外。Consumer消费消息的过程可以分为3个原子步骤:a)从Broker端取得待读取消息的位置并读取消息;b)以读取的消息作为输入进行业务计算,将计算结果作为输出保存到外部存储器;c)在Consumer中保存新的待读取消息位置,其中,步骤b)和c)可以互换顺序。其中步骤a)中Consumer的API中通常不提 供消息的位置信息,这样只能按照取消息的顺序依次取得每个消息,每个消息只能取一遍。例如从Broker端取得第4条消息之后,Consumer崩溃,没有进行业务计算,崩溃重启后,Consumer从Broker端重取消息的时候只能取第5条消息,而没有办法再取第4条消息。这样会导致消息丢失。而且在现有的该方案中,步骤c)中Consumer中保存新的待读取消息位置是在Consumer中,业务计算的计算结果是保存在外部存储器中,当步骤b)或c)的处理过程未成功结束前进程崩溃,计算结果与待读取消息的位置之间都会处于不一致状态。当以读取的消息作为输入进行业务计算,将计算结果作为输出保存到外部存储器中之后,在Consumer中保存新的待读取消息位置之前,Consumer崩溃,崩溃恢复之后,如果API提供消息的位置信息,此时Consumer还是根据保存的待读取消息位置获取消息,会导致Consumer重复对该消息进行业务计算。如果API不提供消息的位置信息,直接获取到下一条消息,Consumer中没有新的待读取消息位置,消息丢失。同理,当在Consumer中保存新的待读取消息位置之后,以读取的消息作为输入进行业务计算,将计算结果作为输出保存到外部存储器中之前,Consumer崩溃,崩溃恢复之后,Consumer根据保存的待读取消息的位置直接获取下一条消息,而未对前一条消息进行业务计算,导致消息丢失。因此,现有的Consumer对消息的处理,会造成重复消费消息或消息丢失,导致消息处理效率较低。为解决上述问题,本发明提供如下技术方案。 

图3为本发明再一实施例提供的消息处理方法的流程图。如图3所示,本实施例的消息处理方法的执行主体为Consumer,即本实施例的消息处理方法在Consumer端介绍本发明的技术方案。本实施例的消息处理方法具体可以包括如下步骤: 

300、从外部存储器中获取当前待处理的消息的位置; 

该步骤可以结合后续的步骤303一起来看,本实施例中,消息的位置也存储在外部存储器中。该外部存储器可以设置在Consumer设备之外,可以为永久性存储器。 

301、根据当前待处理的消息的位置,调用API从Broker中获取当前待处理的消息; 

本实施例中API提供当前待处理的消息的位置,这样,Consumer通过API可以获取到任意一条消息。而当API不提供当前待处理的消息的位置时,Consumer只能按逻辑位置顺序获取下一条消息,并不能根据消息的位置随意获取任一条消息。本实施例中,通过在API提供当前待处理的消息的位置,这样在获取某条消息之后Consumer崩溃之后,在崩溃恢复之后,仍可以调用API从Broker中获取消息时,可以避免消息的丢失。 

302、根据当前待处理的消息进行业务计算,得到计算结果;并确定下一条待处理的消息的位置; 

其中确定下一条消息的位置,具体可以通过当前待处理的消息的位置,以及当前待处理的消息的长度,从而可以准确确定下一条消息的位置。 

303、将计算结果和下一条待处理的消息的位置以同一事务存储在外部存储器中。 

本实施例中将计算结果和下一条待处理的消息的位置在同一事务中存储,同一事务具有统一性,可以保证计算结果和下一条待处理的消息的位置始终是一致的,只要计算结果和下一条待处理的消息的位置中有一个消息未成功获取到,另一个也不会单独去存储。 

本实施例的消息处理方法,在Consumer端,从外部存储器中获取当前待处理的消息的位置;根据当前待处理的消息的位置,调用API从Broker中获取当前待处理的消息;API提供当前待处理的消息的位置;根据当前待处理的消息进行业务计算,得到计算结果;并确定下一条待处理的消息的位置;将计算结果和下一条待处理的消息的位置以同一事务存储在外部存储器中。通过采用该方案,Consumer处理进程获取到下一条待处理消息的位置,而未进行业务计算时发生崩溃,在崩溃修复之后,由于API能够提供当前待处理的消息的位置,因此通过调用API还可以获取到当前待处理的消息,防止了消息的丢失。而且本 发明技术方案中,计算结果和下一条待处理的消息的位置以同一事务存储在外部存储器中。如果获取计算结果之后,获取下一条待处理的消息的位置之前系统发生崩溃,该事务处理未完成,不会在外部存储器中仅存储计算结果,导致崩溃恢复后重复消费消息的缺陷。而将计算结果和下一条待处理的消息的位置以同一事务存储在外部存储器中,保证外部存储器中的计算结果和下一条待处理的消息的位置的状态始终同步,从而避免重复消费消息,提高消息处理准确性,从而提高消息的处理效率。 

图4为本发明实施例提供的消息处理系统的结构示意图。如图4所示,本实施例的消息处理系统,包括:消息源端设备Producer10和消息代理端设备Broker20。Producer10与Broker20通信连接。 

其中Producer10用于为待发送的消息组生成消息标识,每个消息组包括多条消息,该消息标识用以唯一标识消息组;并向Broker20发送消息标识和消息组;Broker20用于接收并存储Producer10发送的消息标识和消息组;Producer10还用于当确定需要再次发送消息组时,从已生成的消息标识中获取消息组的消息标识;并向Broker20发送消息标识和消息组;Broker20还用于当根据已接收到的消息标识,确定再次接收到消息标识对应的消息组时,对本次接收到的消息组进行去重处理。 

本实施例的消息处理系统,通过采用上述Producer10与Broker20实现消息的处理与上述相关方法实施例的实现机制相同,详细可以参考上述相关实施例的记载,在此不再赘述。 

本实施例的消息处理系统,通过采用上述Producer与Broker实现在Producer端为待发送的消息组生成消息标识,每个消息组包括多条消息,消息标识用以唯一标识所述消息组;并向Broker发送该消息标识和该消息组;Broker接收并存储Producer端发送的该消息标识和该消息组;当Producer确定需要再次发送该消息组时,从已生成的消息标识中获取该消息组的消息标识,并向Broker端发送该消息标识和该消息组;而当Broker根据已接收到的消息标识, 确定再次接收到的该消息标识对应的消息组时,对本次接收到的消息组进行去重处理。本实施例的技术方案,与现有技术的采用Bloom Filter方案在Broker端对收到的消息进行去重操作相比,去除了误算率,提高了处理的准确率,达到exactly once的目标,从而提高了消息处理的效率。 

进一步可选地,在上述实施例的技术方案的基础上,其中Broker20具体用于当根据已接收到的消息标识,确定再次接收到消息标识对应的消息组时,丢弃本次接收的消息标识和消息组。 

进一步可选地,在上述实施例的技术方案的基础上,其中Broker20还具体用于接收Producer10发送的消息标识和消息组,并在本地存储消息组,在外部存储器中存储消息标识,同时并在外部存储器中记录存储消息标识的时刻。 

进一步可选地,在上述实施例的技术方案的基础上,Broker20还用于删除外部存储器中存储时间超出存储周期的消息标识。 

上述实施例的所有可选技术方案,可以采用可以结合的方式任意组合,形成本发明的可选技术方案,在此不再赘述。 

上述实施例的消息处理系统,通过采用上述Producer10与Broker20实现消息的处理与上述相关方法实施例的实现机制相同,详细可以参考上述相关实施例的记载,在此不再赘述。 

上述实施例的技术方案,与现有技术的采用Bloom Filter方案在Broker端对收到的消息进行去重操作相比,去除了误算率,提高了处理的准确率,达到exactly once的目标,从而提高了消息处理的效率。 

图5为本发明实施例提供的消息目的端设备的结构示意图。如图5所示,本实施例提供的消息目的端设备即Consumer,具体可以包括:获取模块30、计算模块31和存储模块32。 

其中获取模块30用于从外部存储器中获取当前待处理的消息的位置;获取模块30还用于根据当前待处理的消息的位置,调用API从Broker中获取当前待处理的消息;API提供当前待处理的消息的位置;计算模块31与获取模块30 连接,计算模块31用于根据获取模块30获取的当前待处理的消息进行业务计算,得到计算结果;并确定下一条待处理的消息的位置;例如具体根据获取模块30获取的当前待处理的消息以及外部存储器中的当前待处理的消息的位置,确定下一条待处理的消息的位置。存储模块32与计算模块31连接,存储模块32用于将计算模块31得到的计算结果和下一条待处理的消息的位置以同一事务存储在外部存储器中。 

本实施例的消息目的端设备,通过采用上述模块实现消息的处理与上述相关方法实施例的实现机制相同,详细可以参考上述实施例的记载,在此不再赘述。 

本实施例的消息目的端设备,通过采用上述模块实现在Consumer端,从外部存储器中获取当前待处理的消息的位置;根据当前待处理的消息的位置,调用API从Broker中获取当前待处理的消息;API提供当前待处理的消息的位置;根据当前待处理的消息进行业务计算,得到计算结果;并确定下一条待处理的消息的位置;将计算结果和下一条待处理的消息的位置以同一事务存储在外部存储器中。通过采用该方案,Consumer处理进程获取到下一条待处理消息的位置,而未进行业务计算时发生崩溃,在崩溃修复之后,由于API能够提供当前待处理的消息的位置,因此通过调用API还可以获取到当前待处理的消息,防止了消息的丢失。而且本技术方案中,计算结果和下一条待处理的消息的位置以同一事务存储在外部存储器中。如果获取计算结果之后,获取下一条待处理的消息的位置之前系统发生崩溃,该事务处理未完成,不会在外部存储器中仅存储计算结果,导致崩溃恢复后重复消费消息的缺陷。而将计算结果和下一条待处理的消息的位置以同一事务存储在外部存储器中,保证外部存储器中的计算结果和下一条待处理的消息的位置的状态始终同步,从而避免重复消费消息,提高消息处理准确性,从而提高消息的处理效率。 

本发明实施例提供的分布式消息系统,该分布式消息系统可以包括消息处理系统和Consumer。其中消息处理系统具体可以包括Producer和Broker,其中 Broker分别与Producer和Consumer通信连接。其中消息处理系统可以采用上述图4所示实施例的消息处理系统,和/或Consumer可以采用上述图5所示实施例的消息目的端设备即Consumer。下面图6以消息处理系统采用上述图4所示实施例的消息处理系统,同时Consumer采用上述图5所示实施例的Consumer为例描述本发明的技术方案。 

图6为本发明实施例提供的分布式消息系统的结构示意图。如图6所示,本实施例的分布式消息系统,具体可以包括消息处理系统和Consumer60。其中消息处理系统包括Producer40和Broker50;其中Broker50分别与Producer40和Consumer60进行通讯。 

其中Producer40用于为待发送的消息组生成消息标识,每个消息组包括多条消息,该消息标识用以唯一标识消息组;并向Broker50发送消息标识和消息组;Broker50用于接收并存储Producer40发送的消息标识和消息组;Producer40还用于当确定需要再次发送消息组时,从已生成的消息标识中获取消息组的消息标识;并向Broker50发送消息标识和消息组;Broker50还用于当根据已接收到的消息标识,确定再次接收到消息标识对应的消息组时,对本次接收到的消息组进行去重处理。 

本实施例的分布式消息系统中还包括第一外部存储器70,该第一外部存储器70与Consumer60连接。第一外部存储器70用于在同一事务中存储Consumer60进行业务计算得到的计算结果和下一条待处理的消息的位置。Consumer60用于从第一外部存储器70中获取当前待处理的消息的位置;根据当前待处理的消息的位置,调用API从Broker50中获取当前待处理的消息;API提供当前待处理的消息的位置;根据当前待处理的消息进行业务计算,得到计算结果;并确定下一条待处理的消息的位置;例如具体根据获取模块30获取的当前待处理的消息以及外部存储器中的当前待处理的消息的位置,确定下一条待处理的消息的位置。最后将计算得到的计算结果和下一条待处理的消息的位置以同一事务存储在第一外部存储器70中。 

进一步可选地,Broker50具体用于当根据已接收到的消息标识,确定再次接收到消息标识对应的消息组时,丢弃本次接收的消息标识和消息组。 

进一步可选地,本实施例的分布式消息系统中还可以包括第二外部存储器80。该第二外部存储器80与Broker50连接,该第二外部存储器80也位于消息处理系统内。第二外部存储器80用于存储Broker50接收过的消息标识,并记录存储该消息标识的时刻。 

Broker20还具体用于接收Producer10发送的消息标识和消息组,并在本地存储消息组,在第二外部存储器80中存储消息标识,同时并在第二外部存储器80中记录存储消息标识的时刻。 

需要说明的是,分布式消息系统中也可以包括多个Broker50,多个Broker50共享同一个第二外部存储器80,可以防止不同Broker50处理同一消息组,避免重复处理消息。图6所示实施例是以一个Broker50为例。 

同理,本实施例的分布式消息系统中也可以包括多个Consumer60,第一外部存储器70也可以由多个Consumer60共享。图6所示实施例是以一个Consumer60为例。 

具体地,具体用于判断是否已经接收过Producer40发送的该消息标识对应的消息组,若接收过Producer40发送的该消息标识对应的消息组,丢弃该消息组;若未接收过该消息标识对应的消息组,存储消息组。Broker50在第二外部存储器80存储接收的消息标识,并记录存储该消息标识的时刻。 

进一步可选地,Broker50还用于若未接收过该消息标识对应的消息时,将该消息标识存储在第二外部存储器80中,并在第二外部存储器80中记录存储该消息标识的时刻。 

进一步可选地,Broker50还用于删除第二外部存储器80中存储时间超出存储周期的消息标识。存储时间为存储该消息标识的时刻到当前时刻的时间段长度。 

具体地,本实施例的分布式消息系统中Producer40和Broker50具体可以采 用如上图4所示的实施例中消息处理系统中所述的Producer和Broker;进一步地,Consumer60具体可以采用如上图5所示的实施例所述的Consumer。具体实现消息的处理也可以参考上述相关方法实施例的记载,在此不再赘述。 

本实施例的分布式消息系统,通过在Producer端为待发送的消息组生成消息标识,每个消息组包括多条消息,消息标识用以唯一标识所述消息组;并向Broker发送该消息标识和该消息组;Broker接收并存储Producer端发送的该消息标识和该消息组;当Producer确定需要再次发送该消息组时,从已生成的消息标识中获取该消息组的消息标识,并向Broker端发送该消息标识和该消息组;而当Broker根据已接收到的消息标识,确定再次接收到的该消息标识对应的消息组时,对本次接收到的消息组进行去重处理。与现有技术的采用Bloom Filter方案在Broker端对收到的消息进行去重操作相比,去除了误算率,提高了处理的准确率,达到exactly once的目标,从而提高了消息处理的效率。 

本实施例还可以在Consumer端,从外部存储器中获取当前待处理的消息的位置;根据当前待处理的消息的位置,调用API从Broker中获取当前待处理的消息;API提供当前待处理的消息的位置;根据当前待处理的消息进行业务计算,得到计算结果;并确定下一条待处理的消息的位置;将计算结果和下一条待处理的消息的位置以同一事务存储在外部存储器中。通过采用该方案,Consumer处理进程获取到下一条待处理消息的位置,而未进行业务计算时发生崩溃,在崩溃修复之后,由于API能够提供当前待处理的消息的位置,因此通过调用API还可以获取到当前待处理的消息,防止了消息的丢失。而且本实施例技术方案中,计算结果和下一条待处理的消息的位置以同一事务存储在外部存储器中。如果获取计算结果之后,获取下一条待处理的消息的位置之前系统发生崩溃,该事务处理未完成,不会在外部存储器中仅存储计算结果,导致崩溃恢复后重复消费消息的缺陷。而将计算结果和下一条待处理的消息的位置以同一事务存储在外部存储器中,保证外部存储器中的计算结果和下一条待处理的消息的位置的状态始终同步,从而避免重复消费消息,提高消息处理准确性,从而提高 消息的处理效率。 

需要说明的是:上述实施例提供的消息处理系统、消息目的端设备以及分布式消息系统在消息处理业务时,仅以上述各功能模块的划分进行举例说明,实际应用中,可以根据需要而将上述功能分配由不同的功能模块完成,即将设备的内部结构划分成不同的功能模块,以完成以上描述的全部或者部分功能。另外,上述实施例提供的消息处理系统、消息目的端设备以及分布式消息系统与消息处理业务的方法实施例属于同一构思,其具体实现过程详见方法实施例,这里不再赘述。 

上述本发明实施例序号仅仅为了描述,不代表实施例的优劣。 

本领域普通技术人员可以理解实现上述实施例的全部或部分步骤可以通过硬件来完成,也可以通过程序来指令相关的硬件完成,所述的程序可以存储于一种计算机可读存储介质中,上述提到的存储介质可以是只读存储器,磁盘或光盘等。 

以上所述仅为本发明的较佳实施例,并不用以限制本发明,凡在本发明的精神和原则之内,所作的任何修改、等同替换、改进等,均应包含在本发明的保护范围之内。 

去获取专利,查看全文>

相似文献

  • 专利
  • 中文文献
  • 外文文献
获取专利

客服邮箱:kefu@zhangqiaokeyan.com

京公网安备:11010802029741号 ICP备案号:京ICP备15016152号-6 六维联合信息科技 (北京) 有限公司©版权所有
  • 客服微信

  • 服务号