首页> 中国专利> 消息主题的处理方法、装置、计算机设备和可读存储介质

消息主题的处理方法、装置、计算机设备和可读存储介质

摘要

本申请涉及数据处理技术领域,提供了一种消息主题的处理方法、装置、计算机设备和可读存储介质。该方法包括:设置对应消息队列的偏移量组和至少两个消费组;将消息队列中的消息主题分配至消费组,其中,若消费组被分配消息主题时,消费组为工作消费组,若消费组未被分配消息主题时,消费组为备用消费组;当工作消费组被分配至少两个消息主题,且其中的消息主题中断时,将中断的消息主题分配至备用消费组,以使备用消费组根据偏移量组中对应的偏移量,重启中断的消息主题。通过本申请,能够在无限拉起挂掉的消息主题,以保证程序稳定消费的同时,避免影响同一消息组的其他消息主题的消费。

著录项

  • 公开/公告号CN112612606A

    专利类型发明专利

  • 公开/公告日2021-04-06

    原文格式PDF

  • 申请/专利权人 平安消费金融有限公司;

    申请/专利号CN202011487334.8

  • 发明设计人 王晓初;赵宏军;

    申请日2020-12-16

  • 分类号G06F9/50(20060101);G06F9/54(20060101);

  • 代理机构11015 北京英特普罗知识产权代理有限公司;

  • 代理人程超

  • 地址 200131 上海市浦东新区自由贸易试验区陆家嘴环路1333号18层

  • 入库时间 2023-06-19 10:29:05

说明书

技术领域

本申请涉及数据处理技术领域,尤其涉及一种消息主题的处理方法、装置、计算机设备和可读存储介质。

背景技术

Kafka是一种分布式发布订阅消息系统,基于其高吞吐量的数据处理能力,目前使用非常广泛。但发明人研究发现,目前kafka提供消费组实现多个线程同时消费一个或多个消息主题时,当其中一个消息主题有问题挂掉时,会触发一次均衡导致线程重新分配,而在一些不稳定的场景下,需要保证程序稳定消费,需要将挂掉的线程重新拉起,这样又会发生一次线程重新分配,均会影响其他主题的正常消费。如果无限拉起挂掉的消息主题,则会导致当前消费组一直处于均衡中,同组的其他消息主题消费完全不可用。例如,当消费组同时订阅topic1、topic2、topic3共三个消息主题,若topic3挂掉,则将会对topic1和topic2重新分配线程,当无限重启topic3时,如果持续重启失败,会导致topic1和topic2同时处于不可用的状态。

因此,如何在无限拉起挂掉的消息主题,以保证程序稳定消费的同时,避免影响同一消息组的其他消息主题的消费,成为本领域亟需解决的技术问题。

发明内容

本申请的目的是提供一种消息主题的处理方法、装置、计算机设备和可读存储介质,用于解决现有技术中的上述技术问题。

一方面,为实现上述目的,本申请提供了一种消息主题的处理方法。

该消息主题的处理方法包括:设置对应消息队列的偏移量组和至少两个消费组,其中,所述偏移量组用于记录所述消息队列中各消息主题的偏移量,所述消费组包括若干线程,用于消费所述消息主题并更新所述偏移量;将所述消息队列中的消息主题分配至所述消费组,其中,若所述消费组被分配所述消息主题时,所述消费组为工作消费组,若所述消费组未被分配所述消息主题时,所述消费组为备用消费组;当所述工作消费组被分配至少两个所述消息主题,且其中的所述消息主题中断时,将中断的所述消息主题分配至所述备用消费组,以使所述备用消费组根据所述偏移量组中对应的偏移量,重启所述中断的消息主题。

进一步地,所述消息主题的处理方法还包括:设置对应各个所述消费组的主题分配记录,其中,所述主题分配记录包括各个所述消费组被分配的所述消息主题;将所述中断的消息主题分配至所述备用消费组的步骤包括:根据所述主题分配记录中查找所述备用消费组,并将所述中断的消息主题分配至查找到的所述备用消费组。

进一步地,所述消息主题的处理方法还包括:设置对应所述消费组的线程信息;将所述中断的消息主题分配至查找到的所述备用消费组的步骤包括:当根据所述主题分配记录查找到至少两个所述备用消费组时,获取各个所述备用消费组对应的线程信息;根据所述线程信息在各个所述备用消费组中,确定出与所述中断的消息主题适配的备用消费组。

进一步地,所述线程信息包括所述消费组的线程数量和单线程消费速率阈值,根据所述线程信息在各个所述备用消费组中,确定出与所述中断的消息主题适配的备用消费组的步骤包括:获取所述偏移量组中所述中断的消息主题的历史偏移量;根据所述历史偏移量计算所述中断的消息主题的数据流量;以及根据所述线程数量、所述单线程消费速率阈值和所述中断的消息主题的数据流量确定出与所述中断的消息主题适配的备用消费组。

进一步地,所述消息主题的处理方法还包括:当根据所述主题分配记录查找不到所述备用消费组时,在各个所述工作消费组中确定两个所述工作消费组;针对所述两个工作消费组,将其中一个所述工作消费组的消息主题分配至另一个所述工作消费组,以得到新的所述备用消费组;将所述中断的消息主题分配至新的所述备用消费组。

