首页> 中国专利> 针对kafka的Hbase数据库的入库方法和系统

针对kafka的Hbase数据库的入库方法和系统

摘要

本发明涉及一种针对kafka的Hbase数据库的入库方法,包括:S1:收集kafka集群内所有topic的数据,并保存在队列中;S2:在配置文件中配置topic和所述数据中的分隔符、过滤规则、入库规则的对应关系;S3:根据所述对应关系,将数据序列化,并对所述数据进行过滤;S4:读取所述配置文件中的配置信息,创建与Hbase数据库的连接,并将所述数据构建成put对象;S5:将所述put对象入库所述Hbase数据库。本发明解决了针对kafka中不同topic需要进行单独处理的问题,提供一种通用适配所有topic的方法,并构建了一种高性能的Hbase数据库入库方法,极大的提高了数据写入的效率,提升了输出带宽,最大限度利用了机器的网络和磁盘性能,通过双队列设计,最大限度保证了数据的安全,避免了数据的丢失。

著录项

  • 公开/公告号CN105608223A

    专利类型发明专利

  • 公开/公告日2016-05-25

    原文格式PDF

  • 申请/专利权人 北京中交兴路车联网科技有限公司;

    申请/专利号CN201610019054.1

  • 发明设计人 曹宇;余效伟;肖赞;李旭阳;

    申请日2016-01-12

  • 分类号G06F17/30(20060101);

  • 代理机构11002 北京路浩知识产权代理有限公司;

  • 代理人李相雨

  • 地址 100176 北京市大兴区经济技术开发区景园北街2号BDA国际企业大道园区56幢

  • 入库时间 2023-12-18 15:29:29

法律信息

  • 法律状态公告日

    法律状态信息

    法律状态

  • 2019-04-30

    授权

    授权

  • 2016-06-22

    实质审查的生效 IPC(主分类):G06F17/30 申请日:20160112

    实质审查的生效

  • 2016-05-25

    公开

    公开

说明书

技术领域

本发明涉及计算机数据处理的技术领域,特别涉及一种针对 kafka的Hbase数据库的入库方法和系统。

背景技术

Kafka(分布式消息队列)是一种高吞吐量的分布式发布订阅消 息系统,它可以处理消费者规模的网站中的所有动作数据流。这种 动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多 社会功能的一个关键因素。这些数据通常是由于吞吐量的要求而通 过处理日志和日志聚合来解决。对于像Hadoop(分布式系统框架) 一样的日志数据和离线分析系统,因为受到要求实时处理的限制, 采用kafka进行处理是一个可行的解决方案。kafka的目的是通过 Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通 过集群制来提供实时的消费。

Hbase数据库是一个分布式的、面向列的开源数据库,利用Hbase 数据库技术可在廉价PCServer(服务器)上搭建起大规模结构化存 储集群。Hbase数据库是GoogleBigtable(分布式数据存储系统)的 开源实现,类似GoogleBigtable利用GFS(可扩展的分布式文件系 统)作为其文件存储系统,Hbase数据库用HadoopHDFS(分布式 文件系统)作为其文件存储系统;Google运行MapReduc(编程模型) 来处理Bigtable中的海量数据,Hbase数据库同样利用Hadoop MapReduce来处理Hbase数据库中的海量数据;GoogleBigtable利用 Chubby作为协同服务,Hbase数据库利用Zookeeper(分布式应用程 序协调服务)作为对应。

在数据采集过程中,数据经过kafka中转是很常见的做法,kafka 设计许多主题(topic),针对不同的topic进行数据写入Hbase数据 库,通常需要有不同的做法来使得数据写入Hbase数据库,因为数 据在结构和内容上不同,topic是不同的,这样使得针对不同的topic 需要进行编写单独的逻辑代码进行数据写入,比较费时费力,效率 低下,且不利于后期维护,逻辑过多维护成本高。

发明内容

本发明所要解决的技术问题是如何提供一种通用的适用不同 kafkatopic的Hbase数据库入库方法和系统,能够针对不同的topic, 以最少的工作、最快的效率和最低的维护成本来完成数据写入的工 作。

为此目的,本发明提出了一种针对分布式消息队列的Hbase数据 库的入库方法,包括如下步骤:

一种针对kafka的Hbase数据库的入库方法,其特征在于,包括如下 步骤:

S1:收集kafka集群内所有主题的数据,并保存在队列中;

S2:在配置文件中配置主题和所述数据中的分隔符、过滤规则、 入库规则的对应关系;

S3:根据所述对应关系,将所述数据序列化,并对所述数据进行 过滤;

S4:读取所述配置文件中的配置信息,创建与Hbase数据库的连 接,并将所述数据构建成put对象;

S5:将所述put对象入库所述Hbase数据库。

其中较优的,如果个别主题不适用于通用入库方式,则该方法还 包括:通过反射获取针对对应主题的独立处理类来对所述对应主题的 数据进行处理。

其中较优的,所述put对象入库Hbase数据库包括:通过调用所述 Hbase数据库入库接口将所述数据写入所述Hbase数据库。

