websocket_server.py 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. import asyncio
  2. import cv2
  3. import base64
  4. import numpy as np
  5. import websockets
  6. import time
  7. import logging
  8. import json
  9. import ast
  10. import os
  11. import pymysql
  12. from video_capture import LOG_LEVEL, BINOCULAR_ID
  13. WEBSOCKET_SERVER_IP = '0.0.0.0'
  14. WEBSOCKET_SERVER_PORT = 8324
  15. #Mysql
  16. MYSQL_HOST = '192.168.7.68'#1~5号双目数据均存至该mysql
  17. MYSQL_PORT = 3306
  18. MYSQL_USER = 'root'
  19. MYSQL_PASSWORD = '123456'
  20. MYSQL_DATABASE = 'binocular_database'
  21. #检测后的图像存储路径
  22. IMAGE_PATH = '/data/image_path'
  23. def logger_init(LOG_LEVEL):
  24. logger = logging.getLogger('mylogger.b')
  25. #set log level WARNING
  26. logger.setLevel(LOG_LEVEL)
  27. return logger
  28. logger = logger_init(LOG_LEVEL)
  29. #读取数据库中5号双目最新识别的图像的信息
  30. def get_last_info():
  31. try:
  32. db = pymysql.connect(host=MYSQL_HOST, user=MYSQL_USER, password=MYSQL_PASSWORD, database=MYSQL_DATABASE, port=MYSQL_PORT)
  33. cursor = db.cursor()
  34. sql = """select DATE_FORMAT(datetime, '%Y-%m-%d-%H-%i-%s.%f'), fish_id, fish_species, fish_size, fish_weight from binocular_data where binocular_id={} order by id desc limit 1""".format(BINOCULAR_ID)
  35. count = cursor.execute(sql)
  36. result = cursor.fetchall()
  37. cursor.close()
  38. db.close()
  39. #读出的datetime数据如:'2021-06-16-10-10-10.200000',需要构造出图像的名称:2021-06-16-10-10-10.2.jpg
  40. img_name = result[0][0][:21] + '.jpg'
  41. print(img_name)
  42. fish_id = result[0][1]
  43. fish_species = result[0][2]
  44. fish_size = result[0][3]
  45. fish_weight = result[0][4]
  46. #构造fish_info
  47. item = {}
  48. item['fish_id'] = fish_id
  49. item['species'] = fish_species
  50. item['size'] = fish_size
  51. item['weight'] = fish_weight
  52. fish_info = json.dumps([item])
  53. #image encoding
  54. img = cv2.imread(os.path.join('/data/image_path',img_name))
  55. image_masked_encode = cv2.imencode('.jpg', img)[1]
  56. image_masked_base64 = base64.b64encode(image_masked_encode)
  57. image_masked_data = image_masked_base64.decode()
  58. return image_masked_data, fish_info
  59. except Exception as e:
  60. logger.warning(e)
  61. return "",""
  62. #websocket通过内存中存在的websocket对象来获知那些client端已连接
  63. USERS = set()
  64. async def register(websocket):
  65. USERS.add(websocket)
  66. async def unregister(websocket):
  67. USERS.remove(websocket)
  68. #接收web前端亮度调节参数
  69. async def recv_msg(websocket):
  70. global Q_param
  71. try:
  72. async for message in websocket:
  73. #message形如:{"left":-1,"right":130},str类型
  74. #将message转换成dict
  75. recv_dict = ast.literal_eval(message)
  76. Q_param.put_nowait(recv_dict)
  77. finally:
  78. logger.warning('websocket client is leave')
  79. print(websocket,' is leave')
  80. #在send_msg函数里unregister
  81. #await unregister(websocket)
  82. #向web前端发送实时视频及检测到鱼的图片和图片上鱼的信息
  83. async def send_msg(websocket):
  84. global Q_web
  85. global Q_masked_img
  86. encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 50]
  87. #标记最近识别到的图片是否发送
  88. send_flag = 0
  89. while True:
  90. #从队列中读取左右目图像,队列为空不等待
  91. try:
  92. image_L, image_R = Q_web.get_nowait()
  93. except Exception as e:
  94. print('no image for web')
  95. #该异常出现的次数较频繁,log level设置低一些,防止频繁写log
  96. logger.info('no image in queue for web, %s' % str(e))
  97. time.sleep(1)
  98. continue
  99. #从队列中读取画上鱼轮廓的图像,队列为空不等待,前端显示上一次检测到鱼的图像
  100. try:
  101. image_masked, fish_info = Q_masked_img.get_nowait()
  102. queue_masked_img_flag = 1
  103. except Exception as e:
  104. queue_masked_img_flag = 0
  105. print('no image in masked_image queue')
  106. logger.info('no image in masked_image queue')
  107. image_L = cv2.resize(image_L, (320,240), interpolation = cv2.INTER_LINEAR)
  108. image_R = cv2.resize(image_R, (320,240), interpolation = cv2.INTER_LINEAR)
  109. image_L_encode = cv2.imencode('.jpg', image_L, encode_param)[1]
  110. image_R_encode = cv2.imencode('.jpg', image_R, encode_param)[1]
  111. image_L_base64 = base64.b64encode(image_L_encode)
  112. image_R_base64 = base64.b64encode(image_R_encode)
  113. image_L_data = image_L_base64.decode()
  114. image_R_data = image_R_base64.decode()
  115. #result, imgencode = cv2.imencode('.jpg', frame, encode_param)
  116. #data = np.array(imgencode)
  117. #img = data.tostring()
  118. #检测后的图像编码
  119. if queue_masked_img_flag:
  120. #image_masked = cv2.resize(image_masked, (320,240), interpolation = cv2.INTER_LINEAR)
  121. image_masked_encode = cv2.imencode('.jpg', image_masked,)[1]
  122. #image_masked_encode = cv2.imencode('.jpg', image_masked, encode_param)[1]
  123. image_masked_base64 = base64.b64encode(image_masked_encode)
  124. image_masked_data = image_masked_base64.decode()
  125. else:
  126. #没检测到鱼,最近检测到的图片未发送
  127. if not send_flag:
  128. image_masked_data, fish_info = get_last_info()
  129. send_flag = 1
  130. #没检测到鱼,最近检测到的图片已发送
  131. else:
  132. #发送空字符串
  133. image_masked_data = ""
  134. fish_info = ""
  135. #构造json格式数据
  136. json_data ={"img_l":"data:image/jpeg;base64,%s" % image_L_data, "img_r":"data:image/jpeg;base64,%s" % image_R_data, "img_contour":"data:image/jpeg;base64,%s" % image_masked_data, "fish_info":"data:%s" % fish_info}
  137. json_data = json.dumps(json_data)
  138. #发送到websocket client
  139. try:
  140. await websocket.send(json_data)
  141. print('pushed to web')
  142. await asyncio.sleep(0.1)#websocket.send后必须加这一句才能实现和recv_msg异步执行
  143. #except Exception as e:#websockets.exceptions.ConnectionClosedOK or websockets.exceptions.ConnectionClosedError
  144. except Exception as e:
  145. print('websocket client closed.',e)
  146. logger.warning('websocket client closed.')
  147. await unregister(websocket)
  148. #while True搭配websocket使用时,注销websocket后一定要break退出循环,否则会在下一次循环时注销出异常而异常退出while True
  149. break
  150. async def main_logic(websocket,path):
  151. await register(websocket)
  152. #task_list = []
  153. #async with websockets.connect('wss://localhost:8324') as websocket:
  154. task1 = asyncio.create_task(recv_msg(websocket))
  155. task2 = asyncio.create_task(send_msg(websocket))
  156. await task1
  157. await task2
  158. #task_list.append(asyncio.ensure_future(recv_msg(websocket)))
  159. #task_list.append(asyncio.ensure_future(send_msg(websocket)))
  160. #result = await asyncio.gather(*task_list)
  161. #result = await asyncio.wait(task_list)
  162. #print('result:*****************************',result)
  163. def push_to_web(Q_web_p,Q_masked_img_p,Q_param_p):
  164. global Q_web
  165. global Q_masked_img
  166. global Q_param
  167. global websocket_users
  168. Q_web = Q_web_p
  169. Q_masked_img = Q_masked_img_p
  170. Q_param = Q_param_p
  171. start_server = (main_logic, WEBSOCKET_SERVER_IP, WEBSOCKET_SERVER_PORT)
  172. asyncio.get_event_loop().run_until_complete(start_server)
  173. asyncio.get_event_loop().run_forever()