进一步地,所述线程信息还包括线程空闲时间,所述消息主题的处理方法还包括:每间隔检查周期,根据所述偏移量组计算所述工作消费组的数据流量,根据所述单线程消费速率阈值、所述工作消费组的数据流量和所述检查周期,计算所述工作消费组对应的目标线程量;比较所述目标线程量与所述工作消费组对应的线程数量;当所述线程数量小于或等于所述目标线程量时,将所述线程空闲时间置为0;当所述线程数量大于所述目标线程量时,将所述线程空闲时间增加一个所述检查周期;在各个所述工作消费组中确定两个所述工作消费组的步骤包括:在各个所述工作消费组中,查找所述线程空闲时间最大的两个消费工作组。

进一步地,根据所述偏移量组计算所述工作消费组的数据流量的步骤包括:从所述偏移量组中获取所述消息主题对应的偏移量;根据所述偏移量计算前一个所述检查周期内的偏移量增量;根据所述工作消费组分配的各个所述消息主题对应的偏移量增量,计算所述工作消费组对应的总偏移量增量;根据所述总偏移量增量和所述检查周期计算所述工作消费组的数据流量;

进一步地,采用以下公式根据所述单线程消费速率阈值、所述工作消费组的数据流量和所述检查周期,计算所述工作消费组对应的目标线程量:nt=ceil(cs*p/ms),其中,nt为所述目标线程量,cs为所述工作消费组的数据流量,p为所述检查周期,ms为所述单线程消费速率阈值,ceil为向下取整函数。

另一方面,为实现上述目的,本申请提供了一种消息主题的处理装置。

该线程分配装置包括:第一设置模块,用于设置对应消息队列的偏移量组和至少两个消费组,其中,所述偏移量组用于记录所述消息队列中各消息主题的偏移量,所述消费组包括若干线程,用于消费所述消息主题并更新所述偏移量;第一分配模块,用于将所述消息队列中的消息主题分配至所述消费组,其中,若所述消费组被分配所述消息主题时,所述消费组为工作消费组,若所述消费组未被分配所述消息主题时,所述消费组为备用消费组;第二分配模块,用于当所述工作消费组被分配至少两个所述消息主题,且其中的所述消息主题中断时,将中断的所述消息主题分配至所述备用消费组,以使所述备用消费组根据所述偏移量组中对应的偏移量,重启所述中断的消息主题。

又一方面,为实现上述目的,本申请还提供一种计算机设备,包括存储器、处理器以及存储在存储器上并可在处理器上运行的计算机程序,该处理器执行计算机程序时实现上述方法的步骤。

又一方面,为实现上述目的,本申请还提供计算机可读存储介质,包括存储数据区和存储程序区,存储数据区存储根据区块链节点的使用所创建的数据,存储程序区存储有计算机程序,其中,该计算机程序被处理器执行时实现上述方法的步骤。

本申请提供的消息主题的处理方法、装置、计算机设备和可读存储介质,除了设置对应消息队列的至少两个消费组之外,还设置有对应消息队列的偏移量组,该偏移量组用于记录消息队列中各消息主题的偏移量,消费组的若干线程用于消费消息主题并更新偏移量,在将消息队列中的消息主题分配至消费组之后,当有消息主题中断时,将中断的消息主题分配至未被分配消息主题的备用消费组,以使该备用消费组根据偏移量组中对应的偏移量,重启中断的消息主题,避免了该重启过程对与中断的消息主题处于同一消费组的其他消息主题的影响,同时,中断的消息主题可在新的消费组中无限重启,并且可在重启后续断点位置继续进行数据消费,综上,通过本申请,既能够无限拉起挂掉的消息主题,以保证程序稳定消费,同时又避免影响同一消息组的其他消息主题的消费。

附图说明

通过阅读下文优选实施方式的详细描述,各种其他的优点和益处对于本领域普通技术人员将变得清楚明了。附图仅用于示出优选实施方式的目的,而并不认为是对本申请的限制。而且在整个附图中,用相同的参考符号表示相同的部件。在附图中:

图1为本申请实施例一提供的消息主题的处理方法的流程图;

图2为本申请实施例二提供的消息主题的处理方法的框图;

图3为本申请实施例三提供的计算机设备的硬件结构图。

具体实施方式

为了使本申请的目的、技术方案及优点更加清楚明白,以下结合附图及实施例,对本申请进行进一步详细说明。应当理解,此处所描述的具体实施例仅用以解释本申请,并不用于限定本申请。基于本申请中的实施例,本领域普通技术人员在没有做出创造性劳动前提下所获得的所有其他实施例,都属于本申请保护的范围。

为了解决现有技术中的上述技术问题,本申请提出一种消息主题的处理方法、装置、计算机设备和可读存储介质,在该消息主题的处理方法中,设置对应消息队列的偏移量组和至少两个消费组,其中,偏移量组用于记录消息队列中各消息主题的偏移量,消费组包括若干线程,该线程用于消费消息队列中的消息主题,并在消费过程中实时更新偏移量,该至少两个消费组所消费的消息主题,由偏移量组统一维护,而不是由各个消费组维护自身消费的消息主题,在将消息队列中的消息主题分配至消费组后,被分配消息主题的消费组定义为工作消费组,未被分配消息主题的消费组定义为备用消费组。在消费组消费消息主题处理数据的过程中,当工作消费组被分配至少两个消息主题,且其中的消息主题中断时,将中断的消息主题分配至备用消费组,避免中断的消息主题在原工作消费组中反复重启,对同消费组中其他消息主题的影响,同时,中断的消息主题被分配至新的消息组后,能够在该新的消息组中无限重启,而且在重启后能够依据偏移量组中的偏移量从断点处继续消费,保证了所有消息主题的稳定消费,也即保证了程序的稳定消费。

