hard_disk_storage.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. import pymysql
  2. import traceback
  3. import time
  4. from log import Log
  5. class HardDiskStorage():
  6. def __init__(self, config, port=3306, charset='utf8'):
  7. self.host = config['ip']
  8. self.user = config['username']
  9. self.passwd = config['password']
  10. self.db = config['dataBaseName']
  11. self.port = port
  12. self.charset = charset
  13. self.conn = None
  14. self._conn()
  15. self.log = Log()
  16. def _conn(self):
  17. try:
  18. self.conn = pymysql.connect(host=self.host, user=self.user, password=self.passwd, database=self.db, port=self.port, autocommit=True)
  19. return True
  20. except Exception as e:
  21. return False
  22. def _reConn(self, num=28800, stime=3): # 重试连接总次数为1天,这里根据实际情况自己设置,如果服务器宕机1天都没发现就......
  23. _number = 0
  24. _status = True
  25. while _status and _number <= num:
  26. try:
  27. self.conn.ping() # cping 校验连接是否异常
  28. _status = False
  29. except:
  30. if self._conn() == True: # 重新连接,成功退出
  31. _status = False
  32. break
  33. _number += 1
  34. time.sleep(stime) # 连接不成功,休眠3秒钟,继续循环,知道成功或重试次数结束
  35. def get_station_info(self, station_name):
  36. sql = "SELECT * FROM data_point_tbl where station_name = '%s'" % station_name
  37. try:
  38. self._reConn()
  39. self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
  40. self.cursor.execute(sql)
  41. results = self.cursor.fetchall()
  42. self.cursor.close()
  43. return results
  44. except:
  45. print(traceback.format_exc())
  46. return False
  47. def get_point_info(self, point_tuple):
  48. if point_tuple:
  49. sql = "SELECT * FROM data_point_tbl where serial_number in %s" % (str(point_tuple))
  50. else:
  51. sql = "SELECT * FROM data_point_tbl"
  52. try:
  53. self._reConn()
  54. self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
  55. self.cursor.execute(sql)
  56. results = self.cursor.fetchall()
  57. self.cursor.close()
  58. return results
  59. except:
  60. print(traceback.format_exc())
  61. return False
  62. def get_table_data(self, senect_info):
  63. table_name = "table_" + senect_info['deviceName']
  64. time_begin = senect_info['timeBegin']
  65. time_end = senect_info['timeEnd']
  66. column = senect_info['pointList']
  67. if len(column) > 0:
  68. sql = "SELECT times"
  69. for column_name in column:
  70. sql = sql + "," + column_name
  71. sql = sql + " FROM %s WHERE times > '%s' AND times < '%s'" % (table_name, time_begin, time_end)
  72. else:
  73. sql = "SELECT * FROM %s WHERE times > '%s' AND times < '%s';" % (table_name, time_begin, time_end)
  74. try:
  75. self._reConn()
  76. self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
  77. self.cursor.execute(sql)
  78. results = self.cursor.fetchall()
  79. self.cursor.close()
  80. return results
  81. except:
  82. print(traceback.format_exc())
  83. return None
  84. def get_connectors(self):
  85. sql = "SELECT * FROM station_info_tbl WHERE status = 1"
  86. try:
  87. self._reConn()
  88. self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
  89. self.cursor.execute(sql)
  90. results = self.cursor.fetchall()
  91. self.cursor.close()
  92. return results
  93. except:
  94. print(traceback.format_exc())
  95. return None
  96. def create_delete_stale_data_event(self, eventName, table_name, day):
  97. self.cursor = self.conn.cursor()
  98. sql = "create event %s on SCHEDULE every 1 day do delete from %s where times<(CURRENT_TIMESTAMP() + INTERVAL -%s DAY);" % (
  99. eventName, table_name, day)
  100. self.cursor.execute(sql)
  101. self.cursor.close()
  102. def create_many_column_table(self, table_name, list):
  103. self.cursor = self.conn.cursor()
  104. for index in range(len(list)):
  105. dataType = list[index]['storageType']
  106. columnName = "c" + str(list[index]['serialNumber'])
  107. sql_c = "CREATE TABLE IF NOT EXISTS %s (times datetime NOT NULL,INDEX (times)) \
  108. ENGINE=InnoDB DEFAULT CHARSET=utf8;" % (table_name)
  109. sql_add = "ALTER TABLE %s ADD %s %s " % (table_name, columnName, dataType)
  110. try:
  111. self.cursor.execute(sql_c)
  112. self.cursor.execute(sql_add)
  113. except:
  114. print(traceback.format_exc())
  115. sql_send = "ALTER TABLE %s ADD 'is_send'tinyint NOT NULL DEFAULT '0'" % (table_name)
  116. self.cursor.execute(sql_send)
  117. self.cursor.close()
  118. def insert_column_many(self, table_name, timeNow, dict):
  119. try:
  120. self.cursor = self.conn.cursor()
  121. sql = "INSERT INTO %s (times" % (table_name)
  122. for key_name in dict.keys():
  123. sql = sql + "," + key_name
  124. sql = sql + ") VALUE ('" + str(timeNow) + "'"
  125. for key_name in dict.keys():
  126. data = "," + str(dict[key_name])
  127. sql = sql + data
  128. sql = sql + ")"
  129. try:
  130. self.cursor.execute(sql)
  131. # 提交到数据库执行
  132. self.conn.commit()
  133. except Exception as e:
  134. # 如果发生错误则回滚
  135. self.conn.rollback()
  136. print(e)
  137. except Exception as e:
  138. self._reConn()
  139. print(e)
  140. else:
  141. self.cursor.close()
  142. def close(self):
  143. self.conn.close()
  144. def get_command_info(self, station_name):
  145. sql = "SELECT command FROM command where station_name = '%s' " % station_name
  146. try:
  147. self._reConn()
  148. self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
  149. self.cursor.execute(sql)
  150. results = self.cursor.fetchall()
  151. self.cursor.close()
  152. return results
  153. except:
  154. print(traceback.format_exc())
  155. return None
  156. # lee
  157. def get_device_name_by_station_name(self, station_name):
  158. sql = "SELECT DISTINCT device_name FROM data_point_tbl WHERE station_name = '%s' " % station_name
  159. try:
  160. self._reConn()
  161. self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
  162. self.cursor.execute(sql)
  163. results = self.cursor.fetchall()
  164. self.cursor.close()
  165. return results
  166. except:
  167. print(traceback.format_exc())
  168. return None
  169. def get_data_point_by_device_name(self, device_name):
  170. sql = "SELECT * FROM data_point_tbl WHERE device_name = '%s'" % device_name
  171. try:
  172. self._reConn()
  173. self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
  174. self.cursor.execute(sql)
  175. results = self.cursor.fetchall()
  176. self.cursor.close()
  177. return results
  178. except:
  179. print(traceback.format_exc())
  180. return None