首页> 中国专利> 一种GPU上数据流处理系统及其数据流处理方法

一种GPU上数据流处理系统及其数据流处理方法

摘要

本发明公开了一种GPU上数据流处理系统及其数据流处理方法,属于GPU上数据流处理的技术领域,一种GPU上数据流处理系统,数据源的数据流通过数据流处理系统至客户端,数据流处理系统包括CPU主机和GPU设备;CPU主机包括CPU端加载引擎模块、CPU端缓冲模块、数据流预处理模块、数据流减负模块和可视化模块;GPU设备包括GPU端加载引擎模块、GPU端缓冲模块、数据流概要抽取模块、数据流处理模型库和数据流处理模块;CPU端加载引擎模块的装载或存储单元通过互联网络与数据源、GPU端加载引擎模块的装载或存储单元以及客户端的交互。本发明具有显著的速度优势,很好地满足了高维数据流的实时性需求,可以作为通用的分析方法广泛应用于高维数据流挖掘领域。

著录项

  • 公开/公告号CN104317751A

    专利类型发明专利

  • 公开/公告日2015-01-28

    原文格式PDF

  • 申请/专利权人 浪潮电子信息产业股份有限公司;

    申请/专利号CN201410657243.2

  • 发明设计人 卢晓伟;沈铂;周勇;

    申请日2014-11-18

  • 分类号G06F13/20(20060101);

  • 代理机构37100 济南信达专利事务所有限公司;

  • 代理人姜明

  • 地址 250101 山东省济南市高新区舜雅路1036号

  • 入库时间 2023-12-17 04:14:53

法律信息

  • 法律状态公告日

    法律状态信息

    法律状态

  • 2017-03-01

    授权

    授权

  • 2017-02-08

    专利申请权的转移 IPC(主分类):G06F13/20 登记生效日:20170116 变更前: 变更后: 申请日:20141118

    专利申请权、专利权的转移

  • 2015-02-25

    实质审查的生效 IPC(主分类):G06F13/20 申请日:20141118

    实质审查的生效

  • 2015-01-28

    公开

    公开

说明书

技术领域

本发明涉及一种GPU上数据流处理的技术领域,具体地说是一种GPU上数据流处理系统及其数据流处理方法。 

背景技术

GPU(Graphic Processing Unit),中文翻译为“图形处理器”。GPU是显卡的“心脏”,也就相当于CPU在电脑中的作用。GPU具有相当高的内存带宽,以及大量的执行单元,它可帮助CPU进行一些复杂的计算工作,使显卡减少了对CPU的依赖。 

传统上,GPU的应用被局限于处理图形渲染计算任务,无疑是对计算资源的极大浪费。随着GPU可编程性的不断提高,利用GPU完成通用计算的研究渐渐活跃起来。将GPU用于图形渲染以外领域的计算成为GPGPU(General-purpose computing on graphics processing units,基于GPU的通用计算)。GPGPU计算通常采用CPU+GPU异构模式,由CPU负责执行复杂逻辑处理和事务管理等不适合数据并行的计算,由GPU负责计算密集型的大规模数据并行计算。这种利用GPU强大处理能力和高带宽弥补CPU性能不足的计算方式在发掘计算机潜在的性能,在成本和性价比方面有显著优势。但是传统的GPGPU受硬件可编程和开发方式的制约,应用领域受到了限制,开发难度也很大。 

2007年,由NVIDIA推出的CUDA(Compute Unified Device Architecture,统一计算设备架构), 这一编程接口弥补了传统GPGPU的不足。利用CUDA编程接口, 可以用C语言直接调用GPU资源, 而无需将其映射到图形API, 为GPU的非图形编程普及消除了障碍。 

CUDA模型将CPU作为主机(Host),GPU作为协处理器(co-processor)或设备(device),两者协同工作。CPU负责进行逻辑性强的事物处理和串行计算,GPU则专注于执行高度线程化的并行处理任务。CPU、GPU各自拥有相互独立的存储器地址空间:主机端内存和设备端显存。一旦确定了程序中的并行计算函数(kernel),就考虑把这部分计算交给GPU。 

(数据流的定义)数据流实际上就是连续移动的元素队伍,其中的元素是由相关数据的集合组成。令t表示任一时间戳,at表示在该时间戳到达的数据,流数据可以表示成{…,at1,at,at+1,…}.区别于传统应用模型,流数据模型具有以下4点共性:(1)数据实时到达;(2)数据到达次序独立,不受应用系统所控制;(3)数据规模宏大且不能预知其最大值;(4)数据一经处理,除非特意保存,否则不能被再次取出处理,或者再次提取数据代价昂贵。 

同时,流是以双重身份出现的:(1)作为一个软件可见的程序变量存在。(2)作为一个硬件可见的管理单位存在。实际应用中流往往具有很多属性,当流被映射到硬件中时,这些属性仍然被保持或者变个形式被硬件所见。 

现有技术的数据挖掘中,为了消除数据中的噪音、空值和异常值等错误数据,以保证结果的准确性,通常会在挖掘数据库中的静态数据集之前进行预处理操作;当然,数据流中也避免不了各种错误数据,为了提高挖掘结果的精确度,对其进行预处理也是十分必要的。然而数据流挖掘一般都是在线进行的,无法在挖掘前预处理数据。 

GPU并行计算如何在数据流挖掘领域应用?在计算资源受限的环境下,如何保证数据流处理的实时性和通用性。 

发明内容

本发明的技术任务是提供一种具有显著的速度优势,很好地满足了高维数据流的实时性需求,可以作为通用的分析方法广泛应用于高维数据流挖掘领域的一种GPU上数据流处理系统及其数据流处理方法。 

本发明的技术任务是按以下方式实现的, 

数据源输出的是高维的时间序列数据流,经数据流处理系统处理后,再输出给客户端的是数据流的频繁模式或查询结果。

一种GPU上数据流处理系统,数据源(data sources)的数据流通过数据流处理系统至客户端(client),数据流处理系统包括CPU主机(CPU-Host)和GPU设备(GPU-Device); 

