首页> 中国专利> 实时数据流处理方法、装置、设备、及可读存储介质

实时数据流处理方法、装置、设备、及可读存储介质

摘要

本申请公开了一种实时数据流处理方法、装置、设备、及可读存储介质,该方法包括:获取多个数据流在预设时间窗口内产生的数据项;其中,每个数据项包括:一个索引字段;将所述多个数据流中的包含同一索引字段的数据项进行合并,并将合并后的合并数据项存储到缓存中;当所述合并数据项在所述缓存中的存储时长达到预设阈值时,将所述合并数据项存储到目标数据库中;本申请能够将多条数据流合并后存储到目标数据库中,从而减少目标数据库中的冗余数据。

著录项

  • 公开/公告号CN112416972A

    专利类型发明专利

  • 公开/公告日2021-02-26

    原文格式PDF

  • 申请/专利权人 上海哔哩哔哩科技有限公司;

    申请/专利号CN202011024249.8

  • 发明设计人 陈健;蔡雪峰;

    申请日2020-09-25

  • 分类号G06F16/2455(20190101);G06F16/28(20190101);

  • 代理机构11015 北京英特普罗知识产权代理有限公司;

  • 代理人程超

  • 地址 200433 上海市杨浦区政立路485号国正中心3号楼

  • 入库时间 2023-06-19 10:02:03

说明书

技术领域

本申请涉及数据处理技术领域,特别涉及一种实时数据流处理方法、装置、设备、及可读存储介质。

背景技术

在现有的流式技术中,多条数据流中的数据项会直接存储到MySQL数据库中,再基于MySQL数据库进行后续的数据处理;通常多条数据流之间的数据项存在一定的关联关系,但由于单一的数据流无法获取其他数据流中的数据信息,因此在将数据流中的数据项存入到MySQL数据库之前,只会通过算子对同一数据流中的数据项进行处理,不会对多个数据流之间的数据项进行处理,因此,会在MySQL数据库中存储大量冗余数据,降低了MySQL数据库的存储效率。

发明内容

本申请的目的在于提供一种实时数据流处理方法、装置、设备、及可读存储介质,能够将多条数据流合并后存储到目标数据库中,从而减少目标数据库中的冗余数据。

根据本申请的一个方面,提供了一种实时数据流处理方法,所述方法包括:

获取多个数据流在预设时间窗口内产生的数据项;其中,每个数据项包括:一个索引字段;

将所述多个数据流中的包含同一索引字段的数据项进行合并,并将合并后的合并数据项存储到缓存中;

当所述合并数据项在所述缓存中的存储时长达到预设阈值时,将所述合并数据项存储到目标数据库中。

可选的,在所述将合并后的合并数据项存储到缓存中之后,且所述合并数据项在所述缓存中的存储时长未达到所述预设阈值时,所述方法还包括:

当任一数据流中有撤回流操作时,获取所述数据流中的更新数据项;

根据所述更新数据项中的索引字段,从所述缓存中查找到包含所述索引字段的合并数据项,并根据所述更新数据项对所述合并数据项进行更新。

可选的,所述方法还包括:

获取预设的合并配置文件;其中,所述合并配置文件包括:多个目标索引字段、以及每个目标索引字段所对应的合并规则。

可选的,所述将所述多个数据流中的包含同一索引字段的数据项进行合并,并将合并后的合并数据项存储到缓存中,包括:

针对所述合并配置文件中的一个目标索引字段,判断在所述多个数据流中是否存在包含所述目标索引字段的数据项;

若是,则将所有包含所述目标索引字段的数据项发送至与所述目标索引字段对应的合并节点上;

通过所述合并节点,按照所述合并配置文件中的与所述目标索引字段对应的合并规则,将所有包含所述目标索引字段的数据项合并为合并数据项;

将所述合并数据项、以及所有包含所述目标索引字段的数据项存储到所述合并节点上的缓存中。

可选的,所述根据所述更新数据项中的索引字段,从所述缓存中查找到包含所述索引字段的合并数据项,并根据所述更新数据项对所述合并数据项进行更新,包括:

将所述更新数据项存储到与所述索引字段对应的合并节点上的缓存中;

