首页> 中国专利> 一种基于中断重入机制的协程处理及管理方法

一种基于中断重入机制的协程处理及管理方法

摘要

本发明公开了一种基于中断重入机制的协程处理及管理方法,处理方法中,所述协程包括主协程和子协程,中断重入步骤包括:S1:所述主协程抛出中断请求,所述主协程的底层框架响应所述中断请求并异步发送任务请求;S2:切换至外部系统处理逻辑,所述主协程释放线程资源;S3:所述任务请求应答通过回调接口执行重入所述子协程,并恢复至中断时上下文状态;S4:所述子协程继续处理后续逻辑,直到执行结束,释放线程资源;或者子协程作为新的主协程抛出中断请求,重复上述步骤S1至S4;管理方法中,上述协程组合为应用,所述应用程序入口OnProcess()内包括处理方法中的CallBatch()接口。本发明大幅降低了有大量并行分支的调度程序开发、维护的复杂度和工作量。

著录项

  • 公开/公告号CN112162840A

    专利类型发明专利

  • 公开/公告日2021-01-01

    原文格式PDF

  • 申请/专利权人 曹蕤;

    申请/专利号CN202011047310.0

  • 发明设计人 曹蕤;

    申请日2020-09-29

  • 分类号G06F9/48(20060101);G06F9/46(20060101);G06F9/50(20060101);G06F9/52(20060101);

  • 代理机构32266 苏州中合知识产权代理事务所(普通合伙);

  • 代理人刘奇

  • 地址 100020 北京市朝阳区朝阳北路108楼3单元1107号

  • 入库时间 2023-06-19 09:24:30

说明书

技术领域

本发明属于计算机技术领域,尤其涉及一种基于中断重入机制的协程处理及管理方法。

背景技术

业务应用开发中经常需要开发一些调度程序,这些程序需要同时推进很多分支流程,并且很多分支流程会由于与外部的协作而需要很长的执行时间,例如图1所示的调度过程(其中,B、C、D都表示提交一个批处理任务并等待该任务的完成结果)。采用常见的编程语言开发上述调度程序,通常都以同步阻塞方式实现与外部的协作,因此只能采用多线程实现多分支并行推进,其工作机制如图2所示。很明显,这样的程序存在的问题是,一旦需要大规模并行推进(例如并行分支超过1000个),并且很多分支都存在长时间同步阻塞的场景,则运行时线程资源消耗巨大。

为解决线程资源消耗过大问题,业界常见做法是采用异步回调方式编程,其工作机制如图3所示。很明显,线程资源的消耗被降到最低,但这样的程序存在的问题是编程模型复杂,不符合人的思维习惯,代码的可维护性差,甚至无法支持组件化开发,不能把Process方法封装成一个在E动作完成时返回完成结果的方法。

在较新的编程语言Go、C#中,开发人员可以基于协程编程模型来开发这些调度程序,其工作机制如图4所示。这种工作机制在线程模型层面和异步回调编程的程序几乎一样:所有等待动作都会导致协程挂起,全部协程挂起后则主线程终止。收到应答后,会在新线程中继续推进被唤醒的协程。显然,这样的程序不仅能将线程资源的消耗降到最低,且编程模型符合人的思维习惯,代码可维护性好,并能够支持组件化开发。遗憾的是,支持这种协程编程模型的语言较少,目前行业运用最广泛的Java、C++等语言均不支持;而即使语言本身支持这种协程编程模型,大多数语言一般也不支持程序自动具备一些高可用特性,如同一调度程序可换机运行、系统宕机恢复后程序能够继续执行。

因此,目前急需一种基于中断重入机制的协程处理及管理方法,以弥补上述方法中的缺陷。

发明内容

针对上述问题,本发明提供了一种基于中断重入机制的协程处理及管理方法,采用可维护性更好的同步阻塞编程模型开发有大量并行分支的调度程序,大幅降低了此类程序开发、维护的复杂度和工作量,且能够支持程序进行组件化开发,不受编程语言的约束。

为了实现上述目的,本发明采用的技术方案为:

一种基于中断重入机制的协程处理方法,所述协程包括主协程和子协程,中断重入步骤包括:

S1:所述主协程抛出中断请求,所述主协程的底层框架响应所述中断请求并异步发送任务请求;

S2:切换至外部系统处理逻辑,所述主协程释放线程资源;

S3:所述任务请求应答通过回调接口执行重入所述子协程,并恢复至中断时上下文状态;

S4:所述子协程继续处理后续逻辑,直到执行结束,释放线程资源;或者子协程作为新的主协程抛出中断请求,重复上述步骤S1至S4。

