1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798 |
- import numpy as np
- import pytest
- import torch
- from mmdet.utils import AvoidOOM
- from mmdet.utils.memory import cast_tensor_type
- def test_avoidoom():
- tensor = torch.from_numpy(np.random.random((20, 20)))
- if torch.cuda.is_available():
- tensor = tensor.cuda()
- # get default result
- default_result = torch.mm(tensor, tensor.transpose(1, 0))
- # when not occurred OOM error
- AvoidCudaOOM = AvoidOOM()
- result = AvoidCudaOOM.retry_if_cuda_oom(torch.mm)(tensor,
- tensor.transpose(
- 1, 0))
- assert default_result.device == result.device and \
- default_result.dtype == result.dtype and \
- torch.equal(default_result, result)
- # calculate with fp16 and convert back to source type
- AvoidCudaOOM = AvoidOOM(test=True)
- result = AvoidCudaOOM.retry_if_cuda_oom(torch.mm)(tensor,
- tensor.transpose(
- 1, 0))
- assert default_result.device == result.device and \
- default_result.dtype == result.dtype and \
- torch.allclose(default_result, result, 1e-3)
- # calculate on cpu and convert back to source device
- AvoidCudaOOM = AvoidOOM(test=True)
- result = AvoidCudaOOM.retry_if_cuda_oom(torch.mm)(tensor,
- tensor.transpose(
- 1, 0))
- assert result.dtype == default_result.dtype and \
- result.device == default_result.device and \
- torch.allclose(default_result, result)
- # do not calculate on cpu and the outputs will be same as input
- AvoidCudaOOM = AvoidOOM(test=True, to_cpu=False)
- result = AvoidCudaOOM.retry_if_cuda_oom(torch.mm)(tensor,
- tensor.transpose(
- 1, 0))
- assert result.dtype == default_result.dtype and \
- result.device == default_result.device
- else:
- default_result = torch.mm(tensor, tensor.transpose(1, 0))
- AvoidCudaOOM = AvoidOOM()
- result = AvoidCudaOOM.retry_if_cuda_oom(torch.mm)(tensor,
- tensor.transpose(
- 1, 0))
- assert default_result.device == result.device and \
- default_result.dtype == result.dtype and \
- torch.equal(default_result, result)
- def test_cast_tensor_type():
- inputs = torch.rand(10)
- if torch.cuda.is_available():
- inputs = inputs.cuda()
- with pytest.raises(AssertionError):
- cast_tensor_type(inputs, src_type=None, dst_type=None)
- # input is a float
- out = cast_tensor_type(10., dst_type=torch.half)
- assert out == 10. and isinstance(out, float)
- # convert Tensor to fp16 and re-convert to fp32
- fp16_out = cast_tensor_type(inputs, dst_type=torch.half)
- assert fp16_out.dtype == torch.half
- fp32_out = cast_tensor_type(fp16_out, dst_type=torch.float32)
- assert fp32_out.dtype == torch.float32
- # input is a list
- list_input = [inputs, inputs]
- list_outs = cast_tensor_type(list_input, dst_type=torch.half)
- assert len(list_outs) == len(list_input) and \
- isinstance(list_outs, list)
- for out in list_outs:
- assert out.dtype == torch.half
- # input is a dict
- dict_input = {'test1': inputs, 'test2': inputs}
- dict_outs = cast_tensor_type(dict_input, dst_type=torch.half)
- assert len(dict_outs) == len(dict_input) and \
- isinstance(dict_outs, dict)
- # convert the input tensor to CPU and re-convert to GPU
- if torch.cuda.is_available():
- cpu_device = torch.empty(0).device
- gpu_device = inputs.device
- cpu_out = cast_tensor_type(inputs, dst_type=cpu_device)
- assert cpu_out.device == cpu_device
- gpu_out = cast_tensor_type(inputs, dst_type=gpu_device)
- assert gpu_out.device == gpu_device
|