确定出所述更新数据项的数据流类型;

从所述缓存中删除包含所述索引字段、且与所述数据流类型一致的数据项,以及从所述缓存中删除包含所述索引字段的合并数据项;

通过所述合并节点,按照所述合并配置文件中的与所述索引字段对应的合并规则,将所述缓存中所有包含所述索引字段的数据项合并为新的合并数据项;

将所述新的合并数据项存储到所述缓存中。

可选的,所述确定出所述更新数据项的数据流类型,包括:

根据所述更新数据项中包含的字段长度确定出所述更新数据项的数据流类型。

可选的,所述方法还包括:

若任一数据项的索引字段不包含在所述合并配置文件中,则将所述数据项存储到所述缓存中;

当所述数据项在所述缓存中的存储时长达到所述预设阈值时,将所述数据项存储到所述目标数据库中。

为了实现上述目的,本申请还提供一种实时数据流处理装置,所述装置包括:

获取模块,用于获取多个数据流在预设时间窗口内产生的数据项;其中,每个数据项包括:一个索引字段;

合并模块,用于将所述多个数据流中的包含同一索引字段的数据项进行合并,并将合并后的合并数据项存储到缓存中;

存储模块,用于当所述合并数据项在所述缓存中的存储时长达到预设阈值时,将所述合并数据项存储到目标数据库中。

为了实现上述目的,本申请还提供一种计算机设备,该计算机设备具体包括:存储器、处理器以及存储在所述存储器上并可在所述处理器上运行的计算机程序,所述处理器执行所述计算机程序时实现上述介绍的实时数据流处理方法的步骤。

为了实现上述目的,本申请还提供一种计算机可读存储介质,其上存储有计算机程序,所述计算机程序被处理器执行时实现上述介绍的实时数据流处理方法的步骤。

本申请提供的实时数据流处理方法、装置、设备、及可读存储介质,能够实现将多条数据流合并后存储到目标数据库中,从而减少目标数据库中的冗余数据,使得数据流中的数据项能够exactly once(精确一次)的输出到目标数据库中,从而彻底解决将数据流存储到目标数据库中会产生的各种异常情况,提高了目标数据库的存储效率。

附图说明

通过阅读下文优选实施方式的详细描述,各种其他的优点和益处对于本领域普通技术人员将变得清楚明了。附图仅用于示出优选实施方式的目的,而并不认为是对本申请的限制。而且在整个附图中,用相同的参考符号表示相同的部件。在附图中:

图1为实施例一提供的实时数据流处理方法的一种可选的流程示意图;

图2为实施例一提供的将不同数据流中的数据项分配到对应合并节点的示意图;

图3为实施例一提供的为不同数据流中的包含同一目标索引字段的两个数据项的合并示意图;

图4为实施例一提供的当出现撤回流操作时,重新对两个数据项进行合并处理的示意图;

图5为实施例一提供的实时数据流处理方法的另一种可选的流程示意图;

图6为实施例一提供的实时数据流处理方法的另一种可选的流程示意图;

图7为实施例二提供的实时数据流处理装置的一种可选的组成结构示意图;

图8为实施例三提供的计算机设备的一种可选的硬件架构示意图。

具体实施方式

为了使本申请的目的、技术方案及优点更加清楚明白,以下结合附图及实施例,对本申请进行进一步详细说明。应当理解,此处所描述的具体实施例仅用以解释本申请,并不用于限定本申请。基于本申请中的实施例,本领域普通技术人员在没有做出创造性劳动前提下所获得的所有其他实施例,都属于本申请保护的范围。

实施例一

本申请实施例提供了一种实时数据流处理方法,如图1所示,该方法具体包括以下步骤:

步骤S101:获取多个数据流在预设时间窗口内产生的数据项;其中,每个数据项包括:一个索引字段。

在本实施例中,每个数据流包括多个数据项,且每个数据项包括一个索引字段和多个其余字段;此外,在本实施例中,按照预设时间窗口T,定期获取各个数据流在时间窗口T内产生的所有数据项,从而对一个时间窗口T内的所有数据流产生的所有数据项进行合并处理。