优选地,步骤S1还包括,所述主协程首次抛出中断请求前,建立中断记忆对象并登记到记忆存储器中,所述记忆对象用于储存不可重复动作的执行结果,所述记忆存储器用于储存中断处上下文;

步骤S3还包括,重入所述子协程后,所述子协程执行到中断处并将上下文恢复到中断时的状态,并将所述任务的应答结果登记到所述记忆对象。

更优选地,所述任务为批处理任务,所述主协程的底层框架通过调用CallBatch()接口执行批处理任务。

更优选地,所述CallBatch()接口首次执行时包括以下步骤:

创建BatchCall记忆对象,并登记到记忆存储器中,此时所述BatchCall记忆对象的isInitial标识为true;

执行BatchCall记忆对象的Execute()接口抛出中断类型异常;

调用方捕获所述中断类型异常,并执行BatchCall记忆对象的ExecuteAfterInterruption()接口;

所述ExecuteAfterInterruption()接口调起所述BatchCall记忆对象的OnExecuteAfterInterruption()接口,异步发送批处理任务的请求并在内存中注册回调接口;

动作执行完后,所述BatchCall记忆对象的isInitial标识被置为false,程序执行结束,释放出线程资源。

更优选地,任务应答通过回调接口切换至所述主协程包括以下步骤:

底层框架使用新线程调起回调接口HandleResponse(),并将任务应答传给所述回调接口HandleResponse();

所述回调接口HandleResponse()调起Resume()接口,所述Resume()接口则调起OnProcess()接口重新开始执行所述主协程处理逻辑;

所述主协程再次执行到CallBatch()接口时,查验所述记忆存储器中登记的所述BatchCall记忆对象,此时所述BatchCall记忆对象的isInitial标识为false,不再执行Execute()接口,将任务应答登记到所述BatchCall记忆对象中,使得后续可以从所述BatchCall记忆对象中获取任务应答继续后续处理。

更优选地,任务应答内容存储于所述BatchCall记忆对象中。

一种基于中断重入机制的协程管理方法,其特征在于,利用上述的一种基于中断重入机制的协程处理方法,所述协程组合为应用,所述协程包括协程标识、中断标识isInterrupted和记忆存储器,所述应用运行包括一次开始处理接口Process()和多次重入处理接口Resume()的调用,还包括应用程序入口OnProcess(),所述开始处理接口Process()和重入处理接口Resume()均通过调用所述应用程序入口OnProcess()执行处理逻辑,所述应用程序入口OnProcess()内包括所述CallBatch()接口;在调用所述应用程序入口OnProcess()前通过循环遍历应用的协程集合调起应用程序入口OnProcess()。

优选地,循环遍历应用的协程集合包括以下步骤:

执行while循环遍历协程集合,当通过遍历获取到一个协程时,将此协程设为当前协程,查验所述当前协程的中断标识isInterrupted,若中断标识isInterrupted为true,表示处于中断状态,while循环继续遍历下一个协程;若中断标识isInterrupted为false,表示处于非中断状态,将当前协程的记忆存储器中所有记忆的isInitial标识置为false;

调用OnProcess()接口执行应用处理逻辑,应用执行中断时,while循环捕获中断并执行ExecuteAfterInterruption()接口并将当前协程的IsInterrupted标识置为true,执行结束,while循环继续遍历下一个协程;若应用从OnProcess()接口执行出来未抛出中断异常,当前协程执行结束,while循环从应用的协程集合中删除当前协程,while循环继续遍历下一个协程;

重复上述步骤至while循环遍历结束。

更优选地,所述线程通过开始处理接口Process()首次开始执行时,将创建默认的协程ApplicationThread对象,所述协程ApplicationThread对象的中断标识isInterrupted为false。

更优选地,所述应用程序入口OnProcess()包括Fork()接口,所述Fork()接口第一次执行时包括以下步骤:

新生成ForkMemory记忆对象,并登记到当前协程的记忆存储器中,此时ForkMemory记忆对象的isInitial标识为true;

根据传入的协程数在当前协程的基础上生成多个新协程,所述新协程与所述当前协程的记忆一致,且所述新协程的中断标识isInterrupted为false,并将所述新协程登记到应用的协程集合中,所述ForkMemory记忆对象中存储着当前协程以及所有新生成的协程的标识;

重入执行时,从原协程的记忆存储器中获得ForkMemory记忆对象,此时ForkMemory记忆对象的isInitial标识为false,不再新创建协程,且应用可以从ForkMemory记忆对象中获取先前生成的协程标识。

