ws_server.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. # -*- coding: UTF-8 -*-
  2. '''
  3. @Project :测试代码
  4. @File :wsceshi.py
  5. @Author :Fang
  6. @Date :2024/7/29
  7. '''
  8. import asyncio
  9. import cv2
  10. import websockets
  11. import time
  12. import json
  13. from multiprocessing import Process, Queue
  14. from util.hard_disk_storage import HardDiskStorage
  15. from util.memory_storage import MemoryStorage
  16. import redis, base64
  17. Q_web: Queue
  18. Q_masked_img: Queue
  19. Q_param: Queue
  20. USERS = set()
  21. r = redis.Redis(host='127.0.0.1', port=6379)
  22. hard_disk_db = HardDiskStorage()
  23. memory_storage = MemoryStorage()
  24. async def register(websocket) -> None:
  25. """注册websocket"""
  26. USERS.add(websocket)
  27. print("================【SUCCESS】客户端socket已连接================")
  28. async def unregister(websocket) -> None:
  29. """注销websocket"""
  30. USERS.remove(websocket)
  31. print("================【SUCCESS】客户端socket已注销================")
  32. async def send_msg(websocket) -> None:
  33. """向web前端发送实时视频及检测到鱼的图片和图片上鱼的信息"""
  34. global Q_web
  35. global Q_masked_img
  36. # 标记最近识别到的图片是否发送
  37. send_flag = 0
  38. while True:
  39. # 从队列中读取左右目图像,队列为空不等待
  40. try:
  41. image_L = Q_web.get_nowait()
  42. except Exception as e:
  43. time.sleep(0.1)
  44. continue
  45. # image_L = r.get("images")
  46. # if not image_L:
  47. # print('fang --> no images')
  48. # time.sleep(1)
  49. # continue
  50. # ======================================== 对获取的声呐图像进行编码处理 ========================================
  51. # 使用Base64对被编码的bytes-like对象进行 编码(加密) 并返回bytes对象
  52. image_L_base64 = base64.b64encode(image_L)
  53. # 对bytes字符串进行解码
  54. image_L_data = image_L_base64.decode()
  55. # ======================================== 构造json格式数据 ========================================
  56. json_data = {
  57. "sonar_img": "data:images/jpeg;base64,%s" % image_L_data,
  58. }
  59. json_data = json.dumps(json_data)
  60. # ======================================== 发送数据到websocket client ========================================
  61. try:
  62. await websocket.send(json_data)
  63. print(f'【SUCCESS】',print(r.get('save_image_time')))
  64. await asyncio.sleep(0.01) # websocket.send后必须加这一句才能实现和recv_msg异步执行
  65. # except Exception as e:#websockets.exceptions.ConnectionClosedOK or websockets.exceptions.ConnectionClosedError
  66. except Exception as e:
  67. print(f'【websocket client is leave】{e}')
  68. await unregister(websocket) # websocket断开连接时注销掉当前连接
  69. # while True搭配websocket使用时,注销websocket后一定要break退出循环,否则会在下一次循环时注销出异常而异常退出while True
  70. break
  71. async def main_logic(websocket) -> None:
  72. await register(websocket)
  73. task2 = asyncio.create_task(send_msg(websocket))
  74. await task2
  75. def push_to_web(queue_camera_to_web: Queue) -> None:
  76. global Q_web
  77. Q_web = queue_camera_to_web
  78. # 开启websocket服务
  79. start_server = websockets.serve(main_logic, '0.0.0.0', '8325')
  80. loop = asyncio.get_event_loop()
  81. loop.run_until_complete(start_server) # 运行事件循环,直到Future完成。
  82. loop.run_forever() # 运行事件循环,直到stop()被调用。
  83. def to_web(queue_cameraz_to_web):
  84. while True:
  85. try:
  86. var = r.get("images")
  87. if var:
  88. queue_cameraz_to_web.put_nowait(var)
  89. else:
  90. time.sleep(0.1)
  91. except Exception as e:
  92. time.sleep(0.1)
  93. continue
  94. if __name__ == '__main__':
  95. queue_camera_to_web = Queue(1) # 推送到前端的图像队列
  96. process1 = Process(target=to_web, args=(queue_camera_to_web,))
  97. process1.start()
  98. process2 = Process(target=push_to_web, args=(queue_camera_to_web,))
  99. process2.start()
  100. # push_to_web(queue_camera_to_web)