首页> 中国专利> 一种基于SOA大并发高性能工作流程服务器系统

一种基于SOA大并发高性能工作流程服务器系统

摘要

一种基于SOA大并发高性能工作流程服务器系统,由服务请求组包和分发模块、工作流数据准备模块、工作流服务执行模块、工作流处理结果缓存模块组成,其连接关系为:服务请求组包和分发模块,工作流服务执行模块,工作流处理结果缓存模块依序连接,工作流处理结果缓存模块经过数据库与工作流数据准备模块连接,工作流数据准备模块与工作流服务执行模块相连。本发明充分利用并发技术,阵列多处理器,创新的技术来实现高性能和大并发工作流服务器,大大降低了对服务器硬件自身性能的依赖性,减少了处理响应时间和等待周期。

著录项

法律信息

  • 法律状态公告日

    法律状态信息

    法律状态

  • 2016-07-06

    授权

    授权

  • 2015-02-04

    实质审查的生效 IPC(主分类):G06F9/44 申请日:20140923

    实质审查的生效

  • 2015-01-07

    公开

    公开

说明书

技术领域

本发明涉及面向服务架构(SOA)的大并发高性能工作流服务器技术,实现并行工作流程引擎,超大并发极限用户规模应用场景,并可以持续不停机的扩展服务器能力。

背景技术

企业级的工作流程服务系统,在高并发场景下,依赖于数据库的性能,这导致超大规模用户应用场景中,处理响应时间和等待周期都相当长。对于有对响应要求较高的企业应用业务场景,比如缴费,客户服务等,加快系统处理流程,减少排队时间,提高用户满意度至关重要。传统工作流系统和工作流引擎纯粹依赖软件优化,依赖于服务器硬件自身性能,扩展和提高非常有限。本发明充分利用并发技术,阵列多处理器,创新的技术来实现高性能和大并发工作流服务器,填补此项空白。

发明内容

传统服务器采用请求队列阻塞方式,多线程的处理请求,这样讲导致在多个任务切换的过程中,中处理器将从阻塞状态恢复到运行状态。这种切换虽然是纳秒级别的,当并发到10000个请求连接,也将使得操作系统在内核和用户态之间频繁切换,大大降低了处理器能够并发的上限响应能力。

本发明采用下列技术方案来实现。

一种基于SOA大并发高性能工作流程服务器系统,由服务请求组包和分发模块,工作流数据准备模块,工作流服务执行模块,工作流处理结果缓存模块组成,其连接关系为:服务请求组包和分发模块,工作流服务执行模块,工作流处理结果缓存模块依序连接,工作流处理结果缓存模块经过数据库与工作流数据准备模块连接,工作流数据准备模块与工作流服务执行模块相连;其中:

所述的服务请求组包和分发模块使用C语言在多个高速网络接口计算机或者路由器上编写,充分利用带宽和处理器缓存,容纳大量的请求,并使用非阻塞的组包处理方式,所有请求都在RAM中处理完毕;

工作流服务执行模块使用一个主处理的硬件CPU,配合多个核心的通用并行处理硬件组GPU,包括以下操作:

1)主处理器负责将已经就绪的流程依赖的数据对齐组装;

2)按照请求包的流程关系对应好并行处理器GPU;

3)控制GPU执行,多核心的并行可扩张GPU,保证执行速度高于数据库存储速度;

4)将返回结果整理插入到哈希表中。

数据对齐组装程序结构如下:

① 数据准备模块来的聚合数据,将根据GPU不同要求,进行对齐组装,以取得较高的GPU执行率,对齐操作根据操作系统和内存边界的要求进行,alignmentReadyBuffer函数完成对齐检查;

② 请求包的流程关系由缓存流程定义的workFlowDefHash中获得,并作为GPU执行kernel的参数传递;

③ GPU控制执行程序调用GPU,并将返回结果缓存到哈希表workFlowResultHash,以提供工作流处理结果缓存模块做数据缓存和持久化。

CPU部分的代码使用C编写,GPU并行处理部分的虚拟机代码使用OPENCL编写。

此模块程序结构和源代码摘要如下:

初始化GPU

#include "CL\cl.h"

#include <stdio.h>

#include <stdlib.h>

#include <malloc.h>

int InitOpenCL( const char *program_source,

                cl_uint* alignment,

                 const unsigned int map_size,

                 unsigned int vector_width,

                 const unsigned int gpu_algo,

                 const unsigned int platform_num,

                 const unsigned int dev_num);

int InitOpenCL(const char *program_source, cl_uint* alignment,

                 const unsigned int map_size,

                 const unsigned int gpu_algo,

                 const unsigned int platform_num, const unsigned int dev_num)