关于本申请提供的消息主题的处理方法、装置、计算机设备和可读存储介质的具体实施例,将在下文中详细描述。

实施例一

本申请实施例一提供了一种消息主题的处理方法,通过该方法,中断的消息主题能够在新的消息组中无限重启,避免其在原工作消费组中反复重启对同消费组中其他消息主题的影响,而且其在重启后能够依据偏移量组中的偏移量从断点处继续消费,具体地,图1为本申请实施例一提供的消息主题的处理方法的流程图,如图1所示,该消息主题的处理方法包括如下的步骤S101至步骤S103:

步骤S101:设置对应消息队列的偏移量组和至少两个消费组。

其中,偏移量组用于记录消息队列中各消息主题的偏移量,消费组包括若干线程,用于消费消息主题并更新偏移量。

具体地,消息队列包括多个消息主题,例如Kafka消息队列等。对应消息队列设置两个或两个以上的消费组,每个消费组包括若干线程,该线程用于消费消息主题产生的数据,以实现对数据的处理,具体可以为数据同步等。此外,对应消息队列还设置有偏移量组,该偏移量组用于记录消息队列中各消息主题的偏移量,该偏移量用于记录消息主题中数据被消费的位置,因此,线程在消费消息主题产生的数据的同时,实时更新偏移量组中的对应消息主题的偏移量。也就是说,消息队列中各消息主题的偏移量统一维护,各消费组均可对偏移量组中对应的偏移量进行读写和更新。

步骤S102:将消息队列中的消息主题分配至消费组。

其中,若消费组被分配消息主题时,消费组为工作消费组,若消费组未被分配消息主题时,消费组为备用消费组。

具体地,一个消费组可以包括一个或多个线程,一个线程可以消费多个消息主题,多个线程也可消费一个消息主题,对此并不进行限定。在将消息主题分配至消费组时,一个消费组可以消费一个或多个消息主题,具体地,可预估消息主题的数据量以及线程的数据处理能力,从而使消费组中线程与消息主题实现适配。在本申请中,将被分配有消息主题的消费组,也即处于消费数据状态的消费组,定义为工作消费组;将没有被分配消息主题的消费组,定义为备用消费组。需要说明的是,工作消费组和备用消费组仅仅是从逻辑上对处于不同状态的消费组进行定义上的区分,并不一定具有硬件、软件以及性能上的差异。

步骤S103:当工作消费组被分配至少两个消息主题,且其中的消息主题中断时,将中断的消息主题分配至备用消费组,以使备用消费组根据偏移量组中对应的偏移量,重启中断的消息主题。

在进行消息主题的分配后,工作消费组中的线程开始对消息主题产生的数据进行消费。在消费的过程中,当一个消息主题中断而挂掉,而且该消息主题所在的工作消费组中还存在其他的消息主题时,将该中断的消息主题分配至备用消费组,而后对该中断的消息主题的重启操作,又该备用消费组完成。

可选地,中断的消息主题被分配至备用消费组后,该备用消费组成为新的工作消费组,对中断的消息主题重启,如果该中断的消息主题能够重启,成为新的正常消息主题,则该新的工作消费组查询偏移量组中该新的正常消息主题的偏移量,以从查询到的偏移量所表征的数据位置,也即断点的位置继续消费数据;如果该中断的消息主题无法重启,则该新的工作消费组可按预定周期持续拉起,直到该中断的消息主题能够重启,或者达到中断报警条件,进行报警。

同时,该中断的消息主题由原消费组移出后,原消费组可对剩余的消息主题重新进行线程资源的二次分配,进而可按照重新分配的线程资源消费剩余的消息主题。

在该实施例提供的消息主题的处理方法中,除了设置对应消息队列的至少两个消费组之外,还设置有对应消息队列的偏移量组,该偏移量组用于记录消息队列中各消息主题的偏移量,消费组的若干线程用于消费消息主题并更新偏移量,在将消息队列中的消息主题分配至消费组之后,当有消息主题中断时,将中断的消息主题分配至未被分配消息主题的备用消费组,以使该备用消费组根据偏移量组中对应的偏移量,重启中断的消息主题,避免了该重启过程对与中断的消息主题处于同一消费组的其他消息主题的影响,同时,中断的消息主题可在新的消费组中无限重启,并且可在重启后续断点位置继续进行数据消费,综上,采用该实施例提供的消息主题的处理方法,既能够无限拉起挂掉的消息主题,以保证程序稳定消费,同时又避免影响同一消息组的其他消息主题的消费。

可选地,在一种实施例中,消息主题的处理方法还包括:设置对应各个消费组的主题分配记录,其中,主题分配记录包括各个消费组被分配的消息主题;将中断的消息主题分配至备用消费组的步骤包括:在主题分配记录中查找备用消费组,并将中断的消息主题分配至查找到的备用消费组。

具体地,设置主题分配记录,该主题分配记录用于记录各个消费组被分配的消息主题,包括哪个消费组被分配有哪些消息主题,以及哪个消费组未被分配任务消息主题属于备用消费组等,当消费组中消息主题发生变化时,主题分配记录相应发生变化,可选地,可采用特定标识对备用消费组进行标识。在将中断的消息主题分配至备用消费组时,先通过查询主题分配记录,可查询到当前的备用消费组,然后将该中断的消息主题分配至查找到的备用消费组,同时修改主题分配记录。

