公开/公告号CN105608223A
专利类型发明专利
公开/公告日2016-05-25
原文格式PDF
申请/专利权人 北京中交兴路车联网科技有限公司;
申请/专利号CN201610019054.1
申请日2016-01-12
分类号G06F17/30(20060101);
代理机构11002 北京路浩知识产权代理有限公司;
代理人李相雨
地址 100176 北京市大兴区经济技术开发区景园北街2号BDA国际企业大道园区56幢
入库时间 2023-12-18 15:29:29
法律状态公告日
法律状态信息
法律状态
2019-04-30
授权
授权
2016-06-22
实质审查的生效 IPC(主分类):G06F17/30 申请日:20160112
实质审查的生效
2016-05-25
公开
公开
技术领域
本发明涉及计算机数据处理的技术领域,特别涉及一种针对 kafka的Hbase数据库的入库方法和系统。
背景技术
Kafka(分布式消息队列)是一种高吞吐量的分布式发布订阅消 息系统,它可以处理消费者规模的网站中的所有动作数据流。这种 动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多 社会功能的一个关键因素。这些数据通常是由于吞吐量的要求而通 过处理日志和日志聚合来解决。对于像Hadoop(分布式系统框架) 一样的日志数据和离线分析系统,因为受到要求实时处理的限制, 采用kafka进行处理是一个可行的解决方案。kafka的目的是通过 Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通 过集群制来提供实时的消费。
Hbase数据库是一个分布式的、面向列的开源数据库,利用Hbase 数据库技术可在廉价PCServer(服务器)上搭建起大规模结构化存 储集群。Hbase数据库是GoogleBigtable(分布式数据存储系统)的 开源实现,类似GoogleBigtable利用GFS(可扩展的分布式文件系 统)作为其文件存储系统,Hbase数据库用HadoopHDFS(分布式 文件系统)作为其文件存储系统;Google运行MapReduc(编程模型) 来处理Bigtable中的海量数据,Hbase数据库同样利用Hadoop MapReduce来处理Hbase数据库中的海量数据;GoogleBigtable利用 Chubby作为协同服务,Hbase数据库利用Zookeeper(分布式应用程 序协调服务)作为对应。
在数据采集过程中,数据经过kafka中转是很常见的做法,kafka 设计许多主题(topic),针对不同的topic进行数据写入Hbase数据 库,通常需要有不同的做法来使得数据写入Hbase数据库,因为数 据在结构和内容上不同,topic是不同的,这样使得针对不同的topic 需要进行编写单独的逻辑代码进行数据写入,比较费时费力,效率 低下,且不利于后期维护,逻辑过多维护成本高。
发明内容
本发明所要解决的技术问题是如何提供一种通用的适用不同 kafkatopic的Hbase数据库入库方法和系统,能够针对不同的topic, 以最少的工作、最快的效率和最低的维护成本来完成数据写入的工 作。
为此目的,本发明提出了一种针对分布式消息队列的Hbase数据 库的入库方法,包括如下步骤:
一种针对kafka的Hbase数据库的入库方法,其特征在于,包括如下 步骤:
S1:收集kafka集群内所有主题的数据,并保存在队列中;
S2:在配置文件中配置主题和所述数据中的分隔符、过滤规则、 入库规则的对应关系;
S3:根据所述对应关系,将所述数据序列化,并对所述数据进行 过滤;
S4:读取所述配置文件中的配置信息,创建与Hbase数据库的连 接,并将所述数据构建成put对象;
S5:将所述put对象入库所述Hbase数据库。
其中较优的,如果个别主题不适用于通用入库方式,则该方法还 包括:通过反射获取针对对应主题的独立处理类来对所述对应主题的 数据进行处理。
其中较优的,所述put对象入库Hbase数据库包括:通过调用所述 Hbase数据库入库接口将所述数据写入所述Hbase数据库。
其中较优的,所述配置文件的配置信息包括数据过滤规则、数据 入库规则、主题与独立处理类的对应关系以及主题与Hbase数据库表 的对应关系。
其中较优的,所述put对象入库Hbase数据库包括:通过调用所述 Hbase数据库的入库框架提供的接口将所述数据通过所述入库框架完 成数据写入Hbase数据库。
其中较优的,所述构建put对象的过程包括:根据所述配置文件 的规则,对所述数据再次进行序列化,构建成入库所述Hbase数据库 所需的对象类型。
其中较优的,所述将put对象入库Hbase数据库包括如下步骤:
S51:根据所述队列得到表名,根据所述表名分配操作线程组、 表对象组以及和所述操作线程组、表对象组一一对应的缓冲区;
S52:所述操作线程组处理接收到的数据,并将接收到的数据写 入对应缓冲区中;
S53:获取所述缓冲区中的数据以及对应的表对象组,按顺序将 所述数据分配给对应的表对象组进行数据写入。
其中较优的,所述将数据写入对应缓冲区中是采用队列的形式进 行写入。
其中较优的,所述表对象组进行数据写入采用并行多队列的形 式。
另一方面,本发明还提供了一种针对kafka的Hbase数据库的入库 系统,包括:
数据采集单元,用于收集kafka集群内所有主题的数据,并保存 在队列中;
配置单元,用于在所述配置文件中配置主题和所述数据中的分隔 符、过滤规则、入库规则的对应关系;
序列化过滤单元,用于根据所述对应关系,将所述数据序列化, 并对所述数据进行过滤;
对象构建单元,用于读取所述配置文件中的配置信息,创建与 Hbase数据库的连接,并将所述数据构建成put对象;
数据入库单元,用于将所述put对象入库Hbase数据库。
其中较优的,如果个别主题不适用于通用入库方式,则该系统还 包括:独立处理单元,所述独立处理单元通过反射获取针对对应主题 的独立处理类来对所述主题的数据进行处理。
通过采用本发明所提供的针对分布式消息队列的Hbase数据库的 入库方法和系统,解决了针对kafka中不同topic需要进行单独处理的 问题,通过通用的方式对kafka集群内的不同类型数据进行消费并统 一存储入Hbase数据库,另外,采用多线程多Htable对象以及生产者消 费者模式极大的提高了数据写入的效率,提升了输出带宽,最大限度 利用了机器的网络和磁盘性能。通过双队列设计,最大限度保证了数 据的安全,避免了数据的丢失。
附图说明
通过参考附图会更加清楚的理解本发明的特征和优点,附图是示 意性的而不应理解为对本发明进行任何限制,在附图中:
图1示出了本发明针对kafka的Hbase数据库的入库方法流程示意 图;
图2示出了本发明针对kafka的Hbase数据库的详细入库流程示意 图。
具体实施方式
下面将结合附图对本发明的实施例进行详细描述。
如图1所示,本发明提供了一种针对kafka(分布式消息队列)的 Hbase数据库的入库方法,其特征在于,包括如下步骤:
一种针对kafka的Hbase数据库的入库方法,其特征在于,包括如下 步骤:
S1:收集kafka集群内所有topic(主题)的数据,并保存在队列 中;
S2:在配置文件中配置topic和所述数据中的分隔符、过滤规则、 入库规则的对应关系;
S3:根据所述对应关系,将所述数据序列化,并对所述数据进行 过滤;
S4:读取所述配置文件中的配置信息,创建与Hbase数据库的连 接,并将所述数据构建成put对象;
S5:将所述put对象入库所述Hbase数据库。
如图2所示,具体的,S1:通过统一的数据接收端,统一收集kafka 集群内所有topic(主题)的数据,配置文件制定topic列表,将数据保 存在队列中。统一数据采集端,采用队列的形式收集kafka集群内所 有topic的数据,用以保证数据顺畅的生产和消费,并在Hbase数据库 压力过高或者其他情况下,缓存一部分数据。
S2:在配置文件中配置topic和所述数据中的分隔符、过滤规则、 入库规则的对应关系。
S3:然后数据经过序列化程序,根据步骤S2中的对应关系,将步 骤S1收集到的数据序列化为LIST(数组),读取配置文件中topic和相 关过滤规则的对应关系,对收集的数据进行过滤。例如,数据开头有 标识符的,不存入数据库,则要进行过滤,每种topic不一样,也要进 行过滤。
S4:读取配置文件中的配置信息,topic与表名,rowkey(行键) 规则,列族相关配置信息,创建与Hbase数据库的连接,将过滤后的 数据构建成put对象。其中,基于配置文件定义整个入库流程,通过 配置文件可以配置包括数据过滤规则、数据入库规则、topic与单独处 理类对应关系、topic与Hbase表对应关系,入库框架的生产者线程组 内线程个数,Htable对象个数等。采用自定义语法定义配置文件中过 滤规则和入库规则,保证了kafkatopic写入数据库的通用性。配置文 件示例:
数据过滤:topicname:up,‘CAPACITY’,g
Topicname:3,r
数据入库:topicname:test,self,[1,2,3:family,column], [4,5,6:family,column]
规则解读:数据过滤语法topicname:数据位置,具体字符串, 规则(g过滤r作为rowkey,c作为column名)
入库语法:topicname:表名,rowkey规则(self自行生产,by 数据中提取),数据位置与family,column对应关系。
其中,构建put对象的过程就是通过上述配置文件的规则第二次 对数据进行序列化,将数据构建成入库Hbase数据库所需的对象类型。
S5:将数据构建成put对象后,将put对象批量入库Hbase数据库。 具体的,采用通用数据写入逻辑,通过调用所述Hbase数据库高性能 入库框架接口将所述数据写入所述Hbase数据库。
其中,如果在步骤S3中过滤得到个别topic不适用于上述通用入库 方式或者在步骤S1中收集的个别topic不适用于上述通用入库方式,还 可以通过提供一个工厂类,通过反射能够获取针对对应topic的独立处 理类对数据进行处理。独立处理类必须能够实现程序中提供的顶级接 口,该接口定义了独立处理类必须包含的数据处理方法。
其中,所述将put对象入库Hbase数据库包括如下步骤:
S51:根据步骤S1的数据列表得到表名,从线程池中领取线程, 根据所述表名分配操作线程组、Htable对象组(表对象组)以及和所 述操作线程组、Htable对象组一一对应的缓冲区;Htable对象组为消 费者线程组,消费者线程组和操作线程组为一一对应关系,每个消费 者线程组中每个线程有若干个Htable对象。其中,高性能入库框架中, 分为多个线程组,每个线程组和要写入的表名是一一对应关系,也就 是说,如果有5个topic要写入3个表,就有3个线程组,每个线程组中 包含若干个线程,每个线程处理一部分数。
S52:所述操作线程组处理接收到的数据,并将接收到的数据写 入对应缓冲区中;其中,所述将数据写入对应缓冲区中可以是采用队 列的形式进行写入。队列数量和线程是一一对应的。使用队列保障了 Hbase数据库压力过大写不进去或者其他情况下的数据安全性,是本 发明中的第二个队列。本发明采用双重队列设计,在流程开始以及流 程结束的时候都有队列的存在,以保证本发明在数据输入以及数据输 出中对数据进行缓存,以防止异常情况的数据丢失,最大限度保障了 数据安全,
S53:获取所述缓冲区中的数据以及对应的Htable对象组,具体 的,从入库处理线程池中获取线程,从Htable对象池中获得Htable对 象,按顺序将所述数据分配给对应的Htable对象组进行数据写入,通 过将表名与线程组、对象组、缓冲区一一对应,保证了数据快速写入, 极大的提高了输出带宽,最大限度利用服务器网络和硬盘IO带宽,提 高了写入速度。其中,所述Htable对象组进行数据写入可以采用并行 多队列的形式。在最后输出端使用并行多队列,匹配多线程以及多 Htable模式,保证了并行最大化输出数据,提升性能。
另一方面,采用上述的入库方法,本发明还提供了一种kafka的 Hbase数据库的入库系统,包括:
数据采集单元,用于收集kafka集群内所有主题的数据,并保存 在队列中;
配置单元,用于在所述配置文件中配置主题和所述数据中的分隔 符、过滤规则、入库规则的对应关系;
序列化过滤单元,用于根据所述对应关系,将所述数据序列化, 并对所述数据进行过滤;
对象构建单元,用于读取所述配置文件中的配置信息,创建与 Hbase数据库的连接,并将所述数据构建成put对象;
数据入库单元,用于将所述put对象入库Hbase数据库。
其中,如果个别主题不适用于通用入库方式,则该系统还包括: 独立处理单元,所述独立处理单元通过反射获取针对对应主题的独立 处理类来对所述主题的数据进行处理。
通过采用本发明所提供的针对分布式消息队列的Hbase数据库的 入库方法和系统,解决了针对kafka中不同topic需要进行单独处理的 问题,通过通用的方式对kafka集群内的不同类型数据进行消费并统 一存储入Hbase数据库,另外,采用多线程多Htable对象以及生产者消 费者模式极大的提高了数据写入的效率,提升了输出带宽,最大限度 利用了机器的网络和磁盘性能。通过双队列设计,最大限度保证了数 据的安全,避免了数据的丢失。
虽然结合附图描述了本发明的实施方式,但是本领域技术人员可 以在不脱离本发明的精神和范围的情况下做出各种修改和变型,这样 的修改和变型均落入由所附权利要求所限定的范围之内。
机译: 使用多个KAFKA实时传输数据的KAFKA系统和方法
机译: 在HBase数据库中存储遥感大数据的方法
机译: 在HBASE数据库中存储遥感大数据的方法