2018-7-24-peewee 连接池踩坑日记

起因

上线了2天的hunter-server在线上除了问题,最后看日志的时候发现出现了Lost connection to MySQL server during query。经过一番查询之后,知道是wait_timeout的问题,mysql默认的wait_timeout是8小时,也就是28800秒。从一次建立mysql连接开始,如果8小时之内不做任何操作,那么此次连接会自动过期销毁,导致后面无法进行正常的sql操作,出现Lost connection to MySQL server during query的异常。

mysql> show global variables like '%wait%_timeout';
+--------------------------+-------+
| Variable_name            | Value |
+--------------------------+-------+
| innodb_lock_wait_timeout | 50    |
| table_lock_wait_timeout  | 50    |
| wait_timeout             | 3     |
+--------------------------+-------+
3 rows in set (0.00 sec)

我手动设置成了3秒,为了测试方便,设置命令为

set global wait_timeout=3;

按照官网的写法写了一个demo,

class Person(HunterModel):
    """
    id : id
    username: 用户名
    password: 密码
    """
    username = TextField(default='')
    password = TextField(default='')

    class Meta:
        database = MysqlManage.get_database()

其中MysqlManage.get_database()是一个单例写的获取mysql连接池

class MysqlManage(object):
    _instance_lock = threading.Lock()

    __database = None

    @classmethod
    def get_database(cls, refresh=False):
        """
        :param refresh: 
        :return: 
        """
        with MysqlManage._instance_lock:
            mysql_config = json.loads(get_config()['mysql'])
            if refresh or MysqlManage.__database is None:
                MysqlManage.__database = PooledMySQLDatabase(database=mysql_config["database"], host=mysql_config['host'],
                                               port=int(mysql_config['port']), user=mysql_config['user'],
                                               passwd=mysql_config['password'], max_connections=1, stale_timeout=300)
            return MysqlManage.__database

经过一番查询,在这里看到了解释 https://github.com/coleifer/peewee/issues/961,每次都要connection, close,wtf?感觉有点浪费资源,不过用连接池管理要稍微好些,看到issues最后的

kagan94 commented on 8 Dec 2016
I solved it in the following way:

def refresh_db_connection(fn):
    # It's fix to avoid error "MySQL was gone away".
    # Here we check whether our current db connection is accessible or not (if not, refresh it)
    db.get_conn().ping(True)

@refresh_db_connection
def register_nickname(nickname=""):
    pass
Just handled the decorator to refresh the connection if it was closed.

定期去维持连接,保证当前connection有sql操纵。尝试写了一个demo

schedule.every(1).seconds.do(ping)
while True:
    schedule.run_pending()
def ping():
    """
    为了mysql的wait_timeout不超时,可以执行一个定时任务为每个mysql连接
    :return: 
    """
    # print("ping..")
    data = MysqlManage.get_database()
    print("ping" + str(data))
    data.execute_sql("SELECT 1")

另外一个正常执行

def sqlTest(num):
    for i in range(0, 20):
        try:
            print(i)
            time.sleep(5)
            Task.get(Task.id == 1)
            print("success")
        except Exception as e:
            print(e)

将mysql的wait_timeout设置为3秒,ping函数为1秒维持连接,sqlTest模拟sql操作,间隔5秒执行。但是还是报一样错。两个连接不同

mysql> show full processlist;
+------+------+---------------------+------+---------+------+-------+-----------------------+
| Id   | User | Host                | db   | Command | Time | State | Info                  |
+------+------+---------------------+------+---------+------+-------+-----------------------+
| 1042 | root | localhost           | NULL | Query   |    0 | NULL  | show full processlist |
| 1115 | root | 10.10.134.151:63939 | test | Sleep   |    1 |       | NULL                  |
+------+------+---------------------+------+---------+------+-------+-----------------------+