CPU主机包括CPU端加载引擎模块(CPU-Side Load Engine Area)、CPU端缓冲模块(CPU-Side Buffer Area)、数据流预处理模块(Data Stream Preprocessing Area)、数据流减负模块(Data Stream Load Shedding Area)和可视化模块(Visual Area),CPU端加载引擎模块设置有装载或存储单元(Load/Store Unit),CPU端缓冲模块设置有内存(Main Memory, MM),CPU端加载引擎模块的装载或存储单元、数据流预处理模块、数据流减负模块和可视化模块均与CPU端缓冲模块的内存连接交互,CPU端加载引擎模块的装载或存储单元与可视化模块连接交互;

GPU设备包括GPU端加载引擎模块(GPU-Side Load Engine Area)、GPU端缓冲模块(GPU-Side Buffer Area)、数据流概要抽取模块(Data Stream Synopsis Extraction Area)、数据流处理模型库(Data Stream Processing Model Library)和数据流处理模块(Data Stream Processing Area),GPU端加载引擎模块设置有装载或存储单元(Load/Store Unit),GPU端缓冲模块设置有显存(Device Memory, DM),数据流概要抽取模块用于集成概要抽取方法供数据流处理模块调用,数据流处理模型库用于集成数据流处理算法供数据流处理模块调用,GPU端加载引擎模块的装载或存储单元、数据流处理模块均与GPU端缓冲模块的显存连接交互,数据流概要抽取模块、数据流处理模型库均与GPU端加载引擎模块的装载或存储单元连接,GPU端缓冲模块的显存中开辟有存储空间为滑动窗口;

CPU端加载引擎模块的装载或存储单元通过互联网络(Interconnection Network)与数据源、GPU端加载引擎模块的装载或存储单元以及客户端的交互。

CPU端缓冲模块还设置有用于管理内存的内存管理器,内存管理器内设置有输入监测器,输入监测器用于监视内存中临时存储未处理的数据流;CPU端加载引擎模块包括速度调节器(Speed Regulator)、装载或存储单元(Load/Store Unit)和初始化集成器(Initialization Integrator);速度调节器用于根据内存的缓存状态调整数据源的数据流流入CPU端加载引擎模块的装载或存储单元内的流速,速度调节器内设有反馈机制(Feedback Mechanism);初始化集成器用于集成CPU主机和GPU设备的初始化操作。 

一种GPU上数据流处理方法,数据源输出的数据流通过数据流处理系统处理后将数据结果传输至客户端;数据流的处理流程如下: 

(1)、加载数据流:数据源中的数据流流入CPU端加载引擎模块的装载或存储单元,由CPU端加载引擎模块的装载或存储单元将数据流存储到CPU端缓冲模块的内存中;

(2)、数据流预处理:数据流预处理模块将内存中的原始数据流进行预处理,并把预处理过的数据流存入内存;

(3)、传输数据流:预处理过的数据流由内存至CPU端加载引擎模块的装载或存储单元,由CPU端加载引擎模块的装载或存储单元至互联网络,经互联网络到达GPU端加载引擎模块的装载或存储单元,再由GPU端加载引擎模块的装载或存储单元将其加载到显存的滑动窗口中;

(4)、数据流概要抽取:由数据流处理模块调用数据流概要抽取模块中的概要抽取方法,对滑动窗口中的数据流进行概要抽取,并将最终形成的概要数据结构存储到显存中;

(5)、数据流处理:由数据流处理模块调用数据流处理模型库中的数据流处理算法对概要数据进行处理,并把处理的数据结果存储到显存中;

(6)、传输数据结果:数据结果由GPU端缓冲模块的显存至GPU端加载引擎模块的装载或存储单元,由GPU端加载引擎模块的装载或存储单元发送至互联网络,经互联网络到达CPU端加载引擎模块的装载或存储单元,再由CPU端加载引擎模块的装载或存储单元将数据结果加载到内存,或者是由CPU端加载引擎模块的装载或存储单元将数据结果发送给可视化模块;

(7)、结果可视化:可视化模块将数据结果规范化之后发送给CPU端加载引擎模块的装载或存储单元,由CPU端加载引擎模块的装载或存储单元将数据结果展示给客户端。

一种GPU上数据流处理方法,步骤(2)中,数据流预处理模块将内存中的原始数据流使用预处理方法进行预处理,预处理方法包括数据清洗方法、数据集成方法、数据变换方法和数据规约方法,使用的预处理方法可以为上述的一种方法,或者选择多种方法结合使用。 

㈠、数据清洗方法:填补缺失值、平滑噪声数据、去除异常值和解决数据不一致问题;数据清洗是数据预处理中非常重要的过程,但也是最耗时间的过程;缺失值、噪声和不一致性都将导致数据不准确,而数据清洗可以有效地避免这种情况; 

㈡、数据集成方法:模式集成和对象匹配、去除冗余数据、数据值冲突的检测与处理;数据集成是指把原本存储在多个数据源中的数据集成起来,形成一个数据源,并以某种统一的格式进行集中存储,以方便后续的数据处理工作;

㈢、数据变换方法:数据平滑、数据聚集、数据泛化、数据规范化和属性构造;数据变换是将数据转换成一种适合于数据挖掘的形式;比如数据项间的维度可能不一致,这时就需要消减高维数据项的维度,以减少它们之间的差异,方便处理;

㈣、数据规约方法:数据立方体聚集、属性子集选择、维度归约、数值归约、离散化和概念分层;数据规约又称为数据消减技术。 

一种GPU上数据流处理方法,步骤(2)后,当数据流超负荷时,经过数据流减负模块对数据流进行减负处理,具体步骤为: 

Ⅰ、CPU端缓冲模块的内存管理器的输入监测器监视内存中临时存储未处理的数据流,并决定在一个时间单位内,新来数据流的体积是否超过了GPU设备的数据流处理模块的处理能力;

Ⅱ、若GPU设备的数据流处理模块能够处理所有的数据流,则进行传输数据流;若新来数据流的体积超过了GPU设备的数据流处理模块的处理能力,即数据流超负荷,则将数据流转入数据流减负模块;

