玩蛇网提供最新Python编程技术信息以及Python资源下载!

Python MySQLdb模块简单封装的方法源码

python 培训

日常工作中可能会需要把 MySQLdb封装起来,方便查询等功能使用,今天就来给大家提供一份关于Python MySQLdb模块简单封装的方法源码,供大家参考使用。

Python MySQL数据库相关文章推荐:Python mysql数据库连接、判断、创建表的方法

Python MySQL封装方法

MySQLdb模块简单封装方法源码如下:

#-*- encoding: utf-8 -*-

__author__ = "wentianle"
__version__ = "$Revision: 1.0 $"
__date__ = "$Date: 2009/02/19 $"
__license__ = "Python"

import MySQLdb
from db.DBPool import DBPool
from util import func_ext
from conf.config import Config
import string

class DB:
    fields = None
    
    def open(self,db_name,autoCommit=True):
        pass
    
    def query(self,sql):
        pass

    def error(self,e):
        print "Error %d: %s" % (e.args[0], e.args[1])
        
    def close(self):
        """close the mysql connect"""
        self.cur.close()
        self.db_conn.close()
    
    def next_record(self):
        self.cur.nextset()
    
    def insert(self,p_table_name,p_data):

        for key in p_data:
            p_data[key] = func_ext.addslashes(func_ext.uni_str(p_data[key]))

        key   = string.join(p_data.keys(),"`,`")
        value = string.join(p_data.values(),"','")
        real_sql = "INSERT INTO " + p_table_name + " (`" + key + "`) VALUES ('" + value + "')"
        self.query("set names 'utf8'")
        return self.query(real_sql)

    def replace(self,p_table_name,p_data):

        for key in p_data:
            p_data[key] = func_ext.addslashes(func_ext.uni_str(p_data[key]))

        key   = string.join(p_data.keys(),"`,`")
        value = string.join(p_data.values(),"','")
        real_sql = "REPLACE INTO " + p_table_name + " (`" + key + "`) VALUES ('" + value + "')"
        self.query("set names 'utf8'")

        return self.query(real_sql)


    def update(self,p_table_name,p_data,p_where):

        for key in p_data:

            p_data[key] = func_ext.addslashes(func_ext.uni_str(p_data[key]))

        for key in p_where:
            p_where[key] = func_ext.addslashes(func_ext.uni_str(p_where[key]))
 
       edit_sql  = string.join([str(x[0])+"='"+str(x[1])+"'" for x in p_data.items()],",")

        where_sql = string.join([str(x[0])+"='"+str(x[1])+"'" for x in p_where.items()]," AND ")
        real_sql = "UPDATE "+p_table_name+" SET "+edit_sql+" WHERE "+where_sql
        self.query("set names 'utf8'")

        return self.query(real_sql)

    def delete(self,p_table_name,p_where):

        for key in p_where:
            p_where[key] = func_ext.addslashes(func_ext.uni_str(p_where[key]))

        where_sql = string.join([x[0]+"='"+x[1]+"'" for x in p_where.items()]," AND ")
        real_sql  = "DELETE FROM "+p_table_name+" WHERE "+where_sql
        self.query("set names 'utf8'")

        return self.query(real_sql)
    
    def isinstance(self,p_table_name,p_where):

        for key in p_where:
            p_where[key] = func_ext.addslashes(func_ext.uni_str(p_where[key]))

        where_sql = string.join([x[0]+"='"+x[1]+"'" for x in p_where.items()]," AND ")
        real_sql = "SELECT count(*) as cnt FROM "+p_table_name+" WHERE "+where_sql

        if self.query(real_sql):
            res = self.fetch_assoc()
            return res[0]
        else:
            return 0

    def select(self,sql):
        self.query("set names 'utf8'")
        self.query(sql)
        return self.fetch_all()


    def selectRow(self,sql):
        self.query("set names 'utf8'")
        self.query(sql)
        return self.fetch_assoc()

    def fetch_all(self,upper=0):

        if self.get_num_rows():
            d = []
            result=self.cur.fetchall()
            desc = self.cur.description

            for inv in result:
                _d = {}
                if upper:

                    for i in xrange(0,len(inv)):
                        if isinstance(inv[i],(unicode)):
                            _d[desc[i][0].upper()] = str(inv[i])
                        else:
                            _d[desc[i][0].upper()] = inv[i]
                elif not upper:

                    for i in xrange(0,len(inv)):
                        if isinstance(inv[i],(unicode)):
                            _d[desc[i][0].lower()] = str(inv[i])
                        else:
                            _d[desc[i][0].lower()] = inv[i]
                else:
                    for i in xrange(0,len(inv)):
                        if isinstance(inv[i],(unicode)):
                            _d[desc[i][0]] = str(inv[i])
                        else:
                            _d[desc[i][0]] = inv[i]

#www.iplaypython.com        
2000
        d.append(_d)
            return d
        else:
            return None
        
    
    def fetch_assoc(self,upper=0):
        if self.get_num_rows():
            d = {}
