hard_disk_storage.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289
  1. import pymysql
  2. import traceback
  3. import time
  4. import datetime
  5. from log import OutPutLog
  6. class HardDiskStorage():
  7. def __init__(self, config, port=3306, charset='utf8'):
  8. self._log = OutPutLog()
  9. self.host = config['ip']
  10. self.user = config['username']
  11. self.passwd = config['password']
  12. self.db = config['dataBaseName']
  13. self.port = port
  14. self.charset = charset
  15. self.conn = None
  16. self._conn()
  17. def _conn(self):
  18. try:
  19. self.conn = pymysql.connect(host=self.host, user=self.user, password=self.passwd, database=self.db, port=self.port, autocommit=True)
  20. return True
  21. except:
  22. self._log.error(traceback.format_exc())
  23. return False
  24. def _reConn(self, num=28800, stime=3): # 重试连接总次数为1天,这里根据实际情况自己设置,如果服务器宕机1天都没发现就......
  25. _number = 0
  26. _status = True
  27. while _status and _number <= num:
  28. try:
  29. self.conn.ping() # cping 校验连接是否异常
  30. _status = False
  31. except Exception as e:
  32. self._log.error(e)
  33. if self._conn() == True: # 重新连接,成功退出
  34. print('haha')
  35. _status = False
  36. break
  37. _number += 1
  38. time.sleep(stime) # 连接不成功,休眠3秒钟,继续循环,知道成功或重试次数结束
  39. def get_station_info(self, station_name):
  40. sql = "SELECT * FROM data_point_tbl where station_name = '%s'" % (station_name)
  41. try:
  42. self._reConn()
  43. self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
  44. self.cursor.execute(sql)
  45. results = self.cursor.fetchall()
  46. self.cursor.close()
  47. return results
  48. except:
  49. self._log.error(traceback.format_exc())
  50. return False
  51. def get_point_info(self, point_tuple):
  52. if point_tuple:
  53. sql = "SELECT * FROM data_point_tbl where serial_number in %s" % (str(point_tuple))
  54. else:
  55. sql = "SELECT * FROM data_point_tbl"
  56. try:
  57. self._reConn()
  58. self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
  59. self.cursor.execute(sql)
  60. results = self.cursor.fetchall()
  61. self.cursor.close()
  62. return results
  63. except:
  64. self._log.error(traceback.format_exc())
  65. return False
  66. def get_table_data(self, select_info):
  67. table_name = "table_" + select_info['deviceName']
  68. time_begin = select_info['timeBegin']
  69. time_end = select_info['timeEnd']
  70. column = select_info['pointList']
  71. if len(column) > 0:
  72. sql = "SELECT times"
  73. for column_name in column:
  74. sql = sql + "," + column_name
  75. sql = sql + " FROM %s WHERE times > '%s' AND times < '%s'" % (table_name, time_begin, time_end)
  76. else:
  77. sql = "SELECT * FROM %s WHERE times > '%s' AND times < '%s';" % (table_name, time_begin, time_end)
  78. try:
  79. self._reConn()
  80. self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
  81. self.cursor.execute(sql)
  82. results = self.cursor.fetchall()
  83. self.cursor.close()
  84. return results
  85. except:
  86. self._log.error(traceback.format_exc())
  87. return None
  88. def get_connectors(self, read_write):
  89. sql = f"SELECT * FROM station_info_tbl WHERE status = 1 AND `read_write`={read_write};"
  90. try:
  91. self._reConn()
  92. self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
  93. self.cursor.execute(sql)
  94. results = self.cursor.fetchall()
  95. self.cursor.close()
  96. return results
  97. except:
  98. self._log.error(traceback.format_exc())
  99. return None
  100. # def create_delete_stale_data_event(self, eventName, table_name, day):
  101. # self.cursor = self.conn.cursor()
  102. # sql = "create event %s on SCHEDULE every 1 day do delete from %s where times<(CURRENT_TIMESTAMP() + INTERVAL -%s DAY);" % (
  103. # eventName, table_name, day)
  104. # self.cursor.execute(sql)
  105. # self.cursor.close()
  106. #
  107. # def create_many_column_table(self, table_name, list):
  108. # self.cursor = self.conn.cursor()
  109. # for index in range(len(list)):
  110. # dataType = list[index]['storageType']
  111. # columnName = "c" + str(list[index]['serialNumber'])
  112. # sql_c = "CREATE TABLE IF NOT EXISTS %s (times datetime NOT NULL,INDEX (times)) \
  113. # ENGINE=InnoDB DEFAULT CHARSET=utf8;" % (table_name)
  114. # sql_add = "ALTER TABLE %s ADD %s %s " % (table_name, columnName, dataType)
  115. # try:
  116. # self.cursor.execute(sql_c)
  117. # self.cursor.execute(sql_add)
  118. # except:
  119. # self._log.error(traceback.format_exc())
  120. # sql_send = "ALTER TABLE %s ADD 'is_send'tinyint NOT NULL DEFAULT '0'" % (table_name)
  121. # self.cursor.execute(sql_send)
  122. # self.cursor.close()
  123. def insert_column_many(self, table_name, timeNow, dict):
  124. try:
  125. self.cursor = self.conn.cursor()
  126. sql = "INSERT INTO %s (times" % (table_name)
  127. for key_name in dict.keys():
  128. sql = sql + "," + key_name
  129. sql = sql + ") VALUE ('" + str(timeNow) + "'"
  130. for key_name in dict.keys():
  131. data = "," + str(dict[key_name])
  132. sql = sql + data
  133. sql = sql + ")"
  134. try:
  135. self.cursor.execute(sql)
  136. # 提交到数据库执行
  137. self.conn.commit()
  138. except Exception as e:
  139. # 如果发生错误则回滚
  140. self.conn.rollback()
  141. self._log.error(e)
  142. except Exception as e:
  143. self._reConn()
  144. self._log.error(e)
  145. else:
  146. self.cursor.close()
  147. def close(self):
  148. self.conn.close()
  149. def get_command_info(self, station_name):
  150. sql = "SELECT command FROM command where station_name = %s" % ("'" + station_name + "'")
  151. try:
  152. self._reConn()
  153. self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
  154. self.cursor.execute(sql)
  155. results = self.cursor.fetchall()
  156. self.cursor.close()
  157. return results
  158. except:
  159. self._log.error(traceback.format_exc())
  160. return None
  161. # lee
  162. def get_device_name_by_station_name(self, station_name):
  163. sql = "SELECT DISTINCT device_name FROM data_point_tbl WHERE station_name = '%s' " % station_name
  164. try:
  165. self._reConn()
  166. self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
  167. self.cursor.execute(sql)
  168. results = self.cursor.fetchall()
  169. self.cursor.close()
  170. return results
  171. except:
  172. print(traceback.format_exc())
  173. return None
  174. def get_data_point_by_device_name(self, device_name):
  175. sql = "SELECT * FROM data_point_tbl WHERE device_name = '%s' AND hard_storage=1;" % device_name
  176. try:
  177. self._reConn()
  178. self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
  179. self.cursor.execute(sql)
  180. results = self.cursor.fetchall()
  181. self.cursor.close()
  182. return results
  183. except:
  184. print(traceback.format_exc())
  185. return None
  186. # ----------------------------------------------------------------------------------------------
  187. def execute_sql(self, sql):
  188. try:
  189. self._reConn()
  190. self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
  191. self.cursor.execute(sql)
  192. results = self.cursor.fetchall()
  193. self.cursor.close()
  194. return results
  195. except:
  196. print("execute_sql"+str(traceback.format_exc()))
  197. self._log.error("execute_sql"+str(traceback.format_exc()))
  198. return None
  199. def get_table_name(self):
  200. '''[{'device_name': 'zfdj_electric'}, {'device_name': 'jbd1_electric'}, {'device_name': 'yysbdlfdx_electric'}, ...]'''
  201. sql = "SELECT DISTINCT device_name FROM data_point_tbl;"
  202. res = self.execute_sql(sql)
  203. return res
  204. # def table_xxx_insert_data(self, format_data_dict, tables_name, save_time):
  205. # try:
  206. # for table in tables_name:
  207. # table_name = "table_" + table['device_name'] # zfdj_electric
  208. # sql_columm = "SHOW FIELDS FROM %s WHERE Field like \'c%%\';" % (table_name)
  209. # columns = [] # ['c1', 'c2',...,'c10', 'times', 'is_send']
  210. # value = [] # [1.401298464324817e-45, 0.0,...,1.401298464324817e-45, 'NOW()', 0]
  211. # for each in self.execute_sql(sql_columm):
  212. # columns.append(each['Field'])
  213. # value.append(format_data_dict[each['Field']])
  214. # columns.append('times')
  215. # value.append(save_time)
  216. # columns.append('is_send')
  217. # value.append(0)
  218. # sql_insert = "INSERT INTO %s %s VALUES %s;" % (table_name, format(tuple(columns)).replace('\'', ''), format(tuple(value)))
  219. # print(sql_insert)
  220. # # self.execute_sql(sql_insert)
  221. # except:
  222. # self._log.error(traceback.format_exc())
  223. def get_device_info(self, device_name):
  224. sql = "SELECT * FROM data_point_tbl where device_name = '%s'" % (device_name)
  225. try:
  226. self._reConn()
  227. self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
  228. self.cursor.execute(sql)
  229. results = self.cursor.fetchall()
  230. self.cursor.close()
  231. return results
  232. except:
  233. self._log.error(traceback.format_exc())
  234. return False
  235. def execute_update_sql(self, sql):
  236. """执行UPDATE、INSERT、DELETE语句"""
  237. try:
  238. self._reConn()
  239. self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
  240. self.cursor.execute(sql)
  241. self.conn.commit()
  242. self.cursor.close()
  243. return True
  244. except pymysql.Error as mysql_error:
  245. self._log.error(f"MYSQL执行失败!{sql}")
  246. self._log.error(mysql_error)
  247. return False
  248. def gather_fish_light_info(self, location):
  249. """水下聚鱼灯远程控制功能:查找指定位置的聚鱼灯的相关控制信息"""
  250. sql = f"SELECT control_mode,status,definite_time,conn_status FROM `gather_fish_light_control` WHERE location=\'{location}\';"
  251. data = self.execute_sql(sql)
  252. return data
  253. def get_gather_fish_light_command(self):
  254. """水下聚鱼灯远程控制功能:查看控制聚鱼灯的指令"""
  255. sql = "SELECT control_mode,status,command FROM gather_fish_light_command;"
  256. data = self.execute_sql(sql)
  257. return data