Ⅲ、数据流减负模块对数据流进行减负处理;

Ⅳ、数据流减负模块将剩余数据流转入内存中,进行下一个步骤的传输数据流。

一种GPU上数据流处理方法,数据流减负模块对数据流进行减负处理采用如下策略的一种或多种结合: 

ⅰ、基于数据的丢弃:在接收到的和未处理的时间序列数据流中的数据,找到长度最长的数据,并把它们丢弃;

ⅱ、基于属性的修整:数据流中每个数据都有个属性,在数据中,将拥有最低频数的属性除去,以此对数据的属性修整;

ⅲ、基于优先级的丢弃:每个新到的数据流被分配一个优先级,在接收到的和未处理的数据流中的数据,选择那些拥有最低优先级的数据并丢弃它们。三种策略各有各的目的;基于数据的丢弃策略,丢弃了那些系统需要花费很多时间去处理的长数据,试图尽可能快的减少系统负载,它是面向效率的;基于属性的修整的策略,从数据中删除了那些对处理结果影响不显著的、最不频发的属性;基于优先级的丢弃的策略,从数据中删除了那些拥有最低优先级的数据;相比而言,后面这两个策略是面向精度的,因为它们试图保留高精度的挖掘结果。

一种GPU上数据流处理方法,显存使用全局存储器(Global Memory)去存储各种数据(如概要数据、中间数据、结果数据等),显存中开辟的滑动窗口(Sliding Window, SW)用于把从CPU主机到达GPU设备的数据流保存起来(由于数据流无限性和存储空间的有限性,为了减少内存和显存间的数据拷贝,获得更有效的挖掘结果,所以在显存中划分出一部分空间作为滑动窗口暂存数据),滑动窗口使用基于元组个数定义的滑动窗口,即是窗口大小固定的滑动窗口,用于保存最近到达的K个数据流; 

滑动窗口的数据流的处理方法使用可重写循环的滑动窗口方法,更新时将新数据直接覆盖了要过期的数据,并提供了格局变换函数来维护滑动窗口中数据的逻辑格局状态。

一种GPU上数据流处理方法,步骤(4)的数据流概要抽取,将数据流通过概要抽取方法获得概要数据结构,概要抽取方法包括抽样(Sampling)方法,小波(Wavelet)方法,略图(Sketch)方法和直方图(Histogram)方法。对数据流进行压缩,构造一个比整个数据流的数据规模小得多的数据结构来保存数据流的主要特征,称之为概要数据结构,通过概要数据结构所获得的近似值是在用户可接受范围之内的。 

一种GPU上数据流处理方法,数据流处理模型库集成了数据流处理时的各种数据流处理算法,包括查询处理算法(query processing algorithma)、聚类算法(clustering algorithma)、分类算法(classification algorithma)、频繁项集挖掘算法(frequent itemsets mining algorithma)和多条数据流间的相关性分析算法(correlation analysis algorithma)。 

一种GPU上数据流处理方法,数据流处理模块的任务是调用数据流概要抽取模块的概要抽取方法对数据流进行概要抽取,以及调用数据流处理模型库中的数据流处理算法对概要数据进行并行计算;数据流处理模块包括数据流输入装配器(Data Stream Input Assembler)、全局线程块调度器(Global Block Scheduler)和计算阵列(Compute Array),计算阵列内设置有共享存储器、装载或存储单元;数据流输入装配器负责把显存中的数据读入到数据流处理模块的共享存储器中,全局线程块调度器负责对共享存储器中的线程块、线程和指令进行调度分配管理,计算阵列用于线程的计算;计算阵列中的装载或存储单元在计算时,从显存中加载数据到共享存储器。 

本发明的一种GPU上数据流处理系统及其数据流处理方法具有以下优点: 

1、通用性:以往的使用GPU加速的数据流处理系统仅仅局限于数据流处理时的某一种任务,要么是聚类,要么是分类或者其它;然而,本发明的数据流处理系统,适合于各个应用领域的多条高维时间序列数据流,它涵盖了数据流的预处理、减负、概要抽取和挖掘处理等多项功能,GPU设备部分包含的数据流处理模型库集成了各种数据流处理算法,如查询处理算法、聚类算法、分类算法、频繁项集挖掘算法、相关性分析算法,能完成数据流处理时的多项任务,从而为本发明赋予了通用性;

2、高效性:本发明的概要抽取方法以及所有数据流处理算法的并行部分都使用GPU进行加速,充分利用了GPU强大的处理能力和流水线特性,进一步提高了执行效率;

3、额外I/O开销的控制:数据流处理系统中把滑动窗口开辟在了显存中,这样在进行数据流概要抽取时就避免了数据在内存和显存间的频繁拷贝;另外,在进行概要抽取和数据流处理时,使用了显存,大大减少了从内存中读写数据的次数;

4、因为初始的数据流量过大,如果直接将数据流传送到GPU设备中进行数据流预处理,会大大增加I/O开销,所以本发明把数据流预处理过程设计在了CPU主机的数据流预处理模块中,即进行了数据流预处理消除数据中的噪音、空值和异常值等错误数据,又减少了I/O开销;

5、现有技术中,在滑动窗口未满阶段,新数据直接填充窗口,在窗口已满阶段,随着窗口的滑动,新数据进入窗口内将导致其它已在窗口内的数据前移,覆盖前面的数据;然而,本发明采用的可重写循环的滑动窗口方法,在滑动窗口已满阶段并不需要移动数据,它是将新数据直接覆盖(重写)了要过期的数据,并提供了格局变换函数来维护滑动窗口中数据的逻辑格局状态,节省了大量时间。

附图说明

    下面结合附图对本发明进一步说明。 

附图1为一种GPU上数据流处理系统的框架图。 

具体实施方式

  参照说明书附图和具体实施例对本发明的一种GPU上数据流处理系统及其数据流处理方法作以下详细地说明。 

实施例1: 

