| # TODO(jiayq): as more and more tests are moving to hypothesis test, we |
| # can gradually remove this test script. DO NOT ADD MORE TESTS TO THIS |
| # FILE. |
| |
| |
| |
| |
| |
| import numpy as np |
| from caffe2.python import ( |
| brew, |
| core, |
| device_checker, |
| gradient_checker, |
| model_helper, |
| test_util, |
| workspace, |
| ) |
| from caffe2.python.gradient_checker import NetGradientChecker |
| from caffe2.python.net_builder import ops, NetBuilder |
| from caffe2.proto import caffe2_pb2 |
| |
| import unittest |
| from typing import Optional |
| |
| |
| if workspace.has_gpu_support and workspace.NumGpuDevices() > 0: |
| _gpu_dev_option = caffe2_pb2.DeviceOption() |
| _gpu_dev_option.device_type = workspace.GpuDeviceType |
| cpu_device_option = caffe2_pb2.DeviceOption() |
| gpu_device_checker = device_checker.DeviceChecker( |
| 0.01, [_gpu_dev_option] |
| ) |
| device_checker = device_checker.DeviceChecker( |
| 0.01, [_gpu_dev_option, cpu_device_option] |
| ) |
| gpu_gradient_checkers = [ |
| gradient_checker.GradientChecker( |
| 0.005, 0.05, _gpu_dev_option, "gpu_checker_ws" |
| ), |
| ] |
| gradient_checkers = [ |
| gradient_checker.GradientChecker( |
| 0.005, 0.05, _gpu_dev_option, "gpu_checker_ws" |
| ), |
| gradient_checker.GradientChecker( |
| 0.01, 0.05, cpu_device_option, "cpu_checker_ws" |
| ), |
| ] |
| gpu_device_option: Optional[caffe2_pb2.DeviceOption] = _gpu_dev_option |
| else: |
| cpu_device_option = caffe2_pb2.DeviceOption() |
| gpu_device_option = None |
| gpu_device_checker = device_checker.DeviceChecker( |
| 0.01, [] |
| ) |
| device_checker = device_checker.DeviceChecker(0.01, [cpu_device_option]) |
| |
| gradient_checkers = [ |
| gradient_checker.GradientChecker( |
| 0.01, 0.05, cpu_device_option, "cpu_checker_ws" |
| ) |
| ] |
| gpu_gradient_checkers = [] |
| |
| |
| class TestLRN(test_util.TestCase): |
| |
| def setUp(self): |
| self.test_configs = [(6, 10), (3, 13), ] |
| |
| def testLRN(self): |
| for input_size, depth in self.test_configs: |
| op = core.CreateOperator("LRN", |
| ["X"], |
| ["Y", "Y_scale"], |
| size=11, |
| alpha=0.001, |
| beta=0.5, |
| bias=2.0, |
| order="NHWC" |
| ) |
| X = np.random.rand(2, input_size, input_size, |
| depth).astype(np.float32) |
| res = device_checker.CheckSimple(op, [X], [0]) |
| self.assertTrue(res) |
| for checker in gradient_checkers: |
| res, grad, grad_estimated = checker.CheckSimple(op, [X], 0, [0]) |
| self.assertTrue(res) |
| |
| |
| class TestFlatten(test_util.TestCase): |
| |
| def testFlatten(self): |
| op = core.CreateOperator("Flatten", ["X"], ["Y"]) |
| X = np.random.rand(2, 3, 4, 5).astype(np.float32) |
| res = device_checker.CheckSimple(op, [X], [0]) |
| self.assertTrue(res) |
| for checker in gradient_checkers: |
| res, grad, grad_estimated = checker.CheckSimple(op, [X], 0, [0]) |
| self.assertTrue(res) |
| |
| |
| class TestConcat(test_util.TestCase): |
| |
| def setUp(self): |
| self.test_configs = [ |
| # input_size, depth1, depth2, depth3, depth4 |
| (3, 2, 3, 4, 5), |
| (4, 5, 4, 3, 2), |
| ] |
| |
| def testConcatNHWC(self): |
| for input_size, d1, d2, d3, d4 in self.test_configs: |
| op = core.CreateOperator("Concat", |
| ["X1", "X2", "X3", "X4"], |
| ["Y", "Y_dims"], |
| order="NHWC" |
| ) |
| Xs = [ |
| np.random.rand(2, input_size, input_size, |
| d1).astype(np.float32), |
| np.random.rand(2, input_size, input_size, |
| d2).astype(np.float32), |
| np.random.rand(2, input_size, input_size, |
| d3).astype(np.float32), |
| np.random.rand(2, input_size, input_size, d4).astype(np.float32) |
| ] |
| for i in range(4): |
| res = device_checker.CheckSimple(op, Xs, [0]) |
| self.assertTrue(res) |
| for checker in gradient_checkers: |
| res, grad, grad_estimated = checker.CheckSimple(op, Xs, i, |
| [0]) |
| self.assertTrue(res) |
| |
| def testConcatNCHW(self): |
| for input_size, d1, d2, d3, d4 in self.test_configs: |
| op = core.CreateOperator("Concat", |
| ["X1", "X2", "X3", "X4"], |
| ["Y", "Y_dims"], |
| order="NCHW" |
| ) |
| Xs = [ |
| np.random.rand(2, d1, input_size, |
| input_size).astype(np.float32), |
| np.random.rand(2, d2, input_size, |
| input_size).astype(np.float32), |
| np.random.rand(2, d3, input_size, |
| input_size).astype(np.float32), |
| np.random.rand(2, d4, input_size, input_size).astype(np.float32) |
| ] |
| for i in range(4): |
| res = device_checker.CheckSimple(op, Xs, [0]) |
| self.assertTrue(res) |
| for checker in gradient_checkers: |
| res, grad, grad_estimated = checker.CheckSimple(op, Xs, i, |
| [0]) |
| self.assertTrue(res) |
| |
| |
| class TestRelu(test_util.TestCase): |
| |
| def setUp(self): |
| self.test_configs = [ |
| # input size |
| # (0, 1), |
| (1, 1), |
| (2, 1), |
| (1, 3, 3, 1), |
| (2, 3, 3, 1), |
| (1, 5, 5, 3), |
| (2, 5, 5, 3), |
| ] |
| |
| def testRelu(self): |
| for input_size in self.test_configs: |
| op = core.CreateOperator("Relu", ["X"], ["Y"]) |
| X = np.random.rand(*input_size).astype(np.float32) |
| # go away from the origin point to avoid kink problems |
| X += 0.01 * np.sign(X) |
| X[X == 0] = 0.01 |
| res = device_checker.CheckSimple(op, [X], [0]) |
| self.assertTrue(res) |
| for checker in gradient_checkers: |
| res, grad, grad_estimated = checker.CheckSimple(op, [X], 0, [0]) |
| self.assertTrue(res) |
| |
| |
| class TestTanh(test_util.TestCase): |
| |
| def setUp(self): |
| self.test_configs = [ |
| # (0, 1), |
| (1, 1), |
| (2, 1), |
| (1, 2, 3, 4), |
| ] |
| |
| def testTanh(self): |
| for input_size in self.test_configs: |
| op = core.CreateOperator("Tanh", ["X"], ["Y"]) |
| X = np.random.rand(*input_size).astype(np.float32) - 0.5 |
| res = device_checker.CheckSimple(op, [X], [0]) |
| self.assertTrue(res) |
| for checker in gradient_checkers: |
| res, grad, grad_estimated = checker.CheckSimple(op, [X], 0, [0]) |
| self.assertTrue(res) |
| |
| |
| class TestAbs(test_util.TestCase): |
| |
| def setUp(self): |
| self.test_configs = [ |
| (1, 1), |
| (2, 3), |
| (2, 3, 4), |
| (2, 3, 4, 5), |
| ] |
| |
| def testAbs(self): |
| for input_size in self.test_configs: |
| op = core.CreateOperator("Abs", ["X"], ["Y"]) |
| X = np.random.rand(*input_size).astype(np.float32) |
| # go away from the origin point to avoid kink problems |
| X += 0.01 * np.sign(X) |
| X[X == 0] = 0.01 |
| res = device_checker.CheckSimple(op, [X], [0]) |
| self.assertTrue(res) |
| for checker in gradient_checkers: |
| res, grad, grad_estimated = checker.CheckSimple(op, [X], 0, [0]) |
| self.assertTrue(res) |
| |
| class TestExp(test_util.TestCase): |
| |
| def setUp(self): |
| self.test_configs = [ |
| # (0, 1), |
| (1, 1), |
| (2, 1), |
| (1, 2, 3, 4), |
| ] |
| |
| def testExp(self): |
| for input_size in self.test_configs: |
| op = core.CreateOperator("Exp", ["X"], ["Y"]) |
| X = np.random.rand(*input_size).astype(np.float32) - 0.5 |
| res = device_checker.CheckSimple(op, [X], [0]) |
| self.assertTrue(res) |
| for checker in gradient_checkers: |
| res, grad, grad_estimated = checker.CheckSimple(op, [X], 0, [0]) |
| self.assertTrue(res) |
| |
| class TestCos(test_util.TestCase): |
| |
| def setUp(self): |
| self.test_configs = [ |
| (1, 1), |
| (2, 3), |
| (2, 3, 4), |
| (2, 3, 4, 5), |
| ] |
| |
| def testCos(self): |
| for input_size in self.test_configs: |
| op = core.CreateOperator("Cos", ["X"], ["Y"]) |
| X = np.random.rand(*input_size).astype(np.float32) - 0.5 |
| res = device_checker.CheckSimple(op, [X], [0]) |
| self.assertTrue(res) |
| for checker in gradient_checkers: |
| res, grad, grad_estimated = checker.CheckSimple(op, [X], 0, [0]) |
| self.assertTrue(res) |
| |
| class TestSin(test_util.TestCase): |
| |
| def setUp(self): |
| self.test_configs = [ |
| (1, 1), |
| (2, 3), |
| (2, 3, 4), |
| (2, 3, 4, 5), |
| ] |
| |
| def testSin(self): |
| for input_size in self.test_configs: |
| op = core.CreateOperator("Sin", ["X"], ["Y"]) |
| X = np.random.rand(*input_size).astype(np.float32) - 0.5 |
| res = device_checker.CheckSimple(op, [X], [0]) |
| self.assertTrue(res) |
| for checker in gradient_checkers: |
| res, grad, grad_estimated = checker.CheckSimple(op, [X], 0, [0]) |
| self.assertTrue(res) |
| |
| class TestSigmoid(test_util.TestCase): |
| |
| def setUp(self): |
| self.test_configs = [ |
| # (0, 1), |
| (1, 1), |
| (2, 1), |
| (1, 2, 3, 4), |
| ] |
| |
| def testSigmoid(self): |
| for input_size in self.test_configs: |
| op = core.CreateOperator("Sigmoid", ["X"], ["Y"]) |
| X = np.random.rand(*input_size).astype(np.float32) - 0.5 |
| res = device_checker.CheckSimple(op, [X], [0]) |
| self.assertTrue(res) |
| for checker in gradient_checkers: |
| res, grad, grad_estimated = checker.CheckSimple(op, [X], 0, [0]) |
| self.assertTrue(res) |
| |
| |
| class TestSum(test_util.TestCase): |
| |
| def setUp(self): |
| self.test_configs = [ |
| # ((0, 1), False), |
| ((1, 2, 3, 4), True), |
| ((1, 2, 3, 4), False)] |
| |
| def testSum(self): |
| for (input_size, in_place) in self.test_configs: |
| op = core.CreateOperator("Sum", ["X1", "X2"], |
| ["Y" if not in_place else "X1"]) |
| X1 = np.random.rand(*input_size).astype(np.float32) - 0.5 |
| X2 = np.random.rand(*input_size).astype(np.float32) - 0.5 |
| res = device_checker.CheckSimple(op, [X1, X2], [0]) |
| self.assertTrue(res) |
| for checker in gradient_checkers: |
| res, grad, grad_estimated = checker.CheckSimple( |
| op, [X1, X2], 0, [0]) |
| self.assertTrue(res) |
| res, grad, grad_estimated = checker.CheckSimple( |
| op, [X1, X2], 1, [0]) |
| self.assertTrue(res) |
| |
| |
| class TestMakeTwoClass(test_util.TestCase): |
| |
| def setUp(self): |
| self.test_configs = [ |
| # input size |
| # (0, 1), |
| (1,), |
| (7,), |
| (1, 3), |
| (2, 5), |
| ] |
| |
| def testMakeTwoClass(self): |
| for input_size in self.test_configs: |
| op = core.CreateOperator("MakeTwoClass", ["X"], ["Y"]) |
| X = np.random.rand(*input_size).astype(np.float32) |
| # step a little to avoid gradient problems |
| X[X < 0.01] += 0.01 |
| X[X > 0.99] -= 0.01 |
| res = device_checker.CheckSimple(op, [X], [0]) |
| self.assertTrue(res) |
| for checker in gradient_checkers: |
| res, grad, grad_estimated = checker.CheckSimple(op, [X], 0, [0]) |
| self.assertTrue(res) |
| |
| |
| class TestNetGradientChecker(test_util.TestCase): |
| def test_net_gradient_checker(self): |
| model = model_helper.ModelHelper(name="test") |
| const = model.net.AddExternalInputs("const1", "const2") |
| fc = brew.fc(model, dim_in=3, dim_out=4, blob_in="X", blob_out="Y", axis=0) |
| dist = [model.net.SquaredL2Distance([fc, c]) for c in const] |
| losses = [model.net.AveragedLoss(d) for d in dist] # using two losses here |
| |
| workspace.RunNetOnce(model.param_init_net) |
| NetGradientChecker.Check( |
| model.net, |
| outputs_with_grad=losses, |
| input_values={"X": np.array([1, 2, 3], dtype="float32"), |
| const[0]: np.array([1, 1, 1, 1], dtype="float32"), |
| const[1]: np.array([2, 2, 2, 2], dtype="float32")}, |
| input_to_check="X", |
| ) |
| |
| def test_net_comparison(self): |
| # (a + b) * (c + d) == a * c + a * d + b * c + b * d |
| net1 = core.Net("net1") |
| a, b, c, d = net1.AddExternalInputs("a", "b", "c", "d") |
| a_b = net1.Sum([a, b], "a+b") |
| c_d = net1.Sum([c, d], "c+d") |
| x = net1.Mul([a_b, c_d], "x") |
| |
| net2 = core.Net("net2") |
| ac = net2.Mul([a, c], "ac") |
| ad = net2.Mul([a, d], "ad") |
| bc = net2.Mul([b, c], "bc") |
| bd = net2.Mul([b, d], "bd") |
| y = net2.Sum([ac, ad, bc, bd], "y") |
| |
| input_values = {blob: np.array([i], dtype=np.float32) |
| for i, blob in enumerate([a, b, c, d])} |
| |
| NetGradientChecker.CompareNets( |
| [net1, net2], [[x], [y]], [0], |
| inputs_with_grads=[a, b, c, d], |
| input_values=input_values, |
| ) |
| |
| |
| class TestIf(test_util.TestCase): |
| def testIf(self): |
| W_a_values = [2.0, 1.5] |
| B_a_values = [0.5] |
| W_b_values = [7.0, 3.5] |
| B_b_values = [1.5] |
| |
| with NetBuilder(_use_control_ops=True) as init_nb: |
| W_a = ops.UniformFill([], "W_a", shape=[1, 2], min=-1., max=1.) |
| B_a = ops.ConstantFill([], "B_a", shape=[1], value=0.0) |
| W_b = ops.UniformFill([], "W_b", shape=[1, 2], min=-1., max=1.) |
| B_b = ops.ConstantFill([], "B_b", shape=[1], value=0.0) |
| |
| W_gt_a = ops.GivenTensorFill( |
| [], "W_gt_a", shape=[1, 2], values=W_a_values) |
| B_gt_a = ops.GivenTensorFill([], "B_gt_a", shape=[1], values=B_a_values) |
| W_gt_b = ops.GivenTensorFill( |
| [], "W_gt_b", shape=[1, 2], values=W_b_values) |
| B_gt_b = ops.GivenTensorFill([], "B_gt_b", shape=[1], values=B_b_values) |
| |
| params = [W_gt_a, B_gt_a, W_a, B_a, W_gt_b, B_gt_b, W_b, B_b] |
| |
| with NetBuilder(_use_control_ops=True, initial_scope=params) as train_nb: |
| Y_pred = ops.ConstantFill([], "Y_pred", shape=[1], value=0.0) |
| Y_noise = ops.ConstantFill([], "Y_noise", shape=[1], value=0.0) |
| |
| switch = ops.UniformFill( |
| [], "switch", shape=[1], min=-1., max=1., run_once=0) |
| zero = ops.ConstantFill([], "zero", shape=[1], value=0.0) |
| X = ops.GaussianFill( |
| [], "X", shape=[4096, 2], mean=0.0, std=1.0, run_once=0) |
| noise = ops.GaussianFill( |
| [], "noise", shape=[4096, 1], mean=0.0, std=1.0, run_once=0) |
| |
| with ops.IfNet(ops.LT([switch, zero])): |
| Y_gt = ops.FC([X, W_gt_a, B_gt_a], "Y_gt") |
| ops.Add([Y_gt, noise], Y_noise) |
| ops.FC([X, W_a, B_a], Y_pred) |
| with ops.Else(): |
| Y_gt = ops.FC([X, W_gt_b, B_gt_b], "Y_gt") |
| ops.Add([Y_gt, noise], Y_noise) |
| ops.FC([X, W_b, B_b], Y_pred) |
| |
| dist = ops.SquaredL2Distance([Y_noise, Y_pred], "dist") |
| loss = dist.AveragedLoss([], ["loss"]) |
| |
| assert len(init_nb.get()) == 1, "Expected a single init net produced" |
| assert len(train_nb.get()) == 1, "Expected a single train net produced" |
| |
| train_net = train_nb.get()[0] |
| gradient_map = train_net.AddGradientOperators([loss]) |
| |
| init_net = init_nb.get()[0] |
| ITER = init_net.ConstantFill( |
| [], "ITER", shape=[1], value=0, dtype=core.DataType.INT64) |
| train_net.Iter(ITER, ITER) |
| LR = train_net.LearningRate(ITER, "LR", base_lr=-0.1, |
| policy="step", stepsize=20, gamma=0.9) |
| ONE = init_net.ConstantFill([], "ONE", shape=[1], value=1.) |
| train_net.WeightedSum([W_a, ONE, gradient_map[W_a], LR], W_a) |
| train_net.WeightedSum([B_a, ONE, gradient_map[B_a], LR], B_a) |
| train_net.WeightedSum([W_b, ONE, gradient_map[W_b], LR], W_b) |
| train_net.WeightedSum([B_b, ONE, gradient_map[B_b], LR], B_b) |
| |
| workspace.RunNetOnce(init_net) |
| workspace.CreateNet(train_net) |
| # print("Before training, W_a is: {}".format(workspace.FetchBlob("W_a"))) |
| # print("Before training, B_a is: {}".format(workspace.FetchBlob("B_a"))) |
| # print("Before training, W_b is: {}".format(workspace.FetchBlob("W_b"))) |
| # print("Before training, B_b is: {}".format(workspace.FetchBlob("B_b"))) |
| |
| for _epoch in range(1000): |
| workspace.RunNet(train_net.Proto().name) |
| |
| # print("After training, W_a is: {}".format(workspace.FetchBlob("W_a"))) |
| # print("After training, B_a is: {}".format(workspace.FetchBlob("B_a"))) |
| # print("After training, W_b is: {}".format(workspace.FetchBlob("W_b"))) |
| # print("After training, B_b is: {}".format(workspace.FetchBlob("B_b"))) |
| # print("Ground truth W_a is: {}".format(workspace.FetchBlob("W_gt_a"))) |
| # print("Ground truth B_a is: {}".format(workspace.FetchBlob("B_gt_a"))) |
| # print("Ground truth W_b is: {}".format(workspace.FetchBlob("W_gt_b"))) |
| # print("Ground truth B_b is: {}".format(workspace.FetchBlob("B_gt_b"))) |
| |
| values_map = { |
| "W_a": W_a_values, |
| "B_a": B_a_values, |
| "W_b": W_b_values, |
| "B_b": B_b_values, |
| } |
| |
| train_eps = 0.01 |
| |
| for blob_name, values in values_map.items(): |
| trained_values = workspace.FetchBlob(blob_name) |
| if trained_values.ndim == 2: |
| self.assertEqual(trained_values.shape[0], 1) |
| trained_values = trained_values[0][:] |
| else: |
| self.assertEqual(trained_values.ndim, 1) |
| |
| self.assertEqual(trained_values.size, len(values)) |
| for idx in range(len(trained_values)): |
| self.assertTrue(abs(trained_values[idx] - values[idx]) < train_eps) |
| |
| |
| class TestWhile(test_util.TestCase): |
| @unittest.skip("Skip flaky test.") |
| def testWhile(self): |
| with NetBuilder(_use_control_ops=True) as nb: |
| ops.Copy(ops.Const(0), "i") |
| ops.Copy(ops.Const(1), "one") |
| ops.Copy(ops.Const(2), "two") |
| ops.Copy(ops.Const(2.0), "x") |
| ops.Copy(ops.Const(3.0), "y") |
| ops.Copy(ops.Const(2.0), "z") |
| # raises x to the power of 4 and y to the power of 2 |
| # and z to the power of 3 |
| with ops.WhileNet(): |
| with ops.Condition(): |
| ops.Add(["i", "one"], "i") |
| ops.LE(["i", "two"]) |
| ops.Pow("x", "x", exponent=2.0) |
| with ops.IfNet(ops.LT(["i", "two"])): |
| ops.Pow("y", "y", exponent=2.0) |
| with ops.Else(): |
| ops.Pow("z", "z", exponent=3.0) |
| |
| ops.Add(["x", "y"], "x_plus_y") |
| ops.Add(["x_plus_y", "z"], "s") |
| |
| assert len(nb.get()) == 1, "Expected a single net produced" |
| net = nb.get()[0] |
| |
| net.AddGradientOperators(["s"]) |
| workspace.RunNetOnce(net) |
| # (x^4)' = 4x^3 |
| self.assertAlmostEqual(workspace.FetchBlob("x_grad"), 32) |
| self.assertAlmostEqual(workspace.FetchBlob("x"), 16) |
| # (y^2)' = 2y |
| self.assertAlmostEqual(workspace.FetchBlob("y_grad"), 6) |
| self.assertAlmostEqual(workspace.FetchBlob("y"), 9) |
| # (z^3)' = 3z^2 |
| self.assertAlmostEqual(workspace.FetchBlob("z_grad"), 12) |
| self.assertAlmostEqual(workspace.FetchBlob("z"), 8) |
| |
| |
| if __name__ == '__main__': |
| workspace.GlobalInit(["python"]) |
| unittest.main() |