首页> 中国专利> 自适应的实时消息订阅与发布系统及方法

自适应的实时消息订阅与发布系统及方法

摘要

本发明涉及一种自适应的实时消息订阅与发布系统及方法,该系统包括客户服务接口、原语解析器、消息管理器、订阅管理器、事件监视器、条件评估器、消息接收线程、消息发布线程、消息传送线程。本发明克服现有技术的不足,通过原语解析转换为订阅对象与触发规则从而实现有条件的消息传输与发布;通过采用事件-条件-发布形式的发布规则,实现自适应消息通信机制,且本发明在消息发布过程中针对实时发布的每条消息进行过滤,考虑客户要求的发送条件、消息的优先级等对消息进行排序后发布,节约了网络资源并提高了消息发布及订阅的灵活性、可靠性。

著录项

  • 公开/公告号CN101159711A

    专利类型发明专利

  • 公开/公告日2008-04-09

    原文格式PDF

  • 申请/专利权人 航天东方红卫星有限公司;

    申请/专利号CN200710178135.7

  • 申请日2007-11-27

  • 分类号H04L12/58(20060101);H04L12/18(20060101);

  • 代理机构11009 中国航天科技专利中心;

  • 代理人安丽

  • 地址 100094 北京市海淀区北京5616信箱

  • 入库时间 2023-12-17 19:58:27

法律信息

  • 法律状态公告日

    法律状态信息

    法律状态

  • 2022-11-04

    未缴年费专利权终止 IPC(主分类):H04L12/58 专利号:ZL2007101781357 申请日:20071127 授权公告日:20100602

    专利权的终止

  • 2010-06-02

    授权

    授权

  • 2008-06-04

    实质审查的生效

    实质审查的生效

  • 2008-04-09

    公开

    公开

说明书

技术领域

本发明涉及一种消息订阅与发布的系统及方法,属计算机通信领域。

背景技术

分布式实时应用所要求的通信模式不同于传统的对象引用模式,通过网络连接的多个控制系统或者结点需要实时地进行消息的通信,从而了解应用环境中的状态变化并及时做出反应。传统的同步对象接口调用如CORBA或者RMI是点对点的通信方式,而分布式应用需要周期性地数据通信以及时了解环境的变化,因此基于订阅/发布模型的消息通信机制非常适合这类应用系统。但是,实时消息订阅/发布系统必须解决:消息的过滤、消息传输的实时性与可靠性的均衡,即实时消息通信在满足消息的实时性的同时,必须能够根据消息的类型自动调整传输参数,来适应数据流的多种需求以及不同需求之间的均衡。

消息的订阅与发布机制是经过多年研究、已经成熟的消息通讯方式。这种传统的消息订阅与发布机制可以大大提高消息通讯的可靠性,并能很好地适应大型分布式网络体系结构。文章《基于WS-N的发布订阅消息模式的研究》(卢传富,钱兴华。计算机与数字工程,第34卷)中详细描述了的一种传统的消息订阅与发布机制,这种机制首先要保证的是消息的必达性——即首先要保证一定要把消息送达,而不考虑消息本身的实时性——即消息从其产生开始,持续一定时间间隔后,是否还具有实际意义;这就造成了以下问题:在实时系统中,一旦某个消息在传输过程中因为种种原因造成了延迟,其所在队列中的所有后续消息都会在这个延迟的基础上继续发生延迟,直到此队列的注销才结束。这些发生了延迟的消息,往往在未被送达前就已经失去了实际意义。宝贵的网络资源和消息缓冲区就这样被大量无意义的数据所占据,进入了“在延迟的基础上继续延迟”的恶性循环。

中国专利申请200410077269.6、发明名称:一种基于即时通讯平台的消息订阅方法和系统,申请人:腾讯科技(深圳)有限公司,公开了一种即时通讯的消息订阅方法。该方法也只考虑了消息的必达性,而没有考虑消息本身的实时性。