本发明的一种GPU上数据流处理系统,数据源(data sources)的数据流通过数据流处理系统至客户端(client),数据流处理系统包括CPU主机(CPU-Host)和GPU设备(GPU-Device);

CPU主机包括CPU端加载引擎模块(CPU-Side Load Engine Area)、CPU端缓冲模块(CPU-Side Buffer Area)、数据流预处理模块(Data Stream Preprocessing Area)、数据流减负模块(Data Stream Load Shedding Area)和可视化模块(Visual Area),CPU端加载引擎模块设置有装载或存储单元(Load/Store Unit),CPU端缓冲模块设置有内存(Main Memory, MM),CPU端加载引擎模块的装载或存储单元、数据流预处理模块、数据流减负模块和可视化模块均与CPU端缓冲模块的内存连接交互,CPU端加载引擎模块的装载或存储单元与可视化模块连接交互;

GPU设备包括GPU端加载引擎模块(GPU-Side Load Engine Area)、GPU端缓冲模块(GPU-Side Buffer Area)、数据流概要抽取模块(Data Stream Synopsis Extraction Area)、数据流处理模型库(Data Stream Processing Model Library)和数据流处理模块(Data Stream Processing Area),GPU端加载引擎模块设置有装载或存储单元(Load/Store Unit),GPU端缓冲模块设置有显存(Device Memory, DM),数据流概要抽取模块用于集成概要抽取方法供数据流处理模块调用,数据流处理模型库用于集成数据流处理算法供数据流处理模块调用,GPU端加载引擎模块的装载或存储单元、数据流处理模块均与GPU端缓冲模块的显存连接交互,数据流概要抽取模块、数据流处理模型库均与GPU端加载引擎模块的装载或存储单元连接,GPU端缓冲模块的显存中开辟有存储空间为滑动窗口;

CPU端加载引擎模块的装载或存储单元通过互联网络(Interconnection Network)与数据源、GPU端加载引擎模块的装载或存储单元以及客户端的交互。

CPU端缓冲模块还设置有用于管理内存的内存管理器,内存管理器内设置有输入监测器,输入监测器用于监视内存中临时存储未处理的数据流;CPU端加载引擎模块包括速度调节器(Speed Regulator)、装载或存储单元(Load/Store Unit)和初始化集成器(Initialization Integrator);速度调节器用于根据内存的缓存状态调整数据源的数据流流入CPU端加载引擎模块的装载或存储单元内的流速,速度调节器内设有反馈机制(Feedback Mechanism);初始化集成器用于集成CPU主机和GPU设备的初始化操作。 

实施例2: 

本发明的一种GPU上数据流处理方法,数据源输出的数据流通过数据流处理系统处理后将数据结果传输至客户端;数据流的处理流程如下:

(1)、加载数据流:数据源中的数据流流入CPU端加载引擎模块的装载或存储单元(数据流流向如图1中①所示),由CPU端加载引擎模块的装载或存储单元将数据流存储到CPU端缓冲模块的内存中(数据流流向如图1中②所示);

(2)、数据流预处理:数据流预处理模块将内存中的原始数据流进行预处理(数据流流向如图1中③所示),并把预处理过的数据流存入内存(数据流流向如图1中④所示);

(3)、传输数据流:预处理过的数据流由内存至CPU端加载引擎模块的装载或存储单元(数据流流向如图1中⑦所示),由CPU端加载引擎模块的装载或存储单元至互联网络(数据流流向如图1中⑧所示),经互联网络到达GPU端加载引擎模块的装载或存储单元(数据流流向如图1中⑨所示),再由GPU端加载引擎模块的装载或存储单元将其加载到显存的滑动窗口中(数据流流向如图1中⑩所示);

(4)、数据流概要抽取:由数据流处理模块调用数据流概要抽取模块中的概要抽取方法,对滑动窗口中的数据流进行概要抽取(数据流流向如图1中所示),并将最终形成的概要数据结构存储到显存中(数据流流向如图1中所示);

(5)、数据流处理:由数据流处理模块调用数据流处理模型库中的数据流处理算法对概要数据进行处理(数据流流向如图1中所示),并把处理的数据结果存储到显存中(数据流流向如图1中所示);

(6)、传输数据结果:数据结果由GPU端缓冲模块的显存至GPU端加载引擎模块的装载或存储单元(数据流流向如图1中所示),由GPU端加载引擎模块的装载或存储单元发送至互联网络(数据流流向如图1中所示),经互联网络到达CPU端加载引擎模块的装载或存储单元(数据流流向如图1中所示),再由CPU端加载引擎模块的装载或存储单元将数据结果加载到内存(数据流流向如图1中所示),或者是由CPU端加载引擎模块的装载或存储单元将数据结果发送给可视化模块(数据流流向如图1中所示);

(7)、结果可视化:可视化模块将数据结果规范化之后发送给CPU端加载引擎模块的装载或存储单元(数据流流向如图1中所示),由CPU端加载引擎模块的装载或存储单元将数据结果展示给客户端(数据流流向如图1中所示)。

实施例3: 

本发明的一种GPU上数据流处理方法,数据源输出的数据流通过数据流处理系统处理后将数据结果传输至客户端;数据流的处理流程如下:

(1)、加载数据流:数据源中的数据流流入CPU端加载引擎模块的装载或存储单元(数据流流向如图1中①所示),由CPU端加载引擎模块的装载或存储单元将数据流存储到CPU端缓冲模块的内存中(数据流流向如图1中②所示);

(2)、数据流预处理:数据流预处理模块将内存中的原始数据流进行预处理(数据流流向如图1中③所示),并把预处理过的数据流存入内存(数据流流向如图1中④所示);

(3)、传输数据流:预处理过的数据流由内存至CPU端加载引擎模块的装载或存储单元(数据流流向如图1中⑦所示),由CPU端加载引擎模块的装载或存储单元至互联网络(数据流流向如图1中⑧所示),经互联网络到达GPU端加载引擎模块的装载或存储单元(数据流流向如图1中⑨所示),再由GPU端加载引擎模块的装载或存储单元将其加载到显存的滑动窗口中(数据流流向如图1中⑩所示);

