hard_disk_storage.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321
  1. import datetime
  2. import math
  3. import re
  4. # import openpyxl
  5. import pymysql
  6. import traceback
  7. import time
  8. # from AES_crypt import decrypt
  9. import configparser
  10. class HardDiskStorage:
  11. def __init__(self, port=3306, charset='utf8'):
  12. self.config = configparser.ConfigParser()
  13. self.config.read("database.conf", encoding="utf-8")
  14. self.host = self.config.get("db", "db_host")
  15. self.user = self.config.get("db", "db_user")
  16. self.passwd = self.config.get("db", "db_pass")
  17. self.db = self.config.get("db", "db_name")
  18. self.port = port
  19. self.charset = charset
  20. self.conn = None
  21. if not self._conn():
  22. self._reConn()
  23. def _conn(self):
  24. try:
  25. self.conn = pymysql.connect(host=self.host, user=self.user, password=self.passwd, database=self.db, port=self.port, autocommit=True)
  26. return True
  27. except Exception as e:
  28. print(f'failed to connect to {self.host}:{self.port}:{self.db} by [{self.user}:{self.passwd}]:{e}')
  29. return False
  30. def _reConn(self, num=28800, stime=3): # 重试连接总次数为1天,这里根据实际情况自己设置,如果服务器宕机1天都没发现就......
  31. _number = 0
  32. _status = True
  33. while _status and _number <= num:
  34. try:
  35. self.conn.ping() # cping 校验连接是否异常
  36. _status = False
  37. except Exception as e:
  38. if self._conn(): # 重新连接,成功退出
  39. _status = False
  40. break
  41. _number += 1
  42. time.sleep(stime) # 连接不成功,休眠3秒钟,继续循环,知道成功或重试次数结束
  43. # def get_station_info(self, station_name):
  44. # sql = "SELECT * FROM data_point_tbl where station_name = '%s'" % station_name
  45. # try:
  46. # self._reConn()
  47. # self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
  48. # self.cursor.execute(sql)
  49. # results = self.cursor.fetchall()
  50. # self.cursor.close()
  51. # return results
  52. # except:
  53. # print(traceback.format_exc())
  54. # return False
  55. # def get_point_info(self, point_tuple):
  56. # if point_tuple:
  57. # sql = "SELECT * FROM data_point_tbl where serial_number in %s" % (str(point_tuple))
  58. # else:
  59. # sql = "SELECT * FROM data_point_tbl"
  60. # try:
  61. # self._reConn()
  62. # self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
  63. # self.cursor.execute(sql)
  64. # results = self.cursor.fetchall()
  65. # self.cursor.close()
  66. # return results
  67. # except:
  68. # print(traceback.format_exc())
  69. # return False
  70. # def get_table_data(self, senect_info):
  71. # table_name = "table_" + senect_info['deviceName']
  72. # time_begin = senect_info['timeBegin']
  73. # time_end = senect_info['timeEnd']
  74. # column = senect_info['pointList']
  75. # if len(column) > 0:
  76. # sql = "SELECT times"
  77. # for column_name in column:
  78. # sql = sql + "," + column_name
  79. # sql = sql + " FROM %s WHERE times > '%s' AND times < '%s'" % (table_name, time_begin, time_end)
  80. # else:
  81. # sql = "SELECT * FROM %s WHERE times > '%s' AND times < '%s';" % (table_name, time_begin, time_end)
  82. # try:
  83. # self._reConn()
  84. # self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
  85. # self.cursor.execute(sql)
  86. # results = self.cursor.fetchall()
  87. # self.cursor.close()
  88. # return results
  89. # except:
  90. # print(traceback.format_exc())
  91. # return None
  92. # # 历史查询接口(new)--------------------------------------------\\
  93. # def get_total_count_and_first_id(self, search_info):
  94. # table_name = "table_" + search_info['deviceName']
  95. # time_begin = search_info['timeBegin']
  96. # time_end = search_info['timeEnd']
  97. # sql = "select count(*) from %s where times >= '%s' and times <= '%s';" % (table_name, time_begin, time_end)
  98. # sql_1 = "select id from %s where times >= '%s' limit 1;" % (table_name, time_begin)
  99. # try:
  100. # self._reConn()
  101. # self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
  102. # self.cursor.execute(sql)
  103. # count = self.cursor.fetchall()
  104. # self.cursor.execute(sql_1)
  105. # first_id = self.cursor.fetchall()
  106. # if isinstance(first_id, tuple):
  107. # first_id = list(first_id)
  108. # result = count + first_id
  109. # return result
  110. # except:
  111. # print(traceback.format_exc())
  112. # return None
  113. # def get_item_by_id_offset(self, search_info):
  114. # table_name = "table_" + search_info['deviceName']
  115. # point_list = search_info['pointList']
  116. # id_offset = search_info['idOffset']
  117. # quantity = search_info['quantity']
  118. # sql = "select times, %s from %s where id >= %s limit %s" % (','.join(point_list), table_name, id_offset, quantity)
  119. # try:
  120. # self._reConn()
  121. # self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
  122. # self.cursor.execute(sql)
  123. # results = self.cursor.fetchall()
  124. # self.cursor.close()
  125. # return results
  126. # except:
  127. # print(traceback.format_exc())
  128. # return None
  129. #
  130. # # --------------------------------------------//
  131. # # 数据导出接口------------------------------------------------\\
  132. # def quary_table_data(self, search_info):
  133. # table_name = "table_" + search_info['deviceName']
  134. # time_begin = search_info['timeBegin']
  135. # time_end = search_info['timeEnd']
  136. # point_list = search_info['pointList']
  137. # point_list_1 = str([i[1:] for i in point_list])[1:-1]
  138. #
  139. # sql = "select times, %s from %s where times >= '%s' and times <= '%s';" % (','.join(point_list), table_name, time_begin, time_end)
  140. # sql1 = "select io_point_name from data_point_tbl where serial_number in ( %s );" % point_list_1
  141. #
  142. # try:
  143. # self._reConn()
  144. # self.cursor = self.conn.cursor()
  145. # self.cursor.execute(sql)
  146. # res = self.cursor.fetchall()
  147. # self.cursor.execute(sql1)
  148. # res1 = self.cursor.fetchall()
  149. # title = [item[0] for item in res1]
  150. # title.insert(0, '日期')
  151. # self.cursor.close()
  152. # except:
  153. # print(traceback.format_exc())
  154. # return None
  155. # book = openpyxl.Workbook()
  156. # sheet = book.create_sheet(index=0)
  157. # # 循环将表头写入到sheet页
  158. # for i in range(len(title)):
  159. # if search_info['deviceName'] == 'adcp': # adcp需要重新计算垂直方向表头
  160. # name = re.findall(r"\d+\.?\d*", title[i]) # 提取数字字符
  161. # if name:
  162. # title[i] = re.sub(r"\d+\.?\d*", str(round((int(name[0]) * math.sin(14 * math.pi / 180)), 2)), title[i]) # 14为adcp安装倾斜角度,2保留小数的位数
  163. # sheet.cell(1, i + 1).value = title[i]
  164. # # 写数据
  165. # for row in range(0, len(res)):
  166. # for col in range(0, len(res[row])):
  167. # cell_val = res[row][col]
  168. # if isinstance(cell_val, datetime.datetime):
  169. # times = cell_val.strftime("%Y-%m-%d %H:%M:%S")
  170. # sheet.cell(row + 2, col + 1).value = times
  171. # else:
  172. # sheet.cell(row + 2, col + 1).value = cell_val
  173. # file_path = (table_name + '.xlsx')
  174. # savepath = file_path
  175. # book.save(savepath)
  176. # return savepath
  177. #
  178. # # ------------------------------------------------//
  179. # # 获取insitu指令接口
  180. # def get_in_situ_command(self):
  181. # sql = "select * from shuizhi_insitu_instruct;"
  182. # try:
  183. # self._reConn()
  184. # self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
  185. # self.cursor.execute(sql)
  186. # results = self.cursor.fetchall()
  187. # self.cursor.close()
  188. # return results
  189. # except:
  190. # print(traceback.format_exc())
  191. # return None
  192. # def get_connectors(self):
  193. # sql = "SELECT * FROM station_info_tbl WHERE status = 1"
  194. # try:
  195. # self._reConn()
  196. # self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
  197. # self.cursor.execute(sql)
  198. # results = self.cursor.fetchall()
  199. # self.cursor.close()
  200. # return results
  201. # except:
  202. # print(traceback.format_exc())
  203. # return None
  204. # def create_delete_stale_data_event(self, eventName, table_name, day):
  205. # self.cursor = self.conn.cursor()
  206. # sql = "create event %s on SCHEDULE every 1 day do delete from %s where times<(CURRENT_TIMESTAMP() + INTERVAL -%s DAY);" % (
  207. # eventName, table_name, day)
  208. # self.cursor.execute(sql)
  209. # self.cursor.close()
  210. # def create_many_column_table(self, table_name, list):
  211. # self.cursor = self.conn.cursor()
  212. # for index in range(len(list)):
  213. # dataType = list[index]['storageType']
  214. # columnName = "c" + str(list[index]['serialNumber'])
  215. # sql_c = "CREATE TABLE IF NOT EXISTS %s (times datetime NOT NULL,INDEX (times)) \
  216. # ENGINE=InnoDB DEFAULT CHARSET=utf8;" % (table_name)
  217. # sql_add = "ALTER TABLE %s ADD %s %s " % (table_name, columnName, dataType)
  218. # try:
  219. # self.cursor.execute(sql_c)
  220. # self.cursor.execute(sql_add)
  221. # except:
  222. # print(traceback.format_exc())
  223. # sql_send = "ALTER TABLE %s ADD 'is_send'tinyint NOT NULL DEFAULT '0'" % (table_name)
  224. # self.cursor.execute(sql_send)
  225. # self.cursor.close()
  226. # def insert_column_many(self, table_name, timeNow, dict):
  227. # try:
  228. # self.cursor = self.conn.cursor()
  229. # sql = "INSERT INTO %s (times" % (table_name)
  230. # for key_name in dict.keys():
  231. # sql = sql + "," + key_name
  232. # sql = sql + ") VALUE ('" + str(timeNow) + "'"
  233. # for key_name in dict.keys():
  234. # data = "," + str(dict[key_name])
  235. # sql = sql + data
  236. # sql = sql + ")"
  237. # try:
  238. # self.cursor.execute(sql)
  239. # # 提交到数据库执行
  240. # self.conn.commit()
  241. # except Exception as e:
  242. # # 如果发生错误则回滚
  243. # self.conn.rollback()
  244. # print(e)
  245. # except Exception as e:
  246. # self._reConn()
  247. # print(e)
  248. # else:
  249. # self.cursor.close()
  250. def close(self):
  251. self.conn.close()
  252. # def get_command_info(self, station_name):
  253. # sql = "SELECT command FROM command where station_name = '%s' " % station_name
  254. # try:
  255. # self._reConn()
  256. # self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
  257. # self.cursor.execute(sql)
  258. # results = self.cursor.fetchall()
  259. # self.cursor.close()
  260. # return results
  261. # except:
  262. # print(traceback.format_exc())
  263. # return None
  264. # # lee
  265. # def get_device_name_by_station_name(self, station_name):
  266. # sql = "SELECT DISTINCT device_name FROM data_point_tbl WHERE station_name = '%s' " % station_name
  267. # try:
  268. # self._reConn()
  269. # self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
  270. # self.cursor.execute(sql)
  271. # results = self.cursor.fetchall()
  272. # self.cursor.close()
  273. # return results
  274. # except:
  275. # print(traceback.format_exc())
  276. # return None
  277. # def get_data_point_by_device_name(self, device_name):
  278. # sql = "SELECT * FROM data_point_tbl WHERE device_name = '%s'" % device_name
  279. # try:
  280. # self._reConn()
  281. # self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
  282. # self.cursor.execute(sql)
  283. # results = self.cursor.fetchall()
  284. # self.cursor.close()
  285. # return results
  286. # except:
  287. # print(traceback.format_exc())
  288. # return None
  289. def execute_sql(self, sql,var):
  290. try:
  291. self._reConn()
  292. self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
  293. self.cursor.execute(sql,var)
  294. results = self.cursor.fetchall()
  295. self.cursor.close()
  296. return results
  297. except:
  298. print(traceback.format_exc())
  299. return None