123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369 |
- import pymysql
- import traceback
- import time
- import redis
- import LogOut
- #import mysql.connector
- #import MySQLdb
- class Mysql:
- _logger = LogOut.Log('mysql')
- def __init__ (self,
- host = '',
- user = '',
- passwd = '',
- db = '',
- port = 3306,
- charset= 'utf8'
- ):
- self.host = host
- self.user = user
- self.passwd = passwd
- self.db = db
- self.port = port
- self.charset= charset
- self.conn = None
- self._conn()
-
- def _conn (self):
- try:
- self.conn = pymysql.connect(self.host, self.user, self.passwd, self.db, self.port,autocommit = True)
- '''
- self.conn = mysql.connector.connect(
- host=self.host,
- user=self.user,
- passwd=self.passwd,
- database=self.db
- )
- '''
- '''
- self.conn= MySQLdb.connect(
- host='127.0.0.1',
- port = 3306,
- user='root',
- passwd='',
- db ='centralized_control_db',
- )
- '''
-
- return True
- except :
- self._logger.error('in line 51:mysql connect is unsuccessful!')
- return False
- def _reConn (self,num = 28800,stime = 3): #重试连接总次数为1天,这里根据实际情况自己设置,如果服务器宕机1天都没发现就......
- _number = 0
- _status = True
- while _status and _number <= num:
- try:
- self.conn.ping() #cping 校验连接是否异常
- _status = False
- except:
- self._logger.error('in line 62:mysql connect is unsuccessful!')
- if self._conn()==True: #重新连接,成功退出
- _status = False
- break
- _number +=1
- time.sleep(stime) #连接不成功,休眠3秒钟,继续循环,知道成功或重试次数结束
-
- def get_data_point(self,deviceName):
- list = []
- dict = {}
- sql = "SELECT serial_number,io_point_name,address,read_function_code,write_function_code,data_type,offset,storage_type,protocol,dividend analytical_way \
- FROM data_point_tbl where device_name = '%s'"%(deviceName)
- try:
- self._reConn()
- self.cursor = self.conn.cursor()
- self.cursor.execute(sql)
- results = self.cursor.fetchall()
- for row in results:
- dict = {}
- dict['serialNumber'] = row[0]
- dict['ioPointName'] = row[1]
- dict['address'] = row[2]
- dict['read_function_code'] = row[3]
- dict['write_function_code'] = row[4]
- dict['dataType'] = row[5]
- dict['offset'] = row[6]
- dict['storageType'] = row[7]
- dict['protocol'] = row[8]
- dict['dividend'] = row[9]
- list.append(dict)
- self.cursor.close()
- return list
- except:
- print(traceback.format_exc())
- return False
- def get_mqtt_point(self,deviceCode):
- list = []
- dict = {}
- sql = "SELECT device_name,serial_number,storage_type,mqtt_code,low_limit,up_limit FROM data_point_tbl where device_code = '%s'"%(deviceCode)
- try:
- self._reConn()
- self.cursor = self.conn.cursor()
- self.cursor.execute(sql)
- results = self.cursor.fetchall()
- for row in results:
- dict = {}
- dict['deviceName'] = row[0]
- dict['serialNumber'] = row[1]
- dict['storageType'] = row[2]
- dict['mqttCode'] = row[3]
- dict['lowLimit'] = row[4]
- dict['upLimit'] = row[5]
- list.append(dict)
- self.cursor.close()
- return list
- except Exception as e:
- print(e)
- return False
- def get_mqtt_devices(self):
- list = []
- sql = "SELECT DISTINCT device_code FROM data_point_tbl"
- try:
- self._reConn()
- self.cursor = self.conn.cursor()
- self.cursor.execute(sql)
- results = self.cursor.fetchall()
- for row in results:
- if row[0] != None:
- list.append(row[0])
- self.cursor.close()
- return list
- except:
- print(traceback.format_exc())
- return False
-
- def set_send_status(self,tableName,id,value):
- sql = "UPDATE %s SET is_send = %s WHERE id = %s;"%(tableName,value,id)
- try:
- self.cursor = self.conn.cursor()
- self.cursor.execute(sql)
- # 提交到数据库执行
- self.conn.commit()
- self.cursor.close()
- except Exception as e:
- print(e)
- def get_newest_data(self,tableName):
- sql = "SELECT * FROM %s ORDER BY id DESC LIMIT 1;"%(tableName)
- results = []
- try:
- self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
- self.cursor.execute(sql)
- results = self.cursor.fetchall()
- self.cursor.close()
- if len(results) > 0:
- return results[0]
- else:
- return results
- except Exception as e:
- print(e)
- return None
-
- def get_rainfall_data(self):
- list = []
- sql = "SELECT c462, times FROM table_qxy WHERE times>='%s' limit 1"%(time.strftime("%Y-%m-%d 00:00:00", time.localtime()))
-
- try:
- self._reConn()
- self.cursor = self.conn.cursor()
- self.cursor.execute(sql)
- results = self.cursor.fetchall()
- for row in results:
- if row[0] != None:
- list.append(row[0])
- self.cursor.close()
- return list
- except:
- print(traceback.format_exc())
- return False
-
- def create_point_table(self,list):
- self.cursor = self.conn.cursor()
- for index in range(len(list)):
- tableName = "table_"+str(list[index]['serialNumber'])
- dataType = list[index]['storageType']
- sql = "CREATE TABLE IF NOT EXISTS %s (times datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,data %s not null,INDEX (times)) \
- ENGINE=InnoDB DEFAULT CHARSET=utf8;"%(tableName,dataType)
- try:
- self.cursor.execute(sql)
- except:
- print(traceback.format_exc())
- self.cursor.close()
- def create_delete_stale_data_event(self,eventName,tableName,day):
- self.cursor = self.conn.cursor()
- sql = "create event %s on SCHEDULE every 1 day do delete from %s where times<(CURRENT_TIMESTAMP() + INTERVAL -%s DAY);"%(eventName,tableName,day)
- self.cursor.execute(sql)
- self.cursor.close()
-
- def create_many_column_table_NOTNULL(self,tableName,list):
- self.cursor = self.conn.cursor()
- for index in range(len(list)):
- dataType = list[index]['storageType']
- columnName = "c"+str(list[index]['serialNumber'])
- sql_c = "CREATE TABLE IF NOT EXISTS %s (times datetime NOT NULL,INDEX (times)) \
- ENGINE=InnoDB DEFAULT CHARSET=utf8;"%(tableName)
- sql_add = "ALTER TABLE %s ADD %s %s NOT NULL"%(tableName,columnName,dataType)
- try:
- self.cursor.execute(sql_c)
- self.cursor.execute(sql_add)
- except:
- print(traceback.format_exc())
- self.cursor.close()
-
- def create_many_column_table(self,tableName,list):
- self.cursor = self.conn.cursor()
- for index in range(len(list)):
- dataType = list[index]['storageType']
- columnName = "c"+str(list[index]['serialNumber'])
- sql_c = "CREATE TABLE IF NOT EXISTS %s (times datetime NOT NULL,INDEX (times)) \
- ENGINE=InnoDB DEFAULT CHARSET=utf8;"%(tableName)
- sql_add = "ALTER TABLE %s ADD %s %s "%(tableName,columnName,dataType)
- try:
- self.cursor.execute(sql_c)
- self.cursor.execute(sql_add)
- except:
- print(traceback.format_exc())
- self.cursor.close()
-
- def save_point_data(self,tableName,data):
- sql = "INSERT INTO %s (data)VALUES(%s);"%(tableName,data)
- try:
- self.cursor.execute(sql)
- # 提交到数据库执行
- self.conn.commit()
- except:
- # 如果发生错误则回滚
- self.conn.rollback()
-
- def insert_column_many(self,tableName,timeNow,list):
- try:
- self.cursor = self.conn.cursor()
- sql = "INSERT INTO %s"%(tableName)
- for index in range(len(list)):
- if index == 0:
- columnName = "c"+str(list[index]['serialNumber'])
- sql = sql + "(times,"+columnName+","
- elif index == len(list)-1:
- columnName = "c"+str(list[index]['serialNumber'])
- sql = sql + columnName+")"
- else:
- columnName = "c"+str(list[index]['serialNumber'])
- sql = sql + columnName + ","
-
- for index in range(len(list)):
- if index == 0:
- data = "VALUE ('"+str(timeNow)+"'," + str(list[index]['data']) + ","
- sql = sql + data
- elif index == len(list)-1:
- data = str(list[index]['data']) + ")"
- sql = sql + data
- else:
- data = str(list[index]['data']) + ","
- sql = sql + data
- #print(sql)
- try:
- self.cursor.execute(sql)
- # 提交到数据库执行
- self.conn.commit()
- except Exception as e:
- # 如果发生错误则回滚
- self.conn.rollback()
- print(e)
- except Exception as e:
- self._reConn()
- print(e)
- else:
- self.cursor.close()
-
- def onLineState_insert(self, point, name, state):
- try:
- self.cursor = self.conn.cursor()
- times = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(time.time())))
- sql = "INSERT INTO table_connection_status(io_point, \
- driver_name, data_time, state) \
- VALUES ('%s', '%s', '%s', %s)" % \
- (point, name, times, state)
- try:
- self.cursor.execute(sql)
- # 提交到数据库执行
- self.conn.commit()
- except Exception as e:
- # 如果发生错误则回滚
- self.conn.rollback()
- print(e)
- except Exception as e:
- self._reConn()
- print(e)
- else:
- self.cursor.close()
-
- def close (self):
- self.conn.close()
-
-
- class RedisCtr:
- def __init__ (self,
- _host = '',
- _port = 6379,
- _db = '',
- _decode_responses=True):
- self.host = _host
- self.port = _port
- self.db = _db
- self.decode_responses = _decode_responses
- self.r = None
- self._conn()
- def _conn(self):
- self.r = redis.StrictRedis(host=self.host, port=self.port, db=self.db, decode_responses=self.decode_responses)
- def redisSave(self, dataList):
- try:
- pipe = self.r.pipeline(transaction=True)
- for index in range(len(dataList)):
- key = "c"+str(dataList[index]['serialNumber'])
- data = dataList[index]['data']
- pipe.set(key,data)
- pipe.execute()
- except Exception as e:
- return e
- else:
- return True
-
- def connectStatusRedisSave(self, key, data):
- try:
- pipe = self.r.pipeline(transaction=True)
- pipe.set(key,data)
- pipe.execute()
- except Exception as e:
- return e
- else:
- return True
-
- def readLastStatus(self, key):
- try:
- pipe = self.r.pipeline(transaction=True)
- pipe.get(key)
- result = pipe.execute()
- return int(result[0])
- except Exception as e:
- print(e)
- else:
- print('e')
- def redisSave(dataList,conn):
- try:
- pipe = conn.pipeline(transaction=True)
- for index in range(len(dataList)):
- key = "c"+str(dataList[index]['serialNumber'])
- data = dataList[index]['data']
- pipe.set(key,data)
- pipe.execute()
- except Exception as e:
- print(e)
- return e
- else:
- return True
|