(4)、数据流概要抽取:由数据流处理模块调用数据流概要抽取模块中的概要抽取方法,对滑动窗口中的数据流进行概要抽取(数据流流向如图1中所示),并将最终形成的概要数据结构存储到显存中(数据流流向如图1中所示);

(5)、数据流处理:由数据流处理模块调用数据流处理模型库中的数据流处理算法对概要数据进行处理(数据流流向如图1中所示),并把处理的数据结果存储到显存中(数据流流向如图1中所示);

(6)、传输数据结果:数据结果由GPU端缓冲模块的显存至GPU端加载引擎模块的装载或存储单元(数据流流向如图1中所示),由GPU端加载引擎模块的装载或存储单元发送至互联网络(数据流流向如图1中所示),经互联网络到达CPU端加载引擎模块的装载或存储单元(数据流流向如图1中所示),再由CPU端加载引擎模块的装载或存储单元将数据结果加载到内存(数据流流向如图1中所示),或者是由CPU端加载引擎模块的装载或存储单元将数据结果发送给可视化模块(数据流流向如图1中所示);

(7)、结果可视化:可视化模块将数据结果规范化之后发送给CPU端加载引擎模块的装载或存储单元(数据流流向如图1中所示),由CPU端加载引擎模块的装载或存储单元将数据结果展示给客户端(数据流流向如图1中所示)。

步骤(2)中,数据流预处理模块将内存中的原始数据流使用预处理方法进行预处理,预处理方法包括数据清洗方法、数据集成方法、数据变换方法和数据规约方法,使用的预处理方法可以为上述的一种方法,或者选择多种方法结合使用。 

㈠、数据清洗方法:填补缺失值、平滑噪声数据、去除异常值和解决数据不一致问题;数据清洗是数据预处理中非常重要的过程,但也是最耗时间的过程;缺失值、噪声和不一致性都将导致数据不准确,而数据清洗可以有效地避免这种情况; 

㈡、数据集成方法:模式集成和对象匹配、去除冗余数据、数据值冲突的检测与处理;数据集成是指把原本存储在多个数据源中的数据集成起来,形成一个数据源,并以某种统一的格式进行集中存储,以方便后续的数据处理工作;

㈢、数据变换方法:数据平滑、数据聚集、数据泛化、数据规范化和属性构造;数据变换是将数据转换成一种适合于数据挖掘的形式;比如数据项间的维度可能不一致,这时就需要消减高维数据项的维度,以减少它们之间的差异,方便处理;

㈣、数据规约方法:数据立方体聚集、属性子集选择、维度归约、数值归约、离散化和概念分层;数据规约又称为数据消减技术。 

步骤(2)后,当数据流超负荷时,经过数据流减负模块对数据流进行减负处理,具体步骤为: 

Ⅰ、CPU端缓冲模块的内存管理器的输入监测器监视内存中临时存储未处理的数据流,并决定在一个时间单位内,新来数据流的体积是否超过了GPU设备的数据流处理模块的处理能力;

Ⅱ、若GPU设备的数据流处理模块能够处理所有的数据流,则进行传输数据流;若新来数据流的体积超过了GPU设备的数据流处理模块的处理能力,即数据流超负荷,则将数据流转入数据流减负模块(数据流流向如图1中⑤所示);

Ⅲ、数据流减负模块对数据流进行减负处理;

Ⅳ、数据流减负模块将剩余数据流转入内存中(数据流流向如图1中⑥所示),进行下一个步骤的传输数据流。

数据流减负模块对数据流进行减负处理采用如下策略的一种或多种结合: 

ⅰ、基于数据的丢弃:在接收到的和未处理的时间序列数据流中的数据,找到长度最长的数据,并把它们丢弃;

ⅱ、基于属性的修整:数据流中每个数据都有个属性,在数据中,将拥有最低频数的属性除去,以此对数据的属性修整;

ⅲ、基于优先级的丢弃:每个新到的数据流被分配一个优先级,在接收到的和未处理的数据流中的数据,选择那些拥有最低优先级的数据并丢弃它们。三种策略各有各的目的;基于数据的丢弃策略,丢弃了那些系统需要花费很多时间去处理的长数据,试图尽可能快的减少系统负载,它是面向效率的;基于属性的修整的策略,从数据中删除了那些对处理结果影响不显著的、最不频发的属性;基于优先级的丢弃的策略,从数据中删除了那些拥有最低优先级的数据;相比而言,后面这两个策略是面向精度的,因为它们试图保留高精度的挖掘结果。

显存使用全局存储器(Global Memory)去存储各种数据(如概要数据、中间数据、结果数据等),显存中开辟的滑动窗口(Sliding Window, SW)用于把从CPU主机到达GPU设备的数据流保存起来(由于数据流无限性和存储空间的有限性,为了减少内存和显存间的数据拷贝,获得更有效的挖掘结果,所以在显存中划分出一部分空间作为滑动窗口暂存数据),滑动窗口使用基于元组个数定义的滑动窗口,即是窗口大小固定的滑动窗口,用于保存最近到达的K个数据流; 

滑动窗口的数据流的处理方法使用可重写循环的滑动窗口方法,更新时将新数据直接覆盖了要过期的数据,并提供了格局变换函数来维护滑动窗口中数据的逻辑格局状态。

步骤(4)的数据流概要抽取,将数据流通过概要抽取方法获得概要数据结构,概要抽取方法包括抽样(Sampling)方法,小波(Wavelet)方法,略图(Sketch)方法和直方图(Histogram)方法。对数据流进行压缩,构造一个比整个数据流的数据规模小得多的数据结构来保存数据流的主要特征,称之为概要数据结构,通过概要数据结构所获得的近似值是在用户可接受范围之内的。 

数据流处理模型库集成了数据流处理时的各种数据流处理算法,包括查询处理算法(query processing algorithma)、聚类算法(clustering algorithma)、分类算法(classification algorithma)、频繁项集挖掘算法(frequent itemsets mining algorithma)和多条数据流间的相关性分析算法(correlation analysis algorithma)。 