{

    g_algo = AUTO;

    switch(gpu_algo){

    case 0:

        g_algo = WORK_FLOW_START;

        break;

    case 1:

        g_algo = WORK_FLOW_TRANSFER;

        break;

    case 2:

        g_algo = WORK_FLOW_SPAWN;

        break;

    case 3:

        g_algo = WORK_FLOW_END;

        break;

    }

    g_group_size = 4;

    g_device_num = dev_num;

    cl_device_id devices[MAX_GPU_NUM];

    //cl_uint size_ret = 0;

    cl_int err;

  cl_platform_id ocl_platform_id = GetOCLPlatform(platform_num/*dev_num*/);

    if( ocl_platform_id == NULL ){

        printf("ERROR: Failed to find available OpenCL platform.\n");

        return -1;

    }

    char ocl_version_info[256] = { 0 };

    err = clGetPlatformInfo(ocl_platform_id, CL_PLATFORM_VERSION,

                                sizeof(ocl_version_info)-1, ocl_version_info, NULL);

    if ( err != CL_SUCCESS ) {

        printf("ERROR[%d]: Failed to retreive platform %d's Opencl version info.\n", err, platform_num);

        return -1;

    }

    else{

        printf("Opencl version of platform %d: %s\n", platform_num, ocl_version_info);

    }

    cl_uint             numDevices = 0;

    cl_uint             numGPUDevices = 0;

    clGetDeviceIDs(ocl_platform_id, CL_DEVICE_TYPE_GPU, 0, NULL, &numDevices);

    if(numDevices == 0)    //no GPU available.

    {

        puts("Error: No any GPU devices available in platform! \n");

        return 1;

    }

    else{

        err = clGetDeviceIDs(ocl_platform_id, CL_DEVICE_TYPE_GPU, numDevices,

                             devices, &numGPUDevices);

        if (err != CL_SUCCESS) {

            printf("ERROR[%d]: Failed to get GPU device's ids . (%s) \n",

               err, getclErrString(err));

        return -1;

       }

       printf("The MAX number of available GPU devices is: %u\n", numGPUDevices);

    }

载入并行的CL代码

    char *sources = NULL;

    bool ret = false; //load bin

    if(!ret){

        char *sources = NULL;

        sources = ReadSources(program_source);

        if( NULL == sources ){

            printf("ERROR: Failed to read sources into memory ...\n");

            Cleanup_OpenCL();

            return -1;

        }

        g_program = clCreateProgramWithSource(g_context, 1, (const char**)&sources, NULL, &err);

        if (CL_SUCCESS != err)

        {

            printf("ERROR[%d]: Failed to create Program with source...\n", err);

            Cleanup_OpenCL();

            free(sources);

            return -1;

        }

            err = clBuildProgram(g_program, 1, &devices[dev_num], CompilerOptions, NULL, NULL);

            if (err != CL_SUCCESS){

                printf("ERROR[%d]: Failed to build program...\n", err);

                BuildFailLog(g_program, devices[dev_num]);

                Cleanup_OpenCL();

                free(sources);

                return -1;

            }

  存储编译好的kernel文件

        char **binaries = (char **)malloc( sizeof(char *) * 1 ); //只有一个设备

        size_t *binarySizes = (size_t*)malloc( sizeof(size_t) * 1 );

        err = clGetProgramInfo(g_program,

            CL_PROGRAM_BINARY_SIZES,

            sizeof(size_t) * 1,

            binarySizes, NULL);

        if (CL_SUCCESS != err){

            printf("WARNING[%d]: Failed to get program bin size ...(%s)\n",

                   err, getclErrString(err));

        }

        else{

            binaries[0] = (char *)malloc( sizeof(unsigned char) * binarySizes[0]);

            err = clGetProgramInfo(g_program,

                CL_PROGRAM_BINARIES,

                sizeof(char *) * 1,

                binaries,

                NULL);

            if (CL_SUCCESS != err){

                printf("WARNING[%d]: Failed to get program binaray ...(%s)\n",

                       err, getclErrString(err));

            }

            else{

                //kernelFile.writeBinaryToFile(binfilename, binaries[0], binarySizes[0]);

            }

        }

    }

创建工作流并行内核

    g_workflow_kernel = clCreateKernel(g_program, "workflow1", NULL);

    if (g_workflow_kernel == (cl_kernel)0)

    {

        printf("ERROR: Failed to create kernel momentum ...\n");

        Cleanup_OpenCL();

        free(sources);

        return -1;

    }

    g_match_kernel = clCreateKernel(g_program, "workflow2", NULL);

    if (g_match_kernel == (cl_kernel)0)

    {

        printf("ERROR: Failed to create kernel match...\n");

        Cleanup_OpenCL();

        free(sources);

        return -1;

    }

    free(sources);

    return 0; // success...

}