需要说明的是,不同数据流中的各个数据项所包含的索引字段可以不同;例如,数据流A中的多个数据项包括索引字段KEY1和索引字段KEY2;而数据流B中的多个数据项包括索引字段KEY、索引字段KEY2和索引字段KEY3。

步骤S102:将所述多个数据流中的包含同一索引字段的数据项进行合并,并将合并后的合并数据项存储到缓存中。

具体的,在步骤S102之前,所述方法还包括:

获取预设的合并配置文件;其中,所述合并配置文件包括:多个目标索引字段、以及每个目标索引字段所对应的合并规则。

所述合并规则包括多个合并子规则,每个合并子规则对应数据项中的一个或多个目标字段,合并子规则用于对多个数据项中的目标字段的字段值按照预设运算逻辑处理,以得到合并字段值。其中,预设运算逻辑包括:求和运算、求差运算、求平均运算、取最大运算、取最小运算中的一个或多个。例如,针对包含同一索引字段的数据项A和数据项B,将数据项A和数据项B中的字段2进行求和运行,并将数据项A和数据项B中的字段4进行取最大运算。

进一步的,步骤S102,具体包括:

步骤A1:针对所述合并配置文件中的一个目标索引字段,判断在所述多个数据流中是否存在包含所述目标索引字段的数据项;

步骤A2:若是,则将所有包含所述目标索引字段的数据项发送至与所述目标索引字段对应的合并节点上;

如图2所示,为将不同数据流中的数据项分配到对应合并节点的示意图;在每个数据流中会设置算子,算子具有一定的逻辑处理能力用于进行数据处理,每个数据流中的数据项经过各个算子的处理后会存储到MySQL数据库中,在本实施例中,获取从各个数据流中的位于MySQL数据库之前的上游算子输出的数据项,并根据数据项中的索引字段,将包含相同索引字段的数据项发送至一个合并节点上;

步骤A3:通过所述合并节点,按照所述合并配置文件中的与所述目标索引字段对应的合并规则,将所有包含所述目标索引字段的数据项合并为合并数据项;

其中,在所述合并数据项中也包含所述目标索引字段;

如图3所示,为不同数据流中的包含同一目标索引字段的两个数据项的合并示意图;其中,数据流A中的数据项1和数据流B中的数据项2均包含目标索引字段KEY1;按照与目标索引字段KEY1对应的合并规则可对数据项1和数据项2中的其余字段进行数据合并处理;其中,合并规则可支持各种简单的AggregateFunction操作,例如,求和sum、取最小min、取最大max。

步骤A4:将所述合并数据项、以及所有包含所述目标索引字段的数据项存储到所述合并节点上的缓存中。

需要说明的是,在实时流处理中会存在撤回流的操作,撤回流操作用于对数据流中已产生的数据项进行更新,即,当下游收到撤回流时,会删除上次接收到的数据项,然后上游会再发出一条订正后的更新数据项以替换原数据项;因此,为了便于后期当出现撤回流操作时对合并数据项进行更新,需要将用于计算该合并数据项的原始数据项也存储到缓存中。此外,在实际应用中可分别在各个合并节点中设置对应的缓存,也可统一的为所有合并节点设置缓存。

进一步的,在所述将合并后的合并数据项存储到缓存中之后,所述方法还包括:

步骤B1:当任一数据流中有撤回流操作时,获取所述数据流中的更新数据项;

步骤B2:根据所述更新数据项中的索引字段,从所述缓存中查找到包含所述索引字段的合并数据项,并根据所述更新数据项对所述合并数据项进行更新。

需要说明的是,在本实施例中可在数据项中添加属性标志位,以判断数据项是更新流还是撤回流;如果是撤回流,则不对其做操作,也不需要存储到缓存中;如果是更新流,则根据数据项中的索引字段对缓存中对应的合并数据项进行更新。

更进一步的,步骤B2,具体包括:

步骤B21:将所述更新数据项存储到与所述索引字段对应的合并节点上的缓存中;

步骤B22:确定出所述更新数据项的数据流类型;