数据流处理模块的任务是调用数据流概要抽取模块的概要抽取方法对数据流进行概要抽取,以及调用数据流处理模型库中的数据流处理算法对概要数据进行并行计算;数据流处理模块包括数据流输入装配器(Data Stream Input Assembler)、全局线程块调度器(Global Block Scheduler)和计算阵列(Compute Array),计算阵列内设置有共享存储器、装载或存储单元;数据流输入装配器负责把显存中的数据读入到数据流处理模块的共享存储器中,全局线程块调度器负责对共享存储器中的线程块、线程和指令进行调度分配管理,计算阵列用于线程的计算;计算阵列中的装载或存储单元在计算时,从显存中加载数据到共享存储器。 

在数据流处理系统中,对数据流进行处理,其中CPU主机端的处理步骤如下: 

1. 启动CUDA(数据流处理系统的简称);

2. 为输入数据分配MM(内存);

3. CPU端加载引擎模块从数据源获取输入数据并进行初始化;

4. 数据流预处理模块对输入数据流进行数据清洗、集成等的预处理;

5. 当系统超负荷时,数据流减负模块对数据流进行减负处理;

6. 为GPU设备分配显存的滑动窗口,用于存放输入数据;

7. 初始化集成器对概要抽取方法及数据流处理算法初始化;

8. 将内存中的数据拷贝到显存的滑动窗口中;

9. 为GPU设备分配显存,用于存放数据流处理模块抽取出来的概要数据;

10. 调用GPU设备端的数据流处理算法的并行计算函数(kernel)进行并行计算,获得概要数据,并将其写到显存中的对应区域;

11. 为GPU设备分配显存,用于存放传回来的输出数据;

12. 将显存中的结果回读到内存中;

13. 使用可视化模块对数据进行后续处理,如规范化、可视化等;

14. 释放内存和显存空间;

15. 退出CUDA。

GPU设备端的处理步骤如下: 

1. 分配共享存储器(Shared Memory);

2. 将显存的全局存储器中的数据读入共享存储器;

3. 进行计算,将结果写到共享存储器;

4. 将共享存储器中的结果写到全局存储器。

以下介绍数据流处理系统中各模块的代码: 

一、初始化集成器的各个功能代码:

首先,启动和退出数据流处理系统(简称CUDA)环境:

CUT_DEVICE_INIT(argc, argv);//启动CUDA

CUT_EXIT(argc, argv);//退出CUDA;

接着,分配内存和显存:

在CPU主机分配内存,h_表示CPU主机端,i表示input,o表示output,mem_size表示为数据分配的存储器大小,

float* h_idata = (float*) malloc(mem_size);

float* h_odata = (float*) malloc(mem_size);

在GPU设备分配显存,d_表示GPU设备端,

float* d_idata;  CUDA_SAFE_CALL(cudaMalloc((void**) &d_idata, mem_size));

float* d_odata;  CUDA_SAFE_CALL(cudaMalloc((void**) &d_odata, mem_size));

接着,任务划分,即二维grid和block的维度设计:

dim3 grid(gridDim.x, gridDim.y, 1);//第三维恒为1;

dim3 block(blockDim.x, blockDim.y, 1);//第三维可以不为1,因block是二维,所以置1;

testKernel<<<grid, block>>>(d_idata, d_odata);//调用内核函数,进行并行计算;

然后,内存和显存间的数据拷贝:

将内存中的值读入显存,

CUDA_SAFE_CALL(cudaMemcpy(d_idata, h_idata, mem_size, cudaMemcpyHostToDevice));

将结果从显存写入内存,

CUDA_SAFE_CALL(cudaMemcpy(h_odata, d_odata, mem_size, cudaMemcpyDeviceToHost));

下面的代码用于释放内存、显存的存储空间。

free(h_idata);//释放内存; 

free(h_odata);

CUDA_SAFE_CALL(cudaFree(d_idata));//释放显存;

CUDA_SAFE_CALL(cudaFree(d_odata));

除了上面提到的这些功能,初始化集成器还有选择数据流处理算法(具体到某类算法中的某种算法),选择概要抽取方法,各种数据流处理算法、概要抽取方法的初始化(例如Haar Wavelet是获得执行完全分解时的分解层数,k-means是初始化聚类中心点)的功能。

二、滑动窗口的形式化表示为:CircularSW = <w, num, front, fun> ; 

其中,w为滑动窗口(SW)的宽度;num为当前滑动窗口中数据流的数据量;front为滑动窗口中末端数据的标记,新到的数据放置在这个位置;fun为滑动窗口的格局变换函数,它决定了新数据到来时滑动窗口中已存在数据的格局变化。fun的定义如下表1所示:

表1  可重写循环滑动窗口的格局变换函数

可重写循环滑动窗口的新颖之处就在于它直接计算出了过期数据(即将被移出数据)的位置,新到达的数据则放置到该位置,直接覆盖窗口中原来的数据即可,另外还需要修改front值,使其指向窗口中数据的末端,即front永远指向新数据。与以往的滑动窗口相比,可重写循环滑动窗口能够提升数据流处理系统的效率,它使用同样的存储空间,又避免了窗口内数据的移动,并且允许更细粒度的并发控制。

三、为了提高处理效率,概要抽取方法均需要GPU加速,比如小波方法中的小波分解、略图方法中的哈希映射等。下面以小波方法中的哈尔小波(Haar Wavelet)为例进行详细介绍。小波方法是一种重要的数据压缩方法,通过对原始数据集进行小波变换,保存部分重要的小波系数,能够近似地还原出原始数据集合。Haar Wavelet是小波中最简单的一种,二维或三维(2D或3D)的小波变换均可分解为2个或3个一维(1D)的小波变换。一维Haar Wavelet分解是将向量变换为个小波系数。例如:设,表2演示了该序列的Haar Wavelet变换。 

Resolution lAveragesDetail Coefficientsl=3(6,2,5,3,4,4,4,8)-l=2(4,4,4,6)(2,1,0,-2)l=1(4,5)(0,-1)l=0(4.5)(-0.5)

