> 数据库 > MySQL >

python的mysql连接池并加入缓存过期

前段时间在看高级连接池的实现,所以高级连接池就是一次构建实例的时候,他会一次性创建出指定个数的链接对象,然后会把这些链接对象放到队列里面,然后会开一个线程专门去维护他们,好让他们别怪了,或者提前知道他们怪了,会在生成一个可用的链接。 该线程也会定时的去给服务端发送hello,在redis里面是ping …. 另外,连接池最好加入一个过期时间的概念,比如10分钟内没人用,或者是不管有没有人已经用过了,我为了各种情况会把链接时间过长的链接给kill掉,关于这样的数据类型不能简单的用简单的列表队列了,需要用有序队列。

刚才已经把连接池的原理说清楚了,我跟同事以前看过360开源的atlas的连接池的那段代码,atlas在mysql集群环境中是处理proxy的角色,他做代理中间人角色中,他是会维持两个大的连接池,一个是针对用户的,还有一个是新鲜的代理。他针对队列里面的连接判断活期是相当的灵巧的,他的算法是IP Hash的算法,根据来源的IP会指定一组小连接池,所有客户端的连接池又所属于更大的集合里面,有个主线程会做存活的判断,但是主动的频率不是太高,主要还是靠被动去访问的时候捕获异常,迅速的把一个可用的连接塞入。

那咱们这里实现个简单的连接池。   另外标题中说要加入缓存过期,这怎么理解,其实就是在mysql每次执行查询的时候,我这边会往redis存入这查询的数据作为缓存,过期的时间是你指定的。  这样每次第二次时间内访问,只要时间不过期,那么数据就可以从redis里面获取。

  1. import MySQLdb
  2. import time
  3. import string
  4. import redis
  5.  
  6. class PooledConnection:
  7.     #构建连接池实例
  8.     def __init__(self, maxconnections, connstr,dbtype):
  9.         from Queue import Queue
  10.         self._pool = Queue(maxconnections) # create the queue
  11.         self.connstr = connstr
  12.         self.dbtype=dbtype
  13.         self.maxconnections=maxconnections
  14.         #根据你给数目来创建链接,并且写入刚才创建的队列里面。
  15.         try:
  16.             for i in range(maxconnections):
  17.                 self.fillConnection(self.CreateConnection(connstr,dbtype))
  18.         except Exception,e:
  19.             raise e
  20.  
  21.     def fillConnection(self,conn):
  22.         try:
  23.             self._pool.put(conn)
  24.  
  25.         except Exception,e:
  26.             raise "fillConnection error:"+str(e)
  27.  
  28.     def returnConnection(self, conn):
  29.         try:
  30.             self._pool.put(conn)
  31.         except Exception,e:
  32.             raise "returnConnection error:"+str(e)
  33.  
  34.     def getConnection(self):
  35.         try:
  36.             return self._pool.get()
  37.         except Exception,e:
  38.             raise "getConnection error:"+str(e)
  39.  
  40.     def ColseConnection(self,conn):
  41.         try:
  42.             self._pool.get().close()
  43.             self.fillConnection(self.CreateConnection(connstr,dbtype))
  44.         except Exception,e:
  45.             raise "CloseConnection error:"+str(e)
  46.  
  47.     def CreateConnection(self,connstr,dbtype):
  48.         if dbtype=='xxx':
  49.             pass
  50.         elif dbtype=='mysql':
  51.             try:
  52.                 db_conn = connstr.split("#");
  53.                 #conndb=MySQLdb.connect(db=conf.mydb,host=conf.dbip,user=conf.myuser,passwd=conf.mypasswd);
  54.                 conndb=MySQLdb.connect(user=db_conn[0],passwd=db_conn[1],host=db_conn[2],port=string.atoi(db_conn[3]),db=db_conn[4]);
  55.                 conndb.clientinfo = 'datasync connection pool from datasync.py';
  56.                 conndb.ping();
  57.             except Exception, e:
  58.                 raise 'conn targetdb datasource Excepts,%s!!!(%s).'%(db_conn[2],str(e))
  59.                 return None
  60.  
  61. #mysql如下创建连接池:
  62. connstring="xiaorui.cc#xiaoru.cc#xiaorui.cc#3306#dbname";
  63. mysqlpool=PooledConnection(10,connstring,"mysql");
  64. #获取连接:
  65. mysqlpool.getConnection()
import MySQLdb
import time
import string
import redis

class PooledConnection:
    #构建连接池实例
    def __init__(self, maxconnections, connstr,dbtype):
        from Queue import Queue
        self._pool = Queue(maxconnections) # create the queue
        self.connstr = connstr
        self.dbtype=dbtype
        self.maxconnections=maxconnections
        #根据你给数目来创建链接,并且写入刚才创建的队列里面。 
        try:
            for i in range(maxconnections):
                self.fillConnection(self.CreateConnection(connstr,dbtype))
        except Exception,e:
            raise e

    def fillConnection(self,conn):
        try:
            self._pool.put(conn)

        except Exception,e:
            raise "fillConnection error:"+str(e)

    def returnConnection(self, conn):
        try:
            self._pool.put(conn)
        except Exception,e:
            raise "returnConnection error:"+str(e)

    def getConnection(self):
        try:
            return self._pool.get()
        except Exception,e:
            raise "getConnection error:"+str(e)

    def ColseConnection(self,conn):
        try:
            self._pool.get().close()
            self.fillConnection(self.CreateConnection(connstr,dbtype))
        except Exception,e:
            raise "CloseConnection error:"+str(e)

    def CreateConnection(self,connstr,dbtype):
        if dbtype=='xxx':
            pass
        elif dbtype=='mysql':
            try:
                db_conn = connstr.split("#");
                #conndb=MySQLdb.connect(db=conf.mydb,host=conf.dbip,user=conf.myuser,passwd=conf.mypasswd);
                conndb=MySQLdb.connect(user=db_conn[0],passwd=db_conn[1],host=db_conn[2],port=string.atoi(db_conn[3]),db=db_conn[4]);
                conndb.clientinfo = 'datasync connection pool from datasync.py';
                conndb.ping();
            except Exception, e:
                raise 'conn targetdb datasource Excepts,%s!!!(%s).'%(db_conn[2],str(e))
                return None

#mysql如下创建连接池:
connstring="xiaorui.cc#xiaoru.cc#xiaorui.cc#3306#dbname";
mysqlpool=PooledConnection(10,connstring,"mysql");
#获取连接:
mysqlpool.getConnection()

上面是针对mysql的python mysqldb驱动做的连接池客户端,我曾经尝试过用gevent调用mysqldb模块,会发现你用异步的模式调用MysqlDB还是会io堵塞的,现在开源社区还没有太合理的合理的针对gevent Mysqldb非堵塞异步的解决方式。  如果考虑用另一个Mysql python模块 pymysql,这是个纯python实现的mysql驱动模块,底层就是socket调用,而不是myqldb那样,参杂着一些C语言的实现。  我曾经写过一篇关于gevent PyMysql的文章,有兴趣的朋友可以看看, 我也会抽时间把Mysql链接池改用Gevent Pymysql模式。



(责任编辑:IT)