起因
上线了2天的hunter-server在线上除了问题,最后看日志的时候发现出现了Lost connection to MySQL server during query。经过一番查询之后,知道是wait_timeout的问题,mysql默认的wait_timeout是8小时,也就是28800秒。从一次建立mysql连接开始,如果8小时之内不做任何操作,那么此次连接会自动过期销毁,导致后面无法进行正常的sql操作,出现Lost connection to MySQL server during query的异常。
mysql> show global variables like '%wait%_timeout';
+--------------------------+-------+
| Variable_name | Value |
+--------------------------+-------+
| innodb_lock_wait_timeout | 50 |
| table_lock_wait_timeout | 50 |
| wait_timeout | 3 |
+--------------------------+-------+
3 rows in set (0.00 sec)
我手动设置成了3秒,为了测试方便,设置命令为
set global wait_timeout=3;
按照官网的写法写了一个demo,
class Person(HunterModel):
"""
id : id
username: 用户名
password: 密码
"""
username = TextField(default='')
password = TextField(default='')
class Meta:
database = MysqlManage.get_database()
其中MysqlManage.get_database()是一个单例写的获取mysql连接池
class MysqlManage(object):
_instance_lock = threading.Lock()
__database = None
@classmethod
def get_database(cls, refresh=False):
"""
:param refresh:
:return:
"""
with MysqlManage._instance_lock:
mysql_config = json.loads(get_config()['mysql'])
if refresh or MysqlManage.__database is None:
MysqlManage.__database = PooledMySQLDatabase(database=mysql_config["database"], host=mysql_config['host'],
port=int(mysql_config['port']), user=mysql_config['user'],
passwd=mysql_config['password'], max_connections=1, stale_timeout=300)
return MysqlManage.__database
经过一番查询,在这里看到了解释 https://github.com/coleifer/peewee/issues/961,每次都要connection, close,wtf?感觉有点浪费资源,不过用连接池管理要稍微好些,看到issues最后的
kagan94 commented on 8 Dec 2016
I solved it in the following way:
def refresh_db_connection(fn):
# It's fix to avoid error "MySQL was gone away".
# Here we check whether our current db connection is accessible or not (if not, refresh it)
db.get_conn().ping(True)
@refresh_db_connection
def register_nickname(nickname=""):
pass
Just handled the decorator to refresh the connection if it was closed.
定期去维持连接,保证当前connection有sql操纵。尝试写了一个demo
schedule.every(1).seconds.do(ping)
while True:
schedule.run_pending()
def ping():
"""
为了mysql的wait_timeout不超时,可以执行一个定时任务为每个mysql连接
:return:
"""
# print("ping..")
data = MysqlManage.get_database()
print("ping" + str(data))
data.execute_sql("SELECT 1")
另外一个正常执行
def sqlTest(num):
for i in range(0, 20):
try:
print(i)
time.sleep(5)
Task.get(Task.id == 1)
print("success")
except Exception as e:
print(e)
将mysql的wait_timeout设置为3秒,ping函数为1秒维持连接,sqlTest模拟sql操作,间隔5秒执行。但是还是报一样错。两个连接不同
mysql> show full processlist;
+------+------+---------------------+------+---------+------+-------+-----------------------+
| Id | User | Host | db | Command | Time | State | Info |
+------+------+---------------------+------+---------+------+-------+-----------------------+
| 1042 | root | localhost | NULL | Query | 0 | NULL | show full processlist |
| 1115 | root | 10.10.134.151:63939 | test | Sleep | 1 | | NULL |
+------+------+---------------------+------+---------+------+-------+-----------------------+
mysql> show full processlist;
+------+------+---------------------+------+---------+------+-------+-----------------------+
| Id | User | Host | db | Command | Time | State | Info |
+------+------+---------------------+------+---------+------+-------+-----------------------+
| 1042 | root | localhost | NULL | Query | 0 | NULL | show full processlist |
| 1130 | root | 10.10.134.151:64006 | test | Sleep | 0 | | NULL |
+------+------+---------------------+------+---------+------+-------+-----------------------+
最后选择妥协,参考下官方connect,close用法,决定自己用装饰器实现一个在每次数据库操作都进行connect和close
@classmethod
def close_database(cls, func):
"""关闭连接
:param cls:
:param func:
:return:
"""
def wapper(*args, **kwargs):
try:
MysqlManage.get_database().connect()
return func(*args, **kwargs)
except Exception as e:
traceback.print_exc(e)
finally:
if not MysqlManage.get_database().is_closed():
MysqlManage.get_database().close()
return wapper
bow
后面还有坑,这里记录下,在每个执行sql的地方,这样使用
@MysqlManage.close_database
def get_tasks(cls, *args):
if not args: # 为空的情况返回所有
return Task.select()
return Task.select().where(*args)
这样是可以正常关闭的,假设调用代码如下,调用get_tasks
@classmethod
def test_tasks(cls, *args):
cls.get_tasks(*args)
执行完test_task,连接是closed的,但是很多场景下,必然都是要获取结果然后操作的
@classmethod
def test_tasks(cls, *args):
tasks = cls.get_tasks(*args)
for task in tasks:
do .....
执行完test_tasks,发现连接根本没有closed。这里有个坑爹的地方,我调试源码好久才发现,原来每次遍历或者从tasks取结果的时候,才会执行sql语句。什么意思,Task.select()没有去执行sql语句,当你将返回值进行遍历或者取值操作的时候才会执行,所以很好理解第二个test_tasks在cls.get_tasks(*args)之后是关闭的,但是遍历的时候要执行sql语句啊(类似这种),
('SELECT `t1`.`id`, `t1`.`url_id`, `t1`.`task_id`, `t1`.`info`, `t1`.`path`, `t1`.`payload`, `t1`.`imp_version`, `t1`.`error`, `t1`.`repair`, `t1`.`type`, `t1`.`chinese_type`, `t1`.`description`, `t1`.`level` FROM `vulnerability` AS `t1` WHERE (`t1`.`task_id` = %s)', [2])
所有连接状态又变成了open。知道这个之后只需要在每次遍历完之后再close连接即可。
对于需要遍历操作的地方修改代码如下
@classmethod
@MysqlManage.close_database
def test_tasks(cls, *args):
tasks = cls.get_tasks(*args)
for task in tasks:
do .....
当然最后再贴下完整的代码吧
class MysqlManage(object):
_instance_lock = threading.Lock()
__database = None
@classmethod
def get_database(cls, refresh=False):
"""
单例多线程模式获取db对象
:param refresh:
:return:
"""
with MysqlManage._instance_lock:
mysql_config = get_config()['mysql']
if refresh or MysqlManage.__database is None:
MysqlManage.__database = PooledMySQLDatabase(database=mysql_config["database"],
host=mysql_config['host'],
port=int(mysql_config['port']), user=mysql_config['user'],
passwd=mysql_config['password'], max_connections=mysql_config["max_connections"],
stale_timeout=mysql_config["stale_timeout"])
return MysqlManage.__database
@classmethod
def close_database(cls, func):
"""关闭连接
:param cls:
:param func:
:return:
"""
def wapper(*args, **kwargs):
try:
MysqlManage.get_database().connect()
except Exception:
traceback.print_exc(file=open(EXCEPTION_LOG_PATH, 'a'))
finally:
try:
return func(*args, **kwargs)
except Exception:
pass
finally:
MysqlManage.get_database().close()
return wapper
测试代码如下
for i in range(100):
test_tasks(Task.id == 2)
print(MysqlManage.get_database()._state.closed)