表2  序列的Haar Wavelet变换 

具体计算如下:对Resolution列中层次,Averages列中是原始序列。对原始序列数据两两分对求其均值,得到层次中的Averages,即

显然,在求Averages的过程中,我们丢掉了原始序列中的某些信息,单靠平均值是无法重构原始序列的,平均值只是近似值,所以为了重构原始数据,还需要保存细节系数,即将每对数据中的平均值和第2 个数据的差保存在Detail Coefficients列中,即;

依次进行下去,直到层次。该序列的小波系数由第0层的平均值和全部细节系数组成,即的小波系数为。

以一维哈尔小波变换为例,给出GPU设备的核心代码如下: 

/*--------------------------------------

 * 一维哈尔小波变换

 *--------------------------------------*/

////////////////////////////////////////////////////

//! @param id 输入数据

//! @param od 输出数据

//! @param approx_final 存储最终的近似系数,即平均值

//! @param dlevels 变换的分解层数

//! @param slength_step_half 当前分解层的一半信号长度(全局存储器中细节系数的偏移量)

//! @param bdim 线程块的维度

///////////////////////////////////////////////////

__global__ void 

dwtHaar1D( float* id, float* od, float* approx_final, 

           const unsigned int dlevels,

           const unsigned int slength_step_half,

           const int bdim ) 

{   //用extern声明表示动态分配共享存储器空间,数组大小由host端的Ns参数确定

    extern __shared__ float shared[];

    //线程运行环境,一维参数化

const int gdim = gridDim.x; 

const int bdim = blockDim.x;

const int bid = blockIdx.x;

const int tid = threadIdx.x;  

    //全局的线程id

    const int tid_global = (bid * bdim) + tid;    

    unsigned int idata = (bid * (2 * bdim)) + tid;

    //从global memory中读入数据

    shared[tid] = id[idata];

    shared[tid + bdim] = id[idata + bdim];

    __syncthreads();

    //从共享内存中读出数据

    float data0 = shared[2*tid];

    float data1 = shared[(2*tid) + 1];

__syncthreads();

// Detail Coefficients,没有进一步引用所以直接存储在global memory中

od[tid_global + slength_step_half] = (data0 - data1) * INV_SQRT_2;

 //线程块的偏移量(为了避免产生bank conflicts)

unsigned int atid = tid + (tid >> LOG_NUM_BANKS);

// Averages,为了进一步分解而存储在shared memory中

 shared[atid] = (data0 + data1) * INV_SQRT_2;

__syncthreads();

if( dlevels > 1) 

{   unsigned int offset_neighbor = 1;

unsigned int num_threads = bdim >> 1;

        unsigned int idata0 = tid * 2;

        for( unsigned int i = 1; i < dlevels; ++i) 

        {

            if( tid < num_threads) 

            {   //更新步长,每分解一层,步长就增大2倍 

                unsigned int idata1 = idata0 + offset_neighbor;

                //写入global memory中的位置

unsigned int g_wpos = (num_threads * gdim) + (bid * num_threads) + tid;

                //-----小波分解步骤如下-----

                //为了避免bank conflicts而进行偏移

unsigned int c_idata0 = idata0 + (idata0 >> LOG_NUM_BANKS);

unsigned int c_idata1 = idata1 + (idata1 >> LOG_NUM_BANKS);

// Detail Coefficients,没有进一步修改而直接存储在global memory中

od[g_wpos] = (shared[c_idata0] - shared[c_idata1]) * INV_SQRT_2;

// Averages,请注意shared memory中的表示变得相当稀疏

shared[c_idata0] = (shared[c_idata0] + shared[c_idata1]) * INV_SQRT_2;

//更新存储偏移

                num_threads = num_threads >> 1;//除以2

                offset_neighbor <<= 1;//乘以2 

                idata0 = idata0 << 1;//乘以2     

            }

            __syncthreads();//在每个分解步骤之后进行同步

        }

        if( 0 == tid) //为将要在主机端执行的下一个分解步骤写最上层的元素

        {  approx_final[bid] = shared[0];  }

    }

}。

四、数据流处理算法中的聚类算法的k-means算法: 

k-means算法是最具有代表性的聚类算法,它的主要目的是对具有相同数据类型的样本数据按距离最短规则进行集合的划分,最终获取各等价类。本文用距离表示数据间的相似程度。然而,由于数据流的特殊性,传统聚类算法很难在数据流上实现,所以我们在数据流处理模型库中以哈尔小波为例进行了概要抽取。基于这种小波概要,能够快速计算数据流与聚类中心之间的近似距离。这样,k-means算法实现起来就容易多了。

在聚类中,欧几里德距离(即欧式距离)有着非常直观的意义。本文中,数据项(数据点)和数据项之间的欧式距离计算如下:       

一个数据项与一个数据集之间的距离定义为该数据项与该数据集中所有数据项当中距离最小值。数据项和数据集合之间的欧式距离计算如下:           

该问题所对应的是计算密集型的任务(即求解大量的“距离”),因此在算法上并没有太大的优化空间。而在“增强并行性”上做文章无疑会带来显著的加速效果,因为不同距离之间的计算是完全不存在依赖性的。可以说,这是一个GPU拥有绝对优势的任务场合。 

高维数据流中每个数据项的维数通常很高,所以,实现时我们用自定义的矩阵来存储数据。聚类时,需要GPU频繁计算矩阵的转置、数据项间距离的平方和等极为耗时的操作。设备端k-means算法的核心代码如下。其中,每个thread对应一个距离。 

/*-------------------------------------- 

 * 矩阵定义

 *--------------------------------------*/

/*----- CPU上结构体-----*/

typedef struct {

    int width;//column

    int height;//row

    //int stride;//不间隔访问时stride=width

    float* elements;//矩阵元素的头指针

} CPU_Matrix;

/*-----GPU上结构体-----*/

typedef struct {

    int width;//column

    int height;//row

    float* elements;//矩阵元素的头指针

} GPU_Matrix;

