12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182 |
- import pymysql
- import configparser
- from dbutils.pooled_db import PooledDB
- class UseDatabase:
- # 连接池对象
- __pool = None
- def __init__(self):
- config = configparser.ConfigParser()
- config.read("database.conf", encoding="utf-8")
- self.pool = UseDatabase.__get_conn(config)
- @staticmethod
- def __get_conn(config):
- """
- @summary: 静态方法,从连接池中取出连接
- @return MySQLdb.connection
- """
- db_host = config.get("db", "db_host")
- db_user = config.get("db", "db_user")
- db_pass = config.get("db", "db_pass")
- db_name = config.get("db", "db_name")
- if UseDatabase.__pool is None:
- __pool = PooledDB(pymysql, mincached=1, maxcached=10, maxconnections=10,
- host=db_host, port=3306, user=db_user, passwd=db_pass,
- db=db_name, use_unicode=True, blocking=False, charset="utf8")
- return __pool
- def get_all(self, sql):
- try:
- con = self.pool.connection()
- # cur = con.cursor(cursor=pymysql.cursors.DictCursor)
- cur = con.cursor()
- count = cur.execute(sql)
- if count > 0:
- result = cur.fetchall()
- else:
- result = False
- return result
- except Exception as err:
- raise type(err)(err)
- finally:
- cur.close()
- con.close()
- def get_all_dict(self, sql):
- try:
- con = self.pool.connection()
- cur = con.cursor(cursor=pymysql.cursors.DictCursor)
- count = cur.execute(sql)
- if count > 0:
- result = cur.fetchall()
- else:
- result = False
- return result
- except Exception as err:
- raise type(err)(err)
- finally:
- cur.close()
- con.close()
- def update(self, sql, val):
- try:
- con = self.pool.connection()
- cur = con.cursor()
- cur.execute(sql, val)
- con.commit()
- except Exception as err:
- con.rollback() # 事务回滚
- raise type(err)(err)
- finally:
- cur.close()
- con.close()
- if __name__ == '__main__':
- conn = UseDatabase()
- sql = "show databases;"
- results = conn.get_all(sql)
- print(results)
|