首页> 中国专利> 用于在分布式环境中执行任务的方法、装置及系统

用于在分布式环境中执行任务的方法、装置及系统

摘要

本申请公开了一种用于在分布式环境中执行任务的方法及装置、以及一种分布式任务执行系统。其中所述用于在分布式环境中执行任务的方法包括:接收根据当前系统配置执行数据处理任务的通知消息;获取当前系统配置信息,所述系统配置信息包括:数据处理设备和待处理数据单元的相关信息、以及任务分配算法;根据所述数据处理设备和待处理数据单元的相关信息,采用所述任务分配算法确定本数据处理设备负责处理的待处理数据单元;针对所述待处理数据单元,按照预先设定的方式执行数据处理任务。采用本申请提供的方法,可以避免人工配置导致的效率低下、容易出错等弊端;特别是在系统配置发生变更时,无须人工介入就能快速地进行数据处理任务的重新划分。

著录项

  • 公开/公告号CN105701099A

    专利类型发明专利

  • 公开/公告日2016-06-22

    原文格式PDF

  • 申请/专利权人 阿里巴巴集团控股有限公司;

    申请/专利号CN201410690593.9

  • 发明设计人 刘帅;

    申请日2014-11-25

  • 分类号G06F17/30(20060101);

  • 代理机构11441 北京市清华源律师事务所;

  • 代理人沈泳;李赞坚

  • 地址 英属开曼群岛大开曼资本大厦一座四层847号邮箱

  • 入库时间 2023-12-18 15:45:39

法律信息

  • 法律状态公告日

    法律状态信息

    法律状态

  • 2019-01-22

    授权

    授权

  • 2016-07-20

    实质审查的生效 IPC(主分类):G06F17/30 申请日:20141125

    实质审查的生效

  • 2016-06-22

    公开

    公开

说明书

技术领域

本申请涉及分布式数据处理领域,具体涉及一种用于在分布式环境中执行任务的方法。本申请同时提供一种用于在分布式环境中执行任务的装置、以及一种分布式任务执行系统。

背景技术

对于一些简单的涉及数据处理的应用,通常采用一台数据处理设备(通常是计算机)和一台数据存储设备(通常是数据库设备)就能够满足数据存储和应用的需求。但是随着计算机和网络技术的发展,各种应用越来越多,出现了很多大型的应用系统,一方面数据处理任务越来越复杂,计算工作量越来越大,一台计算机的资源(内存,CPU,网络)通常无法支撑其运行;另一方面,数据量的急剧增长也使得仅采用一台数据库设备无法满足数据存储和访问的需求。

在单机单库的系统架构已经无法满足需求的情况下,出现了基于分布式技术的多机多库架构:在数据处理层面由计算集群中的多台计算机负责执行数据处理任务,而数据存储则采用分库分表技术,将业务数据垂直分配到数据库集群的多台数据库上,也就是说,每台计算机只负责处理某一台或者几台数据库中的数据,由计算集群中的所有计算机共同协作完成整个数据处理任务。显然,采用最初的单机单库架构,不存在任务分配和调节的需求,然而发展到多机多库的架构方式,就必须要考虑如何为每台计算机分配数据处理任务。

现有的解决方案,通常采用固定配置的方式,即:由管理人员根据系统中每台计算机的硬件配置、数量以及存储待处理数据的数据库设备的数量等信息,采用特定的算法计算每台计算机负责处理哪些数据库数据,并将计算结果写入每台计算机的配置文件中,每台计算机在启动的时候读取所述配置文件,从而获知自己负责处理的数据,并进行相应的处理。

采用上述方式,在系统规模比较小、比较稳定的情况下可以满足应用需求,但是随着系统规模的扩展,计算集群中的计算机数量越来越多、数据库集群中的数据库设备也越来越多,采用上述配置方式的弊端日益明显:

(一)由于需要管理员在系统启动之前对每一台计算机逐一进行配置,过程繁琐、效率低,而且容易出错,可能出现某些数据被重复处理、而某些数据因为被遗漏而得不到处理的情况。

(二)特别是,当系统配置发生变化时,例如:为了提升系统能力进行扩容导致计算集群或者数据库集群中的设备数量增加,或者某台设备发生故障无法正常工作或者数据库设备发生了主备切换的情况,现有解决方案仍然只能依靠人工干预的方式,由管理员根据当前系统的最新状况,重新进行计算并根据计算结果更新每台计算机的配置文件,然后触发每台计算机重新启动并执行新的数据处理任务。由于需要人工介入,整个处理过程繁琐、效率也比较低、而且从系统配置发生变化到数据处理设备执行新的数据处理任务之间,通常存在比较长的滞后时间,可能会影响业务数据的正确处理。

发明内容

本申请提供一种用于在分布式环境中执行任务的方法,以解决现有技术通过人工方式为数据处理设备分配任务导致配置过程繁琐、效率低、以及对系统配置变化反应滞后的问题。本申请另外提供一种用于在分布式环境中执行任务的装置,以及一种分布式任务执行系统。

本申请提供一种用于在分布式环境中执行任务的方法,所述方法在执行数据处理任务的设备上实施,包括:

接收根据当前系统配置执行数据处理任务的通知消息;

获取当前系统配置信息,所述系统配置信息包括:数据处理设备和待处理数据单元的相关信息、以及任务分配算法;

根据所述数据处理设备和待处理数据单元的相关信息,采用所述任务分配算法确定本数据处理设备负责处理的待处理数据单元;