mysql> show full processlist;
+------+------+---------------------+------+---------+------+-------+-----------------------+
| Id   | User | Host                | db   | Command | Time | State | Info                  |
+------+------+---------------------+------+---------+------+-------+-----------------------+
| 1042 | root | localhost           | NULL | Query   |    0 | NULL  | show full processlist |
| 1130 | root | 10.10.134.151:64006 | test | Sleep   |    0 |       | NULL                  |
+------+------+---------------------+------+---------+------+-------+-----------------------+

最后选择妥协,参考下官方connect,close用法,决定自己用装饰器实现一个在每次数据库操作都进行connect和close

@classmethod
def close_database(cls, func):
    """关闭连接
    :param cls: 
    :param func: 
    :return: 
    """

    def wapper(*args, **kwargs):
        try:
            MysqlManage.get_database().connect()
            return func(*args, **kwargs)
        except Exception as e:
            traceback.print_exc(e)
        finally:
            if not MysqlManage.get_database().is_closed():
                MysqlManage.get_database().close()

    return wapper

bow

后面还有坑,这里记录下,在每个执行sql的地方,这样使用

@MysqlManage.close_database
def get_tasks(cls, *args):
    if not args:  # 为空的情况返回所有
        return Task.select()
    return Task.select().where(*args)

这样是可以正常关闭的,假设调用代码如下,调用get_tasks

@classmethod
def test_tasks(cls, *args):
	cls.get_tasks(*args)

执行完test_task,连接是closed的,但是很多场景下,必然都是要获取结果然后操作的

@classmethod
def test_tasks(cls, *args):
	tasks = cls.get_tasks(*args)
	for task in tasks:
		do .....

执行完test_tasks,发现连接根本没有closed。这里有个坑爹的地方,我调试源码好久才发现,原来每次遍历或者从tasks取结果的时候,才会执行sql语句。什么意思,Task.select()没有去执行sql语句,当你将返回值进行遍历或者取值操作的时候才会执行,所以很好理解第二个test_tasks在cls.get_tasks(*args)之后是关闭的,但是遍历的时候要执行sql语句啊(类似这种),

('SELECT `t1`.`id`, `t1`.`url_id`, `t1`.`task_id`, `t1`.`info`, `t1`.`path`, `t1`.`payload`, `t1`.`imp_version`, `t1`.`error`, `t1`.`repair`, `t1`.`type`, `t1`.`chinese_type`, `t1`.`description`, `t1`.`level` FROM `vulnerability` AS `t1` WHERE (`t1`.`task_id` = %s)', [2])

所有连接状态又变成了open。知道这个之后只需要在每次遍历完之后再close连接即可。

对于需要遍历操作的地方修改代码如下

@classmethod
@MysqlManage.close_database
def test_tasks(cls, *args):
	tasks = cls.get_tasks(*args)
	for task in tasks:
		do .....

当然最后再贴下完整的代码吧

class MysqlManage(object):
    _instance_lock = threading.Lock()

    __database = None

    @classmethod
    def get_database(cls, refresh=False):
        """
        单例多线程模式获取db对象
        :param refresh: 
        :return: 
        """
        with MysqlManage._instance_lock:
            mysql_config = get_config()['mysql']
            if refresh or MysqlManage.__database is None:
                MysqlManage.__database = PooledMySQLDatabase(database=mysql_config["database"],
                                                             host=mysql_config['host'],
                                                             port=int(mysql_config['port']), user=mysql_config['user'],
                                                             passwd=mysql_config['password'], max_connections=mysql_config["max_connections"],
                                                             stale_timeout=mysql_config["stale_timeout"])
            return MysqlManage.__database

    @classmethod
    def close_database(cls, func):
        """关闭连接
        :param cls: 
        :param func: 
        :return: 
        """

        def wapper(*args, **kwargs):
            try:
                MysqlManage.get_database().connect()
            except Exception:
                traceback.print_exc(file=open(EXCEPTION_LOG_PATH, 'a'))
            finally:
                try:
                    return func(*args, **kwargs)
                except Exception:
                    pass
                finally:
                        MysqlManage.get_database().close()

        return wapper

测试代码如下

for i in range(100):
	test_tasks(Task.id == 2)
	
print(MysqlManage.get_database()._state.closed)