| |
| |
| |
| |
| |
| import unittest |
| import hypothesis.strategies as st |
| from hypothesis import given |
| import numpy as np |
| from caffe2.proto import caffe2_pb2 |
| from caffe2.python import ( |
| brew, |
| core, |
| model_helper, |
| workspace, |
| ) |
| from caffe2.python.transformations import optimizeForMKLDNN |
| import caffe2.python.hypothesis_test_util as hu |
| |
| |
| @unittest.skipIf(not workspace.C.use_mkldnn, "No MKLDNN support.") |
| class PreConvertTest(hu.HypothesisTestCase): |
| @given(input_channels=st.integers(15, 16), |
| batch_size=st.integers(1, 3)) |
| def test_preConvert(self, input_channels, batch_size): |
| def AddModel(model, data): |
| conv1 = brew.conv(model, data, 'conv1', dim_in=input_channels, |
| dim_out=10, kernel=3, stride=1, pad=1, training_mode=1) |
| deconv1 = brew.conv_transpose(model, conv1, 'deconv1', dim_in=10, dim_out=10, |
| kernel=2, stride=2, pad=0, training_mode=1) |
| fc1 = brew.fc(model, deconv1, 'fc1', dim_in=10 * 56 * 56, dim_out=3) |
| softmax = brew.softmax(model, fc1, 'softmax') |
| |
| return softmax |
| |
| def AddTrainingOperators(model, softmax, label): |
| """Adds training operators to the model.""" |
| # Compute cross entropy between softmax scores and labels |
| xent = model.LabelCrossEntropy([softmax, label], 'xent') |
| # Compute the expected loss |
| loss = model.AveragedLoss(xent, "loss") |
| # Use the average loss we just computed to add gradient operators to the model |
| model.AddGradientOperators([loss]) |
| |
| arg_scope = {"order": "NCHW", 'no_bias': False} |
| # Create the model helper for the train model |
| device_opt = core.DeviceOption(caffe2_pb2.IDEEP, 0) |
| with core.DeviceScope(device_opt): |
| train_model = model_helper.ModelHelper(name="test_train", arg_scope=arg_scope) |
| # Add the model definition (fc layers, conv layers, softmax, etc.) |
| softmax = AddModel(train_model, "X") |
| AddTrainingOperators(train_model, softmax, "label") |
| |
| X = np.random.rand( |
| batch_size, input_channels, 28, 28).astype(np.float32) - 0.5 |
| label = np.random.randint(3, size=batch_size).astype(np.int32) |
| blob_dict = {} |
| output_dict = {} |
| output_dict_cosim = {} |
| old_ws_name = workspace.CurrentWorkspace() |
| workspace.FeedBlob('X', X) |
| workspace.FeedBlob('label', label) |
| workspace.RunNetOnce(train_model.param_init_net) |
| for op in train_model.net.Proto().op: |
| if op.type == "Softmax": |
| break |
| for j in range(1, len(op.input)): |
| blob_dict[op.input[j]] = workspace.FetchBlob(op.input[j]) |
| |
| workspace.CreateNet(train_model.net, overwrite=True) |
| optimizeForMKLDNN(train_model.net, training_mode=True) |
| workspace.RunNet(train_model.net) |
| for op in train_model.net.Proto().op: |
| for blob in op.output: |
| output_dict[blob] = workspace.FetchBlob(blob) |
| |
| workspace.SwitchWorkspace("_device_check_", True) |
| workspace.FeedBlob('X', X) |
| workspace.FeedBlob('label', label) |
| for blob in blob_dict.keys(): |
| workspace.FeedBlob(blob, blob_dict[blob]) |
| workspace.CreateNet(train_model.net, overwrite=True) |
| workspace.RunNet(train_model.net) |
| for blob in output_dict.keys(): |
| output_dict_cosim[blob] = workspace.FetchBlob(blob) |
| |
| for blob in output_dict.keys(): |
| if not np.allclose(output_dict[blob], output_dict_cosim[blob], atol=0.001, rtol=0.0001): |
| print("blob {} error".format(blob)) |
| print(np.max(np.abs(output_dict[blob] - output_dict_cosim[blob]))) |
| self.assertTrue(False) |
| |
| workspace.ResetWorkspace() |
| workspace.SwitchWorkspace(old_ws_name) |
| |
| if __name__ == "__main__": |
| unittest.main() |