| # Owner(s): ["module: unknown"] |
| |
| import io |
| import os |
| import shutil |
| import sys |
| import tempfile |
| import unittest |
| from pathlib import Path |
| |
| import expecttest |
| import numpy as np |
| |
| |
| TEST_TENSORBOARD = True |
| try: |
| import tensorboard.summary.writer.event_file_writer # noqa: F401 |
| from tensorboard.compat.proto.summary_pb2 import Summary |
| except ImportError: |
| TEST_TENSORBOARD = False |
| |
| HAS_TORCHVISION = True |
| try: |
| import torchvision |
| except ImportError: |
| HAS_TORCHVISION = False |
| skipIfNoTorchVision = unittest.skipIf(not HAS_TORCHVISION, "no torchvision") |
| |
| TEST_MATPLOTLIB = True |
| try: |
| import matplotlib |
| if os.environ.get('DISPLAY', '') == '': |
| matplotlib.use('Agg') |
| import matplotlib.pyplot as plt |
| except ImportError: |
| TEST_MATPLOTLIB = False |
| skipIfNoMatplotlib = unittest.skipIf(not TEST_MATPLOTLIB, "no matplotlib") |
| |
| import torch |
| from torch.testing._internal.common_utils import ( |
| instantiate_parametrized_tests, |
| IS_MACOS, |
| IS_WINDOWS, |
| parametrize, |
| run_tests, |
| TEST_WITH_CROSSREF, |
| TestCase, |
| skipIfTorchDynamo, |
| ) |
| |
| |
| def tensor_N(shape, dtype=float): |
| numel = np.prod(shape) |
| x = (np.arange(numel, dtype=dtype)).reshape(shape) |
| return x |
| |
| class BaseTestCase(TestCase): |
| """ Base class used for all TensorBoard tests """ |
| def setUp(self): |
| super().setUp() |
| if not TEST_TENSORBOARD: |
| return self.skipTest("Skip the test since TensorBoard is not installed") |
| if TEST_WITH_CROSSREF: |
| return self.skipTest("Don't run TensorBoard tests with crossref") |
| self.temp_dirs = [] |
| |
| def createSummaryWriter(self): |
| # Just to get the name of the directory in a writable place. tearDown() |
| # is responsible for clean-ups. |
| temp_dir = tempfile.TemporaryDirectory(prefix="test_tensorboard").name |
| self.temp_dirs.append(temp_dir) |
| return SummaryWriter(temp_dir) |
| |
| def tearDown(self): |
| super().tearDown() |
| # Remove directories created by SummaryWriter |
| for temp_dir in self.temp_dirs: |
| if os.path.exists(temp_dir): |
| shutil.rmtree(temp_dir) |
| |
| |
| if TEST_TENSORBOARD: |
| from google.protobuf import text_format |
| from PIL import Image |
| from tensorboard.compat.proto.graph_pb2 import GraphDef |
| from tensorboard.compat.proto.types_pb2 import DataType |
| |
| from torch.utils.tensorboard import summary, SummaryWriter |
| from torch.utils.tensorboard._convert_np import make_np |
| from torch.utils.tensorboard._pytorch_graph import graph |
| from torch.utils.tensorboard._utils import _prepare_video, convert_to_HWC |
| from torch.utils.tensorboard.summary import int_to_half, tensor_proto |
| |
| class TestTensorBoardPyTorchNumpy(BaseTestCase): |
| def test_pytorch_np(self): |
| tensors = [torch.rand(3, 10, 10), torch.rand(1), torch.rand(1, 2, 3, 4, 5)] |
| for tensor in tensors: |
| # regular tensor |
| self.assertIsInstance(make_np(tensor), np.ndarray) |
| |
| # CUDA tensor |
| if torch.cuda.is_available(): |
| self.assertIsInstance(make_np(tensor.cuda()), np.ndarray) |
| |
| # regular variable |
| self.assertIsInstance(make_np(torch.autograd.Variable(tensor)), np.ndarray) |
| |
| # CUDA variable |
| if torch.cuda.is_available(): |
| self.assertIsInstance(make_np(torch.autograd.Variable(tensor).cuda()), np.ndarray) |
| |
| # python primitive type |
| self.assertIsInstance(make_np(0), np.ndarray) |
| self.assertIsInstance(make_np(0.1), np.ndarray) |
| |
| def test_pytorch_autograd_np(self): |
| x = torch.autograd.Variable(torch.empty(1)) |
| self.assertIsInstance(make_np(x), np.ndarray) |
| |
| def test_pytorch_write(self): |
| with self.createSummaryWriter() as w: |
| w.add_scalar('scalar', torch.autograd.Variable(torch.rand(1)), 0) |
| |
| def test_pytorch_histogram(self): |
| with self.createSummaryWriter() as w: |
| w.add_histogram('float histogram', torch.rand((50,))) |
| w.add_histogram('int histogram', torch.randint(0, 100, (50,))) |
| w.add_histogram('bfloat16 histogram', torch.rand(50, dtype=torch.bfloat16)) |
| |
| def test_pytorch_histogram_raw(self): |
| with self.createSummaryWriter() as w: |
| num = 50 |
| floats = make_np(torch.rand((num,))) |
| bins = [0.0, 0.25, 0.5, 0.75, 1.0] |
| counts, limits = np.histogram(floats, bins) |
| sum_sq = floats.dot(floats).item() |
| w.add_histogram_raw('float histogram raw', |
| min=floats.min().item(), |
| max=floats.max().item(), |
| num=num, |
| sum=floats.sum().item(), |
| sum_squares=sum_sq, |
| bucket_limits=limits[1:].tolist(), |
| bucket_counts=counts.tolist()) |
| |
| ints = make_np(torch.randint(0, 100, (num,))) |
| bins = [0, 25, 50, 75, 100] |
| counts, limits = np.histogram(ints, bins) |
| sum_sq = ints.dot(ints).item() |
| w.add_histogram_raw('int histogram raw', |
| min=ints.min().item(), |
| max=ints.max().item(), |
| num=num, |
| sum=ints.sum().item(), |
| sum_squares=sum_sq, |
| bucket_limits=limits[1:].tolist(), |
| bucket_counts=counts.tolist()) |
| |
| ints = torch.tensor(range(0, 100)).float() |
| nbins = 100 |
| counts = torch.histc(ints, bins=nbins, min=0, max=99) |
| limits = torch.tensor(range(nbins)) |
| sum_sq = ints.dot(ints).item() |
| w.add_histogram_raw('int histogram raw', |
| min=ints.min().item(), |
| max=ints.max().item(), |
| num=num, |
| sum=ints.sum().item(), |
| sum_squares=sum_sq, |
| bucket_limits=limits.tolist(), |
| bucket_counts=counts.tolist()) |
| |
| class TestTensorBoardUtils(BaseTestCase): |
| def test_to_HWC(self): |
| test_image = np.random.randint(0, 256, size=(3, 32, 32), dtype=np.uint8) |
| converted = convert_to_HWC(test_image, 'chw') |
| self.assertEqual(converted.shape, (32, 32, 3)) |
| test_image = np.random.randint(0, 256, size=(16, 3, 32, 32), dtype=np.uint8) |
| converted = convert_to_HWC(test_image, 'nchw') |
| self.assertEqual(converted.shape, (64, 256, 3)) |
| test_image = np.random.randint(0, 256, size=(32, 32), dtype=np.uint8) |
| converted = convert_to_HWC(test_image, 'hw') |
| self.assertEqual(converted.shape, (32, 32, 3)) |
| |
| def test_convert_to_HWC_dtype_remains_same(self): |
| # test to ensure convert_to_HWC restores the dtype of input np array and |
| # thus the scale_factor calculated for the image is 1 |
| test_image = torch.tensor([[[[1, 2, 3], [4, 5, 6]]]], dtype=torch.uint8) |
| tensor = make_np(test_image) |
| tensor = convert_to_HWC(tensor, 'NCHW') |
| scale_factor = summary._calc_scale_factor(tensor) |
| self.assertEqual(scale_factor, 1, msg='Values are already in [0, 255], scale factor should be 1') |
| |
| |
| def test_prepare_video(self): |
| # At each timeframe, the sum over all other |
| # dimensions of the video should be the same. |
| shapes = [ |
| (16, 30, 3, 28, 28), |
| (36, 30, 3, 28, 28), |
| (19, 29, 3, 23, 19), |
| (3, 3, 3, 3, 3) |
| ] |
| for s in shapes: |
| V_input = np.random.random(s) |
| V_after = _prepare_video(np.copy(V_input)) |
| total_frame = s[1] |
| V_input = np.swapaxes(V_input, 0, 1) |
| for f in range(total_frame): |
| x = np.reshape(V_input[f], newshape=(-1)) |
| y = np.reshape(V_after[f], newshape=(-1)) |
| np.testing.assert_array_almost_equal(np.sum(x), np.sum(y)) |
| |
| def test_numpy_vid_uint8(self): |
| V_input = np.random.randint(0, 256, (16, 30, 3, 28, 28)).astype(np.uint8) |
| V_after = _prepare_video(np.copy(V_input)) * 255 |
| total_frame = V_input.shape[1] |
| V_input = np.swapaxes(V_input, 0, 1) |
| for f in range(total_frame): |
| x = np.reshape(V_input[f], newshape=(-1)) |
| y = np.reshape(V_after[f], newshape=(-1)) |
| np.testing.assert_array_almost_equal(np.sum(x), np.sum(y)) |
| |
| freqs = [262, 294, 330, 349, 392, 440, 440, 440, 440, 440, 440] |
| |
| true_positive_counts = [75, 64, 21, 5, 0] |
| false_positive_counts = [150, 105, 18, 0, 0] |
| true_negative_counts = [0, 45, 132, 150, 150] |
| false_negative_counts = [0, 11, 54, 70, 75] |
| precision = [0.3333333, 0.3786982, 0.5384616, 1.0, 0.0] |
| recall = [1.0, 0.8533334, 0.28, 0.0666667, 0.0] |
| |
| class TestTensorBoardWriter(BaseTestCase): |
| def test_writer(self): |
| with self.createSummaryWriter() as writer: |
| sample_rate = 44100 |
| |
| n_iter = 0 |
| writer.add_hparams( |
| {'lr': 0.1, 'bsize': 1}, |
| {'hparam/accuracy': 10, 'hparam/loss': 10} |
| ) |
| writer.add_scalar('data/scalar_systemtime', 0.1, n_iter) |
| writer.add_scalar('data/scalar_customtime', 0.2, n_iter, walltime=n_iter) |
| writer.add_scalar('data/new_style', 0.2, n_iter, new_style=True) |
| writer.add_scalars('data/scalar_group', { |
| "xsinx": n_iter * np.sin(n_iter), |
| "xcosx": n_iter * np.cos(n_iter), |
| "arctanx": np.arctan(n_iter) |
| }, n_iter) |
| x = np.zeros((32, 3, 64, 64)) # output from network |
| writer.add_images('Image', x, n_iter) # Tensor |
| writer.add_image_with_boxes('imagebox', |
| np.zeros((3, 64, 64)), |
| np.array([[10, 10, 40, 40], [40, 40, 60, 60]]), |
| n_iter) |
| x = np.zeros(sample_rate * 2) |
| |
| writer.add_audio('myAudio', x, n_iter) |
| writer.add_video('myVideo', np.random.rand(16, 48, 1, 28, 28).astype(np.float32), n_iter) |
| writer.add_text('Text', 'text logged at step:' + str(n_iter), n_iter) |
| writer.add_text('markdown Text', '''a|b\n-|-\nc|d''', n_iter) |
| writer.add_histogram('hist', np.random.rand(100, 100), n_iter) |
| writer.add_pr_curve('xoxo', np.random.randint(2, size=100), np.random.rand( |
| 100), n_iter) # needs tensorboard 0.4RC or later |
| writer.add_pr_curve_raw('prcurve with raw data', true_positive_counts, |
| false_positive_counts, |
| true_negative_counts, |
| false_negative_counts, |
| precision, |
| recall, n_iter) |
| |
| v = np.array([[[1, 1, 1], [-1, -1, 1], [1, -1, -1], [-1, 1, -1]]], dtype=float) |
| c = np.array([[[255, 0, 0], [0, 255, 0], [0, 0, 255], [255, 0, 255]]], dtype=int) |
| f = np.array([[[0, 2, 3], [0, 3, 1], [0, 1, 2], [1, 3, 2]]], dtype=int) |
| writer.add_mesh('my_mesh', vertices=v, colors=c, faces=f) |
| |
| class TestTensorBoardSummaryWriter(BaseTestCase): |
| def test_summary_writer_ctx(self): |
| # after using a SummaryWriter as a ctx it should be closed |
| with self.createSummaryWriter() as writer: |
| writer.add_scalar('test', 1) |
| self.assertIs(writer.file_writer, None) |
| |
| def test_summary_writer_close(self): |
| # Opening and closing SummaryWriter a lot should not run into |
| # OSError: [Errno 24] Too many open files |
| passed = True |
| try: |
| writer = self.createSummaryWriter() |
| writer.close() |
| except OSError: |
| passed = False |
| |
| self.assertTrue(passed) |
| |
| def test_pathlib(self): |
| with tempfile.TemporaryDirectory(prefix="test_tensorboard_pathlib") as d: |
| p = Path(d) |
| with SummaryWriter(p) as writer: |
| writer.add_scalar('test', 1) |
| |
| class TestTensorBoardEmbedding(BaseTestCase): |
| def test_embedding(self): |
| w = self.createSummaryWriter() |
| all_features = torch.tensor([[1., 2., 3.], [5., 4., 1.], [3., 7., 7.]]) |
| all_labels = torch.tensor([33., 44., 55.]) |
| all_images = torch.zeros(3, 3, 5, 5) |
| |
| w.add_embedding(all_features, |
| metadata=all_labels, |
| label_img=all_images, |
| global_step=2) |
| |
| dataset_label = ['test'] * 2 + ['train'] * 2 |
| all_labels = list(zip(all_labels, dataset_label)) |
| w.add_embedding(all_features, |
| metadata=all_labels, |
| label_img=all_images, |
| metadata_header=['digit', 'dataset'], |
| global_step=2) |
| # assert... |
| |
| def test_embedding_64(self): |
| w = self.createSummaryWriter() |
| all_features = torch.tensor([[1., 2., 3.], [5., 4., 1.], [3., 7., 7.]]) |
| all_labels = torch.tensor([33., 44., 55.]) |
| all_images = torch.zeros((3, 3, 5, 5), dtype=torch.float64) |
| |
| w.add_embedding(all_features, |
| metadata=all_labels, |
| label_img=all_images, |
| global_step=2) |
| |
| dataset_label = ['test'] * 2 + ['train'] * 2 |
| all_labels = list(zip(all_labels, dataset_label)) |
| w.add_embedding(all_features, |
| metadata=all_labels, |
| label_img=all_images, |
| metadata_header=['digit', 'dataset'], |
| global_step=2) |
| |
| class TestTensorBoardSummary(BaseTestCase): |
| def test_uint8_image(self): |
| ''' |
| Tests that uint8 image (pixel values in [0, 255]) is not changed |
| ''' |
| test_image = np.random.randint(0, 256, size=(3, 32, 32), dtype=np.uint8) |
| scale_factor = summary._calc_scale_factor(test_image) |
| self.assertEqual(scale_factor, 1, msg='Values are already in [0, 255], scale factor should be 1') |
| |
| def test_float32_image(self): |
| ''' |
| Tests that float32 image (pixel values in [0, 1]) are scaled correctly |
| to [0, 255] |
| ''' |
| test_image = np.random.rand(3, 32, 32).astype(np.float32) |
| scale_factor = summary._calc_scale_factor(test_image) |
| self.assertEqual(scale_factor, 255, msg='Values are in [0, 1], scale factor should be 255') |
| |
| def test_list_input(self): |
| with self.assertRaises(Exception) as e_info: |
| summary.histogram('dummy', [1, 3, 4, 5, 6], 'tensorflow') |
| |
| def test_empty_input(self): |
| with self.assertRaises(Exception) as e_info: |
| summary.histogram('dummy', np.ndarray(0), 'tensorflow') |
| |
| def test_image_with_boxes(self): |
| self.assertTrue(compare_image_proto(summary.image_boxes('dummy', |
| tensor_N(shape=(3, 32, 32)), |
| np.array([[10, 10, 40, 40]])), |
| self)) |
| |
| def test_image_with_one_channel(self): |
| self.assertTrue(compare_image_proto( |
| summary.image('dummy', |
| tensor_N(shape=(1, 8, 8)), |
| dataformats='CHW'), |
| self)) # noqa: E131 |
| |
| def test_image_with_one_channel_batched(self): |
| self.assertTrue(compare_image_proto( |
| summary.image('dummy', |
| tensor_N(shape=(2, 1, 8, 8)), |
| dataformats='NCHW'), |
| self)) # noqa: E131 |
| |
| def test_image_with_3_channel_batched(self): |
| self.assertTrue(compare_image_proto( |
| summary.image('dummy', |
| tensor_N(shape=(2, 3, 8, 8)), |
| dataformats='NCHW'), |
| self)) # noqa: E131 |
| |
| def test_image_without_channel(self): |
| self.assertTrue(compare_image_proto( |
| summary.image('dummy', |
| tensor_N(shape=(8, 8)), |
| dataformats='HW'), |
| self)) # noqa: E131 |
| |
| def test_video(self): |
| try: |
| import moviepy # noqa: F401 |
| except ImportError: |
| return |
| self.assertTrue(compare_proto(summary.video('dummy', tensor_N(shape=(4, 3, 1, 8, 8))), self)) |
| summary.video('dummy', np.random.rand(16, 48, 1, 28, 28)) |
| summary.video('dummy', np.random.rand(20, 7, 1, 8, 8)) |
| |
| @unittest.skipIf(IS_MACOS, "Skipping on mac, see https://github.com/pytorch/pytorch/pull/109349 ") |
| def test_audio(self): |
| self.assertTrue(compare_proto(summary.audio('dummy', tensor_N(shape=(42,))), self)) |
| |
| @unittest.skipIf(IS_MACOS, "Skipping on mac, see https://github.com/pytorch/pytorch/pull/109349 ") |
| def test_text(self): |
| self.assertTrue(compare_proto(summary.text('dummy', 'text 123'), self)) |
| |
| @unittest.skipIf(IS_MACOS, "Skipping on mac, see https://github.com/pytorch/pytorch/pull/109349 ") |
| def test_histogram_auto(self): |
| self.assertTrue(compare_proto(summary.histogram('dummy', tensor_N(shape=(1024,)), bins='auto', max_bins=5), self)) |
| |
| @unittest.skipIf(IS_MACOS, "Skipping on mac, see https://github.com/pytorch/pytorch/pull/109349 ") |
| def test_histogram_fd(self): |
| self.assertTrue(compare_proto(summary.histogram('dummy', tensor_N(shape=(1024,)), bins='fd', max_bins=5), self)) |
| |
| @unittest.skipIf(IS_MACOS, "Skipping on mac, see https://github.com/pytorch/pytorch/pull/109349 ") |
| def test_histogram_doane(self): |
| self.assertTrue(compare_proto(summary.histogram('dummy', tensor_N(shape=(1024,)), bins='doane', max_bins=5), self)) |
| |
| def test_custom_scalars(self): |
| layout = { |
| 'Taiwan': { |
| 'twse': ['Multiline', ['twse/0050', 'twse/2330']] |
| }, |
| 'USA': { |
| 'dow': ['Margin', ['dow/aaa', 'dow/bbb', 'dow/ccc']], |
| 'nasdaq': ['Margin', ['nasdaq/aaa', 'nasdaq/bbb', 'nasdaq/ccc']] |
| } |
| } |
| summary.custom_scalars(layout) # only smoke test. Because protobuf in python2/3 serialize dictionary differently. |
| |
| |
| @unittest.skipIf(IS_MACOS, "Skipping on mac, see https://github.com/pytorch/pytorch/pull/109349 ") |
| def test_mesh(self): |
| v = np.array([[[1, 1, 1], [-1, -1, 1], [1, -1, -1], [-1, 1, -1]]], dtype=float) |
| c = np.array([[[255, 0, 0], [0, 255, 0], [0, 0, 255], [255, 0, 255]]], dtype=int) |
| f = np.array([[[0, 2, 3], [0, 3, 1], [0, 1, 2], [1, 3, 2]]], dtype=int) |
| mesh = summary.mesh('my_mesh', vertices=v, colors=c, faces=f, config_dict=None) |
| self.assertTrue(compare_proto(mesh, self)) |
| |
| @unittest.skipIf(IS_MACOS, "Skipping on mac, see https://github.com/pytorch/pytorch/pull/109349 ") |
| def test_scalar_new_style(self): |
| scalar = summary.scalar('test_scalar', 1.0, new_style=True) |
| self.assertTrue(compare_proto(scalar, self)) |
| with self.assertRaises(AssertionError): |
| summary.scalar('test_scalar2', torch.Tensor([1, 2, 3]), new_style=True) |
| |
| |
| def remove_whitespace(string): |
| return string.replace(' ', '').replace('\t', '').replace('\n', '') |
| |
| def get_expected_file(function_ptr): |
| module_id = function_ptr.__class__.__module__ |
| test_file = sys.modules[module_id].__file__ |
| # Look for the .py file (since __file__ could be pyc). |
| test_file = ".".join(test_file.split('.')[:-1]) + '.py' |
| |
| # Use realpath to follow symlinks appropriately. |
| test_dir = os.path.dirname(os.path.realpath(test_file)) |
| functionName = function_ptr.id().split('.')[-1] |
| return os.path.join(test_dir, |
| "expect", |
| 'TestTensorBoard.' + functionName + ".expect") |
| |
| def read_expected_content(function_ptr): |
| expected_file = get_expected_file(function_ptr) |
| assert os.path.exists(expected_file), expected_file |
| with open(expected_file) as f: |
| return f.read() |
| |
| def compare_image_proto(actual_proto, function_ptr): |
| if expecttest.ACCEPT: |
| expected_file = get_expected_file(function_ptr) |
| with open(expected_file, 'w') as f: |
| f.write(text_format.MessageToString(actual_proto)) |
| return True |
| expected_str = read_expected_content(function_ptr) |
| expected_proto = Summary() |
| text_format.Parse(expected_str, expected_proto) |
| |
| [actual, expected] = [actual_proto.value[0], expected_proto.value[0]] |
| actual_img = Image.open(io.BytesIO(actual.image.encoded_image_string)) |
| expected_img = Image.open(io.BytesIO(expected.image.encoded_image_string)) |
| |
| return ( |
| actual.tag == expected.tag and |
| actual.image.height == expected.image.height and |
| actual.image.width == expected.image.width and |
| actual.image.colorspace == expected.image.colorspace and |
| actual_img == expected_img |
| ) |
| |
| def compare_proto(str_to_compare, function_ptr): |
| if expecttest.ACCEPT: |
| write_proto(str_to_compare, function_ptr) |
| return True |
| expected = read_expected_content(function_ptr) |
| str_to_compare = str(str_to_compare) |
| return remove_whitespace(str_to_compare) == remove_whitespace(expected) |
| |
| def write_proto(str_to_compare, function_ptr): |
| expected_file = get_expected_file(function_ptr) |
| with open(expected_file, 'w') as f: |
| f.write(str(str_to_compare)) |
| |
| class TestTensorBoardPytorchGraph(BaseTestCase): |
| def test_pytorch_graph(self): |
| dummy_input = (torch.zeros(1, 3),) |
| |
| class myLinear(torch.nn.Module): |
| def __init__(self) -> None: |
| super().__init__() |
| self.l = torch.nn.Linear(3, 5) |
| |
| def forward(self, x): |
| return self.l(x) |
| |
| with self.createSummaryWriter() as w: |
| w.add_graph(myLinear(), dummy_input) |
| |
| actual_proto, _ = graph(myLinear(), dummy_input) |
| |
| expected_str = read_expected_content(self) |
| expected_proto = GraphDef() |
| text_format.Parse(expected_str, expected_proto) |
| |
| self.assertEqual(len(expected_proto.node), len(actual_proto.node)) |
| for i in range(len(expected_proto.node)): |
| expected_node = expected_proto.node[i] |
| actual_node = actual_proto.node[i] |
| self.assertEqual(expected_node.name, actual_node.name) |
| self.assertEqual(expected_node.op, actual_node.op) |
| self.assertEqual(expected_node.input, actual_node.input) |
| self.assertEqual(expected_node.device, actual_node.device) |
| self.assertEqual( |
| sorted(expected_node.attr.keys()), sorted(actual_node.attr.keys())) |
| |
| def test_nested_nn_squential(self): |
| |
| dummy_input = torch.randn(2, 3) |
| |
| class InnerNNSquential(torch.nn.Module): |
| def __init__(self, dim1, dim2): |
| super().__init__() |
| self.inner_nn_squential = torch.nn.Sequential( |
| torch.nn.Linear(dim1, dim2), |
| torch.nn.Linear(dim2, dim1), |
| ) |
| |
| def forward(self, x): |
| x = self.inner_nn_squential(x) |
| return x |
| |
| class OuterNNSquential(torch.nn.Module): |
| def __init__(self, dim1=3, dim2=4, depth=2): |
| super().__init__() |
| layers = [] |
| for _ in range(depth): |
| layers.append(InnerNNSquential(dim1, dim2)) |
| self.outer_nn_squential = torch.nn.Sequential(*layers) |
| |
| def forward(self, x): |
| x = self.outer_nn_squential(x) |
| return x |
| |
| with self.createSummaryWriter() as w: |
| w.add_graph(OuterNNSquential(), dummy_input) |
| |
| actual_proto, _ = graph(OuterNNSquential(), dummy_input) |
| |
| expected_str = read_expected_content(self) |
| expected_proto = GraphDef() |
| text_format.Parse(expected_str, expected_proto) |
| |
| self.assertEqual(len(expected_proto.node), len(actual_proto.node)) |
| for i in range(len(expected_proto.node)): |
| expected_node = expected_proto.node[i] |
| actual_node = actual_proto.node[i] |
| self.assertEqual(expected_node.name, actual_node.name) |
| self.assertEqual(expected_node.op, actual_node.op) |
| self.assertEqual(expected_node.input, actual_node.input) |
| self.assertEqual(expected_node.device, actual_node.device) |
| self.assertEqual( |
| sorted(expected_node.attr.keys()), sorted(actual_node.attr.keys())) |
| |
| def test_pytorch_graph_dict_input(self): |
| class Model(torch.nn.Module): |
| def __init__(self) -> None: |
| super().__init__() |
| self.l = torch.nn.Linear(3, 5) |
| |
| def forward(self, x): |
| return self.l(x) |
| |
| class ModelDict(torch.nn.Module): |
| def __init__(self) -> None: |
| super().__init__() |
| self.l = torch.nn.Linear(3, 5) |
| |
| def forward(self, x): |
| return {"out": self.l(x)} |
| |
| |
| dummy_input = torch.zeros(1, 3) |
| |
| with self.createSummaryWriter() as w: |
| w.add_graph(Model(), dummy_input) |
| |
| with self.createSummaryWriter() as w: |
| w.add_graph(Model(), dummy_input, use_strict_trace=True) |
| |
| # expect error: Encountering a dict at the output of the tracer... |
| with self.assertRaises(RuntimeError): |
| with self.createSummaryWriter() as w: |
| w.add_graph(ModelDict(), dummy_input, use_strict_trace=True) |
| |
| with self.createSummaryWriter() as w: |
| w.add_graph(ModelDict(), dummy_input, use_strict_trace=False) |
| |
| |
| def test_mlp_graph(self): |
| dummy_input = (torch.zeros(2, 1, 28, 28),) |
| |
| # This MLP class with the above input is expected |
| # to fail JIT optimizations as seen at |
| # https://github.com/pytorch/pytorch/issues/18903 |
| # |
| # However, it should not raise an error during |
| # the add_graph call and still continue. |
| class myMLP(torch.nn.Module): |
| def __init__(self) -> None: |
| super().__init__() |
| self.input_len = 1 * 28 * 28 |
| self.fc1 = torch.nn.Linear(self.input_len, 1200) |
| self.fc2 = torch.nn.Linear(1200, 1200) |
| self.fc3 = torch.nn.Linear(1200, 10) |
| |
| def forward(self, x, update_batch_stats=True): |
| h = torch.nn.functional.relu( |
| self.fc1(x.view(-1, self.input_len))) |
| h = self.fc2(h) |
| h = torch.nn.functional.relu(h) |
| h = self.fc3(h) |
| return h |
| |
| with self.createSummaryWriter() as w: |
| w.add_graph(myMLP(), dummy_input) |
| |
| def test_wrong_input_size(self): |
| with self.assertRaises(RuntimeError) as e_info: |
| dummy_input = torch.rand(1, 9) |
| model = torch.nn.Linear(3, 5) |
| with self.createSummaryWriter() as w: |
| w.add_graph(model, dummy_input) # error |
| |
| @skipIfNoTorchVision |
| def test_torchvision_smoke(self): |
| model_input_shapes = { |
| 'alexnet': (2, 3, 224, 224), |
| 'resnet34': (2, 3, 224, 224), |
| 'resnet152': (2, 3, 224, 224), |
| 'densenet121': (2, 3, 224, 224), |
| 'vgg16': (2, 3, 224, 224), |
| 'vgg19': (2, 3, 224, 224), |
| 'vgg16_bn': (2, 3, 224, 224), |
| 'vgg19_bn': (2, 3, 224, 224), |
| 'mobilenet_v2': (2, 3, 224, 224), |
| } |
| for model_name, input_shape in model_input_shapes.items(): |
| with self.createSummaryWriter() as w: |
| model = getattr(torchvision.models, model_name)() |
| w.add_graph(model, torch.zeros(input_shape)) |
| |
| class TestTensorBoardFigure(BaseTestCase): |
| @skipIfNoMatplotlib |
| def test_figure(self): |
| writer = self.createSummaryWriter() |
| |
| figure, axes = plt.figure(), plt.gca() |
| circle1 = plt.Circle((0.2, 0.5), 0.2, color='r') |
| circle2 = plt.Circle((0.8, 0.5), 0.2, color='g') |
| axes.add_patch(circle1) |
| axes.add_patch(circle2) |
| plt.axis('scaled') |
| plt.tight_layout() |
| |
| writer.add_figure("add_figure/figure", figure, 0, close=False) |
| self.assertTrue(plt.fignum_exists(figure.number)) |
| |
| writer.add_figure("add_figure/figure", figure, 1) |
| if matplotlib.__version__ != '3.3.0': |
| self.assertFalse(plt.fignum_exists(figure.number)) |
| else: |
| print("Skipping fignum_exists, see https://github.com/matplotlib/matplotlib/issues/18163") |
| |
| writer.close() |
| |
| @skipIfNoMatplotlib |
| def test_figure_list(self): |
| writer = self.createSummaryWriter() |
| |
| figures = [] |
| for i in range(5): |
| figure = plt.figure() |
| plt.plot([i * 1, i * 2, i * 3], label="Plot " + str(i)) |
| plt.xlabel("X") |
| plt.xlabel("Y") |
| plt.legend() |
| plt.tight_layout() |
| figures.append(figure) |
| |
| writer.add_figure("add_figure/figure_list", figures, 0, close=False) |
| self.assertTrue(all(plt.fignum_exists(figure.number) is True for figure in figures)) # noqa: F812 |
| |
| writer.add_figure("add_figure/figure_list", figures, 1) |
| if matplotlib.__version__ != '3.3.0': |
| self.assertTrue(all(plt.fignum_exists(figure.number) is False for figure in figures)) # noqa: F812 |
| else: |
| print("Skipping fignum_exists, see https://github.com/matplotlib/matplotlib/issues/18163") |
| |
| writer.close() |
| |
| class TestTensorBoardNumpy(BaseTestCase): |
| @unittest.skipIf(IS_WINDOWS, "Skipping on windows, see https://github.com/pytorch/pytorch/pull/109349 ") |
| @unittest.skipIf(IS_MACOS, "Skipping on mac, see https://github.com/pytorch/pytorch/pull/109349 ") |
| def test_scalar(self): |
| res = make_np(1.1) |
| self.assertIsInstance(res, np.ndarray) and self.assertEqual(res.shape, (1,)) |
| res = make_np(1 << 64 - 1) # uint64_max |
| self.assertIsInstance(res, np.ndarray) and self.assertEqual(res.shape, (1,)) |
| res = make_np(np.float16(1.00000087)) |
| self.assertIsInstance(res, np.ndarray) and self.assertEqual(res.shape, (1,)) |
| res = make_np(np.float128(1.00008 + 9)) |
| self.assertIsInstance(res, np.ndarray) and self.assertEqual(res.shape, (1,)) |
| res = make_np(np.int64(100000000000)) |
| self.assertIsInstance(res, np.ndarray) and self.assertEqual(res.shape, (1,)) |
| |
| def test_pytorch_np_expect_fail(self): |
| with self.assertRaises(NotImplementedError): |
| res = make_np({'pytorch': 1.0}) |
| |
| |
| |
| class TestTensorProtoSummary(BaseTestCase): |
| @parametrize( |
| "tensor_type,proto_type", |
| [ |
| (torch.float16, DataType.DT_HALF), |
| (torch.bfloat16, DataType.DT_BFLOAT16), |
| ], |
| ) |
| @skipIfTorchDynamo("Unsuitable test for Dynamo, behavior changes with version") |
| def test_half_tensor_proto(self, tensor_type, proto_type): |
| float_values = [1.0, 2.0, 3.0] |
| actual_proto = tensor_proto( |
| "dummy", |
| torch.tensor(float_values, dtype=tensor_type), |
| ).value[0].tensor |
| self.assertSequenceEqual( |
| [int_to_half(x) for x in actual_proto.half_val], |
| float_values, |
| ) |
| self.assertTrue(actual_proto.dtype == proto_type) |
| |
| def test_float_tensor_proto(self): |
| float_values = [1.0, 2.0, 3.0] |
| actual_proto = ( |
| tensor_proto("dummy", torch.tensor(float_values)).value[0].tensor |
| ) |
| self.assertEqual(actual_proto.float_val, float_values) |
| self.assertTrue(actual_proto.dtype == DataType.DT_FLOAT) |
| |
| def test_int_tensor_proto(self): |
| int_values = [1, 2, 3] |
| actual_proto = ( |
| tensor_proto("dummy", torch.tensor(int_values, dtype=torch.int32)) |
| .value[0] |
| .tensor |
| ) |
| self.assertEqual(actual_proto.int_val, int_values) |
| self.assertTrue(actual_proto.dtype == DataType.DT_INT32) |
| |
| def test_scalar_tensor_proto(self): |
| scalar_value = 0.1 |
| actual_proto = ( |
| tensor_proto("dummy", torch.tensor(scalar_value)).value[0].tensor |
| ) |
| self.assertAlmostEqual(actual_proto.float_val[0], scalar_value) |
| |
| def test_complex_tensor_proto(self): |
| real = torch.tensor([1.0, 2.0]) |
| imag = torch.tensor([3.0, 4.0]) |
| actual_proto = ( |
| tensor_proto("dummy", torch.complex(real, imag)).value[0].tensor |
| ) |
| self.assertEqual(actual_proto.scomplex_val, [1.0, 3.0, 2.0, 4.0]) |
| |
| def test_empty_tensor_proto(self): |
| actual_proto = tensor_proto("dummy", torch.empty(0)).value[0].tensor |
| self.assertEqual(actual_proto.float_val, []) |
| |
| instantiate_parametrized_tests(TestTensorProtoSummary) |
| |
| if __name__ == '__main__': |
| run_tests() |