针对所述待处理数据单元,按照预先设定的方式执行数据处理任务。

可选的,所述待处理数据单元是指,存储待处理数据的数据库;相应的,所述待处理数据单元的相关信息包括:存储待处理数据的数据库的数目、和存储待处理数据的数据库标识;

所述数据处理设备的相关信息包括:所述数据处理设备的数目、和数据处理设备的标识。

可选的,所述接收根据当前系统配置执行数据处理任务的通知消息包括:

接收本数据处理设备的启动消息;或者,

接收当前系统配置发生变更的通知消息。

可选的,当接收所述当前系统配置发生变更的通知消息后,在执行所述获取当前系统配置信息的步骤前,执行下述操作:

判断当前是否正在执行数据处理任务,若是,结束当前正在执行的数据处理任务。

可选的,所述任务分配算法是负载均衡算法。

可选的,所述根据所述数据处理设备和待处理数据单元的相关信息,采用所述任务分配算法确定本数据处理设备负责处理的待处理数据单元,包括:

用存储待处理数据的每个数据库的标识与所述数据处理设备的数目执行取模操作,将得到的结果作为负责处理相应数据库的数据处理设备编号,得到数据库标识与数据处理设备编号之间的对应关系;

根据每个数据处理设备的标识进行排序,并将本数据处理设备对应的排序号作为本数据处理设备的编号;

根据本数据处理设备的编号以及所述数据库标识与数据处理设备编号之间的对应关系,获取本数据处理设备负责处理的数据库标识。

可选的,所述数据库标识是指,数据库的IP地址;所述数据处理设备标识是指,所述数据处理设备的IP地址。

可选的,所述按照预先设定的方式执行数据处理任务是指,采用定时启动的方式执行数据处理任务。

可选的,所述针对所述待处理数据单元,按照预先设定的方式执行数据处理任务包括:

根据本数据处理设备负责处理的数据库的IP地址,与所述数据库建立连接;

通过已建立的连接,访问所述数据库中的待处理数据并进行处理。

可选的,当所述接收根据当前系统配置执行数据处理任务的通知消息是指接收当前系统配置发生变更的通知消息时,所述方法包括:

向负责监听数据处理设备相关信息变更的管理设备定期上报本数据处理设备的运行状况;或者,

针对接收到的来自所述管理设备的请求上报本数据处理设备的运行状况;

相应的,所述接收当前系统配置发生变更的通知消息是指,接收所述管理设备发送的所述通知消息。

可选的,所述管理设备是Zookeeper服务器;或者,

Zookeeper服务器或配置中心服务器,其中Zookeeper服务器负责监听数据处理设备的相关信息的变更,配置中心服务器负责监听存储待处理数据的数据库的相关信息的变更以及任务分配算法的变更。

相应的,本申请还提供一种用于在分布式环境中执行任务的装置,所述装置部署在执行数据处理任务的设备上,包括:

消息接收单元,用于接收根据当前系统配置执行数据处理任务的通知消息;

信息获取单元,用于获取当前系统配置信息,所述系统配置信息包括:数据处理设备和待处理数据单元的相关信息、以及任务分配算法;

数据确定单元,用于根据所述数据处理设备和待处理数据单元的相关信息,采用所述任务分配算法确定本数据处理设备负责处理的待处理数据单元;

任务执行单元,用于针对所述待处理数据单元,按照预先设定的方式执行数据处理任务。

可选的,所述消息接收单元具体用于,接收本数据处理设备的启动消息;或者,接收当前系统配置发生变更的通知消息。

可选的,所述装置还包括:

任务结束单元,用于在所述消息接收单元接收到当前系统配置发生变更的通知消息后,以及触发所述信息获取单元工作之前,判断当前是否正在执行数据处理任务,若是,结束当前正在执行的数据处理任务。

可选的,所述数据确定单元采用的任务分配算法是负载均衡算法。

可选的,所述数据确定单元包括:

对应关系获取子单元,用于用存储待处理数据的每个数据库的标识与所述数据处理设备的数目执行取模操作,将得到的结果作为负责处理相应数据库的数据处理设备编号,得到数据库标识与数据处理设备编号之间的对应关系;

设备编号获取子单元,用于根据每个数据处理设备的标识进行排序,并将本数据处理设备对应的排序号作为本数据处理设备的编号;

数据库确定子单元,用于根据本数据处理设备的编号以及所述数据库标识与数据处理设备编号之间的对应关系,获取本数据处理设备负责处理的数据库标识。

可选的,所述任务执行单元具体用于,针对所述待处理数据单元,采用定时启动的方式执行数据处理任务。

可选的,所述任务执行单元包括:

连接建立子单元,用于根据本数据处理设备负责处理的数据库的IP地址,与所述数据库建立连接;

数据处理子单元,用于通过已建立的连接,访问所述数据库中的待处理数据并进行处理。

可选的,所述消息接收单元具体用于接收当前系统配置发生变更的通知消息时,所述装置包括:

定期上报子单元,用于向负责监听数据处理设备相关信息变更的管理设备定期上报本数据处理设备的运行状况;或者,

按需上报子单元,用于针对接收到的来自所述管理设备的请求上报本数据处理设备的运行状况;

相应的,所述消息接收单元具体用于接收所述管理设备发送的所述通知消息。