更优选地,所述应用程序入口OnProcess()还包括Merge()接口,应用将所述Fork()接口产生的协程标识集合传给所述Merge()接口,所述Merge()接口的执行步骤如下:

执行while循环遍历协程标识集合,当通过遍历获取到一个协程标识时,将此协程标识设为当前标识,如果遍历到的标识与当前协程的标识一样,while循环继续遍历下一个协程标识;如果遍历到的标识与当前协程的标识不一样,while循环将遍历应用的协程集合;

如果应用的协程集合中存在与当前标识一致的协程,将当前协程从协程集合中删除,并抛出中断异常;

如果应用的协程集合中不存在与当前标识一致的协程,所述Merge()接口正常返回true。

与现有技术相比,本发明的有益效果是:

1、本发明所述的一种基于中断重入机制的协程处理及管理方法使得开发者能够采用可维护性更好的同步阻塞编程模型开发有大量并行分支的调度程序,大幅降低了此类程序开发、维护的复杂度和工作量,且能够支持程序进行组件化开发。

2、本发明所述的一种基于中断重入机制的协程处理及管理方法使得让程序的同步阻塞设计运行时自动转换为异步回调/非阻塞方式执行,确保线程资源消耗被降到最低。

3、本发明所述的一种基于中断重入机制的协程处理及管理方法使得程序状态在线程终止时刻被自动持久化,在新线程接手继续执行时被自动恢复,由此获得可换机运行,宕机恢复后继续执行等高可用特性。

4、本发明所述的一种基于中断重入机制的协程处理及管理方法不受编程语言的约束,支持C、C++、Java等多种语言。

附图说明

图1是背景技术中所述的一种调度过程示意图。

图2是背景技术中所述的一种系统工作机制示意图。

图3是背景技术中所述的另一种系统工作机制示意图。

图4是背景技术中所述的另一种系统工作机制示意图。

图5是本发明所述的一种基于中断重入机制的协程处理方法的工作机制示意图。

图6是实施例二所述的协程案例。

图7是实施例二所述的协程案例运行过程。

图8是实施例二所述的协程案例运行过程。

图9是实施例二所述的协程案例运行过程。

图10是实施例二所述的协程案例运行过程。

图11本发明所述的一种基于中断重入机制的协程管理方法工作机制示意图。

图12本发明所述的一种基于中断重入机制的协程管理方法另一工作机制示意图。

具体实施方式

为了更好的理解本发明,下面结合附图和实施例进一步阐明本发明的内容,但本发明不仅仅局限于下面的实施例。

实施例一

一种基于中断重入机制的协程处理方法,协程包括主协程和子协程,中断重入步骤包括:

S1:主协程抛出中断请求,主协程的底层框架响应中断请求并异步发送任务请求;

S2:切换至外部系统处理逻辑,主协程释放线程资源;

S3:任务请求应答通过回调接口执行重入子协程,并恢复至中断时上下文状态;

S4:子协程继续处理后续逻辑,直到执行结束,释放线程资源;或者子协程作为新的主协程抛出中断请求,重复上述步骤S1至S4。

如图5所示,主协程执行完PartA动作抛出中断请求,底层框架捕获该中断后立即执行动作并释放出线程资源,而后在回调函数中使用新线程跳到先前的中断处,并且上下文恢复到先前中断时状态,取得异步非阻塞接口执行动作的结果信息,并继续后续处理。这一过程中,PartA动作存在重复执行。

由图5可知,重入会使得同一个动作被反复执行,而有些动作是不允许重做的,例如,给账户增加金额。因此,在某些场合,必须保证重入时执行的第N个动作,其输入输出均与重入前某次运行时执行的第N个动作的输入输出完全一致,且对外部环境(例如数据库、文件系统、本系统的环境信息、外部系统的环境信息和数据等)不会产生新的影响。

因此,步骤S1还包括,主协程首次抛出中断请求前,建立中断记忆对象并登记到记忆存储器中,记忆对象用于储存不可重复动作的执行结果,记忆存储器用于储存中断处上下文。步骤S3还包括,重入子协程后,子协程执行到中断处并将上下文恢复到中断时的状态,并将任务的应答结果登记到记忆对象。这里的上下文指的是这个过程中所有产生的记忆对象。

任务以批处理任务为例,主协程的底层框架通过调用CallBatch()接口执行批处理任务。

CallBatch()接口首次执行时包括以下步骤:

1、创建BatchCall记忆对象,并登记到记忆存储器中,此时BatchCall记忆对象的isInitial标识为true;

