import pymysql import traceback import time import redis import LogOut #import mysql.connector #import MySQLdb class Mysql: _logger = LogOut.Log('mysql') def __init__ (self, host = '', user = '', passwd = '', db = '', port = 3306, charset= 'utf8' ): self.host = host self.user = user self.passwd = passwd self.db = db self.port = port self.charset= charset self.conn = None self._conn() def _conn (self): try: self.conn = pymysql.connect(self.host, self.user, self.passwd, self.db, self.port,autocommit = True) ''' self.conn = mysql.connector.connect( host=self.host, user=self.user, passwd=self.passwd, database=self.db ) ''' ''' self.conn= MySQLdb.connect( host='127.0.0.1', port = 3306, user='root', passwd='', db ='centralized_control_db', ) ''' return True except : self._logger.error('in line 51:mysql connect is unsuccessful!') return False def _reConn (self,num = 28800,stime = 3): #重试连接总次数为1天,这里根据实际情况自己设置,如果服务器宕机1天都没发现就...... _number = 0 _status = True while _status and _number <= num: try: self.conn.ping() #cping 校验连接是否异常 _status = False except: self._logger.error('in line 62:mysql connect is unsuccessful!') if self._conn()==True: #重新连接,成功退出 _status = False break _number +=1 time.sleep(stime) #连接不成功,休眠3秒钟,继续循环,知道成功或重试次数结束 def get_data_point(self,deviceName): list = [] dict = {} sql = "SELECT serial_number,io_point_name,address,read_function_code,write_function_code,data_type,offset,storage_type,protocol,dividend analytical_way \ FROM data_point_tbl where device_name = '%s'"%(deviceName) try: self._reConn() self.cursor = self.conn.cursor() self.cursor.execute(sql) results = self.cursor.fetchall() for row in results: dict = {} dict['serialNumber'] = row[0] dict['ioPointName'] = row[1] dict['address'] = row[2] dict['read_function_code'] = row[3] dict['write_function_code'] = row[4] dict['dataType'] = row[5] dict['offset'] = row[6] dict['storageType'] = row[7] dict['protocol'] = row[8] dict['dividend'] = row[9] list.append(dict) self.cursor.close() return list except: print(traceback.format_exc()) return False def get_mqtt_point(self,deviceCode): list = [] dict = {} sql = "SELECT device_name,serial_number,storage_type,mqtt_code,low_limit,up_limit FROM data_point_tbl where device_code = '%s'"%(deviceCode) try: self._reConn() self.cursor = self.conn.cursor() self.cursor.execute(sql) results = self.cursor.fetchall() for row in results: dict = {} dict['deviceName'] = row[0] dict['serialNumber'] = row[1] dict['storageType'] = row[2] dict['mqttCode'] = row[3] dict['lowLimit'] = row[4] dict['upLimit'] = row[5] list.append(dict) self.cursor.close() return list except Exception as e: print(e) return False def get_mqtt_devices(self): list = [] sql = "SELECT DISTINCT device_code FROM data_point_tbl" try: self._reConn() self.cursor = self.conn.cursor() self.cursor.execute(sql) results = self.cursor.fetchall() for row in results: if row[0] != None: list.append(row[0]) self.cursor.close() return list except: print(traceback.format_exc()) return False def set_send_status(self,tableName,id,value): sql = "UPDATE %s SET is_send = %s WHERE id = %s;"%(tableName,value,id) try: self.cursor = self.conn.cursor() self.cursor.execute(sql) # 提交到数据库执行 self.conn.commit() self.cursor.close() except Exception as e: print(e) def get_newest_data(self,tableName): sql = "SELECT * FROM %s ORDER BY id DESC LIMIT 1;"%(tableName) results = [] try: self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor) self.cursor.execute(sql) results = self.cursor.fetchall() self.cursor.close() if len(results) > 0: return results[0] else: return results except Exception as e: print(e) return None def get_rainfall_data(self): list = [] sql = "SELECT c462, times FROM table_qxy WHERE times>='%s' limit 1"%(time.strftime("%Y-%m-%d 00:00:00", time.localtime())) try: self._reConn() self.cursor = self.conn.cursor() self.cursor.execute(sql) results = self.cursor.fetchall() for row in results: if row[0] != None: list.append(row[0]) self.cursor.close() return list except: print(traceback.format_exc()) return False def create_point_table(self,list): self.cursor = self.conn.cursor() for index in range(len(list)): tableName = "table_"+str(list[index]['serialNumber']) dataType = list[index]['storageType'] sql = "CREATE TABLE IF NOT EXISTS %s (times datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,data %s not null,INDEX (times)) \ ENGINE=InnoDB DEFAULT CHARSET=utf8;"%(tableName,dataType) try: self.cursor.execute(sql) except: print(traceback.format_exc()) self.cursor.close() def create_delete_stale_data_event(self,eventName,tableName,day): self.cursor = self.conn.cursor() sql = "create event %s on SCHEDULE every 1 day do delete from %s where times<(CURRENT_TIMESTAMP() + INTERVAL -%s DAY);"%(eventName,tableName,day) self.cursor.execute(sql) self.cursor.close() def create_many_column_table_NOTNULL(self,tableName,list): self.cursor = self.conn.cursor() for index in range(len(list)): dataType = list[index]['storageType'] columnName = "c"+str(list[index]['serialNumber']) sql_c = "CREATE TABLE IF NOT EXISTS %s (times datetime NOT NULL,INDEX (times)) \ ENGINE=InnoDB DEFAULT CHARSET=utf8;"%(tableName) sql_add = "ALTER TABLE %s ADD %s %s NOT NULL"%(tableName,columnName,dataType) try: self.cursor.execute(sql_c) self.cursor.execute(sql_add) except: print(traceback.format_exc()) self.cursor.close() def create_many_column_table(self,tableName,list): self.cursor = self.conn.cursor() for index in range(len(list)): dataType = list[index]['storageType'] columnName = "c"+str(list[index]['serialNumber']) sql_c = "CREATE TABLE IF NOT EXISTS %s (times datetime NOT NULL,INDEX (times)) \ ENGINE=InnoDB DEFAULT CHARSET=utf8;"%(tableName) sql_add = "ALTER TABLE %s ADD %s %s "%(tableName,columnName,dataType) try: self.cursor.execute(sql_c) self.cursor.execute(sql_add) except: print(traceback.format_exc()) self.cursor.close() def save_point_data(self,tableName,data): sql = "INSERT INTO %s (data)VALUES(%s);"%(tableName,data) try: self.cursor.execute(sql) # 提交到数据库执行 self.conn.commit() except: # 如果发生错误则回滚 self.conn.rollback() def insert_column_many(self,tableName,timeNow,list): try: self.cursor = self.conn.cursor() sql = "INSERT INTO %s"%(tableName) for index in range(len(list)): if index == 0: columnName = "c"+str(list[index]['serialNumber']) sql = sql + "(times,"+columnName+"," elif index == len(list)-1: columnName = "c"+str(list[index]['serialNumber']) sql = sql + columnName+")" else: columnName = "c"+str(list[index]['serialNumber']) sql = sql + columnName + "," for index in range(len(list)): if index == 0: data = "VALUE ('"+str(timeNow)+"'," + str(list[index]['data']) + "," sql = sql + data elif index == len(list)-1: data = str(list[index]['data']) + ")" sql = sql + data else: data = str(list[index]['data']) + "," sql = sql + data #print(sql) try: self.cursor.execute(sql) # 提交到数据库执行 self.conn.commit() except Exception as e: # 如果发生错误则回滚 self.conn.rollback() print(e) except Exception as e: self._reConn() print(e) else: self.cursor.close() def onLineState_insert(self, point, name, state): try: self.cursor = self.conn.cursor() times = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(time.time()))) sql = "INSERT INTO table_connection_status(io_point, \ driver_name, data_time, state) \ VALUES ('%s', '%s', '%s', %s)" % \ (point, name, times, state) try: self.cursor.execute(sql) # 提交到数据库执行 self.conn.commit() except Exception as e: # 如果发生错误则回滚 self.conn.rollback() print(e) except Exception as e: self._reConn() print(e) else: self.cursor.close() def close (self): self.conn.close() class RedisCtr: def __init__ (self, _host = '', _port = 6379, _db = '', _decode_responses=True): self.host = _host self.port = _port self.db = _db self.decode_responses = _decode_responses self.r = None self._conn() def _conn(self): self.r = redis.StrictRedis(host=self.host, port=self.port, db=self.db, decode_responses=self.decode_responses) def redisSave(self, dataList): try: pipe = self.r.pipeline(transaction=True) for index in range(len(dataList)): key = "c"+str(dataList[index]['serialNumber']) data = dataList[index]['data'] pipe.set(key,data) pipe.execute() except Exception as e: return e else: return True def connectStatusRedisSave(self, key, data): try: pipe = self.r.pipeline(transaction=True) pipe.set(key,data) pipe.execute() except Exception as e: return e else: return True def readLastStatus(self, key): try: pipe = self.r.pipeline(transaction=True) pipe.get(key) result = pipe.execute() return int(result[0]) except Exception as e: print(e) else: print('e') def redisSave(dataList,conn): try: pipe = conn.pipeline(transaction=True) for index in range(len(dataList)): key = "c"+str(dataList[index]['serialNumber']) data = dataList[index]['data'] pipe.set(key,data) pipe.execute() except Exception as e: print(e) return e else: return True