Kafka低阶消费程序消息积压监控方法及相关设备与流程

    专利2022-07-08  101


    本说明书一个或多个实施例涉及kafka消费程序领域,尤其涉及kafka低阶消费程序中kafka低阶消费程序消息积压监控。



    背景技术:

    低阶的kafka消费程序由程序本身维护消费记录,通过获取消费信息记录和消息最大记录可以得知当前的消息积压量,通过最大消息偏移量减去消费信息记录的偏移量,得到的差值就是消费程序当前的消费积压量,用于监控程序运行状态,如何合理记录消费信息成为程序设计和监控消息积压的考虑因素。

    目前有些kafka低阶消费程序每处理一条消息就进行一次消费记录,消费记录过于频繁,当消息量每秒上千或上万时,对程序和记录设备都会造成非常大的压力。



    技术实现要素:

    有鉴于此,本说明书一个或多个实施例的目的在于提出一种kafka低阶消费程序消息积压监控方法及相关设备,以解决消费记录过于频繁,给程序和记录设备压力过大的问题。

    本说明书一个或多个实施例提供了一种kafka低阶消费程序消息积压监控方法,该方法包括如下:

    由kafka低阶消费程序分批获取kafka消息,并通过kafka公共接口从kafka集群获取最大消息偏移量;

    对于所获取的每一批kafka消息,由kafka低阶消费程序执行下列操作:在该批kafka消息被消费完毕后,记录该批kafka消息的最大偏移量;

    将所获取的最大消息偏移量与所记录的最大偏移量之差作为所述kafka低阶消费程序的实时消息积压量;

    当所述实时消息积压量超过预设的积压量阈值时,发出告警信息。

    本说明书一个或多个实施例提供了一种kafka低阶消费程序消息积压监控方法还包括记录处理完毕的消息的哈希值以进行判重:

    当所获取的每一批kafka消息中的消息被处理时,通过哈希算法计算得到该消息的哈希值,并将该哈希值作为该消息的处理标识予以记录;

    当要处理一个目标kafka消息时,通过哈希算法计算得到该目标kafka消息的目标哈希值;

    将该目标哈希值与各个已记录的所述处理标识进行比较,若所述比较的结果指示所述目标哈希值与至少一个已记录的所述处理标识一致,则确定不处理所述目标kafka消息。

    基于同一发明构思,本说明书一个或多个实施例还提供了一种kafka低阶消费程序消息积压监控装置,根据其实现的功能,可以分为以下模块:

    获取模块,分批获取kafka消息,并记录每一批消息的最大偏移量;

    监控模块,对于所述获取模块获取的每一批kafka消息,执行下列操作:在该批kafka消息被消费完毕后,记录该批kafka消息的最大偏移量;将所述获取模块获取的最大消息偏移量与所记录的最大偏移量之差作为所述kafka低阶消费程序的实时消息积压量;当所述实时消息积压量超过预设的积压量阈值时,发出告警信息;

    标识模块,当所述获取模块获取的每一批kafka消息中的消息被处理时,通过哈希算法计算得到该消息的哈希值,并将该哈希值作为该消息的处理标识予以记录;

    判重模块,当要处理一个目标kafka消息时,通过哈希算法计算得到该目标kafka消息的目标哈希值,并将该目标哈希值与所述标识模块已记录的各个所述处理标识进行比较,若所述比较的结果指示所述目标哈希值与至少一个已记录的所述处理标识一致,则确定不处理所述目标kafka消息。

    基于同一发明构思,本说明书一个或多个实施例还提出一种电子设备,包括存储器、处理器及存储在存储器上并可在处理器上运行的计算机程序,其特征在于,所述处理器执行所述程序时实现上述的kafka低阶消费程序消息积压监控方法。

    基于同一发明构思,本说明书一个或多个实施例还提供了一种存储介质,为一种非暂态计算机可读存储介质,所述非暂态计算机可读存储介质存储计算机指令,所述计算机指令用于使所述计算机执行上述kafka低阶消费程序消息积压监控方法。

    从上面所述可以看出,本说明书一个或多个实施例提供的一种kafka低阶消费程序消息积压监控方法及相关设备,在每一批次几十万条消息中只需进行一次消费记录,大大减轻了kafka消费程序和记录源的压力。

    附图说明

    为了更清楚地说明本说明书一个或多个实施例或现有技术中的技术方案,下面将对实施例或现有技术描述中所需要使用的附图作简单地介绍,显而易见地,下面描述中的附图仅仅是本说明书一个或多个实施例,对于本领域普通技术人员来讲,在不付出创造性劳动的前提下,还可以根据这些附图获得其他的附图。

    图1为本说明书一个或多个实施例实现kafka低阶消费程序消息积压监控方法的流程图;

    图2为本说明书一个或多个实施例中kafka低阶消费程序处理消息时判重的流程图;

    图3为本说明书一个或多个实施例中kafka低阶消费程序消息积压监控装置的示意图;

    图4为本说明书一个或多个实施例中实现kafka低阶消费程序消息积压监控的电子设备示意图。

    具体实施方式

    为使本公开的目的、技术方案和优点更加清楚明白,以下结合具体实施例,并参照附图,对本公开进一步详细说明。

    需要说明的是,除非另外定义,本说明书一个或多个实施例使用的技术术语或者科学术语应当为本公开所属领域内具有一般技能的人士所理解的通常意义。“包括”或者“包含”等类似的词语意指出现该词前面的元件或者物件涵盖出现在该词后面列举的元件或者物件及其等同,而不排除其他元件或者物件。

    如背景技术所述,在现有的kafka低阶消费程序中,每处理一条消息就会进行一次消费记录,在消息量每秒达到成千或上万时,对kafka消费程序本身和记录消费记录的消息库都会造成很大压力。

    为此,本说明书一个或多个实施例提出了一种kafka低阶消费程序监控方法,通过设置时间间隔分批获取kafka消息;当一批kafka消息消费完毕后,记录每一批次消息中最后一个消息的偏移量即为每一批次消息的最大偏移量;使用通过kafka公共接口获取的消息的最新偏移量减去每一批次消息的最大偏移量,得到的差值即为每一批次消息消费完毕后的积压量;同时由技术人员设置积压量阈值,当积压量超出积压量阈值时,由kafka消费程序主动向技术人员发送告警邮件。同时,由于需要处理的消息在每一批次的消息中的数量是不确定的,需要根据业务需求预设筛选条件进行筛选,并记录每一个处理完毕的消息的哈希值;消息处理过程中,计算待处理消息的哈希值与处理完毕的消息的哈希值对比,两者不一致即证明待处理消息还没有被处理,可以继续进行处理,否则,就跳过该待处理消息进行下一个待处理消息的验证和处理。这种方法大大降低程序和记录源的压力,也满足了对kafka消费程序的监控需求。此外,对于正在处理消息的哈希值与已处理过的消息的哈希值做比对,保证了每条需要处理的消息有且仅有一次被处理。

    参考图1,本说明书一个或多个实施例表达的技术方案的步骤如下所述。

    步骤s101、由kafka程序分批获取kafka消息,并通过kafka公共接口从kafka集群获取最大消息偏移量。

    本步骤中,分批获取kafka消息的时间间隔可设置为1-10秒,使每一批次的消息数量控制在几万至几十万条;在kafka消费程序中,消息的偏移量随着消息数量的增加也会累加,每一条消息都对应一个偏移量。获取的最大消息偏移量就是最新的消息偏移量。

    步骤s102、对于所获取的每一批kafka消息,由kafka低阶消费程序执行下列操作:在该批kafka消息被消费完毕后,记录该批kafka消息的最大偏移量。

    本步骤中,记录的最大偏移量就是该批消息中最后一个消息的偏移量。

    步骤s103、将所获取的最大消息偏移量与所记录的最大偏移量之差作为所述kafka低阶消费程序的实时消息积压量。

    步骤s104、当所述实时消息积压量超过预设的积压量阈值时,发出告警信息。

    本步骤中,由技术人员根据实际情况设置积压量阈值,当实时消息积压量超过阈值,就会向技术人员通过发送告警邮件的方式提醒技术人员进行处理,一般的处理方式就是为kafka低阶消费程序划分更多的计算资源。

    本说明书一个或多个实施例提供的kafka低阶消费程序监控方法中,新一批次的kafka消息的消费记录会覆盖上一批次的消费记录。

    本说明书一个或或多个实施例还提出了一种对待处理消息进行判重的方法,参考图2,该方法的步骤如下所述。

    步骤s201、当所获取的每一批kafka消息中的消息被处理时,通过哈希算法计算得到该消息的哈希值,并将该哈希值作为该消息的处理标识予以记录。

    本步骤中,根据业务需求由技术人员预设筛选条件从每一批次的kafka消息中筛选得到待处理消息,同时计算被消费消息的哈希值并存放在远程字典服务(redis)中的set类中。

    步骤s202、当要处理一个目标kafka消息时,通过哈希算法计算得到该目标kafka消息的目标哈希值。

    步骤s203、将该目标哈希值与各个已记录的所述处理标识进行比较,若所述比较的结果指示所述目标哈希值与至少一个已记录的所述处理标识一致,则确定不消费所述目标kafka消息。

    本方法的使用场景如下,当一个kafka低阶消费程序在消费某一批数据时突然停止,导致处理消息的过程被打断。重新启动该程序时,会重新筛选这一批次数据得到待处理消息;由于已经记录下了被处理消息的哈希值,通过将待处理消息的哈希值与记录在set类中的哈希值对比,从而避免对已经处理过的消息重复处理。

    需要说明的是,本说明书一个或多个实施例的方法可以由单个设备执行,例如一台计算机或服务器等。本实施例的方法也可以应用于分布式场景下,由多台设备相互配合来完成。在这种分布式场景的情况下,这多台设备中的一台设备可以只执行本说明书一个或多个实施例的方法中的某一个或多个步骤,这多台设备相互之间会进行交互以完成所述的方法。

    上述对本说明书特定实施例进行了描述。其它实施例在所附权利要求书的范围内。在一些情况下,在权利要求书中记载的动作或步骤可以按照不同于实施例中的顺序来执行并且仍然可以实现期望的结果。另外,在附图中描绘的过程不一定要求示出的特定顺序或者连续顺序才能实现期望的结果。在某些实施方式中,多任务处理和并行处理也是可以的或者可能是有利的。

    基于同一发明构思,与上述任意实施例方法相对应的,本说明书一个或多个实施例还提供了一种kafka低阶消费程序消息积压监控装置,参考图3,可根据其功能分为以下模块。

    获取模块301,分批获取kafka消息,并记录每一批消息的最大偏移量。

    监控模块302,对于所述获取模块获取的每一批kafka消息,执行下列操作:在该批kafka消息被消费完毕后,记录该批kafka消息的最大偏移量;将所述获取模块获取的最大消息偏移量与所记录的最大偏移量之差作为所述kafka低阶消费程序的实时消息积压量;当所述实时消息积压量超过预设的积压量阈值时,发出告警信息。

    标识模块303,当所述获取模块获取的每一批kafka消息中的消息被处理时,通过哈希算法计算得到该消息的哈希值,并将该哈希值作为该消息的处理标识予以记录。

    判重模块304,当要处理一个目标kafka消息时,通过哈希算法计算得到该目标kafka消息的目标哈希值,并将该目标哈希值与所述标识模块已记录的各个所述处理标识进行比较,若所述比较的结果指示所述目标哈希值与至少一个已记录的所述处理标识一致,则确定不处理所述目标kafka消息。

    为了描述的方便,描述以上装置时以功能分为各种模块分别描述。当然,在实施本说明书一个或多个实施例时可以把各模块的功能在同一个或多个软件和/或硬件中实现。

    上述实施例的装置用于实现前述实施例中相应的kafka低阶消费程序消息积压监控方法,并且具有相应的方法实施例的有益效果,在此不再赘述。

    基于同一发明构思,与上述任意实施例方法相对应的,本说明书一个或多个实施例还提供了一种电子设备,包括存储器、处理器及存储在存储器上并可在处理器上运行的计算机程序,所述处理器执行所述程序时能够实现kafka低阶消费程序消息积压监控方法。

    图4示出了本实施例所提供的一种更为具体的电子设备硬件结构示意图,该设备可以包括:处理器1010、存储器1020、输入/输出接口1030、通信接口1040和总线1050。其中处理器1010、存储器1020、输入/输出接口1030和通信接口1040通过总线1050实现彼此之间在设备内部的通信连接。

    处理器1010可以采用通用的cpu(centralprocessingunit,中央处理器)、微处理器、应用专用集成电路(applicationspecificintegratedcircuit,asic)、或者一个或多个集成电路等方式实现,用于执行相关程序,以实现本说明书实施例所提供的技术方案。

    存储器1020可以采用rom(readonlymemory,只读存储器)、ram(randomaccessmemory,随机存取存储器)、静态存储设备,动态存储设备等形式实现。存储器1020可以存储操作系统和其他应用程序,在通过软件或者固件来实现本说明书实施例所提供的技术方案时,相关的程序代码保存在存储器1020中,并由处理器1010来调用执行。

    输入/输出接口1030用于连接输入/输出模块,以实现信息输入及输出。输入输出/模块可以作为组件配置在设备中(图中未示出),也可以外接于设备以提供相应功能。其中输入设备可以包括键盘、鼠标、触摸屏、麦克风、各类传感器等,输出设备可以包括显示器、扬声器、振动器、指示灯等。

    通信接口1040用于连接通信模块(图中未示出),以实现本设备与其他设备的通信交互。其中通信模块可以通过有线方式(例如usb、网线等)实现通信,也可以通过无线方式(例如移动网络、wifi、蓝牙等)实现通信。

    总线1050包括一通路,在设备的各个组件(例如处理器1010、存储器1020、输入/输出接口1030和通信接口1040)之间传输信息。

    需要说明的是,尽管上述设备仅示出了处理器1010、存储器1020、输入/输出接口1030、通信接口1040以及总线1050,但是在具体实施过程中,该设备还可以包括实现正常运行所必需的其他组件。此外,本领域的技术人员可以理解的是,上述设备中也可以仅包含实现本说明书实施例方案所必需的组件,而不必包含图中所示的全部组件。

    上述实施例的电子设备用于实现前述任一实施例中相应的kafka低阶消费程序消息积压监控方法,并且具有相应的方法实施例的有益效果,在此不再赘述。

    基于同一发明构思,与上述任意实施例方法相对应的,本说明书一个或多个实施例还提供了一种存储介质,为一种非暂态计算机可读存储介质,所述非暂态计算机可读存储介质存储计算机指令,所述计算机指令用于使所述计算机执行上述kafka低阶消费程序消息积压监控方法。

    本实施例的计算机可读介质包括永久性和非永久性、可移动和非可移动媒体可以由任何方法或技术来实现信息存储。信息可以是计算机可读指令、消息结构、程序的模块或其他消息。计算机的存储介质的例子包括,但不限于相变内存(pram)、静态随机存取存储器(sram)、动态随机存取存储器(dram)、其他类型的随机存取存储器(ram)、只读存储器(rom)、电可擦除可编程只读存储器(eeprom)、快闪记忆体或其他内存技术、只读光盘只读存储器(cd-rom)、数字多功能光盘(dvd)或其他光学存储、磁盒式磁带,磁带磁磁盘存储或其他磁性存储设备或任何其他非传输介质,可用于存储可以被计算设备访问的信息。

    上述实施例的存储介质存储的计算机指令用于使所述计算机执行如上任一实施例所述的kafka低阶消费程序消息积压监控方法,并且具有相应的方法实施例的有益效果,在此不再赘述。

    所属领域的普通技术人员应当理解:以上任何实施例的讨论仅为示例性的,并非旨在暗示本公开的范围(包括权利要求)被限于这些例子;在本公开的思路下,以上实施例或者不同实施例中的技术特征之间也可以进行组合,步骤可以以任意顺序实现,并存在如上所述的本说明书一个或多个实施例的不同方面的许多其它变化,为了简明它们没有在细节中提供。

    另外,为简化说明和讨论,并且为了不会使本说明书一个或多个实施例难以理解,在所提供的附图中可以示出或可以不示出与集成电路(ic)芯片和其它部件的公知的电源/接地连接。此外,可以以框图的形式示出装置,以便避免使本说明书一个或多个实施例难以理解,并且这也考虑了以下事实,即关于这些框图装置的实施方式的细节是高度取决于将要实施本说明书一个或多个实施例的平台的(即,这些细节应当完全处于本领域技术人员的理解范围内)。在阐述了具体细节(例如,电路)以描述本公开的示例性实施例的情况下,对本领域技术人员来说显而易见的是,可以在没有这些具体细节的情况下或者这些具体细节有变化的情况下实施本说明书一个或多个实施例。因此,这些描述应被认为是说明性的而不是限制性的。

    尽管已经结合了本公开的具体实施例对本公开进行了描述,但是根据前面的描述,这些实施例的很多替换、修改和变型对本领域普通技术人员来说将是显而易见的。例如,其它存储器架构(例如,动态ram(dram))可以使用所讨论的实施例。

    本说明书一个或多个实施例旨在涵盖落入所附权利要求的宽泛范围之内的所有这样的替换、修改和变型。因此,凡在本说明书一个或多个实施例的精神和原则之内,所做的任何省略、修改、等同替换、改进等,均应包含在本公开的保护范围之内。


    技术特征:

    1.一种kafka低阶消费程序消息积压监控方法,包括:

    由kafka低阶消费程序分批获取kafka消息,并通过kafka公共接口从kafka集群获取最大消息偏移量;

    对于所获取的每一批kafka消息,由kafka低阶消费程序执行下列操作:在该批kafka消息被消费完毕后,记录该批kafka消息的最大偏移量;

    将所获取的最大消息偏移量与所记录的最大偏移量之差作为所述kafka低阶消费程序的实时消息积压量;

    当所述实时消息积压量超过预设的积压量阈值时,发出告警信息。

    2.根据权利要求1所述的方法,其中,

    当所获取的每一批kafka消息中的消息被处理时,通过哈希算法计算得到该消息的哈希值,并将该哈希值作为该消息的处理标识予以记录;

    当要处理一个目标kafka消息时,通过哈希算法计算得到该目标kafka消息的目标哈希值;

    将该目标哈希值与各个已记录的所述处理标识进行比较,若所述比较的结果指示所述目标哈希值与至少一个已记录的所述处理标识一致,则确定不处理所述目标kafka消息。

    3.根据权利要求1所述的方法,其中,分批获取kafka消息包括:以1至10秒的时间间隔分批获取所述kafka消息。

    4.根据权利要求1至3中任一项所述的方法,其中,该批kafka消息的最大偏移量为该批kafka消息中最后一个消息的偏移量。

    5.根据权利要求1至3中任一项所述的方法,其中,所述每一批kafka消息的最大偏移量被迭代记录。

    6.一种kafka低阶消费程序消息积压监控装置,包括:

    获取模块,被配置成分批获取kafka消息,并通过kafka公共接口从kafka集群获取最大消息偏移量;

    监控模块,被配置成对于所述获取模块获取的每一批kafka消息,执行下列操作:在该批kafka消息被消费完毕后,记录该批kafka消息的最大偏移量;将所述获取模块获取的最大消息偏移量与所记录的最大偏移量之差作为所述kafka低阶消费程序的实时消息积压量;当所述实时消息积压量超过预设的积压量阈值时,发出告警信息。

    7.根据权利要求6所述的装置,还包括:

    标识模块,被配置成:当所述获取模块获取的每一批kafka消息中的消息被处理时,通过哈希算法计算得到该消息的哈希值,并将该哈希值作为该消息的处理标识予以记录;

    判重模块,被配置成:当要处理一个目标kafka消息时,通过哈希算法计算得到该目标kafka消息的目标哈希值,并将该目标哈希值与所述标识模块已记录的各个所述处理标识进行比较,若所述比较的结果指示所述目标哈希值与至少一个已记录的所述处理标识一致,则确定不处理所述目标kafka消息。

    8.根据权利要求6或7所述的装置,其中,该批kafka消息的最大偏移量为该批kafka消息中最后一个消息的偏移量。

    9.一种电子设备,包括存储器、处理器及存储在所述存储器上并可由所述处理器执行的计算机程序,其中,所述处理器执行所述计算机程序时实现如权利要求1至5中任意一项所述的方法。

    10.一种非暂态计算机可读存储介质,所述存储介质上存储有计算机指令,所述计算机指令在被计算机执行时,使所述计算机实现如权利要求1至5中任一所述的方法。

    技术总结
    本说明书实施例提供了Kafka低阶消费程序消息积压监控方法及相关设备,用于在Kafka低阶消费程序中对消息积压的监控告警和处理消息过程中的判重。首先,在Kafka消费程序中根据预设的时间间隔分批获取消息,一个批次的消息消费完毕后记录这一批次消息的最大偏移量,并通过公共接口获取最新偏移量,由最新偏移量减去最大偏移量,得到的即为该批次消息消费完毕后消息的积压量;设置积压量阈值,当计算得出的积压量超出阈值时,消费程序向技术人员发出告警邮件。消息处理过程中,计算并保存处理完毕消息的哈希值;处理消息时,首先计算待处理消息的哈希值并与保存的哈希值对比,两者不一致时才会继续处理。

    技术研发人员:刘佳;安靖;胡潇涵;王毅;宋洋;崔贝贝
    受保护的技术使用者:中国人寿保险股份有限公司
    技术研发日:2020.11.16
    技术公布日:2021.03.12

    转载请注明原文地址:https://wp.8miu.com/read-17426.html

    最新回复(0)