123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194 |
- import asyncio
- import cv2
- import base64
- import numpy as np
- import websockets
- import time
- import logging
- import json
- import ast
- import os
- import pymysql
- from video_capture import LOG_LEVEL, BINOCULAR_ID
- WEBSOCKET_SERVER_IP = '0.0.0.0'
- WEBSOCKET_SERVER_PORT = 8324
- #Mysql
- MYSQL_HOST = '192.168.7.68'#1~5号双目数据均存至该mysql
- MYSQL_PORT = 3306
- MYSQL_USER = 'root'
- MYSQL_PASSWORD = '123456'
- MYSQL_DATABASE = 'binocular_database'
- #检测后的图像存储路径
- IMAGE_PATH = '/data/image_path'
- def logger_init(LOG_LEVEL):
- logger = logging.getLogger('mylogger.b')
- #set log level WARNING
- logger.setLevel(LOG_LEVEL)
- return logger
- logger = logger_init(LOG_LEVEL)
- #读取数据库中5号双目最新识别的图像的信息
- def get_last_info():
- try:
- db = pymysql.connect(host=MYSQL_HOST, user=MYSQL_USER, password=MYSQL_PASSWORD, database=MYSQL_DATABASE, port=MYSQL_PORT)
- cursor = db.cursor()
- sql = """select DATE_FORMAT(datetime, '%Y-%m-%d-%H-%i-%s.%f'), fish_id, fish_species, fish_size, fish_weight from binocular_data where binocular_id={} order by id desc limit 1""".format(BINOCULAR_ID)
- count = cursor.execute(sql)
- result = cursor.fetchall()
- cursor.close()
- db.close()
- #读出的datetime数据如:'2021-06-16-10-10-10.200000',需要构造出图像的名称:2021-06-16-10-10-10.2.jpg
- img_name = result[0][0][:21] + '.jpg'
- print(img_name)
- fish_id = result[0][1]
- fish_species = result[0][2]
- fish_size = result[0][3]
- fish_weight = result[0][4]
- #构造fish_info
- item = {}
- item['fish_id'] = fish_id
- item['species'] = fish_species
- item['size'] = fish_size
- item['weight'] = fish_weight
- fish_info = json.dumps([item])
- #image encoding
- img = cv2.imread(os.path.join('/data/image_path',img_name))
- image_masked_encode = cv2.imencode('.jpg', img)[1]
- image_masked_base64 = base64.b64encode(image_masked_encode)
- image_masked_data = image_masked_base64.decode()
- return image_masked_data, fish_info
- except Exception as e:
- logger.warning(e)
- return "",""
- #websocket通过内存中存在的websocket对象来获知那些client端已连接
- USERS = set()
- async def register(websocket):
- USERS.add(websocket)
- async def unregister(websocket):
- USERS.remove(websocket)
- #接收web前端亮度调节参数
- async def recv_msg(websocket):
- global Q_param
- try:
- async for message in websocket:
- #message形如:{"left":-1,"right":130},str类型
- #将message转换成dict
- recv_dict = ast.literal_eval(message)
- Q_param.put_nowait(recv_dict)
- finally:
- logger.warning('websocket client is leave')
- print(websocket,' is leave')
- #在send_msg函数里unregister
- #await unregister(websocket)
- #向web前端发送实时视频及检测到鱼的图片和图片上鱼的信息
- async def send_msg(websocket):
- global Q_web
- global Q_masked_img
- encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 50]
- #标记最近识别到的图片是否发送
- send_flag = 0
- while True:
- #从队列中读取左右目图像,队列为空不等待
- try:
- image_L, image_R = Q_web.get_nowait()
- except Exception as e:
- print('no image for web')
- #该异常出现的次数较频繁,log level设置低一些,防止频繁写log
- logger.info('no image in queue for web, %s' % str(e))
- time.sleep(1)
- continue
- #从队列中读取画上鱼轮廓的图像,队列为空不等待,前端显示上一次检测到鱼的图像
- try:
- image_masked, fish_info = Q_masked_img.get_nowait()
- queue_masked_img_flag = 1
- except Exception as e:
- queue_masked_img_flag = 0
- print('no image in masked_image queue')
- logger.info('no image in masked_image queue')
-
- image_L = cv2.resize(image_L, (320,240), interpolation = cv2.INTER_LINEAR)
- image_R = cv2.resize(image_R, (320,240), interpolation = cv2.INTER_LINEAR)
-
- image_L_encode = cv2.imencode('.jpg', image_L, encode_param)[1]
- image_R_encode = cv2.imencode('.jpg', image_R, encode_param)[1]
-
- image_L_base64 = base64.b64encode(image_L_encode)
- image_R_base64 = base64.b64encode(image_R_encode)
-
- image_L_data = image_L_base64.decode()
- image_R_data = image_R_base64.decode()
-
- #result, imgencode = cv2.imencode('.jpg', frame, encode_param)
- #data = np.array(imgencode)
- #img = data.tostring()
- #检测后的图像编码
- if queue_masked_img_flag:
- #image_masked = cv2.resize(image_masked, (320,240), interpolation = cv2.INTER_LINEAR)
- image_masked_encode = cv2.imencode('.jpg', image_masked,)[1]
- #image_masked_encode = cv2.imencode('.jpg', image_masked, encode_param)[1]
- image_masked_base64 = base64.b64encode(image_masked_encode)
- image_masked_data = image_masked_base64.decode()
- else:
- #没检测到鱼,最近检测到的图片未发送
- if not send_flag:
- image_masked_data, fish_info = get_last_info()
- send_flag = 1
- #没检测到鱼,最近检测到的图片已发送
- else:
- #发送空字符串
- image_masked_data = ""
- fish_info = ""
- #构造json格式数据
- json_data ={"img_l":"data:image/jpeg;base64,%s" % image_L_data, "img_r":"data:image/jpeg;base64,%s" % image_R_data, "img_contour":"data:image/jpeg;base64,%s" % image_masked_data, "fish_info":"data:%s" % fish_info}
- json_data = json.dumps(json_data)
-
- #发送到websocket client
- try:
- await websocket.send(json_data)
- print('pushed to web')
- await asyncio.sleep(0.1)#websocket.send后必须加这一句才能实现和recv_msg异步执行
- #except Exception as e:#websockets.exceptions.ConnectionClosedOK or websockets.exceptions.ConnectionClosedError
- except Exception as e:
- print('websocket client closed.',e)
- logger.warning('websocket client closed.')
- await unregister(websocket)
- #while True搭配websocket使用时,注销websocket后一定要break退出循环,否则会在下一次循环时注销出异常而异常退出while True
- break
- async def main_logic(websocket,path):
- await register(websocket)
- #task_list = []
- #async with websockets.connect('wss://localhost:8324') as websocket:
- task1 = asyncio.create_task(recv_msg(websocket))
- task2 = asyncio.create_task(send_msg(websocket))
- await task1
- await task2
-
- #task_list.append(asyncio.ensure_future(recv_msg(websocket)))
- #task_list.append(asyncio.ensure_future(send_msg(websocket)))
- #result = await asyncio.gather(*task_list)
- #result = await asyncio.wait(task_list)
- #print('result:*****************************',result)
- def push_to_web(Q_web_p,Q_masked_img_p,Q_param_p):
- global Q_web
- global Q_masked_img
- global Q_param
- global websocket_users
- Q_web = Q_web_p
- Q_masked_img = Q_masked_img_p
- Q_param = Q_param_p
-
- start_server = (main_logic, WEBSOCKET_SERVER_IP, WEBSOCKET_SERVER_PORT)
- asyncio.get_event_loop().run_until_complete(start_server)
- asyncio.get_event_loop().run_forever()
|