hard_disk_storage.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299
  1. import datetime
  2. from logging_config import logger
  3. import openpyxl
  4. import pymysql
  5. import traceback
  6. import time
  7. from AES_crypt import decrypt
  8. class HardDiskStorage:
  9. def __init__(self, config, port=3306, charset='utf8'):
  10. self.host = config['ip']
  11. self.user = config['username']
  12. self.passwd = decrypt(config['password'])
  13. self.db = config['dataBaseName']
  14. self.port = port
  15. self.charset = charset
  16. self.conn = None
  17. if not self._conn():
  18. self._reConn()
  19. def _conn(self):
  20. try:
  21. self.conn = pymysql.connect(host=self.host, user=self.user, password=self.passwd, database=self.db, port=self.port, autocommit=True)
  22. return True
  23. except Exception as e:
  24. logger.error(f'failed to connect to {self.host}:{self.port}:{self.db} by [{self.user}:{self.passwd}]:{e}')
  25. return False
  26. def _reConn(self, num=28800, stime=3): # 重试连接总次数为1天,这里根据实际情况自己设置,如果服务器宕机1天都没发现就......
  27. _number = 0
  28. _status = True
  29. while _status and _number <= num:
  30. try:
  31. self.conn.ping() # cping 校验连接是否异常
  32. _status = False
  33. except Exception as e:
  34. if self._conn(): # 重新连接,成功退出
  35. _status = False
  36. break
  37. _number += 1
  38. time.sleep(stime) # 连接不成功,休眠3秒钟,继续循环,知道成功或重试次数结束
  39. def get_station_info(self, station_name):
  40. sql = "SELECT * FROM data_point_tbl where station_name = '%s'" % station_name
  41. try:
  42. self._reConn()
  43. self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
  44. self.cursor.execute(sql)
  45. results = self.cursor.fetchall()
  46. self.cursor.close()
  47. return results
  48. except:
  49. print(traceback.format_exc())
  50. return False
  51. def get_point_info(self, point_tuple):
  52. if point_tuple:
  53. sql = "SELECT * FROM data_point_tbl where serial_number in %s" % (str(point_tuple))
  54. else:
  55. sql = "SELECT * FROM data_point_tbl"
  56. try:
  57. self._reConn()
  58. self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
  59. self.cursor.execute(sql)
  60. results = self.cursor.fetchall()
  61. self.cursor.close()
  62. return results
  63. except:
  64. print(traceback.format_exc())
  65. return False
  66. def get_table_data(self, senect_info):
  67. table_name = "table_" + senect_info['deviceName']
  68. time_begin = senect_info['timeBegin']
  69. time_end = senect_info['timeEnd']
  70. column = senect_info['pointList']
  71. if len(column) > 0:
  72. sql = "SELECT times"
  73. for column_name in column:
  74. sql = sql + "," + column_name
  75. sql = sql + " FROM %s WHERE times > '%s' AND times < '%s'" % (table_name, time_begin, time_end)
  76. else:
  77. sql = "SELECT * FROM %s WHERE times > '%s' AND times < '%s';" % (table_name, time_begin, time_end)
  78. try:
  79. self._reConn()
  80. self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
  81. self.cursor.execute(sql)
  82. results = self.cursor.fetchall()
  83. self.cursor.close()
  84. return results
  85. except:
  86. print(traceback.format_exc())
  87. return None
  88. # 历史查询接口(new)--------------------------------------------\\
  89. def get_total_count_and_first_id(self, search_info):
  90. table_name = "table_" + search_info['deviceName']
  91. time_begin = search_info['timeBegin']
  92. time_end = search_info['timeEnd']
  93. sql = "select count(*) from %s where times >= '%s' and times <= '%s';" % (table_name, time_begin, time_end)
  94. sql_1 = "select id from %s where times >= '%s' limit 1;" % (table_name, time_begin)
  95. try:
  96. self._reConn()
  97. self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
  98. self.cursor.execute(sql)
  99. count = self.cursor.fetchall()
  100. self.cursor.execute(sql_1)
  101. first_id = self.cursor.fetchall()
  102. if isinstance(first_id, tuple):
  103. first_id = list(first_id)
  104. result = count + first_id
  105. return result
  106. except:
  107. print(traceback.format_exc())
  108. return None
  109. def get_item_by_id_offset(self, search_info):
  110. table_name = "table_" + search_info['deviceName']
  111. point_list = search_info['pointList']
  112. id_offset = search_info['idOffset']
  113. quantity = search_info['quantity']
  114. sql = "select times, %s from %s where id >= %s limit %s" % (','.join(point_list), table_name, id_offset, quantity)
  115. try:
  116. self._reConn()
  117. self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
  118. self.cursor.execute(sql)
  119. results = self.cursor.fetchall()
  120. self.cursor.close()
  121. return results
  122. except:
  123. print(traceback.format_exc())
  124. return None
  125. # --------------------------------------------//
  126. # 数据导出接口------------------------------------------------\\
  127. def quary_table_data(self, search_info):
  128. table_name = "table_" + search_info['deviceName']
  129. time_begin = search_info['timeBegin']
  130. time_end = search_info['timeEnd']
  131. point_list = search_info['pointList']
  132. point_list_1 = str([i[1:] for i in point_list])[1:-1]
  133. sql = "select times, %s from %s where times >= '%s' and times <= '%s';" % (','.join(point_list), table_name, time_begin, time_end)
  134. sql1 = "select io_point_name from data_point_tbl where serial_number in ( %s );" % point_list_1
  135. try:
  136. self._reConn()
  137. self.cursor = self.conn.cursor()
  138. self.cursor.execute(sql)
  139. res = self.cursor.fetchall()
  140. self.cursor.execute(sql1)
  141. res1 = self.cursor.fetchall()
  142. title = [item[0] for item in res1]
  143. title.insert(0, '日期')
  144. self.cursor.close()
  145. except:
  146. print(traceback.format_exc())
  147. return None
  148. book = openpyxl.Workbook()
  149. sheet = book.create_sheet(index=0)
  150. # 循环将表头写入到sheet页
  151. for i in range(len(title)):
  152. sheet.cell(1, i + 1).value = title[i]
  153. # 写数据
  154. for row in range(0, len(res)):
  155. for col in range(0, len(res[row])):
  156. cell_val = res[row][col]
  157. if isinstance(cell_val, datetime.datetime):
  158. times = cell_val.strftime("%Y-%m-%d %H:%M:%S")
  159. sheet.cell(row + 2, col + 1).value = times
  160. else:
  161. sheet.cell(row + 2, col + 1).value = cell_val
  162. file_path = (table_name + '.xlsx')
  163. savepath = file_path
  164. book.save(savepath)
  165. return savepath
  166. # ------------------------------------------------//
  167. # 获取insitu指令接口
  168. def get_in_situ_command(self):
  169. sql = "select * from shuizhi_insitu_instruct;"
  170. try:
  171. self._reConn()
  172. self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
  173. self.cursor.execute(sql)
  174. results = self.cursor.fetchall()
  175. self.cursor.close()
  176. return results
  177. except:
  178. print(traceback.format_exc())
  179. return None
  180. def get_connectors(self):
  181. sql = "SELECT * FROM station_info_tbl WHERE status = 1"
  182. try:
  183. self._reConn()
  184. self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
  185. self.cursor.execute(sql)
  186. results = self.cursor.fetchall()
  187. self.cursor.close()
  188. return results
  189. except:
  190. print(traceback.format_exc())
  191. return None
  192. def create_delete_stale_data_event(self, eventName, table_name, day):
  193. self.cursor = self.conn.cursor()
  194. sql = "create event %s on SCHEDULE every 1 day do delete from %s where times<(CURRENT_TIMESTAMP() + INTERVAL -%s DAY);" % (
  195. eventName, table_name, day)
  196. self.cursor.execute(sql)
  197. self.cursor.close()
  198. def create_many_column_table(self, table_name, list):
  199. self.cursor = self.conn.cursor()
  200. for index in range(len(list)):
  201. dataType = list[index]['storageType']
  202. columnName = "c" + str(list[index]['serialNumber'])
  203. sql_c = "CREATE TABLE IF NOT EXISTS %s (times datetime NOT NULL,INDEX (times)) \
  204. ENGINE=InnoDB DEFAULT CHARSET=utf8;" % (table_name)
  205. sql_add = "ALTER TABLE %s ADD %s %s " % (table_name, columnName, dataType)
  206. try:
  207. self.cursor.execute(sql_c)
  208. self.cursor.execute(sql_add)
  209. except:
  210. print(traceback.format_exc())
  211. sql_send = "ALTER TABLE %s ADD 'is_send'tinyint NOT NULL DEFAULT '0'" % (table_name)
  212. self.cursor.execute(sql_send)
  213. self.cursor.close()
  214. def insert_column_many(self, table_name, timeNow, dict):
  215. try:
  216. self.cursor = self.conn.cursor()
  217. sql = "INSERT INTO %s (times" % (table_name)
  218. for key_name in dict.keys():
  219. sql = sql + "," + key_name
  220. sql = sql + ") VALUE ('" + str(timeNow) + "'"
  221. for key_name in dict.keys():
  222. data = "," + str(dict[key_name])
  223. sql = sql + data
  224. sql = sql + ")"
  225. try:
  226. self.cursor.execute(sql)
  227. # 提交到数据库执行
  228. self.conn.commit()
  229. except Exception as e:
  230. # 如果发生错误则回滚
  231. self.conn.rollback()
  232. print(e)
  233. except Exception as e:
  234. self._reConn()
  235. print(e)
  236. else:
  237. self.cursor.close()
  238. def close(self):
  239. self.conn.close()
  240. def get_command_info(self, station_name):
  241. sql = "SELECT command FROM command where station_name = '%s' " % station_name
  242. try:
  243. self._reConn()
  244. self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
  245. self.cursor.execute(sql)
  246. results = self.cursor.fetchall()
  247. self.cursor.close()
  248. return results
  249. except:
  250. print(traceback.format_exc())
  251. return None
  252. # lee
  253. def get_device_name_by_station_name(self, station_name):
  254. sql = "SELECT DISTINCT device_name FROM data_point_tbl WHERE station_name = '%s' " % station_name
  255. try:
  256. self._reConn()
  257. self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
  258. self.cursor.execute(sql)
  259. results = self.cursor.fetchall()
  260. self.cursor.close()
  261. return results
  262. except:
  263. print(traceback.format_exc())
  264. return None
  265. def get_data_point_by_device_name(self, device_name):
  266. sql = "SELECT * FROM data_point_tbl WHERE device_name = '%s'" % device_name
  267. try:
  268. self._reConn()
  269. self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
  270. self.cursor.execute(sql)
  271. results = self.cursor.fetchall()
  272. self.cursor.close()
  273. return results
  274. except:
  275. print(traceback.format_exc())
  276. return None