此外,本申请还提供一种分布式任务执行系统,包括:根据上述任意一项所述的用于在分布式环境中执行任务的数据处理装置;用于存储待处理数据的数据库、和用于提供系统配置信息、监听系统配置信息的变更以及向系统内的所述数据处理装置发送系统配置变更的通知消息的管理设备,所述系统配置信息包括:系统内的所述数据处理装置和所述数据库的相关信息以及任务分配算法。

可选的,所述管理设备包括:第一管理设备和第二管理设备,所述第一管理设备用于提供所述数据处理装置的相关信息、监听所述相关信息的变更、并在监测到所述变更时向系统内的数据处理装置广播所述通知消息;所述第二管理设备用于设置任务分配算法、提供所述数据库的相关信息、监听所述相关信息的变更、并在监测到所述变更或者所述任务分配算法发生变更时向系统内的数据处理装置广播所述通知消息。

可选的,所述第一管理设备是Zookeeper服务器,所述第二管理设备是配置中心服务器。

与现有技术相比,本申请具有以下优点:

本申请提供的用于在分布式环境中执行任务的方法,在接收根据当前系统配置执行数据处理任务的通知消息后,先获取当前系统配置信息,并采用所述配置信息中的任务分配算法确定本数据处理设备负责的待处理数据单元,并按照预先设定的方式处理所述待处理数据单元。通过在数据处理设备上实施上述方法,数据处理设备自身具备了动态地根据系统当前配置进行任务划分的能力,可以自动获知自己应该处理哪些数据并执行相应的数据处理任务,从而避免了由于人工配置导致的过程繁琐、效率低下以及容易出错等弊端;特别是在系统配置发生变更的情况下,无须人工介入就能快速地进行数据处理任务的重新划分,使得正常的数据处理过程不受影响,且不会发生遗漏数据或者重复处理数据的情况。

附图说明

图1是本申请的一种用于在分布式环境中执行任务的方法实施例的流程图;

图2是本申请实施例提供的采用负载均衡算法确定本数据处理设备负责处理的待处理数据单元的处理流程图;

图3是本申请实施例提供的数据处理设备与负责处理的数据库之间的对应关系的示意图;

图4是本申请的一种用于在分布式环境中执行任务的装置实施例的示意图;

图5是本申请的一种用于在分布式环境中执行任务的方法的另一种实施例的流程图;

图6是本申请实施例提供的系统配置变更后,数据处理设备与负责处理的数据库之间的对应关系的示意图;

图7是本申请的一种用于在分布式环境中执行任务的装置的另一种实施例的示意图;

图8是本申请的一种分布式任务执行系统的示意图。

具体实施方式

在下面的描述中阐述了很多具体细节以便于充分理解本申请。但是本申请能够以很多不同于在此描述的其它方式来实施,本领域技术人员可以在不违背本申请内涵的情况下做类似推广,因此本申请不受下面公开的具体实施的限制。

在本申请中,分别提供了一种用于在分布式环境中执行任务的方法和装置、以及一种分布式任务执行系统,在后续的实施例中逐一进行详细说明。为了便于理解,先对本申请的技术方案作简要说明。

在分布式计算环境中,待处理数据通常采用分库分表技术存储在多个数据库中,而多个数据处理设备(通常是指计算机)各自负责处理其中一部分待处理数据,最终协同完成整个数据处理任务。实施了本申请所提供方法的数据处理设备,能够根据系统配置信息中指定的任务分配算法,自动计算出自己负责处理的待处理数据单元,从而不必通过人工配置进行数据处理任务的指派。

本申请所述的数据处理设备,通常是指计算机,也包含其他具备数据处理能力的设备,所述数据处理设备通过对数据的访问,执行必要的处理操作,例如:对于在20天内未完成支付的订单,修改其状态为关闭,根据从数据中提取的信息,从银行下载文件并处理,以及其他一些更为复杂的计算操作等。上述访问数据以及在访问数据的基础上执行的处理或者操作,在本申请中统称为数据处理任务。

本申请的技术方案基于待处理数据进行数据处理任务的划分,进行所述划分的基本单位称为待处理数据单元,所述待处理数据单元通常是指存储待处理数据的数据库,也可以是数据库中的表、或者是普通的数据文件等。

实施了本申请技术方案的数据处理设备,在启动的时候、以及接收到当前系统配置发生变更的消息后,都会根据当前的系统配置确定自己负责处理的待处理数据单元,为了便于描述,下文会通过不同的实施例分别针对上述两种应用场景进行说明。

请参考图1,其为本申请的一种用于在分布式环境中执行任务的方法实施例的流程图,在本实施例中,数据处理设备在启动时根据当前的系统配置确定其负责处理的待处理数据单元,并按照预先设定的方式执行数据处理任务。所述方法包括如下步骤:

步骤101:接收本数据处理设备的启动消息。

数据处理设备通常会根据接收到的启动消息执行启动操作,所述启动消息可以是管理设备给本数据处理设备所在系统内的所有数据处理设备发送的统一的消息,也可以是本数据处理设备硬件触发的启动消息。所述启动消息对于数据处理设备来说,相当于通知其根据当前系统配置执行数据处理任务的通知消息,数据处理设备接收到该消息并执行必要的初始化操作后,就执行后续的步骤102-104确定其负责的待处理数据单元并执行相应的数据处理任务。

步骤102:获取当前系统配置信息。

所述系统配置信息,即:反应当前系统配置的信息,包括:数据处理设备和待处理数据单元的相关信息、以及任务分配算法。

