import re import pymysql import time import LogOut import traceback from datetime import datetime, timedelta """ connect连接对象的方法: close() --关闭的方法 commit() --如果支持事务则提交挂起的事务 rollback() --回滚挂起的事务 cursor() --返回连接的游标对象 游标对象的方法: callproc(name,[params]) --用来执行存储过程,接收的参数为存储过程的名字和参数列表,返回受影响的行数 close() --关闭游标 execute(sql,[params])--执行sql语句,可以使用参数,(使用参数时,sql语句中用%s进行站位注值),返回受影响的行数 executemany(sql,params)--执行单挑sql语句,但是重复执行参数列表里的参数,返回受影响的行数 fetchone() --返回结果的下一行 fetchall() --返回结果的 所有行 fetchmany(size)--返回size条记录,如果size大于返回结果行的数量,则会返回cursor.arraysize条记录 nextset() --条至下一行 setinputsizes(size)--定义cursor 游标对象的属性: description--结果列的描述,只读 rowcount --结果中的行数,只读 arraysize --fetchmany返回的行数,默认为1 """ class MysqldbOperational(object): """ 操作mysql数据库,基本方法 """ # logger = LogOut.Log("mysql") def __init__(self, host="localhost", username="root", password="", port=3306, database="", logger=None, charset='utf-8'): self.host = host self.username = username self.password = password self.database = database self.port = port self.charset = charset self.logger = logger self.con = None self.cur = None self._conn() def _conn(self): try: self.con = pymysql.connect(host=self.host, user=self.username, passwd=self.password, port=self.port, db=self.database, autocommit=True) # 所有的查询,都在连接 con 的一个模块 cursor 上面运行的 self.cur = self.con.cursor() except Exception as e: # print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + "[ERROR] {0}\n".format( # traceback.format_exc(limit=1))) self.logger.error('in function _conn.' + repr(e)) def close(self): """ 关闭数据库连接 """ try: self.con.close() self.logger.info("Database connection close.") except Exception as e: # print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + "[ERROR] {0}\n".format( # traceback.format_exc(limit=1))) self.logger.error('in function close.' + repr(e)) def _reConn(self, num=28800, stime=3): # 重试连接总次数为1天,这里根据实际情况自己设置,如果服务器宕机1天都没发现就...... _number = 0 _status = True while _status and _number <= num: try: self.con.ping() # cping 校验连接是否异常 _status = False except Exception as e: # print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + "[ERROR] {0}\n".format( # traceback.format_exc(limit=1))) self.logger.error('in function _reConn.' + repr(e)) if self._conn(): # 重新连接,成功退出 _status = False break _number += 1 time.sleep(stime) # 连接不成功,休眠3秒钟,继续循环,知道成功或重试次数结束 # 抽帧相关函数 def getOneData(self): # 取得上个查询的结果,是单个结果 data = self.cur.fetchone() return data def creatTable(self, tablename, attrdict, constraint): """创建数据库表 args: tablename :表名字 attrdict :属性键值对,{'book_name':'varchar(200) NOT NULL'...}DEFAULT NULL NOT NULL constraint :主外键约束,PRIMARY KEY(`id`) 自动增量 """ self._reConn() if self.isExistTable(tablename): return sql = '' sql_mid = '`id` bigint(11) NOT NULL AUTO_INCREMENT,' for attr, value in attrdict.items(): sql_mid = sql_mid + '`' + attr + '`' + ' ' + value + ',' sql = sql + 'CREATE TABLE IF NOT EXISTS %s (' % tablename sql = sql + sql_mid sql = sql + constraint sql = sql + ') ENGINE=InnoDB DEFAULT CHARSET=utf8' print('creatTable:' + sql) self.executeCommit(sql) def executeSql(self, sql=''): """执行sql语句,针对读操作返回结果集 args: sql :sql语句 """ try: self.cur.execute(sql) records = self.cur.fetchall() return records except pymysql.Error as e: # print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + "[ERROR] {0}\n".format( # traceback.format_exc(limit=1))) self.logger.error('in function executeSql.' + repr(e)) def executeCommit(self, sql=''): """执行数据库sql语句,针对更新,删除,事务等操作失败时回滚 """ try: self.cur.execute(sql) self.con.commit() return True except pymysql.Error as e: self.con.rollback() # print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + "[ERROR] {0}\n".format( # traceback.format_exc(limit=1))) self.logger.error('in function executeCommit.' + repr(e)) return False def insert(self, tablename, params): """插入数据库 args: tablename :表名字 key :属性键 value :属性值 """ self._reConn() key = [] value = [] for tmpkey, tmpvalue in params.items(): key.append(tmpkey) if isinstance(tmpvalue, str): value.append("\'" + tmpvalue + "\'") else: value.append(tmpvalue) attrs_sql = '(' + ','.join(key) + ')' values_sql = ' values(' + ','.join(value) + ')' sql = 'insert into %s' % tablename sql = sql + attrs_sql + values_sql print('_insert:' + sql) self.executeCommit(sql) def select(self, tablename, cond_dict='', order='', fields='*'): """查询数据 args: tablename :表名字 cond_dict :查询条件 order :排序条件 example: print mydb.select(table) print mydb.select(table, fields=["name"]) print mydb.select(table, fields=["name", "age"]) print mydb.select(table, fields=["age", "name"]) """ self._reConn() consql = ' ' if cond_dict != '': for k, v in cond_dict.items(): consql = consql + "`" + tablename + "`" + '.' + "`" + k + "`" + '=' + "'" + v + "'" + ' and' consql = consql + ' 1=1 ' if fields == "*": sql = 'select * from %s where' % tablename else: if isinstance(fields, list): fields = ",".join(fields) sql = 'select %s from %s where' % (fields, tablename) else: self.logger.error("fields input error, please input list fields.") raise Exception("fields input error, please input list fields.") sql = sql + consql + order #print('select:' + sql) return self.executeSql(sql) def select_last_one(self, tablename, fields='*'): """倒叙查询 args: tablename :表名字 fields :检索的名称 """ sql = 'select %s from %s order by time desc limit 1 ' % (fields, tablename) #print('select:' + sql) if fields == "*": return self.executeSql(sql) else: return self.executeSql(sql)[0][0] def insertMany(self, table, attrs, values): """插入多条数据 args: tablename :表名字 attrs :属性键 values :属性值 example: table='test_mysqldb' key = ["id" ,"name", "age"] value = [[101, "liuqiao", "25"], [102,"liuqiao1", "26"], [103 ,"liuqiao2", "27"], [104 ,"liuqiao3", "28"]] mydb.insertMany(table, key, value) """ self._reConn() values_sql = ['%s' for v in attrs] attrs_sql = '(' + ','.join(attrs) + ')' values_sql = ' values(' + ','.join(values_sql) + ')' sql = 'insert into %s' % table sql = sql + attrs_sql + values_sql print('insertMany:' + sql) try: # print(sql) for i in range(0, len(values), 20000): self.cur.executemany(sql, values[i:i + 20000]) self.con.commit() except pymysql.Error as e: self.con.rollback() self.logger.error('in function insertMany.' + repr(e)) def insert_sql(self, table_name, keys, values): """ 增加数据 :param table_name: (字符串)表名 :param keys: (字符串) 字段名, 如 “(device_name, code)” :param values: (元组)每个字段的值, 如 ('sm140', '006001001WAVE') :return: True / False """ for each in values: in_sql = "INSERT INTO %s %s VALUES %s;" % (table_name, keys, each) try: self._reConn() self.cur.execute(in_sql) self.con.commit() except pymysql.Error as e: self.con.rollback() self.logger.error('in function insert_sql.' + repr(e)) def delete(self, tablename, cond_dict): """删除数据 args: tablename :表名字 cond_dict :删除条件字典 example: params = {"name" : "caixinglong", "age" : "38"} mydb.delete(table, params) """ self._reConn() consql = ' ' if cond_dict != '': for k, v in cond_dict.items(): if isinstance(v, str): v = "\'" + v + "\'" consql = consql + tablename + "." + k + '=' + v + ' and ' consql = consql + ' 1=1 ' sql = "DELETE FROM %s where%s" % (tablename, consql) print(sql) return self.executeCommit(sql) def update(self, tablename, attrs_dict, cond_dict): """更新数据 args: tablename :表名字 attrs_dict :更改属性键值对字典 cond_dict :更新条件字典 example: params = {"name" : "caixinglong", "age" : "38"} cond_dict = {"name" : "liuqiao", "age" : "18"} mydb.update(table, params, cond_dict) """ self._reConn() attrs_list = [] consql = ' ' for y in attrs_dict: if not isinstance(attrs_dict[y], str): attrs_dict[y] = str(attrs_dict[y]) for x in cond_dict: if not isinstance(cond_dict[x], str): cond_dict[x] = str(cond_dict[x]) for tmpkey, tmpvalue in attrs_dict.items(): attrs_list.append("`" + tmpkey + "`" + "=" + "\'" + tmpvalue + "\'") attrs_sql = ",".join(attrs_list) #print("attrs_sql:", attrs_sql) if cond_dict != '': for k, v in cond_dict.items(): if isinstance(v, str): v = "\'" + v + "\'" consql = consql + "`" + tablename + "`." + "`" + k + "`" + '=' + v + ' and ' consql = consql + ' 1=1 ' sql = "UPDATE %s SET %s where%s" % (tablename, attrs_sql, consql) #print(sql) return self.executeCommit(sql) def dropTable(self, tablename): """删除数据库表 args: tablename :表名字 """ sql = "DROP TABLE %s" % tablename self.executeCommit(sql) def deleteTable(self, tablename): """清空数据库表 args: tablename :表名字 """ sql = "DELETE FROM %s" % tablename self.executeCommit(sql) def isExistTable(self, tablename): """判断数据表是否存在 args: tablename :表名字 Return: 存在返回True,不存在返回False """ sql = "select * from %s" % tablename result = self.executeCommit(sql) if result is None: return True else: if re.search("doesn't exist", result): return False else: return True # 数据上传相关函数 def get_devices_name(self): list = [] sql = "SELECT DISTINCT device_name FROM data_point_tbl WHERE mqtt_code != 'NULL'" try: self._reConn() self.cursor = self.con.cursor() self.cursor.execute(sql) results = self.cursor.fetchall() for row in results: if row[0] != None: list.append(row[0]) self.cursor.close() return list except Exception as e: # print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + "[ERROR] {0}\n".format( # traceback.format_exc(limit=1))) self.logger.error('in function get_devices_name.' + repr(e)) return False def get_mqtt_devices_count(self): list = 0 sql = "SELECT COUNT(DISTINCT device_code) FROM data_point_tbl" try: self._reConn() self.cursor = self.con.cursor() self.cursor.execute(sql) results = self.cursor.fetchall() for row in results: if row[0] != None: list = row[0] self.cursor.close() return list except Exception as e: # print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + "[ERROR] {0}\n".format( # traceback.format_exc(limit=1))) self.logger.error('in function get_mqtt_devices_count.' + repr(e)) return False def get_mqtt_devices_from_name(self, device_name): list = [] sql = "SELECT DISTINCT device_code FROM data_point_tbl WHERE device_name = '%s'" % (device_name) try: self._reConn() self.cursor = self.con.cursor() self.cursor.execute(sql) results = self.cursor.fetchall() for row in results: if row[0] != None: list.append(row[0]) self.cursor.close() return list except Exception as e: # print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + "[ERROR] {0}\n".format( # traceback.format_exc(limit=1))) self.logger.error('in function get_mqtt_devices_from_name.' + repr(e)) return False def get_mqtt_point(self, deviceCode): list = [] dict = {} sql = "SELECT device_name,serial_number,storage_type,mqtt_code,low_limit,up_limit FROM data_point_tbl where device_code = '%s'" % ( deviceCode) try: self._reConn() self.cursor = self.con.cursor() self.cursor.execute(sql) results = self.cursor.fetchall() for row in results: dict = {} dict['deviceName'] = row[0] dict['serialNumber'] = row[1] dict['storageType'] = row[2] dict['mqttCode'] = row[3] dict['lowLimit'] = row[4] dict['upLimit'] = row[5] list.append(dict) self.cursor.close() return list except Exception as e: # print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + "[ERROR] {0}\n".format( # traceback.format_exc(limit=1))) self.logger.error('in function get_mqtt_point.' + repr(e)) return False def get_breakpoint_last_time_datetime(self, tableName): sql = "SELECT times FROM %s WHERE is_send = 0 AND times < date_sub(now(), interval 1 hour) ORDER BY id DESC LIMIT 1;" % ( tableName) try: self._reConn() self.cursor = self.con.cursor(cursor=pymysql.cursors.DictCursor) self.cursor.execute(sql) results = self.cursor.fetchall() self.cursor.close() if len(results) > 0: return results[0] else: #self.logger.debug(results) return None except Exception as e: # print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + "[ERROR] {0}\n".format( # traceback.format_exc(limit=1))) self.logger.error('in function get_breakpoint_last_time_datetime.' + repr(e)) return None def get_breakpoint_last_time_date_and_time(self, tableName): sql = "SELECT Date,Time FROM %s WHERE is_send = 0 ORDER BY id LIMIT 1;" % (tableName) dict = {} try: self._reConn() self.cursor = self.con.cursor() self.cursor.execute(sql) results = self.cursor.fetchall() self.cursor.close() if len(results) > 0: dict['times'] = str(results[0][0]) + " " + str(results[0][1]) return dict else: return None except Exception as e: # print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + "[ERROR] {0}\n".format( # traceback.format_exc(limit=1))) self.logger.error('in function get_breakpoint_last_time_date_and_time.' + repr(e)) return None def get_hour_data_datetime(self, tableName, begin, end): sql = "SELECT * FROM %s WHERE times >= '%s' And times <= '%s' ORDER BY id ASC;" % (tableName, begin, end) try: self._reConn() self.cursor = self.con.cursor(cursor=pymysql.cursors.DictCursor) self.cursor.execute(sql) results = self.cursor.fetchall() self.cursor.close() if len(results) != 0: return results else: return None except Exception as e: # print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + "[ERROR] {0}\n".format( # traceback.format_exc(limit=1))) self.logger.error('in function get_hour_data_datetime.' + repr(e)) return None def get_hour_data_date_and_time(self, tableName, begin, end): sql = "SELECT * FROM %s WHERE CONCAT(Date,' ', Time) >= '%s' And CONCAT(Date,' ', Time) <= '%s' ORDER BY id ASC;" % ( tableName, begin, end) try: self._reConn() self.cursor = self.con.cursor(cursor=pymysql.cursors.DictCursor) self.cursor.execute(sql) results = self.cursor.fetchall() self.cursor.close() if len(results) != 0: return results else: return None except Exception as e: # print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + "[ERROR] {0}\n".format( # traceback.format_exc(limit=1))) self.logger.error('in function get_hour_data_date_and_time.' + repr(e)) return None def set_many_send_status(self, tableName, id_list): if len(id_list) >= 1: id = id_list[0]['id'] value = 1 sql = "UPDATE %s SET is_send = %s WHERE id = %s" % (tableName, value, id) for index in range(len(id_list)): sql = sql + " OR id = %s" % (id_list[index]['id']) try: self._reConn() self.cursor = self.con.cursor() self.cursor.execute(sql) # 提交到数据库执行 self.con.commit() self.cursor.close() except Exception as e: # print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + "[ERROR] {0}\n".format( # traceback.format_exc(limit=1))) self.logger.error('in function set_many_send_status.' + repr(e)) def get_mqtt_devices(self): list = [] sql = "SELECT DISTINCT device_code FROM data_point_tbl" try: self._reConn() self.cursor = self.con.cursor() self.cursor.execute(sql) results = self.cursor.fetchall() for row in results: if row[0] != None: list.append(row[0]) self.cursor.close() return list except Exception as e: # print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + "[ERROR] {0}\n".format( # traceback.format_exc(limit=1))) self.logger.error('in function get_mqtt_devices.' + repr(e)) return False def get_newest_data(self, tableName): sql = "SELECT * FROM %s ORDER BY id DESC LIMIT 1;" % (tableName) results = [] try: self._reConn() self.cursor = self.con.cursor(cursor=pymysql.cursors.DictCursor) self.cursor.execute(sql) results = self.cursor.fetchall() self.cursor.close() if len(results) > 0: #print(results[0]) return results[0] else: self.logger.debug(results) return results except Exception as e: # print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + "[ERROR] {0}\n".format( # traceback.format_exc(limit=1))) self.logger.error('in function get_newest_data.' + repr(e)) return None def set_send_status(self, tableName, id, value): sql = "UPDATE %s SET is_send = %s WHERE id = %s;" % (tableName, value, id) try: self._reConn() self.cursor = self.con.cursor() self.cursor.execute(sql) # 提交到数据库执行 self.con.commit() self.cursor.close() except Exception as e: # print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + "[ERROR] {0}\n".format( # traceback.format_exc(limit=1))) self.logger.error('in function set_send_status.' + repr(e)) def binocular_data(self): """ 生物监控子系统:sql1:将值为null的数据状态置为1, sql2:查询双目未上传的数据 :return:[{'datetime': datetime.date(2021, 6, 10), 'fish_species': '许氏平鲉', 'size_average': 288.0, 'size_median': 286.6, 'weight_average': 458.3, 'weight_median': 432.6}] """ yes_time = datetime.now().strftime("%Y-%m-%d") sql1 = "UPDATE binocular_dailyreport SET is_send = 1 WHERE size_average IS NULL;" sql2 = "SELECT datetime,fish_species,size_average,size_median,weight_average,weight_median FROM binocular_dailyreport WHERE datetime<\'%s\' AND is_send=0 AND size_average IS NOT NULL;" % (yes_time) try: self._reConn() self.cursor = self.con.cursor(cursor=pymysql.cursors.DictCursor) self.cursor.execute(sql1) self.con.commit() self.cursor.execute(sql2) results = self.cursor.fetchall() self.cursor.close() return results except Exception as e: self.logger.error('in function get_newest_data.' + repr(e)) return None def sonar_data_dailyReport(self): """ 生物监控子系统:查询声呐未上传的数据,每日的总数 :return: [{'date': '2021-07-06', 'assess_total_number': 699718}, {'date': '2021-07-07', 'assess_total_number': 579352}] """ td_time = datetime.now().strftime("%Y-%m-%d") sql = "SELECT date_format(datetime,'%%Y-%%m-%%d') AS date, assess_total_number FROM fish_distribution_dailyReport WHERE datetime<\'%s\' AND is_send=0 AND assess_total_number IS NOT NULL;" % (td_time) try: self._reConn() self.cursor = self.con.cursor(cursor=pymysql.cursors.DictCursor) self.cursor.execute(sql) results = self.cursor.fetchall() self.cursor.close() return results except Exception as e: self.logger.error('in function get_newest_data.' + repr(e)) return None def sonar_data_avg(self): """ 生物监控子系统:查询声呐未上传的数据,上中下三层每日的平均值 :return: [{'date': '2022-08-02', 'assess_top_number': 6855, 'assess_middle_number': 91480, 'assess_bottom_number': 251176}] """ td_time = datetime.now().strftime("%Y-%m-%d") sql = "SELECT date_format(datetime,'%%Y-%%m-%%d') AS date, " \ "CAST(AVG(assess_top_number) AS signed) AS assess_top_number," \ "CAST(AVG(assess_middle_number) AS signed) AS assess_middle_number," \ "CAST(AVG(assess_bottom_number) AS signed) AS assess_bottom_number " \ "FROM fish_distribution_data WHERE datetime<\'%s\' AND is_send=0 GROUP BY date_format(datetime,'%%Y-%%m-%%d');" % (td_time) try: self._reConn() self.cursor = self.con.cursor(cursor=pymysql.cursors.DictCursor) self.cursor.execute(sql) results = self.cursor.fetchall() self.cursor.close() return results except Exception as e: self.logger.error('in function get_newest_data.' + repr(e)) return None def binocular_send_status(self, dates): """ 生物监控子系统:将上传成功的双目数据状态置1 :param dates: :return: """ sql = "UPDATE binocular_dailyreport SET is_send = 1 WHERE datetime in %s;" try: self._reConn() self.cursor = self.con.cursor() self.cursor.execute(sql, (dates,)) self.con.commit() self.cursor.close() except Exception as e: self.logger.error('in function set_send_status.' + repr(e)) def sonar_send_status(self, dates): """ 生物监控子系统:将上传成功的声呐数据状态置1 :param dates: :return: """ sql1 = "UPDATE fish_distribution_dailyReport SET is_send = 1 WHERE date_format(datetime,'%%Y-%%m-%%d') in %s;" sql2 = "UPDATE fish_distribution_data SET is_send = 1 WHERE date_format(datetime,'%%Y-%%m-%%d') in %s;" try: self._reConn() self.cursor = self.con.cursor() self.cursor.execute(sql1, (dates,)) self.cursor.execute(sql2, (dates,)) self.con.commit() self.cursor.close() except Exception as e: self.logger.error('in function set_send_status.' + repr(e)) def commit_sql(self, sql, params): """ 执行 MySQL 的删除、插入、更新语句,如 :param table_name: (字符串)表名 :param cond_dict: (字典)条件 :return: """ try: self._reConn() self.cursor = self.con.cursor() self.cursor.execute(sql, (params,)) self.con.commit() self.cursor.close() result = True except pymysql.Error as err: self.logger.error("in function delete_sql. " + repr(err)) result = False return result