优选的,根据所述更新数据项中包含的字段长度确定出所述更新数据项的数据流类型;在本实施例中,一种类型的数据流中的所有数据项的字段长度均一致,例如,A流的字段长度为10、B流的字段长度为11;因此,可以通过数据项的字段长度确定出数据项的数据流类型;此外,也可以通过在数据项中设置用于标识数据流类型的类型标志位以确定出数据项所属于的数据流类型;

步骤B23:从所述缓存中删除包含所述索引字段、且与所述数据流类型一致的数据项,以及从所述缓存中删除包含所述索引字段的合并数据项;

步骤B24:通过所述合并节点,按照所述合并配置文件中的与所述索引字段对应的合并规则,将所述缓存中所有包含所述索引字段的数据项合并为新的合并数据项;

如图4所示,为当出现撤回流操作时,重新对两个数据项进行合并处理的示意图;其中,数据流A中的数据项1和数据流B中的数据项2均包含目标索引字段KEY1;当针对数据流A中的数据项1出现了撤回流操作时,即需要将原数据项1更新为数据项1′,需要根据更新后的数据项1′以及数据项2合并为新的合并数据项。

步骤B25:将所述新的合并数据项存储到所述缓存中。

步骤S103:当所述合并数据项在所述缓存中的存储时长达到预设阈值时,将所述合并数据项存储到目标数据库中。

优选的,目标数据项为MySQL数据库。

具体的,当所述合并数据项在所述缓存中的存储时长未达到所述预设阈值时,所述方法还包括:

当获取到的数据项与所述缓存中的任一合并数据项包含同一索引字段时,根据所述数据项对所述合并数据项进行更新。

在本实施例中,预设阈值为时间窗口的倍数,可以将一个或多个时间窗口的数据项进行合并,当任一合并数据项在缓存中的存储时长达到预设阈值时,才将该合并数据项存储到目标数据库中;此外,也可以当存储在缓存中的数据项的数量达到预设阈值时,将缓存中的数据项存储到目标数据库中,从而确保不会因为存储在缓存中的数据项过多或存储时间过长而导致内存溢出的情况出现。

进一步的,所述方法还包括:

步骤C1:若任一数据项的索引字段不包含在所述合并配置文件中,则将所述数据项存储到所述缓存中;

步骤C2:当所述数据项在所述缓存中的存储时长达到所述预设阈值时,将所述数据项存储到所述目标数据库中。

此外,在本实施例中,还可支持check point数据镜像恢复机制,即定期对缓存中的数据项进行备份,以当出现故障时,可恢复缓存中的数据项。

在现有技术中,虽然各个数据流之间的数据项会存在一定的关联,但也不会对不同数据流中的数据项进行合并处理,而是分别将各个数据流中的数据项直接存储到MySQL数据库中,因此,会在MySQL数据库中存储大量的冗余数据;此外,当数据流中的原数据项存储到MySQL数据库中时,会在MySQL数据库中产生一条原数据项存储记录,当在该数据流中产生撤回流操作时,会在MySQL数据库中产生一条原数据项撤回记录,当在该数据流中产生更新数据项时,会在MySQL数据库中产生一条更新数据项存储记录,以通过更新数据项替换原数据项;因此,在现有技术中,当出现撤回流操作时,需要在MySQL数据库中存储三条记录,不能做到将数据项exactly once的输出到MySQL数据库中,MySQL数据库的存储效率较低。通过本实施例提供的技术方案,可以在将不同数据流的数据项存储到MySQL数据库之前,通过包含在数据项中的索引字段对具有关联关系的数据项进行合并,且合并后的数据项会在缓存中存储一定时长,从而当出现撤回流操作时,也可直接对缓存中的数据项进行更新,而不需要作用于MySQL数据库,最后将合并后的数据项存储到MySQL数据库中,从而提高MySQL数据库的存储效率,以及减少MySQL数据库中的冗余数据。

如图5所示,为一种实时数据流处理方法的流程示意图,其中包括实时数据流A和实时数据流B;实时数据流A和实时数据流B类似于自来水管道,不断的向外提供数据;每个圆圈代表一个算子,且每个算子分布式运行在集群的各个服务器上,且每个算子具有一定的数据处理所里;合并节点会从上游算子获取到具有相同索引字段的数据项,并将数据项合并后存储到缓存中,当合并数据项在缓存中的存储时长达到预设阈值时,将缓存中的合并数据项发送至MySQL数据库。