所述待处理数据单元通常是指,存储待处理数据的数据库,相应的,所述待处理数据单元的相关信息包括:存储待处理数据的数据库的数目、和存储待处理数据的数据库标识;所述数据处理设备的相关信息包括:所述数据处理设备的数目、和数据处理设备的标识。

在本实施例的一个具体例子中,待处理数据采用分库分表技术存储在多个数据库中,所述存储待处理数据的数据库标识是指数据库的IP地址,所述数据处理设备标识是指,所述数据处理设备(即:计算机)的IP地址。此外,除了获取上述信息外,还可以获取与执行数据处理任务相关的其他信息,例如:数据库名称、数据库中表的名称等信息。

在具体实施中,数据处理设备可以在启动过程中在本系统内以广播的形式发送信息查询请求、并接收数据处理设备和待处理数据单元所在设备返回的应答,从而获取本系统内数据处理设备以及待处理数据单元的相关信息,而任务分配算法则可以采用预先设定的固定算法。

为了简化数据处理设备在启动过程中的上述处理过程,便于对数据处理设备和待处理数据单元进行集中管理、同时也便于对任务分配算法进行动态设置或调整,通常会引入统一的管理设备。

在本实施例的上述具体例子中,采用Zookeeper服务器作为所述管理设备,管理员可以在Zookeeper服务器中预先设置好任务分配算法。每个数据处理设备在启动阶段采用HTTP协议或者是Socket接口主动向Zookeeper服务器上报本设备信息,例如:本设备IP地址,本设备目前已启动等状态信息;此外,每个数据库也采用类似的方式向Zookeeper上报自己的IP地址,以及本数据库是主库还是备库等信息,从而管理设备就获知了本系统内的每个数据处理设备以及数据库的相关信息。

随后,数据处理设备向Zookeeper服务器发送获取数据处理设备和数据库的相关信息、以及任务分配算法的请求,Zookeeper服务器将已获取的数据处理设备的数目、数据处理设备的IP地址,主数据库的IP地址、主数据库的数目、以及任务分配算法返回给每个发送请求的数据处理设备。

在本实施例的上述具体例子中,系统中总共有3个数据处理设备,4个数据库设备,其中数据处理设备通过上述方式获取了如下所示的信息:

1)数据处理设备的数目为3,其IP地址分别为:192.168.0.10、192.168.0.11、192.168.0.12;

2)存储待处理数据的数据库的数目为4,其IP地址分别为:192.168.0.1、192.168.0.2、192.168.0.3和192.168.0.4;

3)具体的任务分配算法。

在上述具体例子中,为了简化系统的实现,采用Zookeeper服务器作为集中管理设备,在其他实施方式中,也可以不采用Zookeeper,而采用自行设计的软件模块实现上述管理功能。为了功能划分更为明确,也可以采用两台管理设备实现上述管理功能,例如:采用Zookeeper服务器管理与数据处理设备相关的信息,采用配置中心服务器管理与数据库相关的信息、以及进行任务分配算法的设置与调整。这些都是具体实施方式的变更,都不偏离本申请的核心。

步骤103:根据所述数据处理设备和待处理数据单元的相关信息,采用所述任务分配算法确定本数据处理设备负责处理的待处理数据单元。

所述任务分配算法是定义如何进行任务划分的具体算法,即:如何为每个数据处理设备分配待处理数据单元的算法。在本实施例的上述具体例子中,采用负载均衡算法,即:尽量将存储待处理数据的数据库均衡地分配给每个数据处理设备。

在本实施例的上述具体例子中,采用负载均衡算法确定本数据处理设备负责处理的数据库的过程,具体包括如下所示的步骤103-1至步骤103-3,下面结合附图2作进一步说明。

步骤103-1:用存储待处理数据的每个数据库的标识与所述数据处理设备的数目执行取模操作,将得到的结果作为负责处理相应数据库的数据处理设备编号,得到数据库标识与数据处理设备编号之间的对应关系。

在本步骤中建立存储待处理数据的数据库的IP地址与数据处理设备编号之间的对应关系,采用如下所示的公式:

数据处理设备编号=(数据库的IP地址)mod(数据处理设备的数目);

在步骤102中已经获取了每个数据库的IP地址,由于IP地址通常采用由小数点分隔的四段表示方式,为了执行上述计算,先将每个数据库的IP地址转换为对应的整数形式,具体说,将每个分段中的十进制数值以字符串的形式拼接在一起,然后将得到的字符串采用atio()或者类似函数转换为十进制整数,例如:IP地址192.168.0.1转换为对应的整数形式就是:19216801。

然后用数据库IP地址的整数形式与数据处理设备的数目执行取模运算,即:获取两者执行整除运算得到的余数,该余数就是负责处理该数据库数据的数据处理设备的编号。本步骤的运算结果如下表所示:

表一、计算数据库IP地址与数据处理设备编号之间的对应关系

数据库(IP)(数据库IP)mod(数据处理设备数目)数据处理设备编号DB1(192.168.0.1)19216801 mod 31DB2(192.168.0.2)19216802 mod 32DB3(192.168.0.3)19216803 mod 30DB4(192.168.0.4)19216804 mod 31

步骤103-2:根据每个数据处理设备的标识进行排序,并将本数据处理设备对应的排序号作为本数据处理设备的编号。

在步骤102中已经获取了每个数据处理设备的IP地址,在本步骤中根据所述IP地址对数据处理设备进行排序,每个数据处理设备对应的排序号就是该设备的编号。在本例子中,各个数据处理设备的编号如下表所示:

