异步RPC框架探究

在进行服务拆分的时候,不同服务之间使用HTTP协议来进行数据交互是最常见的一种解决方案。

之前我在做数据层和业务层分离时,采取的方案是双层Tornado+异步HTTP请求的方式进行数据交互,今天心血来潮,写了下脚本测试服务之间的异步HTTP调用,使用netstat 命令发现TIME_WAIT状态特别多。

原因很简单,异步HTTP调用使用的是tornado的AsyncHTTPClient,它会在每次请求的时候重新建立一个链接,而不是复用之前的连接。

就此开启我的优化之旅…

异步优化

在查看了网上一些讨论之后,发现有个大牛提出在针对同一个地址进行频繁的HTTP请求时的解决方案:使用长连接,同时还给出了Tornado下的使用方式。

之前一直以为Tornado中的异步HTTP只有AsyncHTTPClient,经过大牛指点,发现还有一个叫做tornado.curl_httpclient.CurlAsyncHTTPClient的异步HTTP库,这个库的功能就很强大了,可以在请求时保持长连接,极大地减少TIME_WAIT状态。

在正式使用时候要注意,max_clients参数不要太小,也别太大,在我的机器上测试,max_clients=128是比较中肯的配置。

简易RPC封装

之前服务对外暴露自己接口的方式完全跟写API接口一样:

  1. 实现一个基于RequestHandler的业务Handler
  2. 设置路径映射

这样暴露出来的接口使用很不方便,使用时必须知道该服务这个接口的具体路径,扩展性也很差,今天在写代码的过程中想到了celery处理task的方式,啊哈,装饰器!!仔细想了下,通过合理的设计,完全可以记得减少业务开发方和业务调用方的工作量。

服务提供端设计

本着半吊子Pythonic精神,我设想的完成后的RPC框架这样工作:

  1. 用户调用装饰器装饰需要提供成服务的方法,不需要额外定义路径映射
  2. 服务提供端定义一个唯一的RPC接口,所有的服务都通过该接口访问
  3. 框架内部进行方法映射

废话不多说,上代码:

RpcHandler(RequestHandler):
    _methods = {}

    def post(self):
        obj = json.loads(self.request.body)
        func_name = obj['fun']
        args = obj['args']
        kwargs = obj['kwargs']
        f = self._methods.get(func_name, None)
        if not f:
            self.set_status(502, 'no func found err')
            return
        result = f(*args, **kwargs)
        self.write(json.dumps({'ret': result}))

    @classmethod
    def inject_rpc(cls, func):
        '''used for inject a function in class to rpc framework.'''
        cls._methods[func.__name__] = partial(func)

    @classmethod
    def rpc(cls, func):
        cls._methods[func.__name__] = func

        @wraps(func)
        def y(*args, **kwargs):
            print args, kwargs
            return func(*args, **kwargs)

        return y

使用方法:

@RpcHandler.rpc
def pure_function():
    pass

class X():
    def function_in_class(self):
        pass
x = X()
RpcHandler.inject_rpc(x.function_in_class)

有瑕疵的地方就是这个装饰器无法作用于类方法,因为类方法需要将self也传入,但是在调用装饰器时并没有实例创建,因此增加了一个inject_rpc方法曲线救国…

这里有意思的是partial函数,使用partial函数时,它会自动将实例绑定成方法的第一个参数,具体原理还需要研究下。

最后,若要服务生效,一定要将其加到application的handlers配置中去。

客户端设计

客户端设计成只要知道方法名就可以调用的方式,上代码:

class RpcClient(object):
    def __init__(self, host):
        self.host = host

    def remote_call(self, func, callback, *args, **kwargs):
        req = HTTPRequest(url=self.host, method='POST', body=json.dumps({
            'fun': func.__name__,
            'args': args,
            'kwargs': kwargs
        }))
        cli = tornado.curl_httpclient.CurlAsyncHTTPClient(max_clients=128)
        cli.fetch(req, callback=partial(self.process_rpc_ret, callback))

    def process_rpc_ret(self, cb, response):
        if not response.error:
            obj = json.loads(response.body)
            cb(obj['ret'])
        else:
            print response.code, response.reason
            cb(None)

使用方法:

cli = RpcClient("localhost:8080/rpc")
def mycallback(ret):
    pass
cli.remote_call(X.function_in_class, mycallback)