123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612 |
- # Copyright (c) OpenMMLab. All rights reserved.
- import datetime
- import itertools
- import os.path as osp
- import tempfile
- from typing import Dict, Optional, Sequence, Tuple, Union
- import mmcv
- import numpy as np
- from mmengine.evaluator import BaseMetric
- from mmengine.fileio import dump, get_local_path, load
- from mmengine.logging import MMLogger, print_log
- from terminaltables import AsciiTable
- from mmdet.datasets.api_wrappers import COCOPanoptic
- from mmdet.registry import METRICS
- from ..functional import (INSTANCE_OFFSET, pq_compute_multi_core,
- pq_compute_single_core)
- try:
- import panopticapi
- from panopticapi.evaluation import VOID, PQStat
- from panopticapi.utils import id2rgb, rgb2id
- except ImportError:
- panopticapi = None
- id2rgb = None
- rgb2id = None
- VOID = None
- PQStat = None
- @METRICS.register_module()
- class CocoPanopticMetric(BaseMetric):
- """COCO panoptic segmentation evaluation metric.
- Evaluate PQ, SQ RQ for panoptic segmentation tasks. Please refer to
- https://cocodataset.org/#panoptic-eval for more details.
- Args:
- ann_file (str, optional): Path to the coco format annotation file.
- If not specified, ground truth annotations from the dataset will
- be converted to coco format. Defaults to None.
- seg_prefix (str, optional): Path to the directory which contains the
- coco panoptic segmentation mask. It should be specified when
- evaluate. Defaults to None.
- classwise (bool): Whether to evaluate the metric class-wise.
- Defaults to False.
- outfile_prefix (str, optional): The prefix of json files. It includes
- the file path and the prefix of filename, e.g., "a/b/prefix".
- If not specified, a temp file will be created.
- It should be specified when format_only is True. Defaults to None.
- format_only (bool): Format the output results without perform
- evaluation. It is useful when you want to format the result
- to a specific format and submit it to the test server.
- Defaults to False.
- nproc (int): Number of processes for panoptic quality computing.
- Defaults to 32. When ``nproc`` exceeds the number of cpu cores,
- the number of cpu cores is used.
- file_client_args (dict, optional): Arguments to instantiate the
- corresponding backend in mmdet <= 3.0.0rc6. Defaults to None.
- backend_args (dict, optional): Arguments to instantiate the
- corresponding backend. Defaults to None.
- collect_device (str): Device name used for collecting results from
- different ranks during distributed training. Must be 'cpu' or
- 'gpu'. Defaults to 'cpu'.
- prefix (str, optional): The prefix that will be added in the metric
- names to disambiguate homonymous metrics of different evaluators.
- If prefix is not provided in the argument, self.default_prefix
- will be used instead. Defaults to None.
- """
- default_prefix: Optional[str] = 'coco_panoptic'
- def __init__(self,
- ann_file: Optional[str] = None,
- seg_prefix: Optional[str] = None,
- classwise: bool = False,
- format_only: bool = False,
- outfile_prefix: Optional[str] = None,
- nproc: int = 32,
- file_client_args: dict = None,
- backend_args: dict = None,
- collect_device: str = 'cpu',
- prefix: Optional[str] = None) -> None:
- if panopticapi is None:
- raise RuntimeError(
- 'panopticapi is not installed, please install it by: '
- 'pip install git+https://github.com/cocodataset/'
- 'panopticapi.git.')
- super().__init__(collect_device=collect_device, prefix=prefix)
- self.classwise = classwise
- self.format_only = format_only
- if self.format_only:
- assert outfile_prefix is not None, 'outfile_prefix must be not'
- 'None when format_only is True, otherwise the result files will'
- 'be saved to a temp directory which will be cleaned up at the end.'
- self.tmp_dir = None
- # outfile_prefix should be a prefix of a path which points to a shared
- # storage when train or test with multi nodes.
- self.outfile_prefix = outfile_prefix
- if outfile_prefix is None:
- self.tmp_dir = tempfile.TemporaryDirectory()
- self.outfile_prefix = osp.join(self.tmp_dir.name, 'results')
- # the directory to save predicted panoptic segmentation mask
- self.seg_out_dir = f'{self.outfile_prefix}.panoptic'
- self.nproc = nproc
- self.seg_prefix = seg_prefix
- self.cat_ids = None
- self.cat2label = None
- self.backend_args = backend_args
- if file_client_args is not None:
- raise RuntimeError(
- 'The `file_client_args` is deprecated, '
- 'please use `backend_args` instead, please refer to'
- 'https://github.com/open-mmlab/mmdetection/blob/main/configs/_base_/datasets/coco_detection.py' # noqa: E501
- )
- if ann_file:
- with get_local_path(
- ann_file, backend_args=self.backend_args) as local_path:
- self._coco_api = COCOPanoptic(local_path)
- self.categories = self._coco_api.cats
- else:
- self._coco_api = None
- self.categories = None
- def __del__(self) -> None:
- """Clean up."""
- if self.tmp_dir is not None:
- self.tmp_dir.cleanup()
- def gt_to_coco_json(self, gt_dicts: Sequence[dict],
- outfile_prefix: str) -> Tuple[str, str]:
- """Convert ground truth to coco panoptic segmentation format json file.
- Args:
- gt_dicts (Sequence[dict]): Ground truth of the dataset.
- outfile_prefix (str): The filename prefix of the json file. If the
- prefix is "somepath/xxx", the json file will be named
- "somepath/xxx.gt.json".
- Returns:
- Tuple[str, str]: The filename of the json file and the name of the\
- directory which contains panoptic segmentation masks.
- """
- assert len(gt_dicts) > 0, 'gt_dicts is empty.'
- gt_folder = osp.dirname(gt_dicts[0]['seg_map_path'])
- converted_json_path = f'{outfile_prefix}.gt.json'
- categories = []
- for id, name in enumerate(self.dataset_meta['classes']):
- isthing = 1 if name in self.dataset_meta['thing_classes'] else 0
- categories.append({'id': id, 'name': name, 'isthing': isthing})
- image_infos = []
- annotations = []
- for gt_dict in gt_dicts:
- img_id = gt_dict['image_id']
- image_info = {
- 'id': img_id,
- 'width': gt_dict['width'],
- 'height': gt_dict['height'],
- 'file_name': osp.split(gt_dict['seg_map_path'])[-1]
- }
- image_infos.append(image_info)
- pan_png = mmcv.imread(gt_dict['seg_map_path']).squeeze()
- pan_png = pan_png[:, :, ::-1]
- pan_png = rgb2id(pan_png)
- segments_info = []
- for segment_info in gt_dict['segments_info']:
- id = segment_info['id']
- label = segment_info['category']
- mask = pan_png == id
- isthing = categories[label]['isthing']
- if isthing:
- iscrowd = 1 if not segment_info['is_thing'] else 0
- else:
- iscrowd = 0
- new_segment_info = {
- 'id': id,
- 'category_id': label,
- 'isthing': isthing,
- 'iscrowd': iscrowd,
- 'area': mask.sum()
- }
- segments_info.append(new_segment_info)
- segm_file = image_info['file_name'].replace('jpg', 'png')
- annotation = dict(
- image_id=img_id,
- segments_info=segments_info,
- file_name=segm_file)
- annotations.append(annotation)
- pan_png = id2rgb(pan_png)
- info = dict(
- date_created=str(datetime.datetime.now()),
- description='Coco json file converted by mmdet CocoPanopticMetric.'
- )
- coco_json = dict(
- info=info,
- images=image_infos,
- categories=categories,
- licenses=None,
- )
- if len(annotations) > 0:
- coco_json['annotations'] = annotations
- dump(coco_json, converted_json_path)
- return converted_json_path, gt_folder
- def result2json(self, results: Sequence[dict],
- outfile_prefix: str) -> Tuple[str, str]:
- """Dump the panoptic results to a COCO style json file and a directory.
- Args:
- results (Sequence[dict]): Testing results of the dataset.
- outfile_prefix (str): The filename prefix of the json files and the
- directory.
- Returns:
- Tuple[str, str]: The json file and the directory which contains \
- panoptic segmentation masks. The filename of the json is
- "somepath/xxx.panoptic.json" and name of the directory is
- "somepath/xxx.panoptic".
- """
- label2cat = dict((v, k) for (k, v) in self.cat2label.items())
- pred_annotations = []
- for idx in range(len(results)):
- result = results[idx]
- for segment_info in result['segments_info']:
- sem_label = segment_info['category_id']
- # convert sem_label to json label
- cat_id = label2cat[sem_label]
- segment_info['category_id'] = label2cat[sem_label]
- is_thing = self.categories[cat_id]['isthing']
- segment_info['isthing'] = is_thing
- pred_annotations.append(result)
- pan_json_results = dict(annotations=pred_annotations)
- json_filename = f'{outfile_prefix}.panoptic.json'
- dump(pan_json_results, json_filename)
- return json_filename, (
- self.seg_out_dir
- if self.tmp_dir is None else tempfile.gettempdir())
- def _parse_predictions(self,
- pred: dict,
- img_id: int,
- segm_file: str,
- label2cat=None) -> dict:
- """Parse panoptic segmentation predictions.
- Args:
- pred (dict): Panoptic segmentation predictions.
- img_id (int): Image id.
- segm_file (str): Segmentation file name.
- label2cat (dict): Mapping from label to category id.
- Defaults to None.
- Returns:
- dict: Parsed predictions.
- """
- result = dict()
- result['img_id'] = img_id
- # shape (1, H, W) -> (H, W)
- pan = pred['pred_panoptic_seg']['sem_seg'].cpu().numpy()[0]
- pan_labels = np.unique(pan)
- segments_info = []
- for pan_label in pan_labels:
- sem_label = pan_label % INSTANCE_OFFSET
- # We reserve the length of dataset_meta['classes'] for VOID label
- if sem_label == len(self.dataset_meta['classes']):
- continue
- mask = pan == pan_label
- area = mask.sum()
- segments_info.append({
- 'id':
- int(pan_label),
- # when ann_file provided, sem_label should be cat_id, otherwise
- # sem_label should be a continuous id, not the cat_id
- # defined in dataset
- 'category_id':
- label2cat[sem_label] if label2cat else sem_label,
- 'area':
- int(area)
- })
- # evaluation script uses 0 for VOID label.
- pan[pan % INSTANCE_OFFSET == len(self.dataset_meta['classes'])] = VOID
- pan = id2rgb(pan).astype(np.uint8)
- mmcv.imwrite(pan[:, :, ::-1], osp.join(self.seg_out_dir, segm_file))
- result = {
- 'image_id': img_id,
- 'segments_info': segments_info,
- 'file_name': segm_file
- }
- return result
- def _compute_batch_pq_stats(self, data_samples: Sequence[dict]):
- """Process gts and predictions when ``outfile_prefix`` is not set, gts
- are from dataset or a json file which is defined by ``ann_file``.
- Intermediate results, ``pq_stats``, are computed here and put into
- ``self.results``.
- """
- if self._coco_api is None:
- categories = dict()
- for id, name in enumerate(self.dataset_meta['classes']):
- isthing = 1 if name in self.dataset_meta['thing_classes']\
- else 0
- categories[id] = {'id': id, 'name': name, 'isthing': isthing}
- label2cat = None
- else:
- categories = self.categories
- cat_ids = self._coco_api.get_cat_ids(
- cat_names=self.dataset_meta['classes'])
- label2cat = {i: cat_id for i, cat_id in enumerate(cat_ids)}
- for data_sample in data_samples:
- # parse pred
- img_id = data_sample['img_id']
- segm_file = osp.basename(data_sample['img_path']).replace(
- 'jpg', 'png')
- result = self._parse_predictions(
- pred=data_sample,
- img_id=img_id,
- segm_file=segm_file,
- label2cat=label2cat)
- # parse gt
- gt = dict()
- gt['image_id'] = img_id
- gt['width'] = data_sample['ori_shape'][1]
- gt['height'] = data_sample['ori_shape'][0]
- gt['file_name'] = segm_file
- if self._coco_api is None:
- # get segments_info from data_sample
- seg_map_path = osp.join(self.seg_prefix, segm_file)
- pan_png = mmcv.imread(seg_map_path).squeeze()
- pan_png = pan_png[:, :, ::-1]
- pan_png = rgb2id(pan_png)
- segments_info = []
- for segment_info in data_sample['segments_info']:
- id = segment_info['id']
- label = segment_info['category']
- mask = pan_png == id
- isthing = categories[label]['isthing']
- if isthing:
- iscrowd = 1 if not segment_info['is_thing'] else 0
- else:
- iscrowd = 0
- new_segment_info = {
- 'id': id,
- 'category_id': label,
- 'isthing': isthing,
- 'iscrowd': iscrowd,
- 'area': mask.sum()
- }
- segments_info.append(new_segment_info)
- else:
- # get segments_info from annotation file
- segments_info = self._coco_api.imgToAnns[img_id]
- gt['segments_info'] = segments_info
- pq_stats = pq_compute_single_core(
- proc_id=0,
- annotation_set=[(gt, result)],
- gt_folder=self.seg_prefix,
- pred_folder=self.seg_out_dir,
- categories=categories,
- backend_args=self.backend_args)
- self.results.append(pq_stats)
- def _process_gt_and_predictions(self, data_samples: Sequence[dict]):
- """Process gts and predictions when ``outfile_prefix`` is set.
- The predictions will be saved to directory specified by
- ``outfile_predfix``. The matched pair (gt, result) will be put into
- ``self.results``.
- """
- for data_sample in data_samples:
- # parse pred
- img_id = data_sample['img_id']
- segm_file = osp.basename(data_sample['img_path']).replace(
- 'jpg', 'png')
- result = self._parse_predictions(
- pred=data_sample, img_id=img_id, segm_file=segm_file)
- # parse gt
- gt = dict()
- gt['image_id'] = img_id
- gt['width'] = data_sample['ori_shape'][1]
- gt['height'] = data_sample['ori_shape'][0]
- if self._coco_api is None:
- # get segments_info from dataset
- gt['segments_info'] = data_sample['segments_info']
- gt['seg_map_path'] = data_sample['seg_map_path']
- self.results.append((gt, result))
- # TODO: data_batch is no longer needed, consider adjusting the
- # parameter position
- def process(self, data_batch: dict, data_samples: Sequence[dict]) -> None:
- """Process one batch of data samples and predictions. The processed
- results should be stored in ``self.results``, which will be used to
- compute the metrics when all batches have been processed.
- Args:
- data_batch (dict): A batch of data from the dataloader.
- data_samples (Sequence[dict]): A batch of data samples that
- contain annotations and predictions.
- """
- # If ``self.tmp_dir`` is none, it will save gt and predictions to
- # self.results, otherwise, it will compute pq_stats here.
- if self.tmp_dir is None:
- self._process_gt_and_predictions(data_samples)
- else:
- self._compute_batch_pq_stats(data_samples)
- def compute_metrics(self, results: list) -> Dict[str, float]:
- """Compute the metrics from processed results.
- Args:
- results (list): The processed results of each batch. There
- are two cases:
- - When ``outfile_prefix`` is not provided, the elements in
- results are pq_stats which can be summed directly to get PQ.
- - When ``outfile_prefix`` is provided, the elements in
- results are tuples like (gt, pred).
- Returns:
- Dict[str, float]: The computed metrics. The keys are the names of
- the metrics, and the values are corresponding results.
- """
- logger: MMLogger = MMLogger.get_current_instance()
- if self.tmp_dir is None:
- # do evaluation after collect all the results
- # split gt and prediction list
- gts, preds = zip(*results)
- if self._coco_api is None:
- # use converted gt json file to initialize coco api
- logger.info('Converting ground truth to coco format...')
- coco_json_path, gt_folder = self.gt_to_coco_json(
- gt_dicts=gts, outfile_prefix=self.outfile_prefix)
- self._coco_api = COCOPanoptic(coco_json_path)
- else:
- gt_folder = self.seg_prefix
- self.cat_ids = self._coco_api.get_cat_ids(
- cat_names=self.dataset_meta['classes'])
- self.cat2label = {
- cat_id: i
- for i, cat_id in enumerate(self.cat_ids)
- }
- self.img_ids = self._coco_api.get_img_ids()
- self.categories = self._coco_api.cats
- # convert predictions to coco format and dump to json file
- json_filename, pred_folder = self.result2json(
- results=preds, outfile_prefix=self.outfile_prefix)
- if self.format_only:
- logger.info('results are saved in '
- f'{osp.dirname(self.outfile_prefix)}')
- return dict()
- imgs = self._coco_api.imgs
- gt_json = self._coco_api.img_ann_map
- gt_json = [{
- 'image_id': k,
- 'segments_info': v,
- 'file_name': imgs[k]['segm_file']
- } for k, v in gt_json.items()]
- pred_json = load(json_filename)
- pred_json = dict(
- (el['image_id'], el) for el in pred_json['annotations'])
- # match the gt_anns and pred_anns in the same image
- matched_annotations_list = []
- for gt_ann in gt_json:
- img_id = gt_ann['image_id']
- if img_id not in pred_json.keys():
- raise Exception('no prediction for the image'
- ' with id: {}'.format(img_id))
- matched_annotations_list.append((gt_ann, pred_json[img_id]))
- pq_stat = pq_compute_multi_core(
- matched_annotations_list,
- gt_folder,
- pred_folder,
- self.categories,
- backend_args=self.backend_args,
- nproc=self.nproc)
- else:
- # aggregate the results generated in process
- if self._coco_api is None:
- categories = dict()
- for id, name in enumerate(self.dataset_meta['classes']):
- isthing = 1 if name in self.dataset_meta[
- 'thing_classes'] else 0
- categories[id] = {
- 'id': id,
- 'name': name,
- 'isthing': isthing
- }
- self.categories = categories
- pq_stat = PQStat()
- for result in results:
- pq_stat += result
- metrics = [('All', None), ('Things', True), ('Stuff', False)]
- pq_results = {}
- for name, isthing in metrics:
- pq_results[name], classwise_results = pq_stat.pq_average(
- self.categories, isthing=isthing)
- if name == 'All':
- pq_results['classwise'] = classwise_results
- classwise_results = None
- if self.classwise:
- classwise_results = {
- k: v
- for k, v in zip(self.dataset_meta['classes'],
- pq_results['classwise'].values())
- }
- print_panoptic_table(pq_results, classwise_results, logger=logger)
- results = parse_pq_results(pq_results)
- return results
- def parse_pq_results(pq_results: dict) -> dict:
- """Parse the Panoptic Quality results.
- Args:
- pq_results (dict): Panoptic Quality results.
- Returns:
- dict: Panoptic Quality results parsed.
- """
- result = dict()
- result['PQ'] = 100 * pq_results['All']['pq']
- result['SQ'] = 100 * pq_results['All']['sq']
- result['RQ'] = 100 * pq_results['All']['rq']
- result['PQ_th'] = 100 * pq_results['Things']['pq']
- result['SQ_th'] = 100 * pq_results['Things']['sq']
- result['RQ_th'] = 100 * pq_results['Things']['rq']
- result['PQ_st'] = 100 * pq_results['Stuff']['pq']
- result['SQ_st'] = 100 * pq_results['Stuff']['sq']
- result['RQ_st'] = 100 * pq_results['Stuff']['rq']
- return result
- def print_panoptic_table(
- pq_results: dict,
- classwise_results: Optional[dict] = None,
- logger: Optional[Union['MMLogger', str]] = None) -> None:
- """Print the panoptic evaluation results table.
- Args:
- pq_results(dict): The Panoptic Quality results.
- classwise_results(dict, optional): The classwise Panoptic Quality.
- results. The keys are class names and the values are metrics.
- Defaults to None.
- logger (:obj:`MMLogger` | str, optional): Logger used for printing
- related information during evaluation. Default: None.
- """
- headers = ['', 'PQ', 'SQ', 'RQ', 'categories']
- data = [headers]
- for name in ['All', 'Things', 'Stuff']:
- numbers = [
- f'{(pq_results[name][k] * 100):0.3f}' for k in ['pq', 'sq', 'rq']
- ]
- row = [name] + numbers + [pq_results[name]['n']]
- data.append(row)
- table = AsciiTable(data)
- print_log('Panoptic Evaluation Results:\n' + table.table, logger=logger)
- if classwise_results is not None:
- class_metrics = [(name, ) + tuple(f'{(metrics[k] * 100):0.3f}'
- for k in ['pq', 'sq', 'rq'])
- for name, metrics in classwise_results.items()]
- num_columns = min(8, len(class_metrics) * 4)
- results_flatten = list(itertools.chain(*class_metrics))
- headers = ['category', 'PQ', 'SQ', 'RQ'] * (num_columns // 4)
- results_2d = itertools.zip_longest(
- *[results_flatten[i::num_columns] for i in range(num_columns)])
- data = [headers]
- data += [result for result in results_2d]
- table = AsciiTable(data)
- print_log(
- 'Classwise Panoptic Evaluation Results:\n' + table.table,
- logger=logger)
|