表二、数据处理设备的编号

数据处理设备(IP)数据处理设备编号Worker1(192.168.0.10)0Worker2(192.168.0.11)1Worker3(192.168.0.12)2

为了便于理解,表二给出了本例子中三个数据处理设备的编号,在具体实施中,每个数据处理设备在本步骤只需要按照上述方式获取本数据处理设备的编号就可以了。例如:数据处理设备Worker1获知自己的数据处理设备编号为0。

步骤103-3:根据本数据处理设备的编号以及所述数据库标识与数据处理设备编号之间的对应关系,获取本数据处理设备负责处理的数据库标识。

综合步骤103-1和103-2获取的计算结果,每个数据处理设备就可以获知自己负责处理的数据库的IP地址了。例如:数据处理设备Worker1的编号为0,根据步骤103-1中生成的对应关系,编号为0的数据处理设备与DB3相对应,因此Worker1获知自己负责处理IP地址为192.168.0.3的DB3数据库中的数据。在本例子中,各数据处理设备采用上述算法得到自己负责处理的数据库的IP地址,如下表所示:

表三、每个数据处理设备负责处理的数据库的IP地址

数据处理设备数据处理设备编号负责处理的数据库的IP地址worker10DB3(192.168.0.3)worker21DB1(192.168.0.1)和DB4(192.168.0.4)worker32DB2(192.168.0.2)

至此,每个数据处理设备按照获取的任务分配算法,通过上述计算过程,获知了自己负责处理的数据库的IP地址,请参见附图3,其为数据处理设备与负责处理的数据库之间的对应关系的示意图。

上面通过本实施例的一个具体例子,描述了数据处理设备如何根据任务分配算法确定自己负责的待处理数据单元的过程。在该例子中采用IP地址作为数据处理设备和数据库的标识,并基于IP地址取模的方式实现负载均衡算法,在其他实施方式中,也可以采用其他信息作为设备标识,或者采用其他的负载均衡算法。当然,根据系统实际的配置情况,还可以采用其他算法,例如,可以按照数据处理设备的硬件配置情况设置不同的权重,并按照权重分配待处理数据单元,从而使得处理能力强的数据处理设备分担与其能力相匹配的数据处理任务。在具体实施中,管理人员可以在管理设备上根据具体情况和应用需求,设置相应的任务分配算法,只要数据处理设备根据管理设备提供的任务分配算法,能够获知自己负责处理的待处理数据单元,就都可以实现本申请的技术方案。

步骤104:针对所述待处理数据单元,按照预先设定的方式执行数据处理任务。

根据在步骤103中确定的待处理数据单元,数据处理设备就可以执行相应的数据处理任务了。在具体实施中,数据处理设备可以采用预先设定的定时启动的方式执行数据处理任务,例如:在网络支付业务中,每天上午10:00定时根据数据库中的记录从银行下载文件并进行处理。数据处理设备具体采用何种方式执行任务,不是本申请的核心,本申请不作具体的限定。

在本实施例的上述具体例子中,待处理数据存储在数据库中,数据处理设备按照预先设定的方式执行数据处理任务时,首先根据数据库的IP地址,与数据库建立连接,例如:采用JDBC提供的getConnection()接口建立连接;然后通过已建立的连接,访问所述数据库中的待处理数据并进行处理。

通过上面的描述可以看出,在数据处理设备上实施本申请提供的用于在分布式环境中执行任务的方法,数据处理设备在接收启动消息后,根据获取的系统配置相关信息以及任务分配算法,可以自行计算出本数据处理设备负责的待处理数据单元,并按照预先设定的方式执行数据处理任务。采用这种方式,数据处理设备具备了自动根据当前配系统置进行任务划分的能力,可以自动获知自己应该处理哪些数据并执行相应的处理,从而避免了由于人工进行任务分配导致的过程繁琐、效率低下以及容易出错等弊端。

在上述的实施例中,提供了一种用于在分布式环境中执行任务的方法,与之相对应的,本申请还提供一种用于在分布式环境中执行任务的装置。请参看图4,其为本申请的一种用于在分布式环境中执行任务的装置实施例的示意图。由于装置实施例基本相似于方法实施例,所以描述得比较简单,相关之处参见方法实施例的部分说明即可。下述描述的装置实施例仅仅是示意性的。

本实施例的一种用于在分布式环境中执行任务的装置,包括:启动消息接收单元401,用于接收本数据处理设备的启动消息;信息获取单元402,用于获取当前系统配置信息,所述系统配置信息包括:数据处理设备和待处理数据单元的相关信息、以及任务分配算法;数据确定单元403,用于根据所述数据处理设备和待处理数据单元的相关信息,采用所述任务分配算法确定本数据处理设备负责处理的待处理数据单元;任务执行单元404,用于针对所述待处理数据单元,按照预先设定的方式执行数据处理任务。

可选的,所述数据确定单元采用的任务分配算法是负载均衡算法。

可选的,所述数据确定单元包括:

对应关系获取子单元,用于用存储待处理数据的每个数据库的标识与所述数据处理设备的数目执行取模操作,将得到的结果作为负责处理相应数据库的数据处理设备编号,得到数据库标识与数据处理设备编号之间的对应关系;

设备编号获取子单元,用于根据每个数据处理设备的标识进行排序,并将本数据处理设备对应的排序号作为本数据处理设备的编号;

数据库确定子单元,用于根据本数据处理设备的编号以及所述数据库标识与数据处理设备编号之间的对应关系,获取本数据处理设备负责处理的数据库标识。