2、执行BatchCall记忆对象的Execute()接口抛出中断类型异常;

3、调用方捕获中断类型异常,并执行BatchCall记忆对象的ExecuteAfterInterruption()接口;

4、ExecuteAfterInterruption()接口调起BatchCall记忆对象的OnExecuteAfterInterruption()接口,异步发送批处理任务的请求并在内存中注册回调接口;

5、动作执行完后,BatchCall记忆对象的isInitial标识被置为false,程序执行结束,释放出线程资源。

批处理任务通过回调接口切换至主协程包括以下步骤:

1、底层框架使用新线程调起回调接口HandleResponse(),并将任务应答传给回调接口HandleResponse();

2、回调接口调起Resume()接口,Resume()接口则调起OnProcess()接口重新开始执行主协程处理逻辑;

3、主协程再次执行到CallBatch()接口时,查验记忆存储器中登记的BatchCall记忆对象,此时BatchCall记忆对象的isInitial标识为false,不再执行Execute()接口,将任务应答登记到BatchCall记忆对象中,使得后续可以并从BatchCall记忆对象中获取任务应答继续后续处理。

其中,任务应答内容存储于BatchCall记忆对象中。

实施例二

应用实施例一所述的一种基于中断重入机制的协程处理方法,运行如图6所示的协程。其中,标识符以F开头的圆框代表Fork动作(重入时不可重复执行),以M开头的圆框代表Merge动作(重入时可重复执行)。B、G、H为重入时不可重复执行的动作,A、C、E为重入时可重复执行的动作,D、F、I均为中断动作。

其代码实现如下:

其完整运行过程如下:

如图7所示,应用最初基于T1协程执行,执行F0动作Fork出T11、T12协程。基于T11协程执行时,执行F1动作Fork出T111协程,T1、T11和T12在中断前的动作,以及T111的全部动作都会在一个线程内完成。其中,T11中F0动作、T12中F0动作、T111中F0、B、F1动作在当前线程以及重入时均不再重复执行。

如图8所示,D动作应答回来基于T11协程重入,直到T11协程终止(或再次中断),注意这个处理过程运行在一个新的线程上。其中,T11中F0、B、F1、D动作在重入时均不再重复执行。

如图9所示,F动作应答回来基于T1协程重入,直到T1协程终止(或再次中断)。其中,T1中F0、F动作在重入时均不再重复执行。

如图10所示,I动作应答回来基于T12协程重入,直到T12协程终止(或再次中断)。其中,T12中F0、G、H、I动作在重入时均不再重复执行。

实施例三

如图11所示,一种基于中断重入机制的协程管理方法,由实施例一所述的协程组合为应用,协程包括协程标识、中断标识isInterrupted和记忆存储器,应用运行包括一次开始处理接口Process()和多次重入处理接口Resume()的调用,还包括应用程序入口OnProcess(),开始处理接口Process()和重入处理接口Resume()均通过调用应用程序入口OnProcess()执行处理逻辑,应用程序入口OnProcess()内包括实施例一所述的CallBatch()接口;在调用应用程序入口OnProcess()前通过循环遍历应用的协程集合调起应用程序入口OnProcess()。

循环遍历应用的协程集合包括以下步骤:

1、执行while循环遍历协程集合,当通过遍历获取到一个协程时,将此协程设为当前协程,查验当前协程的中断标识isInterrupted,若中断标识isInterrupted为true,表示处于中断状态,while循环继续遍历下一个协程;若中断标识isInterrupted为false,表示处于非中断状态,将当前协程的记忆存储器中所有记忆的isInitial标识置为false。

2、调用OnProcess()接口执行应用处理逻辑,应用执行中断时,while循环捕获中断并执行ExecuteAfterInterruption()接口并将当前协程的IsInterrupted标识置为true,执行结束,while循环继续遍历下一个协程;若应用从OnProcess()接口执行出来未抛出中断异常,当前协程执行结束,while循环从应用的协程集合中删除当前协程,while循环继续遍历下一个协程。其中,while循环捕获到中断异常后,发现这个中断异常对应的协程已经被删除,则不会再执行中断后处理逻辑接口ExecuteAfterInterruption(),而是继续遍历并执行下一个未中断协程。

3、重复上述步骤至while循环遍历结束。

线程通过开始处理接口Process()首次开始执行时,将创建默认的协程ApplicationThread对象,协程ApplicationThread对象的中断标识isInterrupted为false。应用开始执行时就是基于该协程执行的。

如图12所示,应用程序入口OnProcess()包括Fork()接口,通过调用底层框架提供的Fork()接口,应用就能产生多个协程从而实现多分支并行推进。Fork()接口第一次执行时包括以下步骤:

1、新生成ForkMemory记忆对象,并登记到当前协程的记忆存储器中,此时ForkMemory记忆对象的isInitial标识为true;

2、根据传入的协程数在当前协程的基础上生成多个新协程,新协程与当前协程的记忆一致,且新协程的中断标识isInterrupted为false,并将新协程登记到应用的协程集合中,ForkMemory记忆对象中存储着当前协程以及所有新生成的协程的标识;

3、重入执行时,从原协程中获得ForkMemory记忆对象,此时ForkMemory记忆对象的isInitial标识为false,不再新创建协程,且应用可以从ForkMemory记忆对象中获取先前生成的协程标识。

应用程序入口OnProcess()还包括Merge()接口,Merge()接口与Fork()接口相匹配,Fork()接口将产生的若干协程的标识传给Merge()接口;Merge()接口的执行步骤如下:

1、执行while循环遍历协程标识集合,当通过遍历获取到一个协程标识时,将此协程标识设为当前标识,如果遍历到的标识与当前协程的标识一样,while循环继续遍历下一个协程标识;如果遍历到的标识与当前协程的标识不一样,while循环将遍历应用的协程集合。这里的Merge()接口内部包括两层控制逻辑。

2、如果应用的协程集合中存在与当前标识一致的协程,则说明还有其他协程未执行结束,将当前协程从协程集合中删除,并抛出中断异常。

如果应用的协程集合中不存在与当前标识一致的协程,while循环继续遍历协程标识集合的下一个协程标识,如此往复,最终,如果应用的协程集合中除当前协程外的其他协程的标识都不存在于协程标识集合中,则说明所有协程都执行结束,Merge()接口正常返回true,应用就能在当前协程基础上继续执行后续逻辑,从而实现一个协程推进。

通过调用底层框架提供的Merge()接口,应用的多个协程(都执行到Merge点后)就可“合并”为一个协程从而实现一个分支推进。某个协程(非最后一个)执行到Merge点后仍存在其他未执行结束的协程时,应用将继续中断,同时底层框架清除该协程,而最后一个协程执行到Merge点后将继续后续处理。

应用运行一旦进入中断(释放出线程资源),底层框架就会将应用对象的状态序列化成可存储的格式(如XML报文、二进制数据…),并持久化到存储设备(如数据库、文件…)中,外部系统应答到来时,在新线程重入时,底层框架就从存储设备中取得持久化的应用状态并将其反序列化成应用对象进行处理。

基于状态持久化,应用就能很容易实现换机运行以及宕机恢复后继续执行——即在外部系统应答负载到任一可用的系统上、或者在系统宕机恢复后,外部系统应答到来时,均可通过一种应答消息处理应用使用新线程重入,底层框架继而从存储设备中获得应用状态并将其反序列化成应用对象完成后续处理。

实施例四

应用实施例一所述的一种基于中断重入机制的协程处理方法和实施例三所述的一种基于中断重入机制的协程管理方法,处理日间批量代发工资交易的案例。

交易处理流程包含3个步骤:文件读取和入库—>代发工资—>结果文件生成。其中文件中要处理的数据规模可能较大(几万笔到上百万笔不等)。为在高效利用线程资源的基础上提高交易处理性能,将这3个步骤编排在批量代发工资调度应用(基于调度系统执行)中,步骤2在执行时,我们以1000笔为单位将文件中的数据进行分块,每个块对应一个批量代发工资批处理任务(基于批处理系统执行),这样,就可在一个线程中利用协程管理框架以及CallBatch接口将所有批处理任务请求分发到批处理系统集群中去执行并中断释放出线程资源,批处理系统应答到来时再使用新线程重入处理,直到处理完成或再次中断。

代码示例如下:

对于本领域技术人员而言,显然本发明不限于上述示范性实施例的细节,而且在不背离本发明的精神或基本特征的情况下,能够以其他的具体形式实现本发明。因此,无论从哪一点来看,均应将实施例看作是示范性的,而且是非限制性的,本发明的范围由所附权利要求而不是上述说明限定,因此旨在将落在权利要求的等同要件的含义和范围内的所有变化囊括在本发明内。

去获取专利,查看全文>

相似文献

  • 专利
  • 中文文献
  • 外文文献
获取专利

客服邮箱:kefu@zhangqiaokeyan.com

京公网安备:11010802029741号 ICP备案号:京ICP备15016152号-6 六维联合信息科技 (北京) 有限公司©版权所有
  • 客服微信

  • 服务号