中国专利申请200510116641.4、发明名称:一种呈现信息的通知方法和系统,申请人:华为技术有限公司,公开了一种观察体的订阅策略处理流程。这个流程中每个订阅者都具有一个订阅有效期,这个有效期是指订阅者在特定时间区间内接收特定种类消息的属性,如果消息产生时间已经超越了这个特定的时间区间,该订阅者就不会收到该类消息了。这种订阅的有效性只是保证在非有效期限内,订阅者不会收到特定的消息,没有考虑实际消息发布过程中消息是否过期等情况。

中国专利申请200610106654.8、发明名称:基于会话发起协议的订阅方法及其系统和装置,申请人:华为技术有限公司,公开了另一种实现订阅者有效期的方法。这种订阅的有效性只是保证在非有效期限内,订阅者不会收到特定的消息,没有考虑实际消息发布过程中消息是否过期等情况。

中国专利申请200610034191.9、发明名称:对通讯网络中的SIP消息进行过路的方法、设备及系统,申请人:华为技术有限公司,公开了一种使用消息过滤服务器的消息过滤方法。该方法中消息过滤器的位置集中位于消息过滤服务器上,消息过滤器的这种位置需要将所有的消息都在本地代理中发布后才进行过滤,没有在消息的发布过程中对消息进行过滤,占用了大量的网络资源,且该方法中公布的过滤条件为全局统一的,灵活性不高。

中国专利申请200510135949.3、发明名称:一种具有信息消息过滤功能的移动通信系统及其方法,申请人:乐金电子(中国)研究开发中心有限公司,公开了一种使用密码标识订阅者的消息过滤方法。该方法中通过发送密码标识实现特定条件的消息过滤,即将未经订阅者确认来源的消息进行过滤,对于确定来源的消息全部进行发布,没有考虑确定来源消息中消息的过期、实时性等问题,占用了大量的网络资源,灵活性不高。

通过检索未发现国外与本发明相似的公开出版物及专利。

发明内容

本发明的技术解决问题是:克服现有技术的不足,提供一种自适应的实时消息订阅与发布系统及方法,该系统及方法通过原语解析转换为订阅对象与触发规则从而实现有条件的消息传输与发布;通过采用事件-条件-发布形式的发布规则,实现自适应消息通信机制。

本发明的技术解决方案是:自适应的实时消息订阅与发布系统,包括客户服务接口、原语解析器、消息管理器、订阅管理器、事件监视器、条件评估器、消息接收线程、消息发布线程、消息传送线程,其中:

原语解析器,接收客户发送的消息注册原语、注销原语、订阅原语和发布原语,将接收的原语解析后,将注册消息、注销消息和发布消息发送给消息管理器,将订阅消息发送给订阅管理器;

消息管理器,将接收的注册消息进行注册;将接收的注销消息进行注销;根据接收的发布消息更新发布消息列表,触发事件监测器;

消息接收线程,接收外部的订阅或发布消息,并将接收的订阅或发布消息发送给订阅管理器;

订阅管理器,根据接收的原语解析器发送的订阅消息更新订阅消息列表,将订阅消息发送给消息发布线程;接收消息接收线程发送的订阅消息,根据接收的消息接收线程发送的订阅消息更新订阅消息列表,并触发事件监测器;接收消息接收线程发送的发布消息,并根据订阅消息列表查询本地订阅者,通过客户服务接口发送相应本地订阅者;

事件监测器,根据消息管理器或订阅管理器触发的事件,将与所述事件相关的条件发送给条件评估器;

条件评估器,根据接收的事件监测器发送的条件进行评估,若条件为真,触发消息发布线程;若条件为假,中止消息发布;

消息发布线程,接收通过条件评估器评估的发布消息,或订阅管理器发送的订阅消息;给消息设定优先级,并按优先级将消息放入消息队列;

消息传送线程,从消息队列中读取消息,并发送。

所述的消息接收线程与消息传送线程之间可以进行消息确认,消息接收线程接收外部代理中消息传送线程发送的确认信息,经该确认信息发送给消息发布线程,由消息发布线程从消息队列中将确认的消息删除,若指定时间内,消息接收线程没有接收到确认信息,本地消息发布线程将消息队列中的该消息重新发送,直至消息确认或失效。

