# -*- coding: UTF-8 -*- ''' @Project :测试代码 @File :wsceshi.py @Author :Fang @Date :2024/7/29 ''' import asyncio import cv2 import websockets import time import json from multiprocessing import Process, Queue from util.hard_disk_storage import HardDiskStorage from util.memory_storage import MemoryStorage import redis, base64 Q_web: Queue Q_masked_img: Queue Q_param: Queue USERS = set() r = redis.Redis(host='127.0.0.1', port=6379) hard_disk_db = HardDiskStorage() memory_storage = MemoryStorage() async def register(websocket) -> None: """注册websocket""" USERS.add(websocket) print("================【SUCCESS】客户端socket已连接================") async def unregister(websocket) -> None: """注销websocket""" USERS.remove(websocket) print("================【SUCCESS】客户端socket已注销================") async def send_msg(websocket) -> None: """向web前端发送实时视频及检测到鱼的图片和图片上鱼的信息""" global Q_web global Q_masked_img # 标记最近识别到的图片是否发送 send_flag = 0 while True: # 从队列中读取左右目图像,队列为空不等待 try: image_L = Q_web.get_nowait() except Exception as e: time.sleep(0.1) continue # image_L = r.get("images") # if not image_L: # print('fang --> no images') # time.sleep(1) # continue # ======================================== 对获取的声呐图像进行编码处理 ======================================== # 使用Base64对被编码的bytes-like对象进行 编码(加密) 并返回bytes对象 image_L_base64 = base64.b64encode(image_L) # 对bytes字符串进行解码 image_L_data = image_L_base64.decode() # ======================================== 构造json格式数据 ======================================== json_data = { "sonar_img": "data:images/jpeg;base64,%s" % image_L_data, } json_data = json.dumps(json_data) # ======================================== 发送数据到websocket client ======================================== try: await websocket.send(json_data) print(f'【SUCCESS】',print(r.get('save_image_time'))) await asyncio.sleep(0.01) # websocket.send后必须加这一句才能实现和recv_msg异步执行 # except Exception as e:#websockets.exceptions.ConnectionClosedOK or websockets.exceptions.ConnectionClosedError except Exception as e: print(f'【websocket client is leave】{e}') await unregister(websocket) # websocket断开连接时注销掉当前连接 # while True搭配websocket使用时,注销websocket后一定要break退出循环,否则会在下一次循环时注销出异常而异常退出while True break async def main_logic(websocket) -> None: await register(websocket) task2 = asyncio.create_task(send_msg(websocket)) await task2 def push_to_web(queue_camera_to_web: Queue) -> None: global Q_web Q_web = queue_camera_to_web # 开启websocket服务 start_server = websockets.serve(main_logic, '0.0.0.0', '8325') loop = asyncio.get_event_loop() loop.run_until_complete(start_server) # 运行事件循环,直到Future完成。 loop.run_forever() # 运行事件循环,直到stop()被调用。 def to_web(queue_cameraz_to_web): while True: try: var = r.get("images") if var: queue_cameraz_to_web.put_nowait(var) else: time.sleep(0.1) except Exception as e: time.sleep(0.1) continue if __name__ == '__main__': queue_camera_to_web = Queue(1) # 推送到前端的图像队列 process1 = Process(target=to_web, args=(queue_camera_to_web,)) process1.start() process2 = Process(target=push_to_web, args=(queue_camera_to_web,)) process2.start() # push_to_web(queue_camera_to_web)