前段时间在看高级连接池的实现,所以高级连接池就是一次构建实例的时候,他会一次性创建出指定个数的链接对象,然后会把这些链接对象放到队列里面,然后会开一个线程专门去维护他们,好让他们别怪了,或者提前知道他们怪了,会在生成一个可用的链接。 该线程也会定时的去给服务端发送hello,在redis里面是ping …. 另外,连接池最好加入一个过期时间的概念,比如10分钟内没人用,或者是不管有没有人已经用过了,我为了各种情况会把链接时间过长的链接给kill掉,关于这样的数据类型不能简单的用简单的列表队列了,需要用有序队列。
刚才已经把连接池的原理说清楚了,我跟同事以前看过360开源的atlas的连接池的那段代码,atlas在mysql集群环境中是处理proxy的角色,他做代理中间人角色中,他是会维持两个大的连接池,一个是针对用户的,还有一个是新鲜的代理。他针对队列里面的连接判断活期是相当的灵巧的,他的算法是IP Hash的算法,根据来源的IP会指定一组小连接池,所有客户端的连接池又所属于更大的集合里面,有个主线程会做存活的判断,但是主动的频率不是太高,主要还是靠被动去访问的时候捕获异常,迅速的把一个可用的连接塞入。
那咱们这里实现个简单的连接池。 另外标题中说要加入缓存过期,这怎么理解,其实就是在mysql每次执行查询的时候,我这边会往redis存入这查询的数据作为缓存,过期的时间是你指定的。 这样每次第二次时间内访问,只要时间不过期,那么数据就可以从redis里面获取。
import MySQLdb
import time
import string
import redis
class PooledConnection:
#构建连接池实例
def __init__(self, maxconnections, connstr,dbtype):
from Queue import Queue
self._pool = Queue(maxconnections) # create the queue
self.connstr = connstr
self.dbtype=dbtype
self.maxconnections=maxconnections
#根据你给数目来创建链接,并且写入刚才创建的队列里面。
try:
for i in range(maxconnections):
self.fillConnection(self.CreateConnection(connstr,dbtype))
except Exception,e:
raise e
def fillConnection(self,conn):
try:
self._pool.put(conn)
except Exception,e:
raise "fillConnection error:"+str(e)
def returnConnection(self, conn):
try:
self._pool.put(conn)
except Exception,e:
raise "returnConnection error:"+str(e)
def getConnection(self):
try:
return self._pool.get()
except Exception,e:
raise "getConnection error:"+str(e)
def ColseConnection(self,conn):
try:
self._pool.get().close()
self.fillConnection(self.CreateConnection(connstr,dbtype))
except Exception,e:
raise "CloseConnection error:"+str(e)
def CreateConnection(self,connstr,dbtype):
if dbtype=='xxx':
pass
elif dbtype=='mysql':
try:
db_conn = connstr.split("#");
#conndb=MySQLdb.connect(db=conf.mydb,host=conf.dbip,user=conf.myuser,passwd=conf.mypasswd);
conndb=MySQLdb.connect(user=db_conn[0],passwd=db_conn[1],host=db_conn[2],port=string.atoi(db_conn[3]),db=db_conn[4]);
conndb.clientinfo = 'datasync connection pool from datasync.py';
conndb.ping();
except Exception, e:
raise 'conn targetdb datasource Excepts,%s!!!(%s).'%(db_conn[2],str(e))
return None
#mysql如下创建连接池:
connstring="xiaorui.cc#xiaoru.cc#xiaorui.cc#3306#dbname";
mysqlpool=PooledConnection(10,connstring,"mysql");
#获取连接:
mysqlpool.getConnection()
-
import MySQLdb
-
import time
-
import string
-
import redis
-
-
class PooledConnection:
-
#构建连接池实例
-
def __init__(self, maxconnections, connstr,dbtype):
-
from Queue import Queue
-
self._pool = Queue(maxconnections) # create the queue
-
self.connstr = connstr
-
self.dbtype=dbtype
-
self.maxconnections=maxconnections
-
#根据你给数目来创建链接,并且写入刚才创建的队列里面。
-
try:
-
for i in range(maxconnections):
-
self.fillConnection(self.CreateConnection(connstr,dbtype))
-
except Exception,e:
-
raise e
-
-
def fillConnection(self,conn):
-
try:
-
self._pool.put(conn)
-
-
except Exception,e:
-
raise "fillConnection error:"+str(e)
-
-
def returnConnection(self, conn):
-
try:
-
self._pool.put(conn)
-
except Exception,e:
-
raise "returnConnection error:"+str(e)
-
-
def getConnection(self):
-
try:
-
return self._pool.get()
-
except Exception,e:
-
raise "getConnection error:"+str(e)
-
-
def ColseConnection(self,conn):
-
try:
-
self._pool.get().close()
-
self.fillConnection(self.CreateConnection(connstr,dbtype))
-
except Exception,e:
-
raise "CloseConnection error:"+str(e)
-
-
def CreateConnection(self,connstr,dbtype):
-
if dbtype=='xxx':
-
pass
-
elif dbtype=='mysql':
-
try:
-
db_conn = connstr.split("#");
-
#conndb=MySQLdb.connect(db=conf.mydb,host=conf.dbip,user=conf.myuser,passwd=conf.mypasswd);
-
conndb=MySQLdb.connect(user=db_conn[0],passwd=db_conn[1],host=db_conn[2],port=string.atoi(db_conn[3]),db=db_conn[4]);
-
conndb.clientinfo = 'datasync connection pool from datasync.py';
-
conndb.ping();
-
except Exception, e:
-
raise 'conn targetdb datasource Excepts,%s!!!(%s).'%(db_conn[2],str(e))
-
return None
-
-
#mysql如下创建连接池:
-
connstring="xiaorui.cc#xiaoru.cc#xiaorui.cc#3306#dbname";
-
mysqlpool=PooledConnection(10,connstring,"mysql");
-
#获取连接:
-
mysqlpool.getConnection()
import MySQLdb
import time
import string
import redis
class PooledConnection:
#构建连接池实例
def __init__(self, maxconnections, connstr,dbtype):
from Queue import Queue
self._pool = Queue(maxconnections) # create the queue
self.connstr = connstr
self.dbtype=dbtype
self.maxconnections=maxconnections
#根据你给数目来创建链接,并且写入刚才创建的队列里面。
try:
for i in range(maxconnections):
self.fillConnection(self.CreateConnection(connstr,dbtype))
except Exception,e:
raise e
def fillConnection(self,conn):
try:
self._pool.put(conn)
except Exception,e:
raise "fillConnection error:"+str(e)
def returnConnection(self, conn):
try:
self._pool.put(conn)
except Exception,e:
raise "returnConnection error:"+str(e)
def getConnection(self):
try:
return self._pool.get()
except Exception,e:
raise "getConnection error:"+str(e)
def ColseConnection(self,conn):
try:
self._pool.get().close()
self.fillConnection(self.CreateConnection(connstr,dbtype))
except Exception,e:
raise "CloseConnection error:"+str(e)
def CreateConnection(self,connstr,dbtype):
if dbtype=='xxx':
pass
elif dbtype=='mysql':
try:
db_conn = connstr.split("#");
#conndb=MySQLdb.connect(db=conf.mydb,host=conf.dbip,user=conf.myuser,passwd=conf.mypasswd);
conndb=MySQLdb.connect(user=db_conn[0],passwd=db_conn[1],host=db_conn[2],port=string.atoi(db_conn[3]),db=db_conn[4]);
conndb.clientinfo = 'datasync connection pool from datasync.py';
conndb.ping();
except Exception, e:
raise 'conn targetdb datasource Excepts,%s!!!(%s).'%(db_conn[2],str(e))
return None
#mysql如下创建连接池:
connstring="xiaorui.cc#xiaoru.cc#xiaorui.cc#3306#dbname";
mysqlpool=PooledConnection(10,connstring,"mysql");
#获取连接:
mysqlpool.getConnection()
上面是针对mysql的python mysqldb驱动做的连接池客户端,我曾经尝试过用gevent调用mysqldb模块,会发现你用异步的模式调用MysqlDB还是会io堵塞的,现在开源社区还没有太合理的合理的针对gevent Mysqldb非堵塞异步的解决方式。 如果考虑用另一个Mysql python模块 pymysql,这是个纯python实现的mysql驱动模块,底层就是socket调用,而不是myqldb那样,参杂着一些C语言的实现。 我曾经写过一篇关于gevent PyMysql的文章,有兴趣的朋友可以看看, 我也会抽时间把Mysql链接池改用Gevent Pymysql模式。
(责任编辑:IT) |