其中较优的,所述配置文件的配置信息包括数据过滤规则、数据 入库规则、主题与独立处理类的对应关系以及主题与Hbase数据库表 的对应关系。

其中较优的,所述put对象入库Hbase数据库包括:通过调用所述 Hbase数据库的入库框架提供的接口将所述数据通过所述入库框架完 成数据写入Hbase数据库。

其中较优的,所述构建put对象的过程包括:根据所述配置文件 的规则,对所述数据再次进行序列化,构建成入库所述Hbase数据库 所需的对象类型。

其中较优的,所述将put对象入库Hbase数据库包括如下步骤:

S51:根据所述队列得到表名,根据所述表名分配操作线程组、 表对象组以及和所述操作线程组、表对象组一一对应的缓冲区;

S52:所述操作线程组处理接收到的数据,并将接收到的数据写 入对应缓冲区中;

S53:获取所述缓冲区中的数据以及对应的表对象组,按顺序将 所述数据分配给对应的表对象组进行数据写入。

其中较优的,所述将数据写入对应缓冲区中是采用队列的形式进 行写入。

其中较优的,所述表对象组进行数据写入采用并行多队列的形 式。

另一方面,本发明还提供了一种针对kafka的Hbase数据库的入库 系统,包括:

数据采集单元,用于收集kafka集群内所有主题的数据,并保存 在队列中;

配置单元,用于在所述配置文件中配置主题和所述数据中的分隔 符、过滤规则、入库规则的对应关系;

序列化过滤单元,用于根据所述对应关系,将所述数据序列化, 并对所述数据进行过滤;

对象构建单元,用于读取所述配置文件中的配置信息,创建与 Hbase数据库的连接,并将所述数据构建成put对象;

数据入库单元,用于将所述put对象入库Hbase数据库。

其中较优的,如果个别主题不适用于通用入库方式,则该系统还 包括:独立处理单元,所述独立处理单元通过反射获取针对对应主题 的独立处理类来对所述主题的数据进行处理。

通过采用本发明所提供的针对分布式消息队列的Hbase数据库的 入库方法和系统,解决了针对kafka中不同topic需要进行单独处理的 问题,通过通用的方式对kafka集群内的不同类型数据进行消费并统 一存储入Hbase数据库,另外,采用多线程多Htable对象以及生产者消 费者模式极大的提高了数据写入的效率,提升了输出带宽,最大限度 利用了机器的网络和磁盘性能。通过双队列设计,最大限度保证了数 据的安全,避免了数据的丢失。

附图说明

通过参考附图会更加清楚的理解本发明的特征和优点,附图是示 意性的而不应理解为对本发明进行任何限制,在附图中:

图1示出了本发明针对kafka的Hbase数据库的入库方法流程示意 图;

图2示出了本发明针对kafka的Hbase数据库的详细入库流程示意 图。

具体实施方式

下面将结合附图对本发明的实施例进行详细描述。

如图1所示,本发明提供了一种针对kafka(分布式消息队列)的 Hbase数据库的入库方法,其特征在于,包括如下步骤:

一种针对kafka的Hbase数据库的入库方法,其特征在于,包括如下 步骤:

S1:收集kafka集群内所有topic(主题)的数据,并保存在队列 中;

S2:在配置文件中配置topic和所述数据中的分隔符、过滤规则、 入库规则的对应关系;

S3:根据所述对应关系,将所述数据序列化,并对所述数据进行 过滤;

S4:读取所述配置文件中的配置信息,创建与Hbase数据库的连 接,并将所述数据构建成put对象;

S5:将所述put对象入库所述Hbase数据库。

如图2所示,具体的,S1:通过统一的数据接收端,统一收集kafka 集群内所有topic(主题)的数据,配置文件制定topic列表,将数据保 存在队列中。统一数据采集端,采用队列的形式收集kafka集群内所 有topic的数据,用以保证数据顺畅的生产和消费,并在Hbase数据库 压力过高或者其他情况下,缓存一部分数据。

S2:在配置文件中配置topic和所述数据中的分隔符、过滤规则、 入库规则的对应关系。

S3:然后数据经过序列化程序,根据步骤S2中的对应关系,将步 骤S1收集到的数据序列化为LIST(数组),读取配置文件中topic和相 关过滤规则的对应关系,对收集的数据进行过滤。例如,数据开头有 标识符的,不存入数据库,则要进行过滤,每种topic不一样,也要进 行过滤。

S4:读取配置文件中的配置信息,topic与表名,rowkey(行键) 规则,列族相关配置信息,创建与Hbase数据库的连接,将过滤后的 数据构建成put对象。其中,基于配置文件定义整个入库流程,通过 配置文件可以配置包括数据过滤规则、数据入库规则、topic与单独处 理类对应关系、topic与Hbase表对应关系,入库框架的生产者线程组 内线程个数,Htable对象个数等。采用自定义语法定义配置文件中过 滤规则和入库规则,保证了kafkatopic写入数据库的通用性。配置文 件示例:

数据过滤:topicname:up,‘CAPACITY’,g

Topicname:3,r

数据入库:topicname:test,self,[1,2,3:family,column], [4,5,6:family,column]

规则解读:数据过滤语法topicname:数据位置,具体字符串, 规则(g过滤r作为rowkey,c作为column名)

入库语法:topicname:表名,rowkey规则(self自行生产,by 数据中提取),数据位置与family,column对应关系。

其中,构建put对象的过程就是通过上述配置文件的规则第二次 对数据进行序列化,将数据构建成入库Hbase数据库所需的对象类型。

S5:将数据构建成put对象后,将put对象批量入库Hbase数据库。 具体的,采用通用数据写入逻辑,通过调用所述Hbase数据库高性能 入库框架接口将所述数据写入所述Hbase数据库。

其中,如果在步骤S3中过滤得到个别topic不适用于上述通用入库 方式或者在步骤S1中收集的个别topic不适用于上述通用入库方式,还 可以通过提供一个工厂类,通过反射能够获取针对对应topic的独立处 理类对数据进行处理。独立处理类必须能够实现程序中提供的顶级接 口,该接口定义了独立处理类必须包含的数据处理方法。

其中,所述将put对象入库Hbase数据库包括如下步骤:

S51:根据步骤S1的数据列表得到表名,从线程池中领取线程, 根据所述表名分配操作线程组、Htable对象组(表对象组)以及和所 述操作线程组、Htable对象组一一对应的缓冲区;Htable对象组为消 费者线程组,消费者线程组和操作线程组为一一对应关系,每个消费 者线程组中每个线程有若干个Htable对象。其中,高性能入库框架中, 分为多个线程组,每个线程组和要写入的表名是一一对应关系,也就 是说,如果有5个topic要写入3个表,就有3个线程组,每个线程组中 包含若干个线程,每个线程处理一部分数。

S52:所述操作线程组处理接收到的数据,并将接收到的数据写 入对应缓冲区中;其中,所述将数据写入对应缓冲区中可以是采用队 列的形式进行写入。队列数量和线程是一一对应的。使用队列保障了 Hbase数据库压力过大写不进去或者其他情况下的数据安全性,是本 发明中的第二个队列。本发明采用双重队列设计,在流程开始以及流 程结束的时候都有队列的存在,以保证本发明在数据输入以及数据输 出中对数据进行缓存,以防止异常情况的数据丢失,最大限度保障了 数据安全,

S53:获取所述缓冲区中的数据以及对应的Htable对象组,具体 的,从入库处理线程池中获取线程,从Htable对象池中获得Htable对 象,按顺序将所述数据分配给对应的Htable对象组进行数据写入,通 过将表名与线程组、对象组、缓冲区一一对应,保证了数据快速写入, 极大的提高了输出带宽,最大限度利用服务器网络和硬盘IO带宽,提 高了写入速度。其中,所述Htable对象组进行数据写入可以采用并行 多队列的形式。在最后输出端使用并行多队列,匹配多线程以及多 Htable模式,保证了并行最大化输出数据,提升性能。

另一方面,采用上述的入库方法,本发明还提供了一种kafka的 Hbase数据库的入库系统,包括:

数据采集单元,用于收集kafka集群内所有主题的数据,并保存 在队列中;

配置单元,用于在所述配置文件中配置主题和所述数据中的分隔 符、过滤规则、入库规则的对应关系;

序列化过滤单元,用于根据所述对应关系,将所述数据序列化, 并对所述数据进行过滤;

对象构建单元,用于读取所述配置文件中的配置信息,创建与 Hbase数据库的连接,并将所述数据构建成put对象;

数据入库单元,用于将所述put对象入库Hbase数据库。

其中,如果个别主题不适用于通用入库方式,则该系统还包括: 独立处理单元,所述独立处理单元通过反射获取针对对应主题的独立 处理类来对所述主题的数据进行处理。

通过采用本发明所提供的针对分布式消息队列的Hbase数据库的 入库方法和系统,解决了针对kafka中不同topic需要进行单独处理的 问题,通过通用的方式对kafka集群内的不同类型数据进行消费并统 一存储入Hbase数据库,另外,采用多线程多Htable对象以及生产者消 费者模式极大的提高了数据写入的效率,提升了输出带宽,最大限度 利用了机器的网络和磁盘性能。通过双队列设计,最大限度保证了数 据的安全,避免了数据的丢失。

虽然结合附图描述了本发明的实施方式,但是本领域技术人员可 以在不脱离本发明的精神和范围的情况下做出各种修改和变型,这样 的修改和变型均落入由所附权利要求所限定的范围之内。

去获取专利,查看全文>

相似文献

  • 专利
  • 中文文献
  • 外文文献
获取专利

客服邮箱:kefu@zhangqiaokeyan.com

京公网安备:11010802029741号 ICP备案号:京ICP备15016152号-6 六维联合信息科技 (北京) 有限公司©版权所有
  • 客服微信

  • 服务号