自适应的实时消息订阅与发布方法,包括消息订阅和消息发布两部分,其中:

消息发布实现过程为:

(1)将客户发送的消息发布原语进行解析,解析后更新消息发布列表,触发消息发布事件,查询该消息是否存在订阅者,若存在订阅者,转步骤(2),否则,中止该消息发布;

(2)按照订阅者定义的条件对消息进行评估,将满足条件的消息设定优先级,并缓存到消息队列,从消息队列中依顺序发布给步骤(1)中订阅者所在的代理;将不满足条件的消息中止发布;

(3)订阅者所在代理接收消息,判断消息是否需要确认,若需要确认,转步骤(4),否则,通知所述代理中的消息订阅者;

(4)订阅者所在代理对消息进行确认,并将确认消息发送给消息发布代理,消息发布代理将确认的消息从消息队列中删除;若订阅者所在代理对消息在指定时间内没有进行确认,消息发布代理将该消息重新发布,直至消息确认或失效;

消息订阅实现过程为:

(5)将客户发送的消息订阅原语进行解析,解析后更新消息订阅列表,同时将该订阅消息传送给其他结点;

(6)所有结点触发消息订阅事件,查询是否存在有效消息,若存在有效消息,转步骤(2),否则,等待步骤(1)中的消息发布。

本发明与现有技术相比有益效果为:

(1)本发明通过原语规范消息订阅与发布,并通过原语解析转换为订阅对象与触发规则,与现有技术直接进行消息订阅与发布流程控制相比,本发明可以灵活的改进、扩展消息订阅与发布机制。

(2)本发明在消息发布过程中针对实时发布的每条消息进行过滤,考虑客户要求的发送条件、消息的优先级等对消息进行排序后发布,节约了网络资源并提高了消息发布及订阅的灵活性、可靠性。

(3)本发明在对消息进行优先级设定的时候考虑消息的截止期、消息的可信度、消息的关键程度进行优先级设定,与现有技术只考虑消息的截止期或关键程度相比,更全面的考虑了各个消息发布过程中的各个方面,增强系统的过载处理能力,从而保证关键的消息在任何负载情况下都能够及时送达。

(4)本发明将较大的数据包处理成具有统一大小的数据报文,可以防止一个较长的数据包的发送影响紧急的数据包发送,出现优先级反转的现象。

附图说明

图1为本发明物理环境示意图;

图2为本发明客户与代理连接示意图;

图3为本发明系统组成示意图;

图4为本发明消息发布流程图;

图5为本发明消息订阅流程图。

具体实施方式

实施例:

如图1所示,为本发明物理环境示意图。假设物理环境中有多个结点通过网络联结,每个结点可能包含若干个传感器与致动器,这些结点之间需要互相了解环境相关的信息,即结点A通过传感器获得的环境状态信息s可能是结点B进行控制动作a所需要的,而结点B所进行的控制动作a又需要反馈到结点A,从而协调两者之间的动作。A需要发布消息s,而订阅者是结点B,同样结点B需要发布消息a,而订阅者是结点A;另外系统可能还存在其它一些相关结点如C、D等需要及时了解来自A与B的所有信息。图2为上面所述的物理环境中的单个节点客户与本地代理之间的连接示意图。

