hard_disk_storage.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300
  1. import datetime
  2. import logging
  3. import openpyxl
  4. import pymysql
  5. import traceback
  6. import time
  7. from AES_crypt import decrypt
  8. class HardDiskStorage:
  9. def __init__(self, config, port=3306, charset='utf8'):
  10. self.host = config['ip']
  11. self.user = config['username']
  12. self.passwd = decrypt(config['password'])
  13. self.db = config['dataBaseName']
  14. self.port = port
  15. self.charset = charset
  16. self.conn = None
  17. self.logger = logging.getLogger()
  18. if not self._conn():
  19. self._reConn()
  20. def _conn(self):
  21. try:
  22. self.conn = pymysql.connect(host=self.host, user=self.user, password=self.passwd, database=self.db, port=self.port, autocommit=True)
  23. return True
  24. except Exception as e:
  25. self.logger.error(f'failed to connect to {self.host}:{self.port}:{self.db} by [{self.user}:{self.passwd}]:{e}')
  26. return False
  27. def _reConn(self, num=28800, stime=3): # 重试连接总次数为1天,这里根据实际情况自己设置,如果服务器宕机1天都没发现就......
  28. _number = 0
  29. _status = True
  30. while _status and _number <= num:
  31. try:
  32. self.conn.ping() # cping 校验连接是否异常
  33. _status = False
  34. except Exception as e:
  35. if self._conn(): # 重新连接,成功退出
  36. _status = False
  37. break
  38. _number += 1
  39. time.sleep(stime) # 连接不成功,休眠3秒钟,继续循环,知道成功或重试次数结束
  40. def get_station_info(self, station_name):
  41. sql = "SELECT * FROM data_point_tbl where station_name = '%s'" % station_name
  42. try:
  43. self._reConn()
  44. self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
  45. self.cursor.execute(sql)
  46. results = self.cursor.fetchall()
  47. self.cursor.close()
  48. return results
  49. except:
  50. print(traceback.format_exc())
  51. return False
  52. def get_point_info(self, point_tuple):
  53. if point_tuple:
  54. sql = "SELECT * FROM data_point_tbl where serial_number in %s" % (str(point_tuple))
  55. else:
  56. sql = "SELECT * FROM data_point_tbl"
  57. try:
  58. self._reConn()
  59. self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
  60. self.cursor.execute(sql)
  61. results = self.cursor.fetchall()
  62. self.cursor.close()
  63. return results
  64. except:
  65. print(traceback.format_exc())
  66. return False
  67. def get_table_data(self, senect_info):
  68. table_name = "table_" + senect_info['deviceName']
  69. time_begin = senect_info['timeBegin']
  70. time_end = senect_info['timeEnd']
  71. column = senect_info['pointList']
  72. if len(column) > 0:
  73. sql = "SELECT times"
  74. for column_name in column:
  75. sql = sql + "," + column_name
  76. sql = sql + " FROM %s WHERE times > '%s' AND times < '%s'" % (table_name, time_begin, time_end)
  77. else:
  78. sql = "SELECT * FROM %s WHERE times > '%s' AND times < '%s';" % (table_name, time_begin, time_end)
  79. try:
  80. self._reConn()
  81. self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
  82. self.cursor.execute(sql)
  83. results = self.cursor.fetchall()
  84. self.cursor.close()
  85. return results
  86. except:
  87. print(traceback.format_exc())
  88. return None
  89. # 历史查询接口(new)--------------------------------------------\\
  90. def get_total_count_and_first_id(self, search_info):
  91. table_name = "table_" + search_info['deviceName']
  92. time_begin = search_info['timeBegin']
  93. time_end = search_info['timeEnd']
  94. sql = "select count(*) from %s where times >= '%s' and times <= '%s';" % (table_name, time_begin, time_end)
  95. sql_1 = "select id from %s where times >= '%s' limit 1;" % (table_name, time_begin)
  96. try:
  97. self._reConn()
  98. self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
  99. self.cursor.execute(sql)
  100. count = self.cursor.fetchall()
  101. self.cursor.execute(sql_1)
  102. first_id = self.cursor.fetchall()
  103. if isinstance(first_id, tuple):
  104. first_id = list(first_id)
  105. result = count + first_id
  106. return result
  107. except:
  108. print(traceback.format_exc())
  109. return None
  110. def get_item_by_id_offset(self, search_info):
  111. table_name = "table_" + search_info['deviceName']
  112. point_list = search_info['pointList']
  113. id_offset = search_info['idOffset']
  114. quantity = search_info['quantity']
  115. sql = "select times, %s from %s where id >= %s limit %s" % (','.join(point_list), table_name, id_offset, quantity)
  116. try:
  117. self._reConn()
  118. self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
  119. self.cursor.execute(sql)
  120. results = self.cursor.fetchall()
  121. self.cursor.close()
  122. return results
  123. except:
  124. print(traceback.format_exc())
  125. return None
  126. # --------------------------------------------//
  127. # 数据导出接口------------------------------------------------\\
  128. def quary_table_data(self, search_info):
  129. table_name = "table_" + search_info['deviceName']
  130. time_begin = search_info['timeBegin']
  131. time_end = search_info['timeEnd']
  132. point_list = search_info['pointList']
  133. point_list_1 = str([i[1:] for i in point_list])[1:-1]
  134. sql = "select times, %s from %s where times >= '%s' and times <= '%s';" % (','.join(point_list), table_name, time_begin, time_end)
  135. sql1 = "select io_point_name from data_point_tbl where serial_number in ( %s );" % point_list_1
  136. try:
  137. self._reConn()
  138. self.cursor = self.conn.cursor()
  139. self.cursor.execute(sql)
  140. res = self.cursor.fetchall()
  141. self.cursor.execute(sql1)
  142. res1 = self.cursor.fetchall()
  143. title = [item[0] for item in res1]
  144. title.insert(0, '日期')
  145. self.cursor.close()
  146. except:
  147. print(traceback.format_exc())
  148. return None
  149. book = openpyxl.Workbook()
  150. sheet = book.create_sheet(index=0)
  151. # 循环将表头写入到sheet页
  152. for i in range(len(title)):
  153. sheet.cell(1, i + 1).value = title[i]
  154. # 写数据
  155. for row in range(0, len(res)):
  156. for col in range(0, len(res[row])):
  157. cell_val = res[row][col]
  158. if isinstance(cell_val, datetime.datetime):
  159. times = cell_val.strftime("%Y-%m-%d %H:%M:%S")
  160. sheet.cell(row + 2, col + 1).value = times
  161. else:
  162. sheet.cell(row + 2, col + 1).value = cell_val
  163. file_path = (table_name + '.xlsx')
  164. savepath = file_path
  165. book.save(savepath)
  166. return savepath
  167. # ------------------------------------------------//
  168. # 获取insitu指令接口
  169. def get_in_situ_command(self):
  170. sql = "select * from shuizhi_insitu_instruct;"
  171. try:
  172. self._reConn()
  173. self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
  174. self.cursor.execute(sql)
  175. results = self.cursor.fetchall()
  176. self.cursor.close()
  177. return results
  178. except:
  179. print(traceback.format_exc())
  180. return None
  181. def get_connectors(self):
  182. sql = "SELECT * FROM station_info_tbl WHERE status = 1"
  183. try:
  184. self._reConn()
  185. self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
  186. self.cursor.execute(sql)
  187. results = self.cursor.fetchall()
  188. self.cursor.close()
  189. return results
  190. except:
  191. print(traceback.format_exc())
  192. return None
  193. def create_delete_stale_data_event(self, eventName, table_name, day):
  194. self.cursor = self.conn.cursor()
  195. sql = "create event %s on SCHEDULE every 1 day do delete from %s where times<(CURRENT_TIMESTAMP() + INTERVAL -%s DAY);" % (
  196. eventName, table_name, day)
  197. self.cursor.execute(sql)
  198. self.cursor.close()
  199. def create_many_column_table(self, table_name, list):
  200. self.cursor = self.conn.cursor()
  201. for index in range(len(list)):
  202. dataType = list[index]['storageType']
  203. columnName = "c" + str(list[index]['serialNumber'])
  204. sql_c = "CREATE TABLE IF NOT EXISTS %s (times datetime NOT NULL,INDEX (times)) \
  205. ENGINE=InnoDB DEFAULT CHARSET=utf8;" % (table_name)
  206. sql_add = "ALTER TABLE %s ADD %s %s " % (table_name, columnName, dataType)
  207. try:
  208. self.cursor.execute(sql_c)
  209. self.cursor.execute(sql_add)
  210. except:
  211. print(traceback.format_exc())
  212. sql_send = "ALTER TABLE %s ADD 'is_send'tinyint NOT NULL DEFAULT '0'" % (table_name)
  213. self.cursor.execute(sql_send)
  214. self.cursor.close()
  215. def insert_column_many(self, table_name, timeNow, dict):
  216. try:
  217. self.cursor = self.conn.cursor()
  218. sql = "INSERT INTO %s (times" % (table_name)
  219. for key_name in dict.keys():
  220. sql = sql + "," + key_name
  221. sql = sql + ") VALUE ('" + str(timeNow) + "'"
  222. for key_name in dict.keys():
  223. data = "," + str(dict[key_name])
  224. sql = sql + data
  225. sql = sql + ")"
  226. try:
  227. self.cursor.execute(sql)
  228. # 提交到数据库执行
  229. self.conn.commit()
  230. except Exception as e:
  231. # 如果发生错误则回滚
  232. self.conn.rollback()
  233. print(e)
  234. except Exception as e:
  235. self._reConn()
  236. print(e)
  237. else:
  238. self.cursor.close()
  239. def close(self):
  240. self.conn.close()
  241. def get_command_info(self, station_name):
  242. sql = "SELECT command FROM command where station_name = '%s' " % station_name
  243. try:
  244. self._reConn()
  245. self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
  246. self.cursor.execute(sql)
  247. results = self.cursor.fetchall()
  248. self.cursor.close()
  249. return results
  250. except:
  251. print(traceback.format_exc())
  252. return None
  253. # lee
  254. def get_device_name_by_station_name(self, station_name):
  255. sql = "SELECT DISTINCT device_name FROM data_point_tbl WHERE station_name = '%s' " % station_name
  256. try:
  257. self._reConn()
  258. self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
  259. self.cursor.execute(sql)
  260. results = self.cursor.fetchall()
  261. self.cursor.close()
  262. return results
  263. except:
  264. print(traceback.format_exc())
  265. return None
  266. def get_data_point_by_device_name(self, device_name):
  267. sql = "SELECT * FROM data_point_tbl WHERE device_name = '%s'" % device_name
  268. try:
  269. self._reConn()
  270. self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
  271. self.cursor.execute(sql)
  272. results = self.cursor.fetchall()
  273. self.cursor.close()
  274. return results
  275. except:
  276. print(traceback.format_exc())
  277. return None