get_historical_alarm.py 3.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162
  1. from abstract_api import AbstractApi
  2. from apis.operate_mysql import OperateMysql
  3. import time
  4. import datetime
  5. class Get_historical_alarm(AbstractApi):
  6. '''查询历史报警(包含已读和未读),可根据设备名、时间来进行部分查询'''
  7. def operation(self, request):
  8. operate_mysql = OperateMysql()
  9. device_name = request['device']
  10. time1 = request['begin_time']
  11. # 查报警
  12. if device_name is None and time1 is None:
  13. sql = "SELECT id,CAST(times AS CHAR) as times,name,data,is_cancel FROM alarm_data_tbl order by times desc;"
  14. res = operate_mysql.execute_sql(sql)
  15. elif device_name is not None and time1 is None:
  16. sql1 = "SELECT (serial_number) FROM data_point_tbl WHERE device_name=\'%s\';" % (device_name)
  17. res1 = operate_mysql.execute_sql(sql1)
  18. basic_datas = []
  19. for each in res1:
  20. basic_datas.append('c' + str(each['serial_number']))
  21. sql = "SELECT id, CAST(times AS CHAR) as times,name,data,is_cancel FROM alarm_data_tbl WHERE name in %s order by times desc;" % (format(tuple(basic_datas)))
  22. res = operate_mysql.execute_sql(sql)
  23. elif device_name is None and time1 is not None:
  24. begin_time = datetime.datetime.fromtimestamp(int(request['begin_time']))
  25. end_time = datetime.datetime.fromtimestamp(int(request['end_time']))
  26. sql = "SELECT id,CAST(times AS CHAR) as times,name,data,is_cancel FROM alarm_data_tbl WHERE times > \'%s\' and times < \'%s\' order by times desc;" % (begin_time, end_time)
  27. res = operate_mysql.execute_sql(sql)
  28. elif device_name is not None and time1 is not None:
  29. begin_time = datetime.datetime.fromtimestamp(int(request['begin_time']))
  30. end_time = datetime.datetime.fromtimestamp(int(request['end_time']))
  31. sql1 = "SELECT (serial_number) FROM data_point_tbl WHERE device_name=\'%s\';" % (device_name)
  32. res1 = operate_mysql.execute_sql(sql1)
  33. basic_datas = []
  34. for each in res1:
  35. basic_datas.append('c' + str(each['serial_number']))
  36. sql = "SELECT id,CAST(times AS CHAR) as times,name,data,is_cancel FROM alarm_data_tbl WHERE times > \'%s\' and times < \'%s\' and name in %s order by times desc;" % (begin_time, end_time, format(tuple(basic_datas)))
  37. res = operate_mysql.execute_sql(sql)
  38. if len(res) != 0:
  39. # 查报警对应的点和上下限
  40. list_alarm = []
  41. for each in res:
  42. serial_number = each['name'].replace('c', '')
  43. sql2 = "SELECT io_point_name, alarm_low_limit, alarm_up_limit FROM data_point_tbl WHERE serial_number=%s;" % (serial_number)
  44. res2 = operate_mysql.execute_sql(sql2)
  45. if len(res2) == 0:
  46. io_point_name = "相关信息关联失败"
  47. alarm_low_limit = "相关信息关联失败"
  48. alarm_up_limit = "相关信息关联失败"
  49. else:
  50. io_point_name = res2[0]['io_point_name']
  51. alarm_low_limit = res2[0]['alarm_low_limit']
  52. alarm_up_limit = res2[0]['alarm_up_limit']
  53. dict_alarm = {'id': each['id'], 'times': each['times'], 'io_point_name': io_point_name, 'data': each['data'], 'is_cancel': each['is_cancel'], 'alarm_low_limit': alarm_low_limit, 'alarm_up_limit': alarm_up_limit}
  54. list_alarm.append(dict_alarm)
  55. return list_alarm
  56. else:
  57. return None