在进行服务拆分的时候,不同服务之间使用HTTP协议来进行数据交互是最常见的一种解决方案。
之前我在做数据层和业务层分离时,采取的方案是双层Tornado+异步HTTP请求的方式进行数据交互,今天心血来潮,写了下脚本测试服务之间的异步HTTP调用,使用netstat 命令发现TIME_WAIT状态特别多。
原因很简单,异步HTTP调用使用的是tornado的AsyncHTTPClient,它会在每次请求的时候重新建立一个链接,而不是复用之前的连接。
就此开启我的优化之旅…
异步优化
在查看了网上一些讨论之后,发现有个大牛提出在针对同一个地址进行频繁的HTTP请求时的解决方案:使用长连接,同时还给出了Tornado下的使用方式。
之前一直以为Tornado中的异步HTTP只有AsyncHTTPClient,经过大牛指点,发现还有一个叫做tornado.curl_httpclient.CurlAsyncHTTPClient的异步HTTP库,这个库的功能就很强大了,可以在请求时保持长连接,极大地减少TIME_WAIT状态。
在正式使用时候要注意,max_clients参数不要太小,也别太大,在我的机器上测试,max_clients=128是比较中肯的配置。
简易RPC封装
之前服务对外暴露自己接口的方式完全跟写API接口一样:
- 实现一个基于RequestHandler的业务Handler
- 设置路径映射
这样暴露出来的接口使用很不方便,使用时必须知道该服务这个接口的具体路径,扩展性也很差,今天在写代码的过程中想到了celery处理task的方式,啊哈,装饰器!!仔细想了下,通过合理的设计,完全可以记得减少业务开发方和业务调用方的工作量。
服务提供端设计
本着半吊子Pythonic精神,我设想的完成后的RPC框架这样工作:
- 用户调用装饰器装饰需要提供成服务的方法,不需要额外定义路径映射
- 服务提供端定义一个唯一的RPC接口,所有的服务都通过该接口访问
- 框架内部进行方法映射
废话不多说,上代码:
RpcHandler(RequestHandler): _methods = {} def post(self): obj = json.loads(self.request.body) func_name = obj['fun'] args = obj['args'] kwargs = obj['kwargs'] f = self._methods.get(func_name, None) if not f: self.set_status(502, 'no func found err') return result = f(*args, **kwargs) self.write(json.dumps({'ret': result})) @classmethod def inject_rpc(cls, func): '''used for inject a function in class to rpc framework.''' cls._methods[func.__name__] = partial(func) @classmethod def rpc(cls, func): cls._methods[func.__name__] = func @wraps(func) def y(*args, **kwargs): print args, kwargs return func(*args, **kwargs) return y
使用方法:
@RpcHandler.rpc def pure_function(): pass class X(): def function_in_class(self): pass x = X() RpcHandler.inject_rpc(x.function_in_class)
有瑕疵的地方就是这个装饰器无法作用于类方法,因为类方法需要将self也传入,但是在调用装饰器时并没有实例创建,因此增加了一个inject_rpc方法曲线救国…
这里有意思的是partial函数,使用partial函数时,它会自动将实例绑定成方法的第一个参数,具体原理还需要研究下。
最后,若要服务生效,一定要将其加到application的handlers配置中去。
客户端设计
客户端设计成只要知道方法名就可以调用的方式,上代码:
class RpcClient(object): def __init__(self, host): self.host = host def remote_call(self, func, callback, *args, **kwargs): req = HTTPRequest(url=self.host, method='POST', body=json.dumps({ 'fun': func.__name__, 'args': args, 'kwargs': kwargs })) cli = tornado.curl_httpclient.CurlAsyncHTTPClient(max_clients=128) cli.fetch(req, callback=partial(self.process_rpc_ret, callback)) def process_rpc_ret(self, cb, response): if not response.error: obj = json.loads(response.body) cb(obj['ret']) else: print response.code, response.reason cb(None)
使用方法:
cli = RpcClient("localhost:8080/rpc") def mycallback(ret): pass cli.remote_call(X.function_in_class, mycallback)