123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824 |
- # Copyright (c) OpenMMLab. All rights reserved.
- import copy
- import json
- import os.path as osp
- import tempfile
- from collections import OrderedDict
- from multiprocessing import Process, Queue
- from typing import Dict, List, Optional, Sequence, Union
- import numpy as np
- from mmengine.evaluator import BaseMetric
- from mmengine.fileio import dump, get_text, load
- from mmengine.logging import MMLogger
- from scipy.sparse import csr_matrix
- from scipy.sparse.csgraph import maximum_bipartite_matching
- from mmdet.evaluation.functional.bbox_overlaps import bbox_overlaps
- from mmdet.registry import METRICS
- PERSON_CLASSES = ['background', 'person']
- @METRICS.register_module()
- class CrowdHumanMetric(BaseMetric):
- """CrowdHuman evaluation metric.
- Evaluate Average Precision (AP), Miss Rate (MR) and Jaccard Index (JI)
- for detection tasks.
- Args:
- ann_file (str): Path to the annotation file.
- metric (str | List[str]): Metrics to be evaluated. Valid metrics
- include 'AP', 'MR' and 'JI'. Defaults to 'AP'.
- format_only (bool): Format the output results without perform
- evaluation. It is useful when you want to format the result
- to a specific format and submit it to the test server.
- Defaults to False.
- outfile_prefix (str, optional): The prefix of json files. It includes
- the file path and the prefix of filename, e.g., "a/b/prefix".
- If not specified, a temp file will be created. Defaults to None.
- file_client_args (dict, optional): Arguments to instantiate the
- corresponding backend in mmdet <= 3.0.0rc6. Defaults to None.
- backend_args (dict, optional): Arguments to instantiate the
- corresponding backend. Defaults to None.
- collect_device (str): Device name used for collecting results from
- different ranks during distributed training. Must be 'cpu' or
- 'gpu'. Defaults to 'cpu'.
- prefix (str, optional): The prefix that will be added in the metric
- names to disambiguate homonymous metrics of different evaluators.
- If prefix is not provided in the argument, self.default_prefix
- will be used instead. Defaults to None.
- eval_mode (int): Select the mode of evaluate. Valid mode include
- 0(just body box), 1(just head box) and 2(both of them).
- Defaults to 0.
- iou_thres (float): IoU threshold. Defaults to 0.5.
- compare_matching_method (str, optional): Matching method to compare
- the detection results with the ground_truth when compute 'AP'
- and 'MR'.Valid method include VOC and None(CALTECH). Default to
- None.
- mr_ref (str): Different parameter selection to calculate MR. Valid
- ref include CALTECH_-2 and CALTECH_-4. Defaults to CALTECH_-2.
- num_ji_process (int): The number of processes to evaluation JI.
- Defaults to 10.
- """
- default_prefix: Optional[str] = 'crowd_human'
- def __init__(self,
- ann_file: str,
- metric: Union[str, List[str]] = ['AP', 'MR', 'JI'],
- format_only: bool = False,
- outfile_prefix: Optional[str] = None,
- file_client_args: dict = None,
- backend_args: dict = None,
- collect_device: str = 'cpu',
- prefix: Optional[str] = None,
- eval_mode: int = 0,
- iou_thres: float = 0.5,
- compare_matching_method: Optional[str] = None,
- mr_ref: str = 'CALTECH_-2',
- num_ji_process: int = 10) -> None:
- super().__init__(collect_device=collect_device, prefix=prefix)
- self.ann_file = ann_file
- # crowdhuman evaluation metrics
- self.metrics = metric if isinstance(metric, list) else [metric]
- allowed_metrics = ['MR', 'AP', 'JI']
- for metric in self.metrics:
- if metric not in allowed_metrics:
- raise KeyError(f"metric should be one of 'MR', 'AP', 'JI',"
- f'but got {metric}.')
- self.format_only = format_only
- if self.format_only:
- assert outfile_prefix is not None, 'outfile_prefix must be not'
- 'None when format_only is True, otherwise the result files will'
- 'be saved to a temp directory which will be cleaned up at the end.'
- self.outfile_prefix = outfile_prefix
- self.backend_args = backend_args
- if file_client_args is not None:
- raise RuntimeError(
- 'The `file_client_args` is deprecated, '
- 'please use `backend_args` instead, please refer to'
- 'https://github.com/open-mmlab/mmdetection/blob/main/configs/_base_/datasets/coco_detection.py' # noqa: E501
- )
- assert eval_mode in [0, 1, 2], \
- "Unknown eval mode. mr_ref should be one of '0', '1', '2'."
- assert compare_matching_method is None or \
- compare_matching_method == 'VOC', \
- 'The alternative compare_matching_method is VOC.' \
- 'This parameter defaults to CALTECH(None)'
- assert mr_ref == 'CALTECH_-2' or mr_ref == 'CALTECH_-4', \
- "mr_ref should be one of 'CALTECH_-2', 'CALTECH_-4'."
- self.eval_mode = eval_mode
- self.iou_thres = iou_thres
- self.compare_matching_method = compare_matching_method
- self.mr_ref = mr_ref
- self.num_ji_process = num_ji_process
- @staticmethod
- def results2json(results: Sequence[dict], outfile_prefix: str) -> str:
- """Dump the detection results to a json file."""
- result_file_path = f'{outfile_prefix}.json'
- bbox_json_results = []
- for i, result in enumerate(results):
- ann, pred = result
- dump_dict = dict()
- dump_dict['ID'] = ann['ID']
- dump_dict['width'] = ann['width']
- dump_dict['height'] = ann['height']
- dtboxes = []
- bboxes = pred.tolist()
- for _, single_bbox in enumerate(bboxes):
- temp_dict = dict()
- x1, y1, x2, y2, score = single_bbox
- temp_dict['box'] = [x1, y1, x2 - x1, y2 - y1]
- temp_dict['score'] = score
- temp_dict['tag'] = 1
- dtboxes.append(temp_dict)
- dump_dict['dtboxes'] = dtboxes
- bbox_json_results.append(dump_dict)
- dump(bbox_json_results, result_file_path)
- return result_file_path
- def process(self, data_batch: Sequence[dict],
- data_samples: Sequence[dict]) -> None:
- """Process one batch of data samples and predictions. The processed
- results should be stored in ``self.results``, which will be used to
- compute the metrics when all batches have been processed.
- Args:
- data_batch (dict): A batch of data from the dataloader.
- data_samples (Sequence[dict]): A batch of data samples that
- contain annotations and predictions.
- """
- for data_sample in data_samples:
- ann = dict()
- ann['ID'] = data_sample['img_id']
- ann['width'] = data_sample['ori_shape'][1]
- ann['height'] = data_sample['ori_shape'][0]
- pred_bboxes = data_sample['pred_instances']['bboxes'].cpu().numpy()
- pred_scores = data_sample['pred_instances']['scores'].cpu().numpy()
- pred_bbox_scores = np.hstack(
- [pred_bboxes, pred_scores.reshape((-1, 1))])
- self.results.append((ann, pred_bbox_scores))
- def compute_metrics(self, results: list) -> Dict[str, float]:
- """Compute the metrics from processed results.
- Args:
- results (list): The processed results of each batch.
- Returns:
- eval_results(Dict[str, float]): The computed metrics.
- The keys are the names of the metrics, and the values
- are corresponding results.
- """
- logger: MMLogger = MMLogger.get_current_instance()
- tmp_dir = None
- if self.outfile_prefix is None:
- tmp_dir = tempfile.TemporaryDirectory()
- outfile_prefix = osp.join(tmp_dir.name, 'result')
- else:
- outfile_prefix = self.outfile_prefix
- # convert predictions to coco format and dump to json file
- result_file = self.results2json(results, outfile_prefix)
- eval_results = OrderedDict()
- if self.format_only:
- logger.info(f'results are saved in {osp.dirname(outfile_prefix)}')
- return eval_results
- # load evaluation samples
- eval_samples = self.load_eval_samples(result_file)
- if 'AP' in self.metrics or 'MR' in self.metrics:
- score_list = self.compare(eval_samples)
- gt_num = sum([eval_samples[i].gt_num for i in eval_samples])
- ign_num = sum([eval_samples[i].ign_num for i in eval_samples])
- gt_num = gt_num - ign_num
- img_num = len(eval_samples)
- for metric in self.metrics:
- logger.info(f'Evaluating {metric}...')
- if metric == 'AP':
- AP = self.eval_ap(score_list, gt_num, img_num)
- eval_results['mAP'] = float(f'{round(AP, 4)}')
- if metric == 'MR':
- MR = self.eval_mr(score_list, gt_num, img_num)
- eval_results['mMR'] = float(f'{round(MR, 4)}')
- if metric == 'JI':
- JI = self.eval_ji(eval_samples)
- eval_results['JI'] = float(f'{round(JI, 4)}')
- if tmp_dir is not None:
- tmp_dir.cleanup()
- return eval_results
- def load_eval_samples(self, result_file):
- """Load data from annotations file and detection results.
- Args:
- result_file (str): The file path of the saved detection results.
- Returns:
- Dict[Image]: The detection result packaged by Image
- """
- gt_str = get_text(
- self.ann_file, backend_args=self.backend_args).strip().split('\n')
- gt_records = [json.loads(line) for line in gt_str]
- pred_records = load(result_file, backend_args=self.backend_args)
- eval_samples = dict()
- for gt_record, pred_record in zip(gt_records, pred_records):
- assert gt_record['ID'] == pred_record['ID'], \
- 'please set val_dataloader.sampler.shuffle=False and try again'
- eval_samples[pred_record['ID']] = Image(self.eval_mode)
- eval_samples[pred_record['ID']].load(gt_record, 'box', None,
- PERSON_CLASSES, True)
- eval_samples[pred_record['ID']].load(pred_record, 'box', None,
- PERSON_CLASSES, False)
- eval_samples[pred_record['ID']].clip_all_boader()
- return eval_samples
- def compare(self, samples):
- """Match the detection results with the ground_truth.
- Args:
- samples (dict[Image]): The detection result packaged by Image.
- Returns:
- score_list(list[tuple[ndarray, int, str]]): Matching result.
- a list of tuples (dtbox, label, imgID) in the descending
- sort of dtbox.score.
- """
- score_list = list()
- for id in samples:
- if self.compare_matching_method == 'VOC':
- result = samples[id].compare_voc(self.iou_thres)
- else:
- result = samples[id].compare_caltech(self.iou_thres)
- score_list.extend(result)
- # In the descending sort of dtbox score.
- score_list.sort(key=lambda x: x[0][-1], reverse=True)
- return score_list
- @staticmethod
- def eval_ap(score_list, gt_num, img_num):
- """Evaluate by average precision.
- Args:
- score_list(list[tuple[ndarray, int, str]]): Matching result.
- a list of tuples (dtbox, label, imgID) in the descending
- sort of dtbox.score.
- gt_num(int): The number of gt boxes in the entire dataset.
- img_num(int): The number of images in the entire dataset.
- Returns:
- ap(float): result of average precision.
- """
- # calculate general ap score
- def _calculate_map(_recall, _precision):
- assert len(_recall) == len(_precision)
- area = 0
- for k in range(1, len(_recall)):
- delta_h = (_precision[k - 1] + _precision[k]) / 2
- delta_w = _recall[k] - _recall[k - 1]
- area += delta_w * delta_h
- return area
- tp, fp = 0.0, 0.0
- rpX, rpY = list(), list()
- fpn = []
- recalln = []
- thr = []
- fppi = []
- for i, item in enumerate(score_list):
- if item[1] == 1:
- tp += 1.0
- elif item[1] == 0:
- fp += 1.0
- fn = gt_num - tp
- recall = tp / (tp + fn)
- precision = tp / (tp + fp)
- rpX.append(recall)
- rpY.append(precision)
- fpn.append(fp)
- recalln.append(tp)
- thr.append(item[0][-1])
- fppi.append(fp / img_num)
- ap = _calculate_map(rpX, rpY)
- return ap
- def eval_mr(self, score_list, gt_num, img_num):
- """Evaluate by Caltech-style log-average miss rate.
- Args:
- score_list(list[tuple[ndarray, int, str]]): Matching result.
- a list of tuples (dtbox, label, imgID) in the descending
- sort of dtbox.score.
- gt_num(int): The number of gt boxes in the entire dataset.
- img_num(int): The number of image in the entire dataset.
- Returns:
- mr(float): result of miss rate.
- """
- # find greater_than
- def _find_gt(lst, target):
- for idx, _item in enumerate(lst):
- if _item >= target:
- return idx
- return len(lst) - 1
- if self.mr_ref == 'CALTECH_-2':
- # CALTECH_MRREF_2: anchor points (from 10^-2 to 1) as in
- # P.Dollar's paper
- ref = [
- 0.0100, 0.0178, 0.03160, 0.0562, 0.1000, 0.1778, 0.3162,
- 0.5623, 1.000
- ]
- else:
- # CALTECH_MRREF_4: anchor points (from 10^-4 to 1) as in
- # S.Zhang's paper
- ref = [
- 0.0001, 0.0003, 0.00100, 0.0032, 0.0100, 0.0316, 0.1000,
- 0.3162, 1.000
- ]
- tp, fp = 0.0, 0.0
- fppiX, fppiY = list(), list()
- for i, item in enumerate(score_list):
- if item[1] == 1:
- tp += 1.0
- elif item[1] == 0:
- fp += 1.0
- fn = gt_num - tp
- recall = tp / (tp + fn)
- missrate = 1.0 - recall
- fppi = fp / img_num
- fppiX.append(fppi)
- fppiY.append(missrate)
- score = list()
- for pos in ref:
- argmin = _find_gt(fppiX, pos)
- if argmin >= 0:
- score.append(fppiY[argmin])
- score = np.array(score)
- mr = np.exp(np.log(score).mean())
- return mr
- def eval_ji(self, samples):
- """Evaluate by JI using multi_process.
- Args:
- samples(Dict[str, Image]): The detection result packaged by Image.
- Returns:
- ji(float): result of jaccard index.
- """
- import math
- res_line = []
- res_ji = []
- for i in range(10):
- score_thr = 1e-1 * i
- total = len(samples)
- stride = math.ceil(total / self.num_ji_process)
- result_queue = Queue(10000)
- results, procs = [], []
- records = list(samples.items())
- for i in range(self.num_ji_process):
- start = i * stride
- end = np.min([start + stride, total])
- sample_data = dict(records[start:end])
- p = Process(
- target=self.compute_ji_with_ignore,
- args=(result_queue, sample_data, score_thr))
- p.start()
- procs.append(p)
- for i in range(total):
- t = result_queue.get()
- results.append(t)
- for p in procs:
- p.join()
- line, mean_ratio = self.gather(results)
- line = 'score_thr:{:.1f}, {}'.format(score_thr, line)
- res_line.append(line)
- res_ji.append(mean_ratio)
- return max(res_ji)
- def compute_ji_with_ignore(self, result_queue, dt_result, score_thr):
- """Compute JI with ignore.
- Args:
- result_queue(Queue): The Queue for save compute result when
- multi_process.
- dt_result(dict[Image]): Detection result packaged by Image.
- score_thr(float): The threshold of detection score.
- Returns:
- dict: compute result.
- """
- for ID, record in dt_result.items():
- gt_boxes = record.gt_boxes
- dt_boxes = record.dt_boxes
- keep = dt_boxes[:, -1] > score_thr
- dt_boxes = dt_boxes[keep][:, :-1]
- gt_tag = np.array(gt_boxes[:, -1] != -1)
- matches = self.compute_ji_matching(dt_boxes, gt_boxes[gt_tag, :4])
- # get the unmatched_indices
- matched_indices = np.array([j for (j, _) in matches])
- unmatched_indices = list(
- set(np.arange(dt_boxes.shape[0])) - set(matched_indices))
- num_ignore_dt = self.get_ignores(dt_boxes[unmatched_indices],
- gt_boxes[~gt_tag, :4])
- matched_indices = np.array([j for (_, j) in matches])
- unmatched_indices = list(
- set(np.arange(gt_boxes[gt_tag].shape[0])) -
- set(matched_indices))
- num_ignore_gt = self.get_ignores(
- gt_boxes[gt_tag][unmatched_indices], gt_boxes[~gt_tag, :4])
- # compute results
- eps = 1e-6
- k = len(matches)
- m = gt_tag.sum() - num_ignore_gt
- n = dt_boxes.shape[0] - num_ignore_dt
- ratio = k / (m + n - k + eps)
- recall = k / (m + eps)
- cover = k / (n + eps)
- noise = 1 - cover
- result_dict = dict(
- ratio=ratio,
- recall=recall,
- cover=cover,
- noise=noise,
- k=k,
- m=m,
- n=n)
- result_queue.put_nowait(result_dict)
- @staticmethod
- def gather(results):
- """Integrate test results."""
- assert len(results)
- img_num = 0
- for result in results:
- if result['n'] != 0 or result['m'] != 0:
- img_num += 1
- mean_ratio = np.sum([rb['ratio'] for rb in results]) / img_num
- valids = np.sum([rb['k'] for rb in results])
- total = np.sum([rb['n'] for rb in results])
- gtn = np.sum([rb['m'] for rb in results])
- line = 'mean_ratio:{:.4f}, valids:{}, total:{}, gtn:{}'\
- .format(mean_ratio, valids, total, gtn)
- return line, mean_ratio
- def compute_ji_matching(self, dt_boxes, gt_boxes):
- """Match the annotation box for each detection box.
- Args:
- dt_boxes(ndarray): Detection boxes.
- gt_boxes(ndarray): Ground_truth boxes.
- Returns:
- matches_(list[tuple[int, int]]): Match result.
- """
- assert dt_boxes.shape[-1] > 3 and gt_boxes.shape[-1] > 3
- if dt_boxes.shape[0] < 1 or gt_boxes.shape[0] < 1:
- return list()
- ious = bbox_overlaps(dt_boxes, gt_boxes, mode='iou')
- input_ = copy.deepcopy(ious)
- input_[input_ < self.iou_thres] = 0
- match_scipy = maximum_bipartite_matching(
- csr_matrix(input_), perm_type='column')
- matches_ = []
- for i in range(len(match_scipy)):
- if match_scipy[i] != -1:
- matches_.append((i, int(match_scipy[i])))
- return matches_
- def get_ignores(self, dt_boxes, gt_boxes):
- """Get the number of ignore bboxes."""
- if gt_boxes.size:
- ioas = bbox_overlaps(dt_boxes, gt_boxes, mode='iof')
- ioas = np.max(ioas, axis=1)
- rows = np.where(ioas > self.iou_thres)[0]
- return len(rows)
- else:
- return 0
- class Image(object):
- """Data structure for evaluation of CrowdHuman.
- Note:
- This implementation is modified from https://github.com/Purkialo/
- CrowdDet/blob/master/lib/evaluate/APMRToolkits/image.py
- Args:
- mode (int): Select the mode of evaluate. Valid mode include
- 0(just body box), 1(just head box) and 2(both of them).
- Defaults to 0.
- """
- def __init__(self, mode):
- self.ID = None
- self.width = None
- self.height = None
- self.dt_boxes = None
- self.gt_boxes = None
- self.eval_mode = mode
- self.ign_num = None
- self.gt_num = None
- self.dt_num = None
- def load(self, record, body_key, head_key, class_names, gt_flag):
- """Loading information for evaluation.
- Args:
- record (dict): Label information or test results.
- The format might look something like this:
- {
- 'ID': '273271,c9db000d5146c15',
- 'gtboxes': [
- {'fbox': [72, 202, 163, 503], 'tag': 'person', ...},
- {'fbox': [199, 180, 144, 499], 'tag': 'person', ...},
- ...
- ]
- }
- or:
- {
- 'ID': '273271,c9db000d5146c15',
- 'width': 800,
- 'height': 1067,
- 'dtboxes': [
- {
- 'box': [306.22, 205.95, 164.05, 394.04],
- 'score': 0.99,
- 'tag': 1
- },
- {
- 'box': [403.60, 178.66, 157.15, 421.33],
- 'score': 0.99,
- 'tag': 1
- },
- ...
- ]
- }
- body_key (str, None): key of detection body box.
- Valid when loading detection results and self.eval_mode!=1.
- head_key (str, None): key of detection head box.
- Valid when loading detection results and self.eval_mode!=0.
- class_names (list[str]):class names of data set.
- Defaults to ['background', 'person'].
- gt_flag (bool): Indicate whether record is ground truth
- or predicting the outcome.
- """
- if 'ID' in record and self.ID is None:
- self.ID = record['ID']
- if 'width' in record and self.width is None:
- self.width = record['width']
- if 'height' in record and self.height is None:
- self.height = record['height']
- if gt_flag:
- self.gt_num = len(record['gtboxes'])
- body_bbox, head_bbox = self.load_gt_boxes(record, 'gtboxes',
- class_names)
- if self.eval_mode == 0:
- self.gt_boxes = body_bbox
- self.ign_num = (body_bbox[:, -1] == -1).sum()
- elif self.eval_mode == 1:
- self.gt_boxes = head_bbox
- self.ign_num = (head_bbox[:, -1] == -1).sum()
- else:
- gt_tag = np.array([
- body_bbox[i, -1] != -1 and head_bbox[i, -1] != -1
- for i in range(len(body_bbox))
- ])
- self.ign_num = (gt_tag == 0).sum()
- self.gt_boxes = np.hstack(
- (body_bbox[:, :-1], head_bbox[:, :-1],
- gt_tag.reshape(-1, 1)))
- if not gt_flag:
- self.dt_num = len(record['dtboxes'])
- if self.eval_mode == 0:
- self.dt_boxes = self.load_det_boxes(record, 'dtboxes',
- body_key, 'score')
- elif self.eval_mode == 1:
- self.dt_boxes = self.load_det_boxes(record, 'dtboxes',
- head_key, 'score')
- else:
- body_dtboxes = self.load_det_boxes(record, 'dtboxes', body_key,
- 'score')
- head_dtboxes = self.load_det_boxes(record, 'dtboxes', head_key,
- 'score')
- self.dt_boxes = np.hstack((body_dtboxes, head_dtboxes))
- @staticmethod
- def load_gt_boxes(dict_input, key_name, class_names):
- """load ground_truth and transform [x, y, w, h] to [x1, y1, x2, y2]"""
- assert key_name in dict_input
- if len(dict_input[key_name]) < 1:
- return np.empty([0, 5])
- head_bbox = []
- body_bbox = []
- for rb in dict_input[key_name]:
- if rb['tag'] in class_names:
- body_tag = class_names.index(rb['tag'])
- head_tag = copy.deepcopy(body_tag)
- else:
- body_tag = -1
- head_tag = -1
- if 'extra' in rb:
- if 'ignore' in rb['extra']:
- if rb['extra']['ignore'] != 0:
- body_tag = -1
- head_tag = -1
- if 'head_attr' in rb:
- if 'ignore' in rb['head_attr']:
- if rb['head_attr']['ignore'] != 0:
- head_tag = -1
- head_bbox.append(np.hstack((rb['hbox'], head_tag)))
- body_bbox.append(np.hstack((rb['fbox'], body_tag)))
- head_bbox = np.array(head_bbox)
- head_bbox[:, 2:4] += head_bbox[:, :2]
- body_bbox = np.array(body_bbox)
- body_bbox[:, 2:4] += body_bbox[:, :2]
- return body_bbox, head_bbox
- @staticmethod
- def load_det_boxes(dict_input, key_name, key_box, key_score, key_tag=None):
- """load detection boxes."""
- assert key_name in dict_input
- if len(dict_input[key_name]) < 1:
- return np.empty([0, 5])
- else:
- assert key_box in dict_input[key_name][0]
- if key_score:
- assert key_score in dict_input[key_name][0]
- if key_tag:
- assert key_tag in dict_input[key_name][0]
- if key_score:
- if key_tag:
- bboxes = np.vstack([
- np.hstack((rb[key_box], rb[key_score], rb[key_tag]))
- for rb in dict_input[key_name]
- ])
- else:
- bboxes = np.vstack([
- np.hstack((rb[key_box], rb[key_score]))
- for rb in dict_input[key_name]
- ])
- else:
- if key_tag:
- bboxes = np.vstack([
- np.hstack((rb[key_box], rb[key_tag]))
- for rb in dict_input[key_name]
- ])
- else:
- bboxes = np.vstack(
- [rb[key_box] for rb in dict_input[key_name]])
- bboxes[:, 2:4] += bboxes[:, :2]
- return bboxes
- def clip_all_boader(self):
- """Make sure boxes are within the image range."""
- def _clip_boundary(boxes, height, width):
- assert boxes.shape[-1] >= 4
- boxes[:, 0] = np.minimum(np.maximum(boxes[:, 0], 0), width - 1)
- boxes[:, 1] = np.minimum(np.maximum(boxes[:, 1], 0), height - 1)
- boxes[:, 2] = np.maximum(np.minimum(boxes[:, 2], width), 0)
- boxes[:, 3] = np.maximum(np.minimum(boxes[:, 3], height), 0)
- return boxes
- assert self.dt_boxes.shape[-1] >= 4
- assert self.gt_boxes.shape[-1] >= 4
- assert self.width is not None and self.height is not None
- if self.eval_mode == 2:
- self.dt_boxes[:, :4] = _clip_boundary(self.dt_boxes[:, :4],
- self.height, self.width)
- self.gt_boxes[:, :4] = _clip_boundary(self.gt_boxes[:, :4],
- self.height, self.width)
- self.dt_boxes[:, 4:8] = _clip_boundary(self.dt_boxes[:, 4:8],
- self.height, self.width)
- self.gt_boxes[:, 4:8] = _clip_boundary(self.gt_boxes[:, 4:8],
- self.height, self.width)
- else:
- self.dt_boxes = _clip_boundary(self.dt_boxes, self.height,
- self.width)
- self.gt_boxes = _clip_boundary(self.gt_boxes, self.height,
- self.width)
- def compare_voc(self, thres):
- """Match the detection results with the ground_truth by VOC.
- Args:
- thres (float): IOU threshold.
- Returns:
- score_list(list[tuple[ndarray, int, str]]): Matching result.
- a list of tuples (dtbox, label, imgID) in the descending
- sort of dtbox.score.
- """
- if self.dt_boxes is None:
- return list()
- dtboxes = self.dt_boxes
- gtboxes = self.gt_boxes if self.gt_boxes is not None else list()
- dtboxes.sort(key=lambda x: x.score, reverse=True)
- gtboxes.sort(key=lambda x: x.ign)
- score_list = list()
- for i, dt in enumerate(dtboxes):
- maxpos = -1
- maxiou = thres
- for j, gt in enumerate(gtboxes):
- overlap = dt.iou(gt)
- if overlap > maxiou:
- maxiou = overlap
- maxpos = j
- if maxpos >= 0:
- if gtboxes[maxpos].ign == 0:
- gtboxes[maxpos].matched = 1
- dtboxes[i].matched = 1
- score_list.append((dt, self.ID))
- else:
- dtboxes[i].matched = -1
- else:
- dtboxes[i].matched = 0
- score_list.append((dt, self.ID))
- return score_list
- def compare_caltech(self, thres):
- """Match the detection results with the ground_truth by Caltech
- matching strategy.
- Args:
- thres (float): IOU threshold.
- Returns:
- score_list(list[tuple[ndarray, int, str]]): Matching result.
- a list of tuples (dtbox, label, imgID) in the descending
- sort of dtbox.score.
- """
- if self.dt_boxes is None or self.gt_boxes is None:
- return list()
- dtboxes = self.dt_boxes if self.dt_boxes is not None else list()
- gtboxes = self.gt_boxes if self.gt_boxes is not None else list()
- dt_matched = np.zeros(dtboxes.shape[0])
- gt_matched = np.zeros(gtboxes.shape[0])
- dtboxes = np.array(sorted(dtboxes, key=lambda x: x[-1], reverse=True))
- gtboxes = np.array(sorted(gtboxes, key=lambda x: x[-1], reverse=True))
- if len(dtboxes):
- overlap_iou = bbox_overlaps(dtboxes, gtboxes, mode='iou')
- overlap_ioa = bbox_overlaps(dtboxes, gtboxes, mode='iof')
- else:
- return list()
- score_list = list()
- for i, dt in enumerate(dtboxes):
- maxpos = -1
- maxiou = thres
- for j, gt in enumerate(gtboxes):
- if gt_matched[j] == 1:
- continue
- if gt[-1] > 0:
- overlap = overlap_iou[i][j]
- if overlap > maxiou:
- maxiou = overlap
- maxpos = j
- else:
- if maxpos >= 0:
- break
- else:
- overlap = overlap_ioa[i][j]
- if overlap > thres:
- maxiou = overlap
- maxpos = j
- if maxpos >= 0:
- if gtboxes[maxpos, -1] > 0:
- gt_matched[maxpos] = 1
- dt_matched[i] = 1
- score_list.append((dt, 1, self.ID))
- else:
- dt_matched[i] = -1
- else:
- dt_matched[i] = 0
- score_list.append((dt, 0, self.ID))
- return score_list
|