hard_disk_storage.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317
  1. import datetime
  2. import math
  3. import re
  4. import openpyxl
  5. import pymysql
  6. import traceback
  7. import time
  8. from AES_crypt import decrypt
  9. class HardDiskStorage:
  10. def __init__(self, config, port=3306, charset='utf8'):
  11. self.host = config['ip']
  12. self.user = config['username']
  13. self.passwd = decrypt(config['password'])
  14. self.db = config['dataBaseName']
  15. self.port = port
  16. self.charset = charset
  17. self.conn = None
  18. if not self._conn():
  19. self._reConn()
  20. def _conn(self):
  21. try:
  22. self.conn = pymysql.connect(host=self.host, user=self.user, password=self.passwd, database=self.db, port=self.port, autocommit=True)
  23. return True
  24. except Exception as e:
  25. print(f'failed to connect to {self.host}:{self.port}:{self.db} by [{self.user}:{self.passwd}]:{e}')
  26. return False
  27. def _reConn(self, num=28800, stime=3): # 重试连接总次数为1天,这里根据实际情况自己设置,如果服务器宕机1天都没发现就......
  28. _number = 0
  29. _status = True
  30. while _status and _number <= num:
  31. try:
  32. self.conn.ping() # cping 校验连接是否异常
  33. _status = False
  34. except Exception as e:
  35. if self._conn(): # 重新连接,成功退出
  36. _status = False
  37. break
  38. _number += 1
  39. time.sleep(stime) # 连接不成功,休眠3秒钟,继续循环,知道成功或重试次数结束
  40. def get_station_info(self, station_name):
  41. sql = "SELECT * FROM data_point_tbl where station_name = '%s'" % station_name
  42. try:
  43. self._reConn()
  44. self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
  45. self.cursor.execute(sql)
  46. results = self.cursor.fetchall()
  47. self.cursor.close()
  48. return results
  49. except:
  50. print(traceback.format_exc())
  51. return False
  52. def get_point_info(self, point_tuple):
  53. if point_tuple:
  54. sql = "SELECT * FROM data_point_tbl where serial_number in %s" % (str(point_tuple))
  55. else:
  56. sql = "SELECT * FROM data_point_tbl"
  57. try:
  58. self._reConn()
  59. self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
  60. self.cursor.execute(sql)
  61. results = self.cursor.fetchall()
  62. self.cursor.close()
  63. return results
  64. except:
  65. print(traceback.format_exc())
  66. return False
  67. def get_table_data(self, senect_info):
  68. table_name = "table_" + senect_info['deviceName']
  69. time_begin = senect_info['timeBegin']
  70. time_end = senect_info['timeEnd']
  71. column = senect_info['pointList']
  72. if len(column) > 0:
  73. sql = "SELECT times"
  74. for column_name in column:
  75. sql = sql + "," + column_name
  76. sql = sql + " FROM %s WHERE times > '%s' AND times < '%s'" % (table_name, time_begin, time_end)
  77. else:
  78. sql = "SELECT * FROM %s WHERE times > '%s' AND times < '%s';" % (table_name, time_begin, time_end)
  79. try:
  80. self._reConn()
  81. self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
  82. self.cursor.execute(sql)
  83. results = self.cursor.fetchall()
  84. self.cursor.close()
  85. return results
  86. except:
  87. print(traceback.format_exc())
  88. return None
  89. # 历史查询接口(new)--------------------------------------------\\
  90. def get_total_count_and_first_id(self, search_info):
  91. table_name = "table_" + search_info['deviceName']
  92. time_begin = search_info['timeBegin']
  93. time_end = search_info['timeEnd']
  94. sql = "select count(*) from %s where times >= '%s' and times <= '%s';" % (table_name, time_begin, time_end)
  95. sql_1 = "select id from %s where times >= '%s' limit 1;" % (table_name, time_begin)
  96. try:
  97. self._reConn()
  98. self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
  99. self.cursor.execute(sql)
  100. count = self.cursor.fetchall()
  101. self.cursor.execute(sql_1)
  102. first_id = self.cursor.fetchall()
  103. if isinstance(first_id, tuple):
  104. first_id = list(first_id)
  105. result = count + first_id
  106. return result
  107. except:
  108. print(traceback.format_exc())
  109. return None
  110. def get_item_by_id_offset(self, search_info):
  111. table_name = "table_" + search_info['deviceName']
  112. point_list = search_info['pointList']
  113. id_offset = search_info['idOffset']
  114. quantity = search_info['quantity']
  115. sql = "select times, %s from %s where id >= %s limit %s" % (','.join(point_list), table_name, id_offset, quantity)
  116. try:
  117. self._reConn()
  118. self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
  119. self.cursor.execute(sql)
  120. results = self.cursor.fetchall()
  121. self.cursor.close()
  122. return results
  123. except:
  124. print(traceback.format_exc())
  125. return None
  126. # --------------------------------------------//
  127. # 数据导出接口------------------------------------------------\\
  128. def quary_table_data(self, search_info):
  129. table_name = "table_" + search_info['deviceName']
  130. time_begin = search_info['timeBegin']
  131. time_end = search_info['timeEnd']
  132. point_list = search_info['pointList']
  133. point_list_1 = str([i[1:] for i in point_list])[1:-1]
  134. sql = "select times, %s from %s where times >= '%s' and times <= '%s';" % (','.join(point_list), table_name, time_begin, time_end)
  135. sql1 = "select io_point_name from data_point_tbl where serial_number in ( %s );" % point_list_1
  136. try:
  137. self._reConn()
  138. self.cursor = self.conn.cursor()
  139. self.cursor.execute(sql)
  140. res = self.cursor.fetchall()
  141. self.cursor.execute(sql1)
  142. res1 = self.cursor.fetchall()
  143. title = [item[0] for item in res1]
  144. title.insert(0, '日期')
  145. self.cursor.close()
  146. except:
  147. print(traceback.format_exc())
  148. return None
  149. book = openpyxl.Workbook()
  150. sheet = book.create_sheet(index=0)
  151. # 循环将表头写入到sheet页
  152. for i in range(len(title)):
  153. if search_info['deviceName'] == 'adcp': # adcp需要重新计算垂直方向表头
  154. name = re.findall(r"\d+\.?\d*", title[i]) # 提取数字字符
  155. if name:
  156. title[i] = re.sub(r"\d+\.?\d*", str(round((int(name[0]) * math.sin(14 * math.pi / 180)), 2)), title[i]) # 14为adcp安装倾斜角度,2保留小数的位数
  157. sheet.cell(1, i + 1).value = title[i]
  158. # 写数据
  159. for row in range(0, len(res)):
  160. for col in range(0, len(res[row])):
  161. cell_val = res[row][col]
  162. if isinstance(cell_val, datetime.datetime):
  163. times = cell_val.strftime("%Y-%m-%d %H:%M:%S")
  164. sheet.cell(row + 2, col + 1).value = times
  165. else:
  166. sheet.cell(row + 2, col + 1).value = cell_val
  167. file_path = (table_name + '.xlsx')
  168. savepath = file_path
  169. book.save(savepath)
  170. return savepath
  171. # ------------------------------------------------//
  172. # 获取insitu指令接口
  173. def get_in_situ_command(self):
  174. sql = "select * from shuizhi_insitu_instruct;"
  175. try:
  176. self._reConn()
  177. self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
  178. self.cursor.execute(sql)
  179. results = self.cursor.fetchall()
  180. self.cursor.close()
  181. return results
  182. except:
  183. print(traceback.format_exc())
  184. return None
  185. def get_connectors(self):
  186. sql = "SELECT * FROM station_info_tbl WHERE status = 1"
  187. try:
  188. self._reConn()
  189. self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
  190. self.cursor.execute(sql)
  191. results = self.cursor.fetchall()
  192. self.cursor.close()
  193. return results
  194. except:
  195. print(traceback.format_exc())
  196. return None
  197. def create_delete_stale_data_event(self, eventName, table_name, day):
  198. self.cursor = self.conn.cursor()
  199. sql = "create event %s on SCHEDULE every 1 day do delete from %s where times<(CURRENT_TIMESTAMP() + INTERVAL -%s DAY);" % (
  200. eventName, table_name, day)
  201. self.cursor.execute(sql)
  202. self.cursor.close()
  203. def create_many_column_table(self, table_name, list):
  204. self.cursor = self.conn.cursor()
  205. for index in range(len(list)):
  206. dataType = list[index]['storageType']
  207. columnName = "c" + str(list[index]['serialNumber'])
  208. sql_c = "CREATE TABLE IF NOT EXISTS %s (times datetime NOT NULL,INDEX (times)) \
  209. ENGINE=InnoDB DEFAULT CHARSET=utf8;" % (table_name)
  210. sql_add = "ALTER TABLE %s ADD %s %s " % (table_name, columnName, dataType)
  211. try:
  212. self.cursor.execute(sql_c)
  213. self.cursor.execute(sql_add)
  214. except:
  215. print(traceback.format_exc())
  216. sql_send = "ALTER TABLE %s ADD 'is_send'tinyint NOT NULL DEFAULT '0'" % (table_name)
  217. self.cursor.execute(sql_send)
  218. self.cursor.close()
  219. def insert_column_many(self, table_name, timeNow, dict):
  220. try:
  221. self.cursor = self.conn.cursor()
  222. sql = "INSERT INTO %s (times" % (table_name)
  223. for key_name in dict.keys():
  224. sql = sql + "," + key_name
  225. sql = sql + ") VALUE ('" + str(timeNow) + "'"
  226. for key_name in dict.keys():
  227. data = "," + str(dict[key_name])
  228. sql = sql + data
  229. sql = sql + ")"
  230. try:
  231. self.cursor.execute(sql)
  232. # 提交到数据库执行
  233. self.conn.commit()
  234. except Exception as e:
  235. # 如果发生错误则回滚
  236. self.conn.rollback()
  237. print(e)
  238. except Exception as e:
  239. self._reConn()
  240. print(e)
  241. else:
  242. self.cursor.close()
  243. def close(self):
  244. self.conn.close()
  245. def get_command_info(self, station_name):
  246. sql = "SELECT command FROM command where station_name = '%s' " % station_name
  247. try:
  248. self._reConn()
  249. self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
  250. self.cursor.execute(sql)
  251. results = self.cursor.fetchall()
  252. self.cursor.close()
  253. return results
  254. except:
  255. print(traceback.format_exc())
  256. return None
  257. # lee
  258. def get_device_name_by_station_name(self, station_name):
  259. sql = "SELECT DISTINCT device_name FROM data_point_tbl WHERE station_name = '%s' " % station_name
  260. try:
  261. self._reConn()
  262. self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
  263. self.cursor.execute(sql)
  264. results = self.cursor.fetchall()
  265. self.cursor.close()
  266. return results
  267. except:
  268. print(traceback.format_exc())
  269. return None
  270. def get_data_point_by_device_name(self, device_name):
  271. sql = "SELECT * FROM data_point_tbl WHERE device_name = '%s'" % device_name
  272. try:
  273. self._reConn()
  274. self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
  275. self.cursor.execute(sql)
  276. results = self.cursor.fetchall()
  277. self.cursor.close()
  278. return results
  279. except:
  280. print(traceback.format_exc())
  281. return None
  282. def execute_sql(self, sql):
  283. try:
  284. self._reConn()
  285. self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
  286. self.cursor.execute(sql)
  287. results = self.cursor.fetchall()
  288. self.cursor.close()
  289. return results
  290. except:
  291. print(traceback.format_exc())
  292. return None