消息总线扩展之集成Thrift-RPC

本文主要探讨了消息总线支持Thrift RPC的实现过程。鉴于RabbitMQ官方的Java
Client提供了基于RabbitMQ的JSON-RPC,消息总线也顺道提供了JSON-RPC的API。然后也尝试了为消息总线增加对Thrift-RPC的扩展支持,希望此举能让消息总线同时为SOA提供基础设施。

Thrift简介

Thrift是一个跨语言的服务部署框架,最初由Facebook于2007年开发,2008年进入Apache开源项目。Thrift通过一个中间语言(IDL,
接口定义语言)来定义RPC的接口和数据类型,然后通过一个编译器生成不同语言的代码(目前支持C++,Java, Python,PHP, Ruby, Erlang,
Perl, Haskell, C#, Cocoa, Smalltalk和OCaml),并由生成的代码负责RPC协议层和传输层的实现。

初衷

做这件事情的初衷是RabbitMQ可以用于模拟request/response这样的通信模型,而这个通信模型就是通常C/S以及B/S架构的通信模型。并且因为RPC的流行,其官方java
client已经提供了对基于JSON(文本协议)的RPC的实现。而Thrift本身是个RPC框架,提供跨语言、多协议、多传输通信机制的实现。如果能将两者衔接起来,消息总线对RPC的支持无疑更加完善。

思路

Thrift的实现是基于类似TCP/IP的多层协议栈模型。它的特点是对等通信,逻辑分离,分层解耦。如下图:

在协议层,目前Thrift支持众多的协议,这些协议大致分为两类:

  1. 二进制协议
  2. 文本协议(以XML、JSON为代表)

Thrift同样支持众多的通信传输机制:Socket,IOStream,HttpClient,file,memory-input,Nonblocking等,由于实现类太多,此处就不上类图了。

由于其实现是分层解耦并面向接口的,因此任意的通信协议都可以与任意的通信传输机制结合起来形成一种特定的RPC实现!也正是得益于这种设计,使得我们可以实现一种新的通信传输机制而不用对整个库大动干戈。

因此,如果我们需要实现基于RabbitMQ的通信机制,是否意味着我们只需要在C/S两端简单得实现通信传输层就可以了呢。准确地说,RPC的客户端实现(服务消费者)是这样的,但RPC的服务端(服务提供者)的实现却不仅仅只是实现一个通信传输层就可以的。因为服务器作为一个应答者(Responser),其本身必须被构建为一个Process
Loop(简单地理解就是个while(true))。因此在服务端的实现,还需要维护基于MQ Message的处理循环。大致示意图如下:

现状

思路有了,但现状是我们需要接入的不仅仅是RabbitMQ而是消息总线(消息总线是基于RabbitMQ封装而成)。所以我们在验证可行性的时候还是要分两步走:第一用RabbitMQ的消息机制,是否能够实现Thrift的C/S两端通信;第二是否能够很好得适配消息总线的Proxy的机制并提供合适的API。

实现

客户端实现

RPC的客户端相对来说比较简单,因此我们先从客户端下手。客户端这边主要任务就是将方法的调用信息根据在协议层转换后得到的二进制请求数据发送到目标队列,然后阻塞等待获取响应后,将二进制响应数据再根据客户端所用的协议进行还原,这是两个相反的过程,一个是自上向下,一个是自底向上,如果你理解TCP/IP协议栈,那么这里应该很容易理解。

得益于RabbitMQ官方提供的Java
Client,它提供了一个RPClient类,可以直接以阻塞的方式发起一个request请求,并获取数据。这是我们在客户端通过RabbitMQ的java
client收发数据的主要接口。现在我们需要适配Thrift,只要重新实现Thrift的通信层的客户端接口:TTransport即可。
Thrift默认提供了很多客户端通信技术的实现,而这些实现类都必须实现TTransport这一基接口,该接口的类图如下:

该接口定义了输入、输出、打开、关闭等接口方法。而衔接输入输出的关键方法是flush,其大致的实现如下:

public void flush() throws TTransportException {
        byte[] data = this.reqMsgStream.toByteArray();
        this.reqMsgStream.reset();

        byte[] responseData = new byte[0];
        try {
            responseData = this.client.primitiveRequest(secret, target, data, token, timeout);
        } catch (Exception e) {
            ExceptionHelper.logException(logger, e, "[flush]");
        }
        this.respMsgStream = new ByteArrayInputStream(responseData);
    }

可以看到它先从输出流中获取请求数据,然后调用总线的消息发送请求,并阻塞等待响应数据,最后基于响应数据构建输入流。

就RPC的客户端而言,我们只需要实现基于消息总线的通信传输机制即可。下面我们来看一下客户端以消息总线作为通信机制后的使用代码:

public void testThriftRpc() throws Exception {
        TTransport transport = new TAMQPClientTransport(this.client,
                                                        "kliwhiduhaiucvarkjajksdbfkjabw",
                                                        "emapDemoRpcResponse",
                                                        "klasehnfkljashdnflhkjahwlekdjf",
                                                        10000);
        transport.open();
        TProtocol protocol = new TJSONProtocol(transport);
        CalcService.Client client = new CalcService.Client(protocol);
        int result = client.calcSum();
        logger.info(result);
        transport.close();
    }

大致来看层次还是比较清晰的,构建的流程是从低层向上层的构建方式:

首先构建一个客户端的传输对象,这里我们注入消息总线的客户端,用于在内部处理客户端跟服务器端的通信。

然后open客户端的传输对象,如果客户端确实需要在通信之前或之后做些事情,可以override
open/close这一方法对,因为确实有些通信机制是需要打开或回收一些资源的,不过如果你不需要在open/close方法里做额外操作,也请在编码时写上这两句,就当作它是模式代码吧。

最后构建一个协议处理器,接着将协议处理器传递给IDL生成的客户端代码。在使用完之后可以关闭传输对象。在这里作为额外的一个步骤:如果你不再需要使用消息总线的客户端,请归还该对象。

服务器端实现

通过之前对实现机制探讨,我们已经知道对于服务端的实现,我们除了需要给出以消息总线作为通信传输机制的实现,还需要一个基于消息总线构建的处理循环。如客户端一样,
RabbitMQ 官方java
client已经提供了一个RpcServer的基础实现,我们干脆以此作为RPC服务端的实现原型(没必要重复造轮子),只沿用thrift的协议处理相关的组件。也就是说我们需要构建一个listen某个队列的consumer,并hlod住它,然后再嵌入Thrift处理器以及协议处理的代码。

这次我们先来看看服务端的使用代码,然后再去看它的实现方式:

MessagebusSinglePool singlePool = new MessagebusSinglePool(host, port);;
        Messagebus client = singlePool.getResource();

        //server code
        WrappedRpcServer rpcServer = null;
        try {
            TProcessor processor = new CalcService.Processor(new CalcServiceImpl());
            TProtocolFactory inProtocolFactory = new TJSONProtocol.Factory();
            TProtocolFactory outProtocolFactory = new TJSONProtocol.Factory();
            rpcServer = client.buildRpcServer("mshdfjbqwejhfgasdfbjqkygaksdfa",
                new ThriftMessageHandler(processor, inProtocolFactory, outProtocolFactory));

            rpcServer.mainLoop();
        } finally {
            rpcServer.close();
            singlePool.returnResource(client);
            singlePool.destroy();
        }

它直接替换了Thrift提供的Server实现(跟客户端通信一样thrift同样提供了多个Server的实现),而是采用了基于RabbitMQ构建的一个RPCServer,然后启动一个event
loop将其block住,等待客户端的请求,并进行处理。

消息总线提供了一个API:buildRpcServer,它构建出一个封装后的WrappedRpcServer。该WrappedRpcServer封装了上文提到的RabbitMQ
java
client自带的RpcServer。为什么要封装?主要的原因还是信息隐藏。其实,如果只接入RabbitMQ,不封装是没有问题的。但现在的目的是接入消息总线,而消息总线在RabbitMQjava
client之上又封装了一层,屏蔽了一些不必要的设置,而这些设置恰好又是构建这个RpcServer类的实例所必备的参数(比如 queue
name,channel等)。

因此这里我们基于组合的方式将RpcServer从WrappedRpcServer类的构造器注入进来,使得WrappedRpcServer成为RpcServer的代理,WrappedRpcServer的构造器访问标识符被设置为private,这是因为我们在消息总线内部构建了RpcServer的实例,然后通过反射机制来构造WrappedRpcServer的实例。看代码:

public WrappedRpcServer buildRpcServer(String secret, final IRpcMessageProcessor rpcMsgProcessor) {
        Node source = this.getContext().getConfigManager().getNodeView(secret).getCurrentQueue();
        try {
            RpcServer aServer = new RpcServer(this.getContext().getChannel(), source.getValue()) {

                @Override
                public byte[] handleCall(QueueingConsumer.Delivery request, AMQP.BasicProperties replyProperties) {
                    return rpcMsgProcessor.onRpcMessage(request.getBody());
                }

            };

            Constructor<WrappedRpcServer> rpcServerConstructor = WrappedRpcServer.class.getDeclaredConstructor(RpcServer.class);
            rpcServerConstructor.setAccessible(true);
            WrappedRpcServer wrappedRpcServer = rpcServerConstructor.newInstance(aServer);
            rpcServerConstructor.setAccessible(false);

            return wrappedRpcServer;

buildRpcServerAPI需要的参数除了有一个用于自身标识的secret,还有一个rpc消息处理器接口:IRpcMessageProcessor。它的定义如下:

public interface IRpcMessageProcessor {

    public byte[] onRpcMessage(byte[] in);

}

它的主要作用是给Thrift RPC(或后续可能接入的其他RPC框架)提供一个跟消息总线衔接的入口。它拥有一个输入作为参数,每个RPC
Server在其内部处理请求并将结构作为输出参数返回。可以看到,这个接口没有对第三方产生任何依赖,而且输入参数跟输出参数都是byte[],因此它可以适配任何类似Thrift这样的RPC框架(当然前提是这些RPC框架都能像thrift具有这么好的扩展性)。

有了输入跟输出,就类似有了一个“管道”,中间的拦截处理过程就可以嵌入thrift
RPC的第三方处理代码。这里针对thrift,我们实现了一个ThriftMessageHandler,它展示了thrift如何无缝衔接消息总线:

public byte[] onRpcMessage(byte[] inMsg) {
        InputStream in = new ByteArrayInputStream(inMsg);
        OutputStream out = new ByteArrayOutputStream();
        TTransport transport = new TIOStreamTransport(in, out);
        TProtocol inProtocol = inProtocolFactory.getProtocol(transport);
        TProtocol outProtocol = outProtocolFactory.getProtocol(transport);

        try {
            processor.process(inProtocol, outProtocol);
            return ((ByteArrayOutputStream) out).toByteArray();
        } catch (TException e) {
            ExceptionHelper.logException(logger, e, "onRpcMessage");
            throw new RuntimeException(e);
        } finally {
            transport.close();
        }
    }

首先它将方法参数作为输入,基于该输入构建一个输入流,然后构造一个空的输出流。通过输入、输出流可以构建出服务端的TTransport对象(它是thrift通信传输的基础也是,thrift协议栈的最底层)。接下来构造用于对输入解码以及对输出编码的协议处理对象。然后传入thrift的核心处理对象:processor进行处理,产生的原始输出作为方法返回。

上面讲解了thrift的服务端处理逻辑以及thrift通过IRpcMessageProcessor无缝接入了消息总线,那么我们还没有提到这段代码是在什么时候以及怎么被触发调用的(也就是IRpcMessageProcessor怎么衔接消息总线这端的消息事件的)?这是通过覆盖RpcServer的消息处理代码来实现的。见代码:

RpcServer aServer = new RpcServer(this.getContext().getChannel(), source.getValue()) {

                @Override
                public byte[] handleCall(QueueingConsumer.Delivery request, AMQP.BasicProperties replyProperties) {
                    return rpcMsgProcessor.onRpcMessage(request.getBody());
                }

            };

我们在构建RpcServer实例的时候,覆盖了消息的处理方式,使其触发对IRpcMessageProcessor的onRpcMessage方法的调用。该方法的调用就衔接起了thrift的代码逻辑,如果这里的RPC框架不是thrift,那么它也可以用于衔接其他的RPC框架。

写在最后

如同thrift自身提供的多种通信传输方式,将消息总线作为thrift的通信机制使得基于thrift的RPC的通信方式多了一种可能。

排队处理&负载均衡

基于MQ的RPC传输机制的好处是,由于队列本身具有的缓冲性质,可以在应对高并发的时候保证请求不丢失(事实上针对web请求,在面对高并发时常用的一种机制也是排队处理);另外MQ提供的分布式集群前面加个HAProxy就能提供很好的负载均衡。

吞吐量低

当然可以肯定是协议越轻(数据量越少),通信的处理方式越简单,RPCServer处理的请求越快,单位时间处理的请求数就越多,吞吐量也就越高。最终的测试结果显示也表明了这一点,在单线程的情况下,基于消息总线的Thrift-RPC比基于Socket的Thrift-RPC慢了一个数量级(一台非专用的普通PC机,QPS为300左右)。通过对基于消息总线的Thrift的JSON-RPC与基于消息总线的原生提供的JSON-RPC测试对比后,发现两者的测试结果类似,因此可以推测主要的瓶颈还是在基于RabbitMQ的传输上。因为首先AMQP协议面向的场景主要是高可用性的场景,所以其服务端的实现略复杂,并且协议数据比直接socket的裸数据大不少,另外无论是服务端还是客户端都存在着成对的拆包和封包的过程,这些都拉低了基于消息总线的RPC的性能表现。

平衡点

性能是否是唯一的考虑因素?这是一个好问题,或者说性能是否是最关键的因素。其实很多事情都是权衡的过程,你要做的是看看能否在你关注的所有面上找到一个平衡点。一切面向通用的解决方案都不是最精简以及性能最好的。这一点就像那些ESB一样,它们的性能表现也同样比一台提供restful
API的web server差了很多。

原文发布时间为:2015-04-24

本文作者:vinoYang

本文来自合作伙伴CSDN博客,了解相关信息可以关注CSDN博客。

时间: 2017-11-21

消息总线扩展之集成Thrift-RPC的相关文章

消息总线扩展之面向消息的数据集成

最近一段时间,我在琢磨消息总线除了能进行受管控的消息通信之外,还有哪些可以扩展的方向.这篇文章我们来探讨一下面向消息的数据集成是否可以作为一种尝试方向. 相关技术简介 XML 谈到XML我们的第一映像就是用它来做各种配置,当然如果你是Javaer,那么可能你印象最深的就是Spring的bena配置了.其实,XML的用途远不止充当配置文件这一方面.它还被广泛应用于异构系统集成.数据集成.语义/协议转换等等方面,甚至成为构建平台非常重要的基石.虽然XML一直以来被人诟病其解析效率低下以及数据量太冗余

消息总线扩展之主动转发

问题简述 消息总线目前为Java编程语言提供了SDK,同时针对其他语言提供了一个称之为httpBridge的http代理.这基本可以满足大部分主流编程语言对消息总线的使用需求,但这也仅仅是对技术层面上的需求的满足.在业务层面上,尤其是面对老的业务系统的适配一直都是个难题,这篇文章谈谈面对一个在线上运行的业务系统,如何使得引入消息总线的总体成本尽可能得低. 就消息总线的两种使用方式而言,无论是SDK的方式还是httpBridge的方式,都需要往第三方系统引入对消息总线的依赖,这些依赖包括但不仅限于

消息总线重构之简化客户端

这段时间对消息总线进行了再次重构.本次重构主要针对消息总线的pubsub组件以及对client的简化,同时谈谈对消息总线的一些想法. 简化client的复杂度 之前的client需要同时连接两个分布式组件.消息总线的访问需要用户提供pubsuberHost,pubsuberPort参数,因此它首先连接的就是pubsuber.而消息总线是基于RabbitMQ构建的,因此它必然还需要连接RabbitMQ.而之所以没有需要用户程序提供RabbitMQ Server的地址信息,是因为它是通过pubsub

消息总线优化之PubSub

近段时间,暂缓了消息总线feature的开发,花了部分时间对原先的pubsub机制进行了针对性的优化与重构.这里记录一下优化的过程以及相比原来的设计有哪些改观. PubSub在消息总线内部的作用 PubSub在消息总线内部主要用于对所有在线客户端进行实时管控的作用.每个客户端在使用消息总线时,都"被迫"注册到PubSub上,并"被迫"订阅了一些Channel,以便消息总线管控台实时下发一些管控指令及时生效. 之前的设计回顾 这里有必要回顾一下之前的设计.消息总线内部

消息总线重构之EventBus

最近花了不少时间对消息总线进行了重构.重构的重点是在消息总线中加入了Guava的EventBus,并应用于以下两个场景: (1)改进广播通知 (2)业务逻辑串联,用事件驱动替代责任链模式 EventBus简介 EventBus是Google的开源项目Guava里的一个组件,有兴趣的人可以看我前不久的一篇博文解读.总得来说,EventBus是观察者模型的实现,利用它你既可以实现观察者模型的业务场景,还可以基于它的事件驱动机制来实现应用程序内组件之间的解耦与通信. 改进广播通知 广播通知是消息总线提

谈消息总线的路由模型

最近在写一个基于RabbitMQ的消息总线.虽然RabbitMQ提供了plugin的机制可以实现对其进行扩展,但是由于对erlang语言不熟,考虑到上手成本高,因此放弃实现plugin,转而基于Smart client + 树形拓扑路由的模型.当然这也大大降低了我们实现功能的灵活性,后面我会找个时间开篇新文章,谈谈Smart Client的限制. 预备知识 RabbitMQ对于消息的通信只提供了几个非常简单的API:Channel#basicPublish:Channel#basicConsum

消息总线授权设计

我曾在之前的一篇文章中对比过消息队列跟消息总线.它们其中的一个不同点就是:消息总线更关注通信安全,消息总线可以管控通信双方,对通信的管控是建立在授权的基础上.因此授权模型的设计是消息总线必须考虑的问题.所谓的授权,就是校验通信双方有没有建立可信任的通信关系.这篇文章我们来谈谈消息总线的权限设计. 消息总线使用场景及RabbitMQ通信简介 在介绍授权设计之前,我们先了解一些必要信息.通常我们将消息总线应用于以下这些场景: 缓冲类--自生产自消费 解耦类.异步类--生产者消费者模型 服务调用类(R

springcloud(九):配置中心和消息总线(配置中心终结版)

我们在springcloud(七):配置中心svn示例和refresh中讲到,如果需要客户端获取到最新的配置信息需要执行refresh,我们可以利用webhook的机制每次提交代码发送请求来刷新客户端,当客户端越来越多的时候,需要每个客户端都执行一遍,这种方案就不太适合了.使用Spring Cloud Bus可以完美解决这一问题. Spring Cloud Bus Spring cloud bus通过轻量消息代理连接各个分布的节点.这会用在广播状态的变化(例如配置变化)或者其他的消息指令.Spr

分享一个分布式消息总线,基于.NET Socket Tcp的发布-订阅框架,附代码下载

一.分布式消息总线      在很多MIS项目之中都有这样的需求,需要一个及时.高效的的通知机制,即比如当使用者A完成了任务X,就需要立即告知使用者B任务X已经完成,在通常的情况下,开发人中都是在使用者B所使用的程序之中写数据库轮循代码,这样就会产品一个很严重的两个问题,第一个问题是延迟,轮循机制要定时执行,必须会引起延迟,第二个问题是数据库压力过大,当进行高频度的轮循会生产大量的数据库查询,并且如果有大量的使用者进行轮循,那数据库的压力就更大了.      那么在这个时间,就需要一套能支持发布