例如,设置有消费组A、消费组B和消费组C共三个消费组,其中,在主题分配记录中记录有消息主题1、2和3被分配至消费组A,消息主题4和5被分配至消费组B,消费组C当前属于备用消费组。当消息主题1发生中断时,查找该主题分配记录,可查找到消费组C,将消息主题重新分配至消费组C,并且修改主题分配记录,修改后的主题分配记录为:消息主题1和2被分配至消费组A,消息主题4和5被分配至消费组B,消息主题3被分配至消费组C。

采用该实施例提供的消息主题的处理方法,设置了对应消费组的主题分配记录,有助于在消息主题中断时,快速而准确的将其重新分配至备用消费组。

可选地,在一种实施例中,消息主题的处理方法还包括:设置对应消费组的线程信息;将中断的消息主题分配至查找到的备用消费组的步骤包括:当根据主题分配记录查找到至少两个备用消费组时,获取各个备用消费组对应的线程信息;根据线程信息在各个备用消费组中,确定出与中断的消息主题适配的备用消费组。

具体地,针对每个消费组,维护其自身包括的线程信息,该线程信息可以包括线程数量、单线程消费速率阈值、线程所在的节点、线程的创建时间、回收时间等信息。在主题分配记录中查找备用消费组时,如果查找到两个或两个以上的备用消费组时,需要从查找到的备用消费组中选择出一个以重启中断的消息主题。在选择时,获取各个备用消费组对应的线程信息,然后根据该线程信息进行选择,以确定出与中断的消息主题适配的备用消费组。可选地,可定义不同的适配规则,例如,根据消息主题的消费时间和线程回收时间的适配关系进行选择,以避免线程回收影响消息主题的消费;又如,根据消息主题的数据量与线程数量、单线程消费速率阈值进行选择,以保证消费组的数据消费能力与消息主题的数据量相适配等。

采用该实施例提供的消息主题的处理方法,设置了对应消费组的线程信息,有助于在选择备用消费组时,选择到与中断的消息主题所适配的备用消费组,提升中断的消息主题能够正常重启的概率。

可选地,在一种实施例中,线程信息包括消费组的线程数量和单线程消费速率阈值,根据线程信息在各个备用消费组中,确定出与中断的消息主题适配的备用消费组的步骤包括:获取偏移量组中中断的消息主题的历史偏移量;根据历史偏移量计算中断的消息主题的数据流量;以及根据线程数量、单线程消费速率阈值和消息主题的数据流量确定出与中断的消息主题适配的备用消费组。

具体地,可根据偏移量组中中断的消息主题的历史偏移量确定在中断前预定时长内的偏移量增量,然后通过计算该偏移量增量与该预定时长的商,即可确定在该预定时长内,中断的消息主题的数据流量。具体如,获取消息主题在中断前5分钟偏移量,得到该5分钟内的偏移量增量,该偏移量增量除以5分钟,即可确定该5分钟内中断的消息主题的数据流量。根据消费组的线程数量和单线程消费速率阈值的积,能够得到该消费组最大的数据流量,因此,在选择与中断的消息主题适配的备用消费组时,可选择数据流量与最大的数据流量能够满足预设条件的备用消费组,具体如数据流量小于最大的数据流量且与该最大的数据流量最接近的备用消费组。

采用该实施例提供的消息主题的处理方法,根据消费组的线程数量、单线程消费速率阈值,以及中断的消息主题的数据流量,确定出与中断的消息主题适配的备用消费组,有利于中断的消息主题在新的消费组中稳定消费,能够减少数据积压的概率,以及避免线程资源的浪费。

可选地,在一种实施例中,消息主题的处理方法还包括:当根据主题分配记录查找不到备用消费组时,在各个工作消费组中确定两个工作消费组;针对两个工作消费组,将其中一个工作消费组的消息主题分配至另一个工作消费组,以得到新的备用消费组;将中断的消息主题分配至新的备用消费组。

具体地,当某一消费组的一消息主题中断时,通过主题分配记录查找备用消费组,当预置的所有消费组均为工作消费组时,从各个工作消费组中选出两个工作消费组,将该两个工作消费组中的消息主题合并由一个工作消费组消费,产生一个新的备用消费组,从而能够将中断的消息主题分配至备用消费组,可在新的备用消费组中进行无限重启,提升该中断的消息主题的稳定性,同时减少对其他消息主题的影响。其中,在从各个工作消费组中选择两个工作消费组时,可以根据工作消费组消费的消息主题的数据流量、工作消费组包括的线程线程信息等进行选择。

可选地,在一种实施例中,线程信息还包括线程空闲时间,消息主题的处理方法还包括:每间隔检查周期,根据偏移量组计算工作消费组的数据流量,根据单线程消费速率阈值、工作消费组的数据流量和检查周期,计算工作消费组对应的目标线程量;比较目标线程量与工作消费组对应的线程数量;当线程数量小于或等于目标线程量时,将线程空闲时间置为0;当线程数量大于目标线程量时,将线程空闲时间增加一个检查周期;在各个工作消费组中确定两个工作消费组的步骤包括:在各个工作消费组中,查找线程空闲时间最大的两个消费工作组。