可选的,所述任务执行单元具体用于,针对所述待处理数据单元,采用定时启动的方式执行数据处理任务。

可选的,所述任务执行单元包括:

连接建立子单元,用于根据本数据处理设备负责处理的数据库的IP地址,与所述数据库建立连接;

数据处理子单元,用于通过已建立的连接,访问所述数据库中的待处理数据并进行处理。

与上述的一种用于在分布式环境中执行任务的方法实施例相对应的,本申请还提供了所述方法的另一种实施例。请参考图5,其为本申请提供的一种用于在分布式环境中执行任务的方法的另一种实施例的流程图,在本实施例中,所述数据处理设备在运行过程中,一旦接收到当前系统配置发生变更的通知消息,则重新计算并确定本数据处理设备负责处理的数据,并执行相应的数据处理任务,从而实现任务划分的自适应调整机制。本实施例与第一实施例步骤相同的部分不再赘述,下面重点描述不同之处。

本申请提供的一种用于在分布式环境中执行任务的方法包括:

步骤501:接收当前系统配置发生变更的通知消息。

在具体实施过程中,通常会在系统内部引入管理设备,由管理设备负责监测系统配置是否发生变更,并在监测到发生变更时,向系统内的数据处理设备发送所述当前系统配置发生变更的通知消息,数据处理设备接收该消息后,执行步骤502。

仍以第一实施例中具体例子为例,所述管理设备是Zookeeper服务器,该服务器不仅能够在设备启动阶段提供数据处理设备和数据库的相关信息、以及任务分配算法,还负责监测系统配置是否发生变更并发送相应的变更消息,数据处理设备则监听由Zookeeper发送的系统配置发生变更的通知消息。

具体说,数据处理设备和数据库在启动时,主动向Zookeeper服务器上报自己的信息(参见第一实施例步骤102中的相关说明),从而Zookeeper服务器维护了本系统内部所有数据处理设备列表,以及数据库设备列表。

在系统的正常运行期间,数据处理设备以及数据库设备会定期向Zookeeper服务器发送心跳数据,或者,Zookeeper服务器定期向上述设备发送状态查询请求、并接收这些设备返回的应答,如果Zookeeper在预先设定的时间段内没有接收到某个设备发送的心跳数据或者没有接收到某个设备返回的应答,则可以认为该设备发生了故障;如果根据扩容的需要在系统中增加了新的数据处理设备或者数据库设备,新增加的设备在启动时,主动向Zookeeper上报本设备的信息;数据库设备发生主备切换后,也主动向Zookeeper报告,告知Zookeeper自己的IP地址以及自己是主库还是备库等相关信息。

通过上述机制,Zookeeper服务器就可以获知当前系统配置是否发生了变更,并维护最新的数据处理设备列表和数据库设备列表、以及任务分配算法。当Zookeeper服务器发现系统配置发生变更时(包括数据处理设备或者数据库设备数目变化、主数据库IP地址变化、管理人员重新设置了任务分配算法变化等),向其维护的数据处理设备列表中的每个数据处理设备发送系统配置变更消息,即:向所有数据处理设备进行广播,数据处理设备的监听线程接收到该消息后,执行步骤502以及后续步骤重新确定自己负责处理的数据库。

在本实施例的上述具体例子中,采用Zookeeper作为集中管理设备,在其他实施方式中,也可以采用两个管理设备实现上述功能,例如:采用Zookeeper服务器监测数据处理设备相关信息的变更,采用配置中心服务器监测数据库设备相关信息的变更以及任务分配算法的变更,相应的每个数据处理设备则需要监听来自上述两个管理设备的系统配置发生变更的通知消息。

步骤502:判断当前是否正在执行数据处理任务,若是,结束当前正在执行的数据处理任务。

系统配置发生变更后,由于要对数据处理任务重新进行划分,每个数据处理设备负责处理的待处理数据单元通常会发生变化,因此应该先停止当前正在执行的数据处理任务。

在本实施例的一个具体例子中,数据处理设备的监听线程接收到系统配置变更消息后,首先判断负责执行数据处理任务的工作线程当前是否处于工作状态,如果是,则设置停止处理的标志位为'1',工作线程在执行数据处理任务的过程中,会定期轮询该标志位,当发现该标志被置位时,结束当前的数据处理任务。

步骤503:获取当前系统配置信息,所述系统配置信息包括:数据处理设备和待处理数据单元的相关信息、以及任务分配算法。

请参见第一实施例中关于步骤102的相关说明,此处不再赘述,为了便于理解,本实施例在后续的步骤中,仍采用第一实施例中的具体例子进行说明。如果数据处理设备Worker3发生故障(例如:该设备无响应,不能继续执行数据处理任务),那么其他数据处理设备接收系统配置变更的通知消息后,执行本步骤重新获取如下所示的系统配置信息:

1)数据处理设备的数目为2,其IP地址分别为192.168.0.10和192.168.0.11;

2)存储待处理数据的数据库的数目为4,其IP地址分别为:192.168.0.1、192.168.0.2、192.168.0.3和192.168.0.4;

3)具体的任务分配算法。

在具体实施中,如果在系统配置变更的通知消息中明确包含具体的变更内容,而不仅仅是简单的通知,那么数据处理设备也可以直接根据该变更内容,对最近一次获取的数据处理设备和待处理数据单元的相关信息以及任务分配算法,进行相应更新,从而也实现了本步骤获取当前系统配置信息的功能。

