import pymysql import configparser from dbutils.pooled_db import PooledDB class UseDatabase: # 连接池对象 __pool = None def __init__(self): config = configparser.ConfigParser() config.read("database.conf", encoding="utf-8") self.pool = UseDatabase.__get_conn(config) @staticmethod def __get_conn(config): """ @summary: 静态方法,从连接池中取出连接 @return MySQLdb.connection """ db_host = config.get("db", "db_host") db_user = config.get("db", "db_user") db_pass = config.get("db", "db_pass") db_name = config.get("db", "db_name") if UseDatabase.__pool is None: __pool = PooledDB(pymysql, mincached=1, maxcached=10, maxconnections=10, host=db_host, port=3306, user=db_user, passwd=db_pass, db=db_name, use_unicode=True, blocking=False, charset="utf8") return __pool def get_all(self, sql): try: con = self.pool.connection() # cur = con.cursor(cursor=pymysql.cursors.DictCursor) cur = con.cursor() count = cur.execute(sql) if count > 0: result = cur.fetchall() else: result = False return result except Exception as err: raise type(err)(err) finally: cur.close() con.close() def get_all_dict(self, sql): try: con = self.pool.connection() cur = con.cursor(cursor=pymysql.cursors.DictCursor) count = cur.execute(sql) if count > 0: result = cur.fetchall() else: result = False return result except Exception as err: raise type(err)(err) finally: cur.close() con.close() def update(self, sql, val): try: con = self.pool.connection() cur = con.cursor() cur.execute(sql, val) con.commit() except Exception as err: con.rollback() # 事务回滚 raise type(err)(err) finally: cur.close() con.close() if __name__ == '__main__': conn = UseDatabase() sql = "show databases;" results = conn.get_all(sql) print(results)