具体地,预置检查周期,在消费组消费消息主题,进行数据处理的过程中,若当前时间达到检查周期,则对每个工作消费组对应的线程空闲状态进行一次检测。在检测时,首先根据偏移量组计算工作消费组的数据流量,例如,可以计算在上一检查周期内工作消费组的数据流量,然后根据单线程消费速率阈值、工作消费组的数据流量和检查周期,计算目标线程量,也即,计算工作消费组的所有线程在满足单线程消费速率阈值的条件下,同时能够保证一个检查周期内达到工作消费组的实际数据流量时,理论上所需的线程量。然后将该目标线程量与工作消费组实际包括线程数量进行比较,如果实际线程数量小于或等于目标线程量时,表明实际线程已经在接近单线程消费速率阈值的状态下运行,此时线程处于忙碌状态,因此将该工作消费组对应的线程空闲时间置为0,而当实际线程数量大于目标线程量时,表明实际线程存在冗余,必然有线程处于较小的速率下运行,存在线程资源的浪费,此时,将该工作消费组对应的线程空闲时间增加一个检查周期,从而,通过线程空闲时间,能够反应出工作消费组的工作状态。

基于此,当某一消费组的一消息主题中断,通过主题分配记录查找备用消费组,而预置的所有消费组均为工作消费组,需要从各个工作消费组中选出两个工作消费组时,可通过线程空闲时间选择工作消费组,选择线程空闲时间最大的两个消费工作组。

采用该实施例提供的消息主题的处理方法,按周期检测并记录工作消费组的空闲状态,在需要从工作消费组中选择两个工作消费组,进行消息主题的合并以及被分配中断的消息主题时,选择处于空闲状态的工作消费组进行操作,进一步减小对中断消息主题的处理而影响其他消息主题的正常运行。

可选地,在一种实施例中,根据偏移量组计算工作消费组的数据流量的步骤包括:从偏移量组中获取消息主题对应的偏移量;根据偏移量计算前一个检查周期的偏移量增量;根据工作消费组分配的各个消息主题对应的偏移量增量,计算工作消费组对应的总偏移量增量;根据总偏移量增量和检查周期计算工作消费组的数据流量。

具体地,通过偏移量组,能够获取前一个检查周期内消息主题的偏移量增量,将工作消费组中所有消息主题在此检查周期内的偏移量增量相加,即可得到该工作消费组的总偏移量增量,计算此总偏移量增量和检查周期的商,得到工作消费组的数据流量。

可选地,在一种实施例中,采用以下公式根据单线程消费速率阈值、数据流量和检查周期,计算工作消费组对应的目标线程量:nt=ceil(cs*p/ms),其中,nt为目标线程量,cs为数据流量,p为检查周期,ms为单线程消费速率阈值,ceil为向下取整函数。

其中,单线程消费速率阈值可采用实验测定的方式进行确定,具体步骤如下:

1)在消息队列的测试消息主题中写入满足测试量的测试数据时,例如在消息队列中主题TEST_TOPIC写入m(条)数据,每条数据q(MB),其中,保证数据量在1000条以上,共500MB以上,通过足够的数据量以保证测试效果;

2)订阅测试消息主题,以使线程消费测试数据,例如订阅消息主题TEST_TOPIC,使得线程开始按处理逻辑消费数据;

3)记录线程的消费完成时间,也即测试消费完成时间t(s)。

4)根据测试量和消费完成时间,确定单线程消费速率阈值,例如可以为单位时间内消费的条数=m/t(条/s);单位时间内消费的数据量=m*q/t(MB/s)。

可选地,在一种实施例中,在将消息队列中的消息主题分配至消费组时,可采用如下的分配方式:

确定待分配的线程数量和对应的多个消息主题,具体地,在该步骤中,确定要将多少线程分配给哪些消息主题。

获取各个消息主题的历史流量,在消息队列的历史消费过程中,会记录消费消息主题的偏移量offset,根据消息主题的偏移量offset,即可计算该消息主题的历史流量。具体地,可获取消息主题在预设时长内的偏移量增量,并计算该偏移量增量与预设时长的商,即为历史流量。例如,过去5分钟的offset增量,然后利用该增量除以5分钟,即可得到历史流量,例如历史流量为过去5分钟的offset增量(条)/5*60(s),从而能够快速简单的随时计算得到。可选地,可预置线程分配周期,在到达每个线程分配周期时,为分布式发布订阅消息系统中的所有消息主题进行一次线程分配量的调整,在调整时,可根据上一个周期内的偏移量增量与一个线程分配周期的商来计算历史流量。从而能够实现线程分配量的动态持续调整。

根据历史流量计算消息主题所占的线程比例。设待分配的线程数量为m,对应的消息主题的数量为n,第i个消息主题的历史流量为ti,s=t1+t2+...+tn,i=1,2,...,n,则第i个消息主题所占的线程比例为ti/s。

根据线程比例和线程数量向消息主题分配线程。在该步骤中,根据线程比例向消息主题分配线程时,线程比例越大,被分配到的线程资源越多,根据线程比例的大小,一个消息主题可分配多个线程,多个消息主题也可共享一个线程,最终当所有的消息主题均分配有线程资源时,被分配的线程数量等于或小于(基本等于)总的线程数量,该处的小于是指分配的线程数量与总的线程数量的差在可接受的误差范围内。

可选地,在进行线程分配时,如果某个消息主题对应的线程比例与线程数量的积(定义为比例积)大于1,则该消息主题自身被分配若干线程,如果某个消息主题对应的比例积小于1,则该消息主题与其他比例积也同样小于1的消息主题共用相同的线程。