步骤504:根据所述数据处理设备和待处理数据单元的相关信息,采用所述任务分配算法确定本数据处理设备负责处理的待处理数据单元。

请参见第一实施例中关于步骤103的相关说明,此处不再赘述。下面的表四、表五和表六是本步骤的计算结果:

表四、计算数据库IP地址与数据处理设备编号之间的对应关系

数据库(IP)(数据库IP)mod(数据处理设备数目)数据处理设备编号DB1(192.168.0.1)19216801 mod 21DB2(192.168.0.2)19216802 mod 20DB3(192.168.0.3)19216803 mod 21DB4(192.168.0.4)19216804 mod 20

表五、数据处理设备的编号

数据处理设备(IP)数据处理设备编号Worker1(192.168.0.10)0Worker2(192.168.0.11)1

表六、每个数据处理设备负责处理的数据库的IP地址

数据处理设备数据处理设备编号负责处理的数据库的IP地址Worker10DB2(192.168.0.2)和DB4(192.168.0.4)Worker21DB1(192.168.0.1)和DB3(192.168.0.3)

至此,本例子中的数据处理设备按照任务分配算法,通过上述计算过程,重新确定了自己负责处理的数据所在的数据库的IP地址,请参见附图6,其为Worker3发生故障后,数据处理设备与负责处理的数据库之间的对应关系的示意图。

步骤505:针对所述待处理数据单元,按照预先设定的方式执行数据处理任务。

请参见第一实施例中关于步骤104的相关说明,此处不再赘述。

至此,本实施例描述了在某个数据处理设备发生故障的情况下,采用本申请提供的用于在分布式环境中执行任务的方法,重新进行任务划分的过程。当系统配置发生其他变更的情况下,数据处理设备执行的基本处理流程与本实施例描述的上述流程是相同的。

需要说明的是,本实施例和第一实施例都以数据库作为进行任务划分的基本单位,在其他实施方式中可以采用更为小粒度的划分方式,例如:以数据库中的表为单位进行任务的划分,在这种情况下,本申请所述的待处理数据的相关信息不仅包括:数据库数目、数据库IP地址,还可以包含每个数据库中包含的表的数量、每个表的名称等(也可以称作数据库的拓扑结构),数据处理设备采用相应的任务分配算法确定本设备负责处理哪些数据库中的哪些表,并按照预先设定的方式执行相应的数据处理任务;相应的,监测待处理数据相关信息的变更时,不仅要监测数据库级别的变更,还要监测数据库中数据表的增减变更。采用上述实施方式,只是对待处理数据单元进行了更为细粒度的定义,其本质并不偏离本申请的核心,也在本申请的保护范围之内。

此外,在本实施例和第一实施例中,待处理数据都存储在数据库中,在其他实施方式中,待处理数据也可以存储在数据文件中,采用本申请提供的方法,数据处理设备可以自动获知本设备负责处理哪个或者哪些数据文件中的数据,并通过访问相应的数据文件执行数据处理任务。采用这种实施方式,只是待处理数据的存储方式发生了变更,其本质同样也不偏离本申请的核心,也在本申请的保护范围之内。

综上所述,本申请提供的用于在分布式环境中执行任务的方法,在系统配置信息发生变更时,可以根据当前的数据处理设备和待处理数据单元的相关信息以及任务分配算法,重新确定本数据处理设备负责处理的待处理数据单元,并按照预先设定的方式执行数据处理任务,从而实现了在系统配置发生变更时无须人工介入就能快速地进行数据处理任务的重新划分,使得正常的数据处理过程不受影响,且不会发生遗漏数据或者重复处理数据的情况。

与上述一种用于在分布式环境中执行任务的方法的另一种实施例相对应,本申请还提供一种用于在分布式环境中执行任务的装置。请参看图7,其为本申请的一种用于在分布式环境中执行任务的装置的另一种实施例示意图。由于装置实施例基本相似于方法实施例,所以描述得比较简单,相关之处参见方法实施例的部分说明即可。下述描述的装置实施例仅仅是示意性的。

本实施例的一种用于在分布式环境中执行任务的装置,包括:变更消息接收单元701,用于接收当前系统配置发生变更的通知消息;任务结束单元702,用于判断当前是否正在执行数据处理任务,若是,结束当前正在执行的数据处理任务;信息获取单元703,用于获取当前系统配置信息,所述系统配置信息包括:数据处理设备和待处理数据单元的相关信息、以及任务分配算法;数据确定单元704,用于根据所述数据处理设备和待处理数据单元的相关信息,采用所述任务分配算法确定本数据处理设备负责处理的待处理数据单元;任务执行单元705,用于针对所述待处理数据单元,按照预先设定的方式执行数据处理任务。

可选的,所述数据确定单元采用的任务分配算法是负载均衡算法。

可选的,所述数据确定单元包括:

对应关系获取子单元,用于用存储待处理数据的每个数据库的标识与所述数据处理设备的数目执行取模操作,将得到的结果作为负责处理相应数据库的数据处理设备编号,得到数据库标识与数据处理设备编号之间的对应关系;

设备编号获取子单元,用于根据每个数据处理设备的标识进行排序,并将本数据处理设备对应的排序号作为本数据处理设备的编号;

数据库确定子单元,用于根据本数据处理设备的编号以及所述数据库标识与数据处理设备编号之间的对应关系,获取本数据处理设备负责处理的数据库标识。