/*--------------------------------------

 * 矩阵转置

 *--------------------------------------*/

__global__ void transpose(float *odata, float *idata, int width, int height) {   

    //静态分配shared memory

    __shared__ float block[BLOCK_SIZE][BLOCK_SIZE+1];

   //把矩阵块读入共享存储器

   unsigned int xIndex = blockIdx.x * BLOCK_SIZE + threadIdx.x;

    unsigned int yIndex = blockIdx.y * BLOCK_SIZE + threadIdx.y;

   if((xIndex<width)&&(yIndex<height)) {

        unsigned int index_in = yIndex * width + xIndex;

      block[threadIdx.y][threadIdx.x] = idata[index_in];

   }

   __syncthreads();

   //将转置后的矩阵写回全局存储器

   xIndex = blockIdx.y * BLOCK_SIZE + threadIdx.x;

   yIndex = blockIdx.x * BLOCK_SIZE + threadIdx.y;

   if((xIndex<height)&&(yIndex<width)){

      unsigned int index_out = yIndex * height + xIndex;

        odata[index_out] = block[threadIdx.x][threadIdx.y];

   }

}

/*--------------------------------------

 * 距离计算,求的是距离平方和

 *--------------------------------------*/

__global__ void 

DistanceOfSquareKernel(GPU_Matrix A, GPU_Matrix B, GPU_Matrix C, size_t pitch) {

   int blockRow = blockIdx.y;  int blockCol = blockIdx.x;

    GPU_Matrix subC = GetSubMatrix(C, blockRow, blockCol);

    float Cvalue = 0;  int row = threadIdx.y;  int col = threadIdx.x;

    for (int m = 0; m < (A.width / BLOCK_SIZE); ++m){

        GPU_Matrix subA = GetSubMatrix(A, blockRow, m);

        GPU_Matrix subB = GetSubMatrix(B, m, blockCol);

        //声明用于存储A、B子块的共享存储器数组

        __shared__ float As[BLOCK_SIZE][BLOCK_SIZE];

        __shared__ float Bs[BLOCK_SIZE][BLOCK_SIZE];

      //完成数据从全局存储器到共享存储器的拷贝,每个线程负责一个元素

        As[row][col] = GetGPUElement(&subA, row, col, pitch);        

        Bs[row][col] = GetGPUElement(&subB, row, col, pitch);

        __syncthreads();

      //并行计算,每个线程负责C中一个元素值的计算

        for (int n = 0; n < BLOCK_SIZE; ++n)

        Cvalue += powf((As[row][n] - Bs[n][col]),2.0f);

        __syncthreads();

    }

    SetGPUElement(&subC, row, col, Cvalue, pitch);

}。

五、计算阵列中的线程网格(Grid)、线程块(Block)和线程(Thread)分别被加载到计算阵列的流处理器阵列(SPA)、流多处理器(SM)和流处理器(SP)上执行。线程网格之间通过显存交换数据,而各个线程块是并行执行的,不能相互通信,只能通过显存共享数据;同一线程块内的线程可以通过共享存储器(Shared Memory)和同步实现通信。 

由于数据流的高维特性,在进行任务划分时,我们将线程网格(Grid)和线程块(Block)的维度都设计成是二维的。下面这个是CPU主机端代码,用于设置运行参数,即线程网格的形状和线程块的形状。其中,gridDim, blockDim, blockIdx, threadIdx是CUDA C中的内建变量。 

dim3 grid(gridDim.x, gridDim.y, 1);//第三维恒为1 

dim3 block(blockDim.x, blockDim.y, 1);//第三维可以不为1,因block是二维,所以置1

其中,blockIdx.x∈[0, gridDim.x-1],blockIdx.y∈[0, gridDim.y-1],threadIdx.x∈[0, blockDim.x-1],threadIdx.y∈[0, blockDim.y-1]。

图中存在两个层次的并行,即线程网格(Grid)中的线程块(Block)间并行和线程块(Block)中的线程(Thread)间并行。(N+1)表示线程块(block)总数,(M+1)表示线程(thread)总数。 

(N+1)=gridDim.x* gridDim.y≤65535*65535,其中gridDim.x≤65535,gridDim.y≤65535。 

(M+1)=blockDim.x* blockDim.y≤1024,其中blockDim.x≤512,blockDim.y≤512。 

由于线程网格(Grid)和线程块(Block)是二维的,所以在内核函数中也要使用二维的索引。下面是设备端代码,用于计算线程索引,即确定线程(Thread)在整个线程网格(Grid)中的位置。 

unsigned int bid_in_grid = blockIdx.x + blockIdx.y*gridDim.x; 

unsigned int tid_in_block = threadIdx.x + threadIdx.y*blockDim.x;

unsigned int tid_in_grid_x = threadIdx.x + blockIdx.x*blockDim.x;//x

unsigned int tid_in_grid_y = threadIdx.y + blockIdx.y*blockDim.y;//y

unsigned int tid_in_grid = tid_in_grid_x + tid_in_grid_y*blockDim.x*gridDim.x;//offset。

另外,为了有效利用执行单元,设计线程块(Block)时,应该尽量使每个block中的线程数量是32的整数倍,最好是保持在64~256之间。为了充分利用GPU的资源,提高其执行效率,代码实现时我们必须要对线程网格(Grid)和线程块(Block)的维度设计格外重视。 

以上说明仅为本发明较佳的具体实施方式,但本发明的保护范围并不局限于此,任何熟悉本技术领域的技术人员在本发明揭露的技术范围内,可轻易想到的变化或替换,都应涵盖在本发明的保护范围之内。因此,本发明的保护范围应该以权利要求书的保护范围为准。 

去获取专利,查看全文>

相似文献

  • 专利
  • 中文文献
  • 外文文献
获取专利

客服邮箱:kefu@zhangqiaokeyan.com

京公网安备:11010802029741号 ICP备案号:京ICP备15016152号-6 六维联合信息科技 (北京) 有限公司©版权所有
  • 客服微信

  • 服务号