123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127 |
- # -*- coding: UTF-8 -*-
- '''
- @Project :测试代码
- @File :wsceshi.py
- @Author :Fang
- @Date :2024/7/29
- '''
- import asyncio
- import cv2
- import websockets
- import time
- import json
- from multiprocessing import Process, Queue
- from util.hard_disk_storage import HardDiskStorage
- from util.memory_storage import MemoryStorage
- import redis, base64
- Q_web: Queue
- Q_masked_img: Queue
- Q_param: Queue
- USERS = set()
- r = redis.Redis(host='127.0.0.1', port=6379)
- hard_disk_db = HardDiskStorage()
- memory_storage = MemoryStorage()
- async def register(websocket) -> None:
- """注册websocket"""
- USERS.add(websocket)
- print("================【SUCCESS】客户端socket已连接================")
- async def unregister(websocket) -> None:
- """注销websocket"""
- USERS.remove(websocket)
- print("================【SUCCESS】客户端socket已注销================")
- async def send_msg(websocket) -> None:
- """向web前端发送实时视频及检测到鱼的图片和图片上鱼的信息"""
- global Q_web
- global Q_masked_img
- # 标记最近识别到的图片是否发送
- send_flag = 0
- while True:
- # 从队列中读取左右目图像,队列为空不等待
- try:
- image_L = Q_web.get_nowait()
- except Exception as e:
- time.sleep(0.1)
- continue
- # image_L = r.get("images")
- # if not image_L:
- # print('fang --> no images')
- # time.sleep(1)
- # continue
- # ======================================== 对获取的声呐图像进行编码处理 ========================================
- # 使用Base64对被编码的bytes-like对象进行 编码(加密) 并返回bytes对象
- image_L_base64 = base64.b64encode(image_L)
- # 对bytes字符串进行解码
- image_L_data = image_L_base64.decode()
- # ======================================== 构造json格式数据 ========================================
- json_data = {
- "sonar_img": "data:images/jpeg;base64,%s" % image_L_data,
- }
- json_data = json.dumps(json_data)
- # ======================================== 发送数据到websocket client ========================================
- try:
- await websocket.send(json_data)
- print(f'【SUCCESS】',print(r.get('save_image_time')))
- await asyncio.sleep(0.01) # websocket.send后必须加这一句才能实现和recv_msg异步执行
- # except Exception as e:#websockets.exceptions.ConnectionClosedOK or websockets.exceptions.ConnectionClosedError
- except Exception as e:
- print(f'【websocket client is leave】{e}')
- await unregister(websocket) # websocket断开连接时注销掉当前连接
- # while True搭配websocket使用时,注销websocket后一定要break退出循环,否则会在下一次循环时注销出异常而异常退出while True
- break
- async def main_logic(websocket) -> None:
- await register(websocket)
- task2 = asyncio.create_task(send_msg(websocket))
- await task2
- def push_to_web(queue_camera_to_web: Queue) -> None:
- global Q_web
- Q_web = queue_camera_to_web
- # 开启websocket服务
- start_server = websockets.serve(main_logic, '0.0.0.0', '8325')
- loop = asyncio.get_event_loop()
- loop.run_until_complete(start_server) # 运行事件循环,直到Future完成。
- loop.run_forever() # 运行事件循环,直到stop()被调用。
- def to_web(queue_cameraz_to_web):
- while True:
- try:
- var = r.get("images")
- if var:
- queue_cameraz_to_web.put_nowait(var)
- else:
- time.sleep(0.1)
- except Exception as e:
- time.sleep(0.1)
- continue
- if __name__ == '__main__':
- queue_camera_to_web = Queue(1) # 推送到前端的图像队列
- process1 = Process(target=to_web, args=(queue_camera_to_web,))
- process1.start()
- process2 = Process(target=push_to_web, args=(queue_camera_to_web,))
- process2.start()
- # push_to_web(queue_camera_to_web)
|