可选的,所述任务执行单元具体用于,针对所述待处理数据单元,采用定时启动的方式执行数据处理任务。

可选的,所述任务执行单元包括:

连接建立子单元,用于根据本数据处理设备负责处理的数据库的IP地址,与所述数据库建立连接;

数据处理子单元,用于通过已建立的连接,访问所述数据库中的待处理数据并进行处理。

可选的,所述装置包括:

定期上报子单元,用于向负责监听数据处理设备相关信息变更的管理设备定期上报本数据处理设备的运行状况;或者,

按需上报子单元,用于针对接收到的来自所述管理设备的请求上报本数据处理设备的运行状况;

相应的,所述消息接收单元具体用于接收所述管理设备发送的所述通知消息。

此外,本申请实施例还提供了一种分布式任务执行系统,该系统包括根据上述实施例所述的用于在分布式环境中执行任务的数据处理装置、用于存储待处理数据的数据库以及管理设备,所述管理设备用于提供系统配置信息、监听系统配置信息的变更以及向系统内的所述数据处理装置发送系统配置变更的通知消息。所述系统配置信息包括:系统内的数据处理装置和数据库的相关信息以及任务分配算法。

如图8所示,其为本申请提供的分布式任务执行系统的示意图。在图示系统中,所述数据处理装置部署在计算机上,3台计算机组成了计算集群;所述用于存储待处理数据的数据库设备有4台,组成了数据库集群;该系统中还包括一台Zookeeper服务器,用于提供计算集群中各台计算机的相关信息、监听计算集群内计算机数量的变更、并在监听到变更后向计算集群广播系统配置变更的通知消息;该系统还包括一台配置中心服务器,用于设置任务分配算法、提供数据库集群中各数据库设备的相关信息、监听数据库集群中数据库设备数量的变更、主备库切换的变更、以及任务分配算法的变更、并在监听到变更时向计算集群广播系统配置变更的通知消息。

管理员在配置中心服务器上预先设置好任务分配算法,数据库集群中的数据库设备和计算集群中的计算机,在启动阶段分别向配置中心服务器和Zookeeper服务器上报自己的相关信息,此后,计算集群中的计算机从Zookeeper服务器和配置中心获取本系统内的计算机和数据库的相关信息以及任务分配算法,采用所述任务分配算法确定本计算机负责处理的数据库并执行相应的数据处理任务;计算集群中的计算机在接收到来自Zookeeper或者配置中心的系统配置变更消息时,停止当前执行的数据处理任务,重新获取上述相关信息以及任务分配算法,重新确定本计算机负责处理的数据库并执行相应的数据处理任务。

由此可见,本申请提供的分布式任务执行系统,在计算集群或者数据库集群中的设备数量发生变更时、或者数据库集群中主数据库IP地址发生变化时(例如:发生主备切换)、以及任务分配算法发生变更时,可以自动进行数据处理任务的重新划分,实现了在系统配置发生变化时的自适应机制。

本实施例给出的上述分布式任务执行系统是示意性的,在具体实施中,数据处理装置的数目和用于存储待处理数据的数据库的数目是没有具体限制的;而且也可以采用单一的管理设备替代Zookeeper服务器和配置中心服务器,同样可以实现本申请提供的分布式任务执行系统的功能。

本申请虽然以较佳实施例公开如上,但其并不是用来限定本申请,任何本领域技术人员在不脱离本申请的精神和范围内,都可以做出可能的变动和修改,因此本申请的保护范围应当以本申请权利要求所界定的范围为准。

在一个典型的配置中,计算设备包括一个或多个处理器(CPU)、输入/输出接口、网络接口和内存。

内存可能包括计算机可读介质中的非永久性存储器,随机存取存储器(RAM)和/或非易失性内存等形式,如只读存储器(ROM)或闪存(flashRAM)。内存是计算机可读介质的示例。

1、计算机可读介质包括永久性和非永久性、可移动和非可移动媒体可以由任何方法或技术来实现信息存储。信息可以是计算机可读指令、数据结构、程序的模块或其他数据。计算机的存储介质的例子包括,但不限于相变内存(PRAM)、静态随机存取存储器(SRAM)、动态随机存取存储器(DRAM)、其他类型的随机存取存储器(RAM)、只读存储器(ROM)、电可擦除可编程只读存储器(EEPROM)、快闪记忆体或其他内存技术、只读光盘只读存储器(CD-ROM)、数字多功能光盘(DVD)或其他光学存储、磁盒式磁带,磁带磁磁盘存储或其他磁性存储设备或任何其他非传输介质,可用于存储可以被计算设备访问的信息。按照本文中的界定,计算机可读介质不包括非暂存电脑可读媒体(transitorymedia),如调制的数据信号和载波。

2、本领域技术人员应明白,本申请的实施例可提供为方法、系统或计算机程序产品。因此,本申请可采用完全硬件实施例、完全软件实施例或结合软件和硬件方面的实施例的形式。而且,本申请可采用在一个或多个其中包含有计算机可用程序代码的计算机可用存储介质(包括但不限于磁盘存储器、CD-ROM、光学存储器等)上实施的计算机程序产品的形式。

去获取专利,查看全文>

相似文献

  • 专利
  • 中文文献
  • 外文文献
获取专利

客服邮箱:kefu@zhangqiaokeyan.com

京公网安备:11010802029741号 ICP备案号:京ICP备15016152号-6 六维联合信息科技 (北京) 有限公司©版权所有
  • 客服微信

  • 服务号