采用该实施例提供的消息主题的处理方法,确定待分配的线程数量和对应的多个消息主题厚,获取各个消息主题的历史流量,以根据历史流量计算消息主题所占的线程比例,然后根据线程比例和线程数量向消息主题分配线程,其中,消息主题对应的线程比例,也即该消息主题历史实际产生的数据在全体待分配线程的消息主题产生的数据总量中的占比,因此,根据该线程比例向消息主题分配线程,能够使每个消息主题基于其数据流量进行线程分配,并考虑实际可分配的线程数量作为分配的基准,实现了线程分配的均衡性,能够减少线程资源的浪费以及数据积压问题出现的概率。

可选地,在一种实施例中,根据线程比例和待分配的线程数量向消息主题分配线程的步骤包括:采用公式(ti/s)*m计算每个消息主题对应的比例积;若比例积大于或等于1,则进行一次线程分配,为比例积对应的消息主题分配F(x)个线程,其中,F为取整函数,x为比例积,具体地,取整函数F可以为ceil()、floor()、round(),其中,ceil()为向上取整函数,floor()为向下取整函数,round()为四舍五入取整函数,优选采用round(),以减小取整产生的分配误差;以及若比例积小于1,则累加多个比例积,直到多个比例积的和大于或等于1时进行一次线程分配,为多个比例积对应点额消息主题共同分配F(y)个线程,其中,y为多个比例积的和。

采用该实施例提供的消息主题的处理方法,能够根据比例积的大小实现相应线程资源的分配,使得比例积大于或等于1,也即数据占比大的消息主题能够分配相应数量的线程,使得比例积小于1,也即数据占比小的若干消息主题共用一个或两个线程,进一步提升线程分配的均衡性和准确性。

可选地,在一种实施例中,在到达每个线程分配周期时,在计算得到每个资源主题对应的比例积时,一次完成所有消息主题的线程分配。针对每个消息主题,按照上述的分配方法,根据比例积的大小,自身分配若干线程或与其他若干消息主题共用线程。

可选地,在另一种实施例中,在根据线程比例和线程数量向消息主题分配线程的步骤中,每完成一次线程分配后,返回确定待分配的线程数量和对应的多个消息主题的步骤,重新确定待分配的线程数量和对应的多个消息主题并进行线程分配,以通过迭代的方式将所有线程分配完成和/或为所有消息主题分配线程。具体描述如下:

步骤S1:确定待分配的线程数量m和对应的n个消息主题。

其中,初始第一次执行该步骤S1时,线程数量m为分布式发布订阅消息系统预置的所有线程的数量,n个消息主题是分布式发布订阅消息系统的消息主题的总量。

后续在执行该步骤S1时,线程数量m和消息主题的数量n均在发生变化。具体地,线程数量m为上一次的线程数量减去上一次分配出去的线程量,消息主题的数量n为当前还未分配线程的消息主题的量。

步骤S2:获取各个消息主题的历史流量。

步骤S3:根据历史流量计算消息主题所占的线程比例。

步骤S4:根据线程比例和线程数量向消息主题分配线程。

在步骤S4中,每次仅为比例积大于1的消息主题进行线程分配,或者仅为一组比例积均小于1且比例积和大于或等于1的消息主题进行线程分配,在分配完成后,返回步骤S1中,直到所有线程分配完成和/或为所有消息主题分配线程,跳出循环,完成迭代。

采用该实施例提供的消息主题的处理方法,每次迭代,仅进行一次线程分配,通过多次迭代完成线程分配,能够进一步减小取整函数产生的误差,从而进一步提升线程分配的均衡性和准确性。

可选地,在一种实施例中,每一次迭代时,在根据线程比例和线程数量向消息主题分配线程的步骤中,包括:若存在比例积大于或等于1的消息主题,则先对比例积大于或等于1的消息主题进行线程分配。

具体地,在步骤S4中,计算消息主题对应的比例积(ti/s)*m后,若存在比例积大于或等于1的消息主题,则按照F(x)为其中的一个消息主题进行线程分配,在分配完成后,返回步骤S101中。若不存在比例积大于或等于1的消息主题,则按照F(y)为其中的至少两个消息主题进行线程分配,在分配完成后,返回步骤S1中。

采用该实施例提供的消息主题的处理方法,在迭代时,优先对比例积大于或等于1的消息主题分配线程,能够加快迭代的速度,以快速完成线程分配。

可选地,在一种实施例中,若存在比例积大于或等于1的消息主题,则先对比例积大于或等于1的消息主题进行线程分配的步骤包括:在所有比例积大于或等于的消息主题中,获取比例积最大的消息主题;为比例积最大的消息主题进行线程分配。

具体地,在优先对比例积大于或等于1的消息主题分配线程时,优先对比例积大的消息主题分配线程,能够进一步加快迭代的速度,以快速完成线程分配。

可选地,在一种实施例中,每一次迭代时,在根据线程比例和线程数量向消息主题分配线程的步骤中,包括:若不存在比例积大于或等于1的消息主题,将所有消息主题按照比例积由小到大的顺序排序,得到消息主题队列;其中,在累加多个比例积时,按照消息主题队列中的次序依次累加对应的比例积。

具体地,在对比例积均小于1的消息主题进行线程分配时,先按照比例积由小到大的顺序排序,其中,比例积越小,表明该消息主题的数据流量越小,按照比例积由小到大的顺序形成消息主题队列,基于该队列的次序累加比列积进行线程分配时,一方面,能够优先使数据流量小的消息主题被分配线程,减少这部分消息主题在最后分配时占用较多的线程资源而导致线程资源的浪费,另一方面,能够使数据流量小的消息主题共用线程,使得数据流量特点类似的消息主题通过相同的线程进行消费,有主题提升线程的消费稳定性。

