1.介绍
使用多Tornado的都知道,在Tornado中所有的操作都建议采用异步的形式,
但是实际情况是很多的操作并没有异步版本, 比如现在要解决的就是MySQL的同步阻塞问题.
通过调查别家大牛的处理方案, 个人觉得最简单,对代码修改程度最小的当属:
线程池+MySQL连接池的方案.
2.方案概述
使用这种方案,核心有下面几点:
- 借助tornado.concurrent.run_on_executor装饰器将需要丢入到线程池的任务装饰起来
- 实现一个线程池,必须提供提交任务的submit方法
- 实现一个MySQL连接池,保证各个MySQL请求之间数据互不干涉
- 在handler中使用yield和tornado.web.asynchronous保证异步执行
3.run_on_executor代码解析
直接看代码:
def run_on_executor(args, **kwargs):
def run_on_executor_decorator(fn):
executor = kwargs.get(“executor”, “executor”)
io_loop = kwargs.get(“io_loop”, “io_loop”)
@functools.wraps(fn)
def wrapper(self, args, kwargs):
callback = kwargs.pop(“callback”, None)
future = getattr(self, executor).submit(fn, self, *args, kwargs)
if callback:
getattr(self, io_loop).add_future(
future, lambda future: callback(future.result()))
return future
return wrapper
# 省去无关代码
return run_on_executor_decorator
我觉得重要的几个点:
- 装饰器会从装饰的方法的实例中寻找线程池实例(也就是获取executor)
- 装饰器会将装饰的方法提交到线程池中运行, 而submit方法会返回一个Future对象
- 如果装饰的方法提供了callback参数, 那么会在任务执行结束后执行callback函数,
具体如何执行,下面会介绍 - 在调用被装饰的方法时, 会返回一个Future对象,
这满足了tornado的tornado.web.asynchronous装饰器要求 - 当需要在任务处理完成时处理一些额外逻辑, 必须在参数中传入callback,
但是确保自己已经在类中设置了io_loop,如果需要在程序结束时处理返回的值,
则必须要在传入的参数中增加callback参数.
当callback参数存在时执行的逻辑为:将future加入到io_loop中去,io_loop中add_future的用法是:
在future的完成回调中增加一个方法,该方法会将callback增加到io_loop的主循环待处理callback队列中.
4.ThreadPoolExecutor的submit方法
使用线程池时,不用重复造轮子,可以直接使用ThreadPoolExecutor来构造线程池,
需要注意的是Python2.7下需要使用 pip install futures 来安装.
接下来看submit的代码:
def submit(self, fn, args, *kwargs):
with self._shutdown_lock:
if self._shutdown:
raise RuntimeError(‘cannot schedule new futures after shutdown’)
f = _base.Future()
w = _WorkItem(f, fn, args, kwargs)
self._work_queue.put(w)
self._adjust_thread_count()
return f
功能很简单,就是将任务包装成一个_WorkItem, 然后放入到工作队列中去, 值得一提的是,
这个工作队列来自Queue.Queue,本身就是线程安全的,
这里主要关注的是_base.Future()构造出来的对象,需要关注它里面的一个方法
(当传入了callback时会调用到它):
def add_done_callback(self, fn):
with self._condition:
if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]:
self._done_callbacks.append(fn)
return
fn(self)
也就是如果这个Future已经完成/取消了,则会执行传入的回调函数,否则加入到完成回调函数列表中.
5.MySQL连接池
由于项目使用的是torndb, 因此写一个轻量级的MySQL连接池逻辑很简单, 代码如下:
class MysqlConnPool(object):
def init(self, host, database, user, pwd, max_conns=30):
self.idle_conn = Queue.Queue()
self.pool_size = 0
self.max_conns = max_conns
self.conn_params = (host, database, user, pwd)
self.poll_size_mutex = threading.Lock()
def _get_conn_from_pool(self):
if self.idle_conn.empty() and self.pool_size < self.max_conns:
conn = torndb.Connection(*self.conn_params, time_zone="+8:00")
self.poll_size_mutex.acquire()
self.pool_size += 1
self.poll_size_mutex.release()
return conn
return self.idle_conn.get()
def query(self, *args, **kwargs):
conn = self._get_conn_from_pool()
res = conn.query(*args, **kwargs)
self.idle_conn.put(conn)
return res
# 封装其它方法
使用时只要在系统初始化的时候把连接池初始化好, 后面数据库的一些处理接口都封装好了,
直接调用就可以了.
- 关于异常处理:交由外层逻辑判断,内部不做判断
- 关于析构:可以在线程池退出时断开所有的连接
使用
使用run_on_executor装饰后,就可以将被修饰的方法当做普通的异步方法使用,
使用时可以使用yield tornado.gen.Task() 结合 tornado.gen.coroutine的方法实现同步代码异步效果.