初始化结果哈希

ret = CreateHashTab(nWorkflowInstances, &ht);

if(ret)

检查已经准备好数据的流程实例

int iReadyInst = GetReadyInstInfo(...);

for(i=0; i<iReadyInst; i++){

if( insts[i].isReady){

//对齐数据到输入缓冲

inputBuffer[buf_cnt++]=unpackReq(insts[i].reqFrame);

}

}

控制GPU执行工作流引擎

bool ExecuteWorkflowKernel(cl_long** inputReadyBuffer, const int param_buffer)

{

    cl_int err = CL_SUCCESS;

    if(g_offset==NULL){

        g_offset = clCreateBuffer(g_context, CL_MEM_READ_ONLY | CL_MEM_COPY_HOST_PTR, sizeof(uint32),

                              (void*)&param_buffer, &err);

    }

    if (CL_SUCCESS != err){

        printf("ERROR[%d]: Failed to create input param buffer size(0x%x)Bytes, (%s)\n",

               err, sizeof(uint32), getclErrString(err));

        return false;

    }

    err = clSetKernelArg(g_workflow_kernel, 0, sizeof(cl_mem), (void *) &g_param);

    if (err != CL_SUCCESS){

        printf("ERROR[%d]: Failed to set  kernel arguments. (%s)\n",

                err, getclErrString(err));

        return false;

    }

    err = clSetKernelArg(g_workflow_kernel, 1, sizeof(cl_mem), (void *) &g_inputBuffer);

    if (err != CL_SUCCESS)

    {

        printf("ERROR[%d]: Failed to set input buffer 1 kernel arguments. (%s) \n",

               err, getclErrString(err));

        return false;

    }

    err = clSetKernelArg(g_workflow_kernel, 3, sizeof(cl_mem), (void *) &g_offset);

    if (err != CL_SUCCESS){

        printf("ERROR: Failed to create kernel. \n"

                /*err, getclErrString(err)*/);

        return false;

    }

工作流数据准备模块,完成对所有流程定义的数据预取和缓存,流程定义和流程状态数据相对都是稳定的,为了提速处理过程,这些数据只有在发生改变的时候才从数据库读取,所有工做流依赖的数据将缓存在哈希表中,保证工作流服务执行模块随机访问的速度;包括以下操作:

1)对未就绪的数据加锁,将所有未在哈希表中的请求流程的定义数据读取并存储到哈希表;

2)将有外界依赖的数据单独抽取处理;

3)将已经就绪的数据解锁;

4)将超时的数据和请求返回客户端。

工作流数据准备模块程序结构如下建立:

① 大量流程实例信息对应的数据使用原子变量加锁标记,instReadyLockMap保证CPU在访问时,机器指令的原子性,即在一个指令周期中不会中断;

② 加锁后的实例流程定义信息首先从缓存哈希表中查找,此哈希表workFlowDefHash将不断缓存来自于数据库的流程定义,此处缓存是异步机制,在数据库的流程定义发生变更之后,流程定义的服务将发出异步通知,刷新缓存;

③ 数据准备程序启动多个线程并发对需要准备数据的流程定义进行解析,需要外部数据的流程实例则访问外部文件,外部系统或者数据库,将可能阻塞此线程。多数准备好流程流转数据的线程将对instReadyLockMap对应的实例解锁;

④ 准备好的数据将批量被采集聚合,以备流程执行模块调用。对于超时的线程,将异常通知会客户端,并重置流程实例状态,释放锁资源。

工作流处理结果缓存模块,完成将工作流结果缓存在高速的SSD整列中,当处理器空闲的时候,将数据解包,按照不同流程实例回写到数据库,并将请求结果传递会客户端;包括以下操作:

1)对流程执行结果以包的形式批量写入SSD固态硬盘存储;

2)判断主处理器是否已经调度完GPU,空闲后将流程结果解包;

3)判断主处理器是否空闲,将流程结果提交到数据库;

4)提交成功,将流程结果返回到客户端;

5)将超时的数据和请求返回客户端。

工作流处理结果缓存模块的程序结构如下:

① 流程结果在GPU处理完成后也是聚合状态的,直接批量写入带有SSD磁盘整列,其数据带宽高达2000MB/s以上;

② 主处理器如果已经存在空闲调度,派生新的工作线程,将处理结果陆续读回到Ram,对结果进行解包,提交到数据库;

③ 对于超时的线程,将异常通知会客户端,并重置流程实例状态。

本发明的服务器采用以下技术来提升系统并发极限:

1.非阻塞机制的处理服务并发请求

为了达到系统IO最大吞吐量,非阻塞式的调度对请求进行处理,工作流引擎的独立虚拟机运行机制,保证流程的每一步处理都是非阻塞式的,每个处理都即刻返回,如图1。

通过这种机制调度,每个物理CPU核心仅仅使用一个进程处理请求,不再需要传统CPU从多个线程切换,对CPU的利用率将更高,同等性能处理器的吞吐量将会提高。

2.聚合请求的网络请求传递分发处理机制

对同类请求进行聚合,整包分发,提高网络的利用率,请求队列将是整包多个请求为单位,不再单个处理,提升处理器缓存的命中率如图2。

通过这种机制,以连续打包传递网络数据,充分利用网络带宽,同等带宽下能承载的数据请求数量增加,不同于传统请求负载均衡机制,每类请求使用专用的计算机服务程序进行批量处理解决。

3. 服务程序的并行处理机制

通过并行程序语言编制服务处理程序,服务处理程序性能一个专用的虚拟机,并运行在多核心硬件上,如专用集成电路。虽然单个请求服务程序的处理速度下降,但并发数量大大增加。

4. 缓存数据和推迟存储的机制

并行程序处理前,将同一类流程的定义数据、依赖数据都一次性加载,处理完成请求后,第一级缓存,将批量的大块数据直接写入存储器,如磁盘。这样将连续数据读写速度将非常快。

缓存程序将在IO使用空闲率降低到一定程度之后(根据用户需要服务质量设置),延迟将服务请求的结果写入数据库。

本发明的有益效果:本发明充分利用并发技术,阵列多处理器,创新的技术来实现高性能和大并发工作流服务器,大大降低了对服务器硬件自身性能的依赖性,减少了处理响应时间和等待周期。

附图说明

图1是本发明的结构示意图;

图2是本发明非阻塞机制的处理服务并发请求示意图;

图3 是本发明聚合请求的网络分发到服务专用的节点示意图。

具体实施方式

一种基于SOA大并发高性能工作流程服务器系统,由服务请求组包和分发模块,工作流数据准备模块,工作流服务执行模块,工作流处理结果缓存模块组成,其连接关系为:服务请求组包和分发模块,工作流服务执行模块,工作流处理结果缓存模块依序连接,工作流处理结果缓存模块经过数据库与工作流数据准备模块连接,工作流数据准备模块与工作流服务执行模块相连;其中:

所述的服务请求组包和分发模块使用C语言在多个高速网络接口计算机或者路由器上编写,充分利用带宽和处理器缓存,容纳大量的请求,并使用非阻塞的组包处理方式,所有请求都在RAM中处理完毕;

工作流服务执行模块使用一个主处理的硬件CPU,配合多个核心的通用并行处理硬件组GPU,包括以下操作:

1)主处理器负责将已经就绪的流程依赖的数据对齐组装;

2)按照请求包的流程关系对应好并行处理器GPU;

3)控制GPU执行,多核心的并行可扩张GPU,保证执行速度高于数据库存储速度;

4)将返回结果整理插入到哈希表中;

CPU部分的代码使用C编写,GPU并行处理部分的虚拟机代码使用OPENCL编写;

工作流数据准备模块,完成对所有流程定义的数据预取和缓存,流程定义和流程状态数据相对都是稳定的,为了提速处理过程,这些数据只有在发生改变的时候才从数据库读取,所有工做流依赖的数据将缓存在哈希表中,保证工作流服务执行模块随机访问的速度;包括以下操作:

1)对未就绪的数据加锁,将所有未在哈希表中的请求流程的定义数据读取并存储到哈希表;

2)将有外界依赖的数据单独抽取处理;

3)将已经就绪的数据解锁;

4)将超时的数据和请求返回客户端;

工作流处理结果缓存模块,完成将工作流结果缓存在高速的SSD整列中,当处理器空闲的时候,将数据解包,按照不同流程实例回写到数据库,并将请求结果传递会客户端;包括以下操作:

1)对流程执行结果以包的形式批量写入SSD固态硬盘存储;

2)判断主处理器是否已经调度完GPU,空闲后将流程结果解包;

3)判断主处理器是否空闲,将流程结果提交到数据库;

4)提交成功,将流程结果返回到客户端;

5)将超时的数据和请求返回客户端。

去获取专利,查看全文>

相似文献

  • 专利
  • 中文文献
  • 外文文献
获取专利

客服邮箱:kefu@zhangqiaokeyan.com

京公网安备:11010802029741号 ICP备案号:京ICP备15016152号-6 六维联合信息科技 (北京) 有限公司©版权所有
  • 客服微信

  • 服务号