import asyncio import cv2 import base64 import numpy as np import websockets import time import logging import json import ast import os import pymysql from video_capture import LOG_LEVEL, BINOCULAR_ID WEBSOCKET_SERVER_IP = '0.0.0.0' WEBSOCKET_SERVER_PORT = 8324 #Mysql MYSQL_HOST = '192.168.7.68'#1~5号双目数据均存至该mysql MYSQL_PORT = 3306 MYSQL_USER = 'root' MYSQL_PASSWORD = '123456' MYSQL_DATABASE = 'binocular_database' #检测后的图像存储路径 IMAGE_PATH = '/data/image_path' def logger_init(LOG_LEVEL): logger = logging.getLogger('mylogger.b') #set log level WARNING logger.setLevel(LOG_LEVEL) return logger logger = logger_init(LOG_LEVEL) #读取数据库中5号双目最新识别的图像的信息 def get_last_info(): try: db = pymysql.connect(host=MYSQL_HOST, user=MYSQL_USER, password=MYSQL_PASSWORD, database=MYSQL_DATABASE, port=MYSQL_PORT) cursor = db.cursor() sql = """select DATE_FORMAT(datetime, '%Y-%m-%d-%H-%i-%s.%f'), fish_id, fish_species, fish_size, fish_weight from binocular_data where binocular_id={} order by id desc limit 1""".format(BINOCULAR_ID) count = cursor.execute(sql) result = cursor.fetchall() cursor.close() db.close() #读出的datetime数据如:'2021-06-16-10-10-10.200000',需要构造出图像的名称:2021-06-16-10-10-10.2.jpg img_name = result[0][0][:21] + '.jpg' print(img_name) fish_id = result[0][1] fish_species = result[0][2] fish_size = result[0][3] fish_weight = result[0][4] #构造fish_info item = {} item['fish_id'] = fish_id item['species'] = fish_species item['size'] = fish_size item['weight'] = fish_weight fish_info = json.dumps([item]) #image encoding img = cv2.imread(os.path.join('/data/image_path',img_name)) image_masked_encode = cv2.imencode('.jpg', img)[1] image_masked_base64 = base64.b64encode(image_masked_encode) image_masked_data = image_masked_base64.decode() return image_masked_data, fish_info except Exception as e: logger.warning(e) return "","" #websocket通过内存中存在的websocket对象来获知那些client端已连接 USERS = set() async def register(websocket): USERS.add(websocket) async def unregister(websocket): USERS.remove(websocket) #接收web前端亮度调节参数 async def recv_msg(websocket): global Q_param try: async for message in websocket: #message形如:{"left":-1,"right":130},str类型 #将message转换成dict recv_dict = ast.literal_eval(message) Q_param.put_nowait(recv_dict) finally: logger.warning('websocket client is leave') print(websocket,' is leave') #在send_msg函数里unregister #await unregister(websocket) #向web前端发送实时视频及检测到鱼的图片和图片上鱼的信息 async def send_msg(websocket): global Q_web global Q_masked_img encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 50] #标记最近识别到的图片是否发送 send_flag = 0 while True: #从队列中读取左右目图像,队列为空不等待 try: image_L, image_R = Q_web.get_nowait() except Exception as e: print('no image for web') #该异常出现的次数较频繁,log level设置低一些,防止频繁写log logger.info('no image in queue for web, %s' % str(e)) time.sleep(1) continue #从队列中读取画上鱼轮廓的图像,队列为空不等待,前端显示上一次检测到鱼的图像 try: image_masked, fish_info = Q_masked_img.get_nowait() queue_masked_img_flag = 1 except Exception as e: queue_masked_img_flag = 0 print('no image in masked_image queue') logger.info('no image in masked_image queue') image_L = cv2.resize(image_L, (320,240), interpolation = cv2.INTER_LINEAR) image_R = cv2.resize(image_R, (320,240), interpolation = cv2.INTER_LINEAR) image_L_encode = cv2.imencode('.jpg', image_L, encode_param)[1] image_R_encode = cv2.imencode('.jpg', image_R, encode_param)[1] image_L_base64 = base64.b64encode(image_L_encode) image_R_base64 = base64.b64encode(image_R_encode) image_L_data = image_L_base64.decode() image_R_data = image_R_base64.decode() #result, imgencode = cv2.imencode('.jpg', frame, encode_param) #data = np.array(imgencode) #img = data.tostring() #检测后的图像编码 if queue_masked_img_flag: #image_masked = cv2.resize(image_masked, (320,240), interpolation = cv2.INTER_LINEAR) image_masked_encode = cv2.imencode('.jpg', image_masked,)[1] #image_masked_encode = cv2.imencode('.jpg', image_masked, encode_param)[1] image_masked_base64 = base64.b64encode(image_masked_encode) image_masked_data = image_masked_base64.decode() else: #没检测到鱼,最近检测到的图片未发送 if not send_flag: image_masked_data, fish_info = get_last_info() send_flag = 1 #没检测到鱼,最近检测到的图片已发送 else: #发送空字符串 image_masked_data = "" fish_info = "" #构造json格式数据 json_data ={"img_l":"data:image/jpeg;base64,%s" % image_L_data, "img_r":"data:image/jpeg;base64,%s" % image_R_data, "img_contour":"data:image/jpeg;base64,%s" % image_masked_data, "fish_info":"data:%s" % fish_info} json_data = json.dumps(json_data) #发送到websocket client try: await websocket.send(json_data) print('pushed to web') await asyncio.sleep(0.1)#websocket.send后必须加这一句才能实现和recv_msg异步执行 #except Exception as e:#websockets.exceptions.ConnectionClosedOK or websockets.exceptions.ConnectionClosedError except Exception as e: print('websocket client closed.',e) logger.warning('websocket client closed.') await unregister(websocket) #while True搭配websocket使用时,注销websocket后一定要break退出循环,否则会在下一次循环时注销出异常而异常退出while True break async def main_logic(websocket,path): await register(websocket) #task_list = [] #async with websockets.connect('wss://localhost:8324') as websocket: task1 = asyncio.create_task(recv_msg(websocket)) task2 = asyncio.create_task(send_msg(websocket)) await task1 await task2 #task_list.append(asyncio.ensure_future(recv_msg(websocket))) #task_list.append(asyncio.ensure_future(send_msg(websocket))) #result = await asyncio.gather(*task_list) #result = await asyncio.wait(task_list) #print('result:*****************************',result) def push_to_web(Q_web_p,Q_masked_img_p,Q_param_p): global Q_web global Q_masked_img global Q_param global websocket_users Q_web = Q_web_p Q_masked_img = Q_masked_img_p Q_param = Q_param_p start_server = (main_logic, WEBSOCKET_SERVER_IP, WEBSOCKET_SERVER_PORT) asyncio.get_event_loop().run_until_complete(start_server) asyncio.get_event_loop().run_forever()