#            i = 0
            desc = self.cur.description
            self.fields = self.cur.fetchone()

            if upper:
                for i in xrange(0,len(self.fields)):
                    if isinstance(self.fields[i],(unicode)):
                        d[desc[i][0].upper()] = str(self.fields[i])
                    else:
                        d[desc[i][0].upper()] = self.fields[i]
            elif not upper:
                for i in xrange(0,len(self.fields)):
                    if isinstance(self.fields[i],(unicode)):
                        d[desc[i][0].lower()] = str(self.fields[i])
                    else:
                        d[desc[i][0].lower()] = self.fields[i]
            else:
                for i in xrange(0,len(self.fields)):
                    if isinstance(self.fields[i],(unicode)):
                        d[desc[i][0]] = str(self.fields[i])
                    else:
                        d[desc[i][0]] = self.fields[i]
            return d
        else:
            return None
        

    def get_num_rows(self):
        return self.cur.rowcount
    
    def autoCommit(self,flag):
        self.query("Set AUTOCOMMIT = "+str(flag))
    
    def debug_sql(self,sql):
        if Config.getConf("public","debug_sql") == "True":
            pass
            
#数据库操作类配合(dbpool)使用

class Mysql(DB):
    def get_mysql_version(self):
        """return the mysql version"""
        return MySQLdb.get_client_info()

    def open(self,db_name,autoCommit=True):
        """mysql connect"""
        pool = DBPool.getInstance(db_name)
        if pool:
            self.db_conn = pool.connection(0)
            self.cur = self.db_conn.cursor()
            if autoCommit == True:
                self.autoCommit(1)
            self.charset = DBPool.getInstance(db_name).charset
            self.set_name()
            return True
        else:
            raise Exception,"open database %s Failure" % (db_name)
    
    def set_name(self):
        self.query("SET NAMES "+self.charset)
        self.query("SET CHARACTER SET "+self.charset)

    def query(self,sql):
        if isinstance(sql, unicode):
            sql = sql.encode("utf8")
        self.debug_sql(sql);
        
        return self.cur.execute(sql)


    def getInsertId(self):
        self.query("SELECT LAST_INSERT_ID() AS lid")
        rs = self.cur.fetchone()
        return rs[0]
    
    def commit(self):
        self.db_conn.commit()

#if __name__ == '__main__':
#    myTest = Mysql()
#    if myTest.open("b2b_platform"):
#        sql = "select * from sc_goods"
#        myTest.query(sql)
#        aa = myTest.fetch_assoc()
#        print aa

关于Python MySQLdb模块简单封装的方法:第二段源码(数据库连接池句柄)

#-*- encoding: utf-8 -*-

__author__ = "wentianle (wenzaile@163.com)"
__version__ = "$Revision: 1.0 $"
__date__ = "$Date: 2009/02/19 $"
__license__ = "Python"


from DBUtils.PooledDB import PooledDB

import MySQLdb
import time
from conf.config import Config
from util import func_ext



#数据库连接池句柄
class DBPool:
    pool = {};
    @staticmethod

    def getInstance(db_name):
        try:
            if isinstance(db_name, unicode):
                db_name = db_name.encode("utf8")
            if DBPool.pool.get(db_name):
                return DBPool.pool.get(db_name)
            else:
#                print Config.getConf('db_'+db_name,"dbhost")
                
                tmp_pool = PooledDB(MySQLdb, \
                                    int(Config.getConf("dbpool_config","db_mincached")),\
                                    int(Config.getConf("dbpool_config","db_maxcached")),\
                                    int(Config.getConf("dbpool_config","db_maxshared")),\
                                    int(Config.getConf("dbpool_config","db_maxconnections")),\
                                    int(Config.getConf("dbpool_config","db_blocking")),\
                                    int(Config.getConf("dbpool_config","db_maxusage")),\
                                    ["SET NAMES utf8","SET CHARACTER SET utf8"],\
                                    host = Config.getConf('db_'+db_name,"dbhost"), \
                                    user = Config.getConf('db_'+db_name,"dbuser"), \
                                    passwd = Config.getConf('db_'+db_name,"dbpass"), \
                                    db = Config.getConf('db_'+db_name,"dbname"),\
                                    charset = Config.getConf('db_'+db_name,"charset"),\
                                    port = int(Config.getConf('db_'+db_name,"dbport")))
                tmp_pool.charset = Config.getConf('db_'+db_name,"charset")
                DBPool.pool[db_name] = tmp_pool

                return DBPool.pool[db_name]

        except Exception,e:
            func_ext.error_log(e)
            time.sleep(5)

            return DBPool.getInstance(db_name)
            

if __name__ == '__main__':
    while True:
        pool = DBPool.getInstance("plat")
        time.sleep(2)
        db_conn = pool.connection()
        cur = db_conn.cursor()
        cur.execute("select * from sc_supplier")
        res = cur.fetchone()
        print res
        time.sleep(2)

玩蛇网原创,转载请注明文章出处和来源网址:http://www.iplaypython.com/code/other/o2702.html



微信公众号搜索"玩蛇网Python之家"加关注,每日最新的Python资讯、图文视频教程可以让你一手全掌握。强烈推荐关注!

微信扫描下图可直接关注

玩蛇网Python新手QQ群,欢迎加入: ① 240764603 玩蛇网Python新手群
文章发布日期:2016-01-05 10:12 玩蛇网 www.iplaypython.com

评论列表(网友评论仅供网友表达个人看法,并不表明本站同意其观点或证实其描述)
相关文章推荐
别人正在看
特别推荐
去顶部去底部