所述的本地代理即本发明的实时消息订阅与发布系统组成框图如图3所示,本发明系统包括实时消息订阅与发布服务(以下简记为RTPS)连接点、客户服务接口、原语解析器、消息管理器、订阅管理器、事件监视器、条件评估器、消息接收线程、消息发布线程、消息传送线程;RTPS连接点允许客户建立与本地代理之间的连接,从而能够实现RTPS客户与本地代理之间的双向交互,实现消息的订阅与发布。RTPS连接点支持以事件方式通知RTPS客户新的消息与相关的原语执行结果。客户服务接口负责管理本地RTPS客户与代理之间的连接,区分不同客户订阅的消息并通过相应的连接点通知客户。原语解析器,接收客户发送的消息注册原语,将接收的注册原语解析后,向消息管理器注册该消息;或接收客户发送的消息注销原语,将接收的注销原语解析后,向消息管理器注销该消息;或接收客户发送的订阅原语,将订阅原语进行解析,解析后将该订阅消息发送给订阅管理器;订阅管理器,接收原语解析器发送的订阅消息,更新本地订阅信息表,生成相应的订阅消息,并将生成的消息发送给消息发布线程,由消息发布线程将该消息送入消息队列,由消息传送线程发送给消息源结点;消息源结点处消息接收线程接收该消息,并将该消息发送给订阅管理器,由订阅管理器更新订阅消息列表,并触发事件监测器,由事件监测器根据订阅消息中的事件与条件触发消息发布,将满足事件与条件的消息发送给条件评估器,由条件评估器将满足发布条件的消息发送给消息发布线程,由消息发布线程将该消息送入消息队列,由消息传送线程发送给订阅者;当原语解析器接收的是客户发送的消息发布原语,将消息发布原语进行解析,解析后将发布消息发送给消息管理器,消息管理器更新消息发布列表后触发事件监测器,事件监测器调用条件评估器进行条件评估,将满足条件的消息发送到消息发布线程,由消息发布线程设定优先级,并将消息按优先级缓存到消息队列中,由消息传送线程从消息队列中读取消息进行发送。

消息接收线程与消息传送线程之间可以进行消息确认,消息接收线程接收外部代理中消息传送线程发送的确认信息,经该确认信息发送给消息发布线程,由消息发布线程从消息队列中将确认的消息删除,若指定时间内,消息接收线程没有接收到确认信息,本地消息发布线程将消息队列中的该消息重新发送,直至消息确认或失效。

如图4、5所示,为本发明消息发布与订阅方法流程图,方法具体实现步骤如下:

消息发布流程实现过程如下:

(1)将客户发送的消息发布原语进行解析,解析后更新消息发布列表,触发消息发布事件,查询该消息是否存在订阅者,若存在订阅者,转步骤(2),否则,中止该消息发布;

(2)按照订阅者定义的条件对消息进行评估,将满足条件的消息设定优先级,并缓存到消息队列,从消息队列中依顺序发布给步骤(1)中订阅者所在的代理;将不满足条件的消息中止发布;

(3)订阅者所在代理接收消息,判断消息是否需要确认,若需要确认,转步骤(4),否则,通知所述代理中的消息订阅者;

(4)订阅者所在代理对消息进行确认,并将确认消息发送给消息发布代理,消息发布代理将确认的消息从消息队列中删除;若订阅者所在代理对消息在指定时间内没有进行确认,消息发布代理将该消息重新发布,直至消息确认或失效;

消息订阅实现过程为:

(5)将客户发送的消息订阅原语进行解析,解析后更新消息订阅列表,同时将该订阅消息传送给其他结点;

(6)所有结点触发消息订阅事件,查询是否存在有效消息,若存在有效消息,转步骤(2),否则,等待步骤(1)中的消息发布

上面所述的本发明系统与方法中设定优先级考虑消息的截止期、消息的可信度、消息的关键程度采用优先级表方法进行设定,设定方法可参照软件学报,2004年第15卷第3期,名称“基于优先级表的实时调度算法及其实现”。

下面具体介绍本发明实现实时消息订阅与发布服务(简记为RTPS)的基础原语。

在实时消息订阅与发布系统中,每个客户程序都可以成为消息的生产者,也可以成为消息的消费者。对于消息的生产者来说,必须使用消息注册原语向系统注册消息,该原语语法如下:

Register msg_name(Type,Title,Source,Deadline,Confidence,[attribute1,attribute2,...]);

同一个消息可以由多个发布者注册,每次注册导致消息的参考计数适当增加。所有消息都具有下面的缺省属性,包括消息名称(msg_name)、消息类型(Type)、消息标题(Title)、消息的来源(Source)、消息的截止期(Deadline)、消息的可信度(Confidence)。而消息的其它属性都是用户定义的,包括消息内容及其各个子内容。