此外,如图6所示,为在实际应用中实现实时数据流处理方法的流程示意图;其中,会预先设置合并配置文件,该合并配置文件可以为Json格式或sql语言,在合并配置文件中包括:目标索引字段、合并规则、预设缓存时长阈值;当需要实现实时数据流处理方法时,读取合并配置文件,并根据合并配置文件向实时流系统注册Sink MySQL数据库的输出算子、主件或者DAG;当启动分布式实时流任务时,多条数据流会进入Sink MySQL数据库算子;Sink MySQL数据库算子上游判断是否流入数据项或者有更新数据项,若是,则将具有相同索引字段的数据项发送至合并节点,以对数据项进行聚合操作,得到合并数据项,并将合并数据项存储到缓存中,当存储时长达到预设缓存时长阈值时,将合并数据项存储到MySQL数据库中。还需要说明的是,若一个数据流中数据项的索引字段不存在于其他数据流的数据项中,那么将该数据流的数据项直接存储到缓存中,并最终写入MySQL数据库中。

实施例二

本申请实施例提供了一种实时数据流处理装置,如图7所示,该装置具体包括以下组成部分:

获取模块701,用于获取多个数据流在预设时间窗口内产生的数据项;其中,每个数据项包括:一个索引字段;

合并模块702,用于将所述多个数据流中的包含同一索引字段的数据项进行合并,并将合并后的合并数据项存储到缓存中;

存储模块703,用于当所述合并数据项在所述缓存中的存储时长达到预设阈值时,将所述合并数据项存储到目标数据库中。

具体的,所述方法还包括:

更新模块,用于当任一数据流中有撤回流操作时,获取所述数据流中的更新数据项;

根据所述更新数据项中的索引字段,从所述缓存中查找到包含所述索引字段的合并数据项,并根据所述更新数据项对所述合并数据项进行更新。

所述方法还包括:

配置模块,用于获取预设的合并配置文件;其中,所述合并配置文件包括:多个目标索引字段、以及每个目标索引字段所对应的合并规则。

进一步的,合并模块702,具体用于:

针对所述合并配置文件中的一个目标索引字段,判断在所述多个数据流中是否存在包含所述目标索引字段的数据项;

若是,则将所有包含所述目标索引字段的数据项发送至与所述目标索引字段对应的合并节点上;

通过所述合并节点,按照所述合并配置文件中的与所述目标索引字段对应的合并规则,将所有包含所述目标索引字段的数据项合并为合并数据项;

将所述合并数据项、以及所有包含所述目标索引字段的数据项存储到所述合并节点上的缓存中。

进一步的,所述更新模块,具体用于:

将所述更新数据项存储到与所述索引字段对应的合并节点上的缓存中;

确定出所述更新数据项的数据流类型;

从所述缓存中删除包含所述索引字段、且与所述数据流类型一致的数据项,以及从所述缓存中删除包含所述索引字段的合并数据项;

通过所述合并节点,按照所述合并配置文件中的与所述索引字段对应的合并规则,将所述缓存中所有包含所述索引字段的数据项合并为新的合并数据项;

将所述新的合并数据项存储到所述缓存中。

更进一步的,存储模块703,还用于:

若任一数据项的索引字段不包含在所述合并配置文件中,则将所述数据项存储到所述缓存中;

当所述数据项在所述缓存中的存储时长达到所述预设阈值时,将所述数据项存储到所述目标数据库中。

实施例三

本实施例还提供一种计算机设备,如可以执行程序的智能手机、平板电脑、笔记本电脑、台式计算机、机架式服务器、刀片式服务器、塔式服务器或机柜式服务器(包括独立的服务器,或者多个服务器所组成的服务器集群)等。如图8所示,本实施例的计算机设备80至少包括但不限于:可通过系统总线相互通信连接的存储器801、处理器802。需要指出的是,图8仅示出了具有组件801-802的计算机设备80,但是应理解的是,并不要求实施所有示出的组件,可以替代的实施更多或者更少的组件。

