本发明涉及kafka持久化技术领域,具体是一种kafka中新增topic的消费数据自动持久化方法。
背景技术:
kafka是一种支持分布式消息存储和消费系统,其构架如图1所示。kafkacluster(集群)包含多个服务器,服务器节点称为broker。kafka以topic(类别)作为基础逻辑单元对消息进行存储、消费等操作。为了线性提高吞吐率,kafka在物理上将topic分成多个partition(分区),每个partition通过创建一定数目的副本并分散存储在不同broker上以支持容错。kafka通过集成分布式协调服务zookeeper实现元数据存储以及集群管理等功能,从而支持集群的动态水平扩展。基于kafka的消息定发布订阅服务系统,注册通过的发布方创建topic,订阅方订阅该topic,发布方往该topic中发布消息。
现有的kafka数据持久化,是在项目配置文件中预先设定topic列表,并在数据库中创建好对应的存储表。producer向列表中的某一topic中发布消息后,consumer消费该消息的消费数据能够保存到对应的存储表中。
针对新增的topic,只能人工修改项目配置文件并创建存储表,才能使得consumer消费该topic中消息的消费数据得以保存到对应的存储表中,即不能灵活控制topic及其对应的数据库存储表。
技术实现要素:
针对现有kafka持久化技术不能自动对新增topic进行持久化的技术缺陷,本发明提供一种kafka中新增topic的消费数据自动持久化方法。
本发明保护一种kafka中新增topic的消费数据自动持久化方法,包括以下步骤:
步骤1,人工在kafka服务后台提供的管理页面上创建表及配置表中字段;
步骤2,kafka服务后台根据管理页面上填写的表名向kafka集群发布topic创建指令;
步骤3,kafka集群根据topic创建指令创建topic,数据库中自动生成对应存储表;
步骤4,kafka服务后台针对新增topic创建对应的consumer;
步骤5,当kafka服务后台接收归属于该新增topic的消息时,根据消息来源创建对应的producer,并将消息发布到其该新增topic中;
步骤6,该新增topic的consumer监听到消息后,消费消息,产生消费数据;
步骤7,kafka服务后台保存该消费数据,并将其存储到该新增topic对应的存储表中。
优选的,管理页面提供除提供创建topic、创建表及配置表中字段的接口外,还提供删除topic及其对应数据库存储表的接口。
本发明还保护一种基于上述kafka中新增topic的消费数据自动持久化方法的消息定发布订阅服务系统。
本发明在管理页面提供新增/删除topic对应的数据库存储表的功能,实现topic及其对应的数据库存储表的灵活控制;新增topic的consumer在消费该topic中的消息时,可以自动将消费数据保存至数据库对应的存储表中。
附图说明
图1为kafka中新增topic的消费数据自动持久化方法流程图。
具体实施方式
下面结合附图和具体实施方式对本发明作进一步详细的说明。本发明的实施例是为了示例和描述起见而给出的,而并不是无遗漏的或者将本发明限于所公开的形式。很多修改和变化对于本领域的普通技术人员而言是显而易见的。选择和描述实施例是为了更好说明本发明的原理和实际应用,并且使本领域的普通技术人员能够理解本发明从而设计适于特定用途的带有各种修改的各种实施例。
实施例1
一种kafka中新增topic的消费数据自动持久化方法,如图1所示,包括以下步骤:
步骤1,人工在kafka服务后台提供的管理页面上创建表及配置表中字段;
步骤2,kafka服务后台根据管理页面上填写的表名向kafka集群发布topic创建指令;
步骤3,kafka集群根据topic创建指令创建topic,数据库中自动生成对应存储表;
步骤4,kafka服务后台针对新增topic创建对应的consumer;
步骤5,当kafka服务后台接收归属于该新增topic的消息时,根据消息来源创建对应的producer,并将消息发布到其该新增topic中;
步骤6,该新增topic的consumer监听到消息后,消费消息,产生消费数据;
步骤7,kafka服务后台保存该消费数据,并将其存储到该新增topic对应的存储表中。
为了更加灵活地控制topic及其对应的数据库存储表,可以在管理页面提供新增和删除topic和对应数据库存储表的接口。
显然,所描述的实施例仅仅是本发明的一部分实施例,而不是全部的实施例。基于本发明中的实施例,本领域及相关领域的普通技术人员在没有作出创造性劳动的前提下所获得的所有其他实施例,都应属于本发明保护的范围。
1.一种kafka中新增topic的消费数据自动持久化方法,其特征在于,包括以下步骤:
步骤1,人工在kafka服务后台提供的管理页面上创建表及配置表中字段;
步骤2,kafka服务后台根据管理页面上填写的表名向kafka集群发布topic创建指令;
步骤3,kafka集群根据topic创建指令创建topic,数据库中自动生成对应存储表;
步骤4,kafka服务后台针对新增topic创建对应的consumer;
步骤5,当kafka服务后台接收归属于该新增topic的消息时,根据消息来源创建对应的producer,并将消息发布到其该新增topic中;
步骤6,该新增topic的consumer监听到消息后,消费消息,产生消费数据;
步骤7,kafka服务后台保存该消费数据,并将其存储到该新增topic对应的存储表中。
2.根据权利要求1所述的kafka中新增topic的消费数据自动持久化方法,其特征在于,管理页面提供删除topic及其对应数据库存储表的接口。
3.一种基于权利要求1或2所述的kafka中新增topic的消费数据自动持久化方法的消息定发布订阅服务系统。
技术总结