同样,当消息的生产者不再发布消息时,应该向系统注销消息,以免影响订阅者有效地获取消息。消息注销的原语语法如下:

Unregister msg_name;

每当一个消息被注销,则消息的参考计数适当减少,当参考计数为0时,则消息被注销并从消息列表中删除,即该消息已经没有来源。

实时消息订阅与发布系统采用命名方式发布数据,每个订阅对象(Tag)具有一个全局唯一的名称,标识了消息发布者与订阅者之间的一类消息。实时消息的特点是内容随着时间过去会失效,因此要求周期性地刷新,例如工厂过程控制中的反应炉温度、股市行情等等。为了实现自适应的消息订阅,首先定义两个原语:Subscribe与Publish。

订阅原语的语法如下:

Subscribe tag_name=msg_name(attribute1,attribute2,...)

On{event}

If{condition}

With{criticalness,[multicast address],[port]}

[ACK|NOACK]

[DEACTIVATE];

订阅原语表明了一个订阅对象可以订阅消息对象的部分属性,而不是全部属性,例如订阅者只对消息的标题感兴趣,则只需在属性列表中指定标题属性。订阅原语的On子句定义了消息发布的触发事件,这些事件包括MSG_REGISTER(消息被注册时)、MSG_UNREGISTER(消息被注销时)、MSG_UPDATE(消息被更新时)、MSG_SUBSCRIBED(当消息被订阅时)、MSG_LOST_VALIDITY(当消息失去有效性时)等,并且这些事件的参数可以是任意的消息。订阅原语的If子句定义了消息的过滤条件,采用逻辑表达式的形式,例如订阅者指定只有当消息的可信度大于0.8是才对这个消息感兴趣,则表达式为msg_name.Confidence>0.8,这样保证用户只是得到自己想要的消息而不是所有的消息。消息订阅原语中的With子句定义了一些与消息发布相关的属性,包括消息的重要程度criticalness表达了订阅者对这个消息的期望程度,可选参数多点播送地址与端口适用于一组订阅者,能够提高一对多与多对多的消息传输效率。ACK与NOACK选项用于设置消息发布是是否需要接收方确认,从而提供消息传输的可靠性。当一个客户对所订阅的消息不再感兴趣时,可以通过DEACTIVATE选项取消订阅,语法如下:

Subscribe tag_name DEACTIVATE;

消息发布原语用于消息发布者发布最新的消息版本,其语法如下:

Publish msg;

消息发布原语促使系统更新消息内容,产生新的消息版本,并根据定义的订阅对象进行消息发布。一个消息可以具有多个发布者,但是发布的消息可能具有不同的可信度与截止期,系统总是在使用有效截止期内可信度最高的消息。

本发明消息订阅通过解析把原语转换为订阅对象与触发规则从而实现有条件的消息传输与发布。消息的订阅与发布原语转换为下面所述的订阅对象与规则关键代码如下:

class stag_object{//订阅对象的定义

private:

//来自于Subscribe原语的对象属性定义

...attribute1;

...attribute2;

int criticality;

Char*multicastadd;

int port;

//订阅管理相关的属性

int ref_counter;//订阅的客户数

LinkedList<Subscriber>subscriberlist;//订阅客户列表

public:

publish(int parameter=ACK|NOACK);//用于消息发布的方法

subscribe(optional int parameter=DEACTIVATE);//用于消息订阅的方法

};

Define rule stag_rule//订阅规则的定义

On...

If...

Then stag_object.publish(...);

本发明未公开技术属本领域技术人员公知常识。

去获取专利,查看全文>

相似文献

  • 专利
  • 中文文献
  • 外文文献
获取专利

客服邮箱:kefu@zhangqiaokeyan.com

京公网安备:11010802029741号 ICP备案号:京ICP备15016152号-6 六维联合信息科技 (北京) 有限公司©版权所有
  • 客服微信

  • 服务号