可选地,在一种实施例中,每一次迭代时,在根据线程比例和线程数量向消息主题分配线程的步骤中,包括:若不存在比例积大于或等于1的消息主题,将所有消息主题按照是否处于同一消费组进行分组;其中,在累加多个比例积时,针对属于同一消费组中的消息主题累加对应的比例积。

具体地,在对比例积均小于1的消息主题进行线程分配时,先按照是否处于同一消费组进行分组,将处于不同消费组的消息主题进行隔离,避免不同消费组的消息主题共用线程。可选地,在同一个消费组中,仍然可按照比例积由小到大的顺序排序后进行线程分配。

实施例二

对应于上述实施例一,本申请实施例二提供了一种消息主题的处理装置,相应的技术特征和对应的技术效果可参考上述实施例一,该处不再赘述。图2为本申请实施例二提供的消息主题的处理装置的框图,如图2所示,该装置包括:第一设置模块201、第一分配模块202和第二分配模块203。

第一设置模块201,用于设置对应消息队列的偏移量组和至少两个消费组,其中,所述偏移量组用于记录所述消息队列中各消息主题的偏移量,所述消费组包括若干线程,用于消费所述消息主题并更新所述偏移量;第一分配模块202,用于将所述消息队列中的消息主题分配至所述消费组,其中,若所述消费组被分配所述消息主题时,所述消费组为工作消费组,若所述消费组未被分配所述消息主题时,所述消费组为备用消费组;第二分配模块203,用于当所述工作消费组被分配至少两个所述消息主题,且其中的所述消息主题中断时,将中断的所述消息主题分配至所述备用消费组,以使所述备用消费组根据所述偏移量组中对应的偏移量,重启所述中断的消息主题。

可选地,在一种实施例中,所述消息主题的处理装置还包括:第二设置模块,用于设置对应各个所述消费组的主题分配记录,其中,所述主题分配记录包括各个所述消费组被分配的所述消息主题;第二分配模块包括第一分配单元,用于根据所述主题分配记录中查找所述备用消费组,并将所述中断的消息主题分配至查找到的所述备用消费组。

可选地,在一种实施例中,所述消息主题的处理装置还包括:第三设置模块,用于设置对应所述消费组的线程信息;所述第一分配单元在将所述中断的消息主题分配至查找到的所述备用消费组时,具体执行的步骤包括:当根据所述主题分配记录查找到至少两个所述备用消费组时,获取各个所述备用消费组对应的线程信息;根据所述线程信息在各个所述备用消费组中,确定出与所述中断的消息主题适配的备用消费组。

可选地,在一种实施例中,所述线程信息包括所述消费组的线程数量和单线程消费速率阈值,所述第一分配单元在根据所述线程信息在各个所述备用消费组中,确定出与所述中断的消息主题适配的备用消费组时,具体执行的步骤包括:获取所述偏移量组中所述中断的消息主题的历史偏移量;根据所述历史偏移量计算所述中断的消息主题的数据流量;以及根据所述线程数量、所述单线程消费速率阈值和所述中断的消息主题的数据流量确定出与所述中断的消息主题适配的备用消费组。

可选地,在一种实施例中,所述消息主题的处理装置还包括:第三分配模块,用于当根据所述主题分配记录查找不到所述备用消费组时,在各个所述工作消费组中确定两个所述工作消费组;针对所述两个工作消费组,将其中一个所述工作消费组的消息主题分配至另一个所述工作消费组,以得到新的所述备用消费组;将所述中断的消息主题分配至新的所述备用消费组。

可选地,在一种实施例中,所述线程信息还包括线程空闲时间,所述消息主题的处理装置还包括:计算模块,用于每间隔检查周期,根据所述偏移量组计算所述工作消费组的数据流量,根据所述单线程消费速率阈值、所述工作消费组的数据流量和所述检查周期,计算所述工作消费组对应的目标线程量;比较模块,用于比较所述目标线程量与所述工作消费组对应的线程数量;处理模块,用于当所述线程数量小于或等于所述目标线程量时,将所述线程空闲时间置为0,当所述线程数量大于所述目标线程量时,将所述线程空闲时间增加一个所述检查周期;第三分配模块在各个所述工作消费组中确定两个所述工作消费组时,具体执行的步骤包括:在各个所述工作消费组中,查找所述线程空闲时间最大的两个消费工作组。

可选地,在一种实施例中,计算模块根据所述偏移量组计算所述工作消费组的数据流量时,具体执行的步骤包括:从所述偏移量组中获取所述消息主题对应的偏移量;根据所述偏移量计算前一个所述检查周期内的偏移量增量;根据所述工作消费组分配的各个所述消息主题对应的偏移量增量,计算所述工作消费组对应的总偏移量增量;根据所述总偏移量增量和所述检查周期计算所述工作消费组的数据流量。

可选地,在一种实施例中,计算模块采用以下公式根据所述单线程消费速率阈值、所述工作消费组的数据流量和所述检查周期,计算所述工作消费组对应的目标线程量:nt=ceil(cs*p/ms),其中,nt为所述目标线程量,cs为所述工作消费组的数据流量,p为所述检查周期,ms为所述单线程消费速率阈值,ceil为向下取整函数。