本实施例中,存储器801(即可读存储介质)包括闪存、硬盘、多媒体卡、卡型存储器(例如,SD或DX存储器等)、随机访问存储器(RAM)、静态随机访问存储器(SRAM)、只读存储器(ROM)、电可擦除可编程只读存储器(EEPROM)、可编程只读存储器(PROM)、磁性存储器、磁盘、光盘等。在一些实施例中,存储器801可以是计算机设备80的内部存储单元,例如该计算机设备80的硬盘或内存。在另一些实施例中,存储器801也可以是计算机设备80的外部存储设备,例如该计算机设备80上配备的插接式硬盘,智能存储卡(Smart Media Card,SMC),安全数字(Secure Digital,SD)卡,闪存卡(Flash Card)等。当然,存储器801还可以既包括计算机设备80的内部存储单元也包括其外部存储设备。在本实施例中,存储器801通常用于存储安装于计算机设备80的操作系统和各类应用软件。此外,存储器801还可以用于暂时地存储已经输出或者将要输出的各类数据。

处理器802在一些实施例中可以是中央处理器(Central Processing Unit,CPU)、控制器、微控制器、微处理器、或其他数据处理芯片。该处理器802通常用于控制计算机设备80的总体操作。

具体的,在本实施例中,处理器802用于执行处理器802中存储的实时数据流处理方法的程序,所述实时数据流处理方法的程序被执行时实现如下步骤:

获取多个数据流在预设时间窗口内产生的数据项;其中,每个数据项包括:一个索引字段;

将所述多个数据流中的包含同一索引字段的数据项进行合并,并将合并后的合并数据项存储到缓存中;

当所述合并数据项在所述缓存中的存储时长达到预设阈值时,将所述合并数据项存储到目标数据库中。

上述方法步骤的具体实施例过程可参见第一实施例,本实施例在此不再重复赘述。

实施例四

本实施例还提供一种计算机可读存储介质,如闪存、硬盘、多媒体卡、卡型存储器(例如,SD或DX存储器等)、随机访问存储器(RAM)、静态随机访问存储器(SRAM)、只读存储器(ROM)、电可擦除可编程只读存储器(EEPROM)、可编程只读存储器(PROM)、磁性存储器、磁盘、光盘、服务器、App应用商城等等,其上存储有计算机程序,所述计算机程序被处理器执行时实现如下方法步骤:

获取多个数据流在预设时间窗口内产生的数据项;其中,每个数据项包括:一个索引字段;

将所述多个数据流中的包含同一索引字段的数据项进行合并,并将合并后的合并数据项存储到缓存中;

当所述合并数据项在所述缓存中的存储时长达到预设阈值时,将所述合并数据项存储到目标数据库中。

上述方法步骤的具体实施例过程可参见第一实施例,本实施例在此不再重复赘述。

需要说明的是,在本文中,术语“包括”、“包含”或者其任何其他变体意在涵盖非排他性的包含,从而使得包括一系列要素的过程、方法、物品或者装置不仅包括那些要素,而且还包括没有明确列出的其他要素,或者是还包括为这种过程、方法、物品或者装置所固有的要素。在没有更多限制的情况下,由语句“包括一个……”限定的要素,并不排除在包括该要素的过程、方法、物品或者装置中还存在另外的相同要素。

上述本申请实施例序号仅仅为了描述,不代表实施例的优劣。

通过以上的实施方式的描述,本领域的技术人员可以清楚地了解到上述实施例方法可借助软件加必需的通用硬件平台的方式来实现,当然也可以通过硬件,但很多情况下前者是更佳的实施方式。

以上仅为本申请的优选实施例,并非因此限制本申请的专利范围,凡是利用本申请说明书及附图内容所作的等效结构或等效流程变换,或直接或间接运用在其他相关的技术领域,均同理包括在本申请的专利保护范围内。

去获取专利,查看全文>

相似文献

  • 专利
  • 中文文献
  • 外文文献
获取专利

客服邮箱:kefu@zhangqiaokeyan.com

京公网安备:11010802029741号 ICP备案号:京ICP备15016152号-6 六维联合信息科技 (北京) 有限公司©版权所有
  • 客服微信

  • 服务号