dataBase_api.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369
  1. import pymysql
  2. import traceback
  3. import time
  4. import redis
  5. import LogOut
  6. #import mysql.connector
  7. #import MySQLdb
  8. class Mysql:
  9. _logger = LogOut.Log('mysql')
  10. def __init__ (self,
  11. host = '',
  12. user = '',
  13. passwd = '',
  14. db = '',
  15. port = 3306,
  16. charset= 'utf8'
  17. ):
  18. self.host = host
  19. self.user = user
  20. self.passwd = passwd
  21. self.db = db
  22. self.port = port
  23. self.charset= charset
  24. self.conn = None
  25. self._conn()
  26. def _conn (self):
  27. try:
  28. self.conn = pymysql.connect(self.host, self.user, self.passwd, self.db, self.port,autocommit = True)
  29. '''
  30. self.conn = mysql.connector.connect(
  31. host=self.host,
  32. user=self.user,
  33. passwd=self.passwd,
  34. database=self.db
  35. )
  36. '''
  37. '''
  38. self.conn= MySQLdb.connect(
  39. host='127.0.0.1',
  40. port = 3306,
  41. user='root',
  42. passwd='',
  43. db ='centralized_control_db',
  44. )
  45. '''
  46. return True
  47. except :
  48. self._logger.error('in line 51:mysql connect is unsuccessful!')
  49. return False
  50. def _reConn (self,num = 28800,stime = 3): #重试连接总次数为1天,这里根据实际情况自己设置,如果服务器宕机1天都没发现就......
  51. _number = 0
  52. _status = True
  53. while _status and _number <= num:
  54. try:
  55. self.conn.ping() #cping 校验连接是否异常
  56. _status = False
  57. except:
  58. self._logger.error('in line 62:mysql connect is unsuccessful!')
  59. if self._conn()==True: #重新连接,成功退出
  60. _status = False
  61. break
  62. _number +=1
  63. time.sleep(stime) #连接不成功,休眠3秒钟,继续循环,知道成功或重试次数结束
  64. def get_data_point(self,deviceName):
  65. list = []
  66. dict = {}
  67. sql = "SELECT serial_number,io_point_name,address,read_function_code,write_function_code,data_type,offset,storage_type,protocol,dividend analytical_way \
  68. FROM data_point_tbl where device_name = '%s'"%(deviceName)
  69. try:
  70. self._reConn()
  71. self.cursor = self.conn.cursor()
  72. self.cursor.execute(sql)
  73. results = self.cursor.fetchall()
  74. for row in results:
  75. dict = {}
  76. dict['serialNumber'] = row[0]
  77. dict['ioPointName'] = row[1]
  78. dict['address'] = row[2]
  79. dict['read_function_code'] = row[3]
  80. dict['write_function_code'] = row[4]
  81. dict['dataType'] = row[5]
  82. dict['offset'] = row[6]
  83. dict['storageType'] = row[7]
  84. dict['protocol'] = row[8]
  85. dict['dividend'] = row[9]
  86. list.append(dict)
  87. self.cursor.close()
  88. return list
  89. except:
  90. print(traceback.format_exc())
  91. return False
  92. def get_mqtt_point(self,deviceCode):
  93. list = []
  94. dict = {}
  95. sql = "SELECT device_name,serial_number,storage_type,mqtt_code,low_limit,up_limit FROM data_point_tbl where device_code = '%s'"%(deviceCode)
  96. try:
  97. self._reConn()
  98. self.cursor = self.conn.cursor()
  99. self.cursor.execute(sql)
  100. results = self.cursor.fetchall()
  101. for row in results:
  102. dict = {}
  103. dict['deviceName'] = row[0]
  104. dict['serialNumber'] = row[1]
  105. dict['storageType'] = row[2]
  106. dict['mqttCode'] = row[3]
  107. dict['lowLimit'] = row[4]
  108. dict['upLimit'] = row[5]
  109. list.append(dict)
  110. self.cursor.close()
  111. return list
  112. except Exception as e:
  113. print(e)
  114. return False
  115. def get_mqtt_devices(self):
  116. list = []
  117. sql = "SELECT DISTINCT device_code FROM data_point_tbl"
  118. try:
  119. self._reConn()
  120. self.cursor = self.conn.cursor()
  121. self.cursor.execute(sql)
  122. results = self.cursor.fetchall()
  123. for row in results:
  124. if row[0] != None:
  125. list.append(row[0])
  126. self.cursor.close()
  127. return list
  128. except:
  129. print(traceback.format_exc())
  130. return False
  131. def set_send_status(self,tableName,id,value):
  132. sql = "UPDATE %s SET is_send = %s WHERE id = %s;"%(tableName,value,id)
  133. try:
  134. self.cursor = self.conn.cursor()
  135. self.cursor.execute(sql)
  136. # 提交到数据库执行
  137. self.conn.commit()
  138. self.cursor.close()
  139. except Exception as e:
  140. print(e)
  141. def get_newest_data(self,tableName):
  142. sql = "SELECT * FROM %s ORDER BY id DESC LIMIT 1;"%(tableName)
  143. results = []
  144. try:
  145. self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
  146. self.cursor.execute(sql)
  147. results = self.cursor.fetchall()
  148. self.cursor.close()
  149. if len(results) > 0:
  150. return results[0]
  151. else:
  152. return results
  153. except Exception as e:
  154. print(e)
  155. return None
  156. def get_rainfall_data(self):
  157. list = []
  158. sql = "SELECT c462, times FROM table_qxy WHERE times>='%s' limit 1"%(time.strftime("%Y-%m-%d 00:00:00", time.localtime()))
  159. try:
  160. self._reConn()
  161. self.cursor = self.conn.cursor()
  162. self.cursor.execute(sql)
  163. results = self.cursor.fetchall()
  164. for row in results:
  165. if row[0] != None:
  166. list.append(row[0])
  167. self.cursor.close()
  168. return list
  169. except:
  170. print(traceback.format_exc())
  171. return False
  172. def create_point_table(self,list):
  173. self.cursor = self.conn.cursor()
  174. for index in range(len(list)):
  175. tableName = "table_"+str(list[index]['serialNumber'])
  176. dataType = list[index]['storageType']
  177. sql = "CREATE TABLE IF NOT EXISTS %s (times datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,data %s not null,INDEX (times)) \
  178. ENGINE=InnoDB DEFAULT CHARSET=utf8;"%(tableName,dataType)
  179. try:
  180. self.cursor.execute(sql)
  181. except:
  182. print(traceback.format_exc())
  183. self.cursor.close()
  184. def create_delete_stale_data_event(self,eventName,tableName,day):
  185. self.cursor = self.conn.cursor()
  186. sql = "create event %s on SCHEDULE every 1 day do delete from %s where times<(CURRENT_TIMESTAMP() + INTERVAL -%s DAY);"%(eventName,tableName,day)
  187. self.cursor.execute(sql)
  188. self.cursor.close()
  189. def create_many_column_table_NOTNULL(self,tableName,list):
  190. self.cursor = self.conn.cursor()
  191. for index in range(len(list)):
  192. dataType = list[index]['storageType']
  193. columnName = "c"+str(list[index]['serialNumber'])
  194. sql_c = "CREATE TABLE IF NOT EXISTS %s (times datetime NOT NULL,INDEX (times)) \
  195. ENGINE=InnoDB DEFAULT CHARSET=utf8;"%(tableName)
  196. sql_add = "ALTER TABLE %s ADD %s %s NOT NULL"%(tableName,columnName,dataType)
  197. try:
  198. self.cursor.execute(sql_c)
  199. self.cursor.execute(sql_add)
  200. except:
  201. print(traceback.format_exc())
  202. self.cursor.close()
  203. def create_many_column_table(self,tableName,list):
  204. self.cursor = self.conn.cursor()
  205. for index in range(len(list)):
  206. dataType = list[index]['storageType']
  207. columnName = "c"+str(list[index]['serialNumber'])
  208. sql_c = "CREATE TABLE IF NOT EXISTS %s (times datetime NOT NULL,INDEX (times)) \
  209. ENGINE=InnoDB DEFAULT CHARSET=utf8;"%(tableName)
  210. sql_add = "ALTER TABLE %s ADD %s %s "%(tableName,columnName,dataType)
  211. try:
  212. self.cursor.execute(sql_c)
  213. self.cursor.execute(sql_add)
  214. except:
  215. print(traceback.format_exc())
  216. self.cursor.close()
  217. def save_point_data(self,tableName,data):
  218. sql = "INSERT INTO %s (data)VALUES(%s);"%(tableName,data)
  219. try:
  220. self.cursor.execute(sql)
  221. # 提交到数据库执行
  222. self.conn.commit()
  223. except:
  224. # 如果发生错误则回滚
  225. self.conn.rollback()
  226. def insert_column_many(self,tableName,timeNow,list):
  227. try:
  228. self.cursor = self.conn.cursor()
  229. sql = "INSERT INTO %s"%(tableName)
  230. for index in range(len(list)):
  231. if index == 0:
  232. columnName = "c"+str(list[index]['serialNumber'])
  233. sql = sql + "(times,"+columnName+","
  234. elif index == len(list)-1:
  235. columnName = "c"+str(list[index]['serialNumber'])
  236. sql = sql + columnName+")"
  237. else:
  238. columnName = "c"+str(list[index]['serialNumber'])
  239. sql = sql + columnName + ","
  240. for index in range(len(list)):
  241. if index == 0:
  242. data = "VALUE ('"+str(timeNow)+"'," + str(list[index]['data']) + ","
  243. sql = sql + data
  244. elif index == len(list)-1:
  245. data = str(list[index]['data']) + ")"
  246. sql = sql + data
  247. else:
  248. data = str(list[index]['data']) + ","
  249. sql = sql + data
  250. #print(sql)
  251. try:
  252. self.cursor.execute(sql)
  253. # 提交到数据库执行
  254. self.conn.commit()
  255. except Exception as e:
  256. # 如果发生错误则回滚
  257. self.conn.rollback()
  258. print(e)
  259. except Exception as e:
  260. self._reConn()
  261. print(e)
  262. else:
  263. self.cursor.close()
  264. def onLineState_insert(self, point, name, state):
  265. try:
  266. self.cursor = self.conn.cursor()
  267. times = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(time.time())))
  268. sql = "INSERT INTO table_connection_status(io_point, \
  269. driver_name, data_time, state) \
  270. VALUES ('%s', '%s', '%s', %s)" % \
  271. (point, name, times, state)
  272. try:
  273. self.cursor.execute(sql)
  274. # 提交到数据库执行
  275. self.conn.commit()
  276. except Exception as e:
  277. # 如果发生错误则回滚
  278. self.conn.rollback()
  279. print(e)
  280. except Exception as e:
  281. self._reConn()
  282. print(e)
  283. else:
  284. self.cursor.close()
  285. def close (self):
  286. self.conn.close()
  287. class RedisCtr:
  288. def __init__ (self,
  289. _host = '',
  290. _port = 6379,
  291. _db = '',
  292. _decode_responses=True):
  293. self.host = _host
  294. self.port = _port
  295. self.db = _db
  296. self.decode_responses = _decode_responses
  297. self.r = None
  298. self._conn()
  299. def _conn(self):
  300. self.r = redis.StrictRedis(host=self.host, port=self.port, db=self.db, decode_responses=self.decode_responses)
  301. def redisSave(self, dataList):
  302. try:
  303. pipe = self.r.pipeline(transaction=True)
  304. for index in range(len(dataList)):
  305. key = "c"+str(dataList[index]['serialNumber'])
  306. data = dataList[index]['data']
  307. pipe.set(key,data)
  308. pipe.execute()
  309. except Exception as e:
  310. return e
  311. else:
  312. return True
  313. def connectStatusRedisSave(self, key, data):
  314. try:
  315. pipe = self.r.pipeline(transaction=True)
  316. pipe.set(key,data)
  317. pipe.execute()
  318. except Exception as e:
  319. return e
  320. else:
  321. return True
  322. def readLastStatus(self, key):
  323. try:
  324. pipe = self.r.pipeline(transaction=True)
  325. pipe.get(key)
  326. result = pipe.execute()
  327. return int(result[0])
  328. except Exception as e:
  329. print(e)
  330. else:
  331. print('e')
  332. def redisSave(dataList,conn):
  333. try:
  334. pipe = conn.pipeline(transaction=True)
  335. for index in range(len(dataList)):
  336. key = "c"+str(dataList[index]['serialNumber'])
  337. data = dataList[index]['data']
  338. pipe.set(key,data)
  339. pipe.execute()
  340. except Exception as e:
  341. print(e)
  342. return e
  343. else:
  344. return True