实施例三

本实施例三还提供一种计算机设备,如可以执行程序的智能手机、平板电脑、笔记本电脑、台式计算机、机架式服务器、刀片式服务器、塔式服务器或机柜式服务器(包括独立的服务器,或者多个服务器所组成的服务器集群)等。如图3所示,本实施例的计算机设备01至少包括但不限于:可通过系统总线相互通信连接的存储器012、处理器011,如图3所示。需要指出的是,图3仅示出了具有组件存储器012和处理器011的计算机设备01,但是应理解的是,并不要求实施所有示出的组件,可以替代的实施更多或者更少的组件。

本实施例中,存储器012(即可读存储介质)包括闪存、硬盘、多媒体卡、卡型存储器(例如,SD或DX存储器等)、随机访问存储器(RAM)、静态随机访问存储器(SRAM)、只读存储器(ROM)、电可擦除可编程只读存储器(EEPROM)、可编程只读存储器(PROM)、磁性存储器、磁盘、光盘等。在一些实施例中,存储器012可以是计算机设备01的内部存储单元,例如该计算机设备01的硬盘或内存。在另一些实施例中,存储器012也可以是计算机设备01的外部存储设备,例如该计算机设备01上配备的插接式硬盘,智能存储卡(Smart Media Card,SMC),安全数字(Secure Digital,SD)卡,闪存卡(Flash Card)等。当然,存储器012还可以既包括计算机设备01的内部存储单元也包括其外部存储设备。本实施例中,存储器012通常用于存储安装于计算机设备01的操作系统和各类应用软件,例如实施例二的消息主题的处理装置等。此外,存储器012还可以用于暂时地存储已经输出或者将要输出的各类数据。

处理器011在一些实施例中可以是中央处理器(Central Processing Unit,CPU)、控制器、微控制器、微处理器、或其他数据处理芯片。该处理器011通常用于控制计算机设备01的总体操作。本实施例中,处理器011用于运行存储器012中存储的程序代码或者处理数据,例如消息主题的处理方法等。

实施例四

本实施例四还提供一种计算机可读存储介质,如闪存、硬盘、多媒体卡、卡型存储器(例如,SD或DX存储器等)、随机访问存储器(RAM)、静态随机访问存储器(SRAM)、只读存储器(ROM)、电可擦除可编程只读存储器(EEPROM)、可编程只读存储器(PROM)、磁性存储器、磁盘、光盘、服务器、App应用商城等等,其上存储有计算机程序,程序被处理器执行时实现相应功能。本实施例的计算机可读存储介质用于存储消息主题的处理装置,该计算机可读存储介质被处理器执行时实现实施例一的消息主题的处理方法。

进一步地,所述计算机可读存储介质可主要包括存储程序区和存储数据区,其中,存储程序区可存储操作系统、至少一个功能所需的应用程序等;存储数据区可存储根据区块链节点的使用所创建的数据等。

本申请所指区块链是分布式数据存储、点对点传输、共识机制、加密算法等计算机技术的新型应用模式。区块链(Blockchain),本质上是一个去中心化的数据库,是一串使用密码学方法相关联产生的数据块,每一个数据块中包含了一批次网络交易的信息,用于验证其信息的有效性(防伪)和生成下一个区块。区块链可以包括区块链底层平台、平台产品服务层以及应用服务层等。

需要说明的是,在本文中,术语“包括”、“包含”或者其任何其他变体意在涵盖非排他性的包含,从而使得包括一系列要素的过程、方法、物品或者装置不仅包括那些要素,而且还包括没有明确列出的其他要素,或者是还包括为这种过程、方法、物品或者装置所固有的要素。在没有更多限制的情况下,由语句“包括一个……”限定的要素,并不排除在包括该要素的过程、方法、物品或者装置中还存在另外的相同要素。

上述本申请实施例序号仅仅为了描述,不代表实施例的优劣。

可用于众多通用或专用的计算机系统环境或配置中。例如:个人计算机、服务器计算机、手持设备或便携式设备、平板型设备、多处理器系统、基于微处理器的系统、置顶盒、可编程的消费电子设备、网络PC、小型计算机、大型计算机、包括以上任何系统或设备的分布式计算环境等等。可以在由计算机执行的计算机可执行指令的一般上下文中描述,例如程序模块。一般地,程序模块包括执行特定任务或实现特定抽象数据类型的例程、程序、对象、组件、数据结构等等。也可以在分布式计算环境中实践,在这些分布式计算环境中,由通过通信网络而被连接的远程处理设备来执行任务。在分布式计算环境中,程序模块可以位于包括存储设备在内的本地和远程计算机存储介质中。

通过以上的实施方式的描述,本领域的技术人员可以清楚地了解到上述实施例方法可借助软件加必需的通用硬件平台的方式来实现,当然也可以通过硬件,但很多情况下前者是更佳的实施方式。

以上仅为本申请的优选实施例,并非因此限制本申请的专利范围,凡是利用本申请说明书及附图内容所作的等效结构或等效流程变换,或直接或间接运用在其他相关的技术领域,均同理包括在本申请的专利保护范围内。

去获取专利,查看全文>

相似文献

  • 专利
  • 中文文献
  • 外文文献
获取专利

客服邮箱:kefu@zhangqiaokeyan.com

京公网安备:11010802029741号 ICP备案号:京ICP备15016152号-6 六维联合信息科技 (北京) 有限公司©版权所有
  • 客服微信

  • 服务号