| |
| |
| |
| |
| |
| import unittest |
| |
| from caffe2.python import core |
| from hypothesis import given |
| import hypothesis.strategies as st |
| import caffe2.python.hypothesis_test_util as hu |
| from caffe2.python import workspace |
| from caffe2.python.functional import Functional |
| import numpy as np |
| |
| |
| @st.composite |
| def _tensor_splits(draw, add_axis=False): |
| """Generates (axis, split_info, tensor_splits) tuples.""" |
| tensor = draw(hu.tensor(min_value=4)) # Each dim has at least 4 elements. |
| axis = draw(st.integers(0, len(tensor.shape) - 1)) |
| if add_axis: |
| # Simple case: get individual slices along one axis, where each of them |
| # is (N-1)-dimensional. The axis will be added back upon concatenation. |
| return ( |
| axis, np.ones(tensor.shape[axis], dtype=np.int32), [ |
| np.array(tensor.take(i, axis=axis)) |
| for i in range(tensor.shape[axis]) |
| ] |
| ) |
| else: |
| # General case: pick some (possibly consecutive, even non-unique) |
| # indices at which we will split the tensor, along the given axis. |
| splits = sorted( |
| draw( |
| st. |
| lists(elements=st.integers(0, tensor.shape[axis]), max_size=4) |
| ) + [0, tensor.shape[axis]] |
| ) |
| return ( |
| axis, np.array(np.diff(splits), dtype=np.int32), [ |
| tensor.take(range(splits[i], splits[i + 1]), axis=axis) |
| for i in range(len(splits) - 1) |
| ], |
| ) |
| |
| |
| class TestFunctional(hu.HypothesisTestCase): |
| @given(X=hu.tensor(), engine=st.sampled_from(["", "CUDNN"]), **hu.gcs) |
| def test_relu(self, X, engine, gc, dc): |
| X += 0.02 * np.sign(X) |
| X[X == 0.0] += 0.02 |
| output = Functional.Relu(X, device_option=gc) |
| Y_l = output[0] |
| Y_d = output["output_0"] |
| |
| with workspace.WorkspaceGuard("tmp_workspace"): |
| op = core.CreateOperator("Relu", ["X"], ["Y"], engine=engine) |
| workspace.FeedBlob("X", X) |
| workspace.RunOperatorOnce(op) |
| Y_ref = workspace.FetchBlob("Y") |
| |
| np.testing.assert_array_equal( |
| Y_l, Y_ref, err_msg='Functional Relu result mismatch' |
| ) |
| |
| np.testing.assert_array_equal( |
| Y_d, Y_ref, err_msg='Functional Relu result mismatch' |
| ) |
| |
| @given(tensor_splits=_tensor_splits(), **hu.gcs) |
| def test_concat(self, tensor_splits, gc, dc): |
| # Input Size: 1 -> inf |
| axis, _, splits = tensor_splits |
| concat_result, split_info = Functional.Concat(*splits, axis=axis, device_option=gc) |
| |
| concat_result_ref = np.concatenate(splits, axis=axis) |
| split_info_ref = np.array([a.shape[axis] for a in splits]) |
| |
| np.testing.assert_array_equal( |
| concat_result, |
| concat_result_ref, |
| err_msg='Functional Concat result mismatch' |
| ) |
| |
| np.testing.assert_array_equal( |
| split_info, |
| split_info_ref, |
| err_msg='Functional Concat split info mismatch' |
| ) |
| |
| @given(tensor_splits=_tensor_splits(), split_as_arg=st.booleans(), **hu.gcs) |
| def test_split(self, tensor_splits, split_as_arg, gc, dc): |
| # Output Size: 1 - inf |
| axis, split_info, splits = tensor_splits |
| |
| split_as_arg = True |
| |
| if split_as_arg: |
| input_tensors = [np.concatenate(splits, axis=axis)] |
| kwargs = dict(axis=axis, split=split_info, num_output=len(splits)) |
| else: |
| input_tensors = [np.concatenate(splits, axis=axis), split_info] |
| kwargs = dict(axis=axis, num_output=len(splits)) |
| result = Functional.Split(*input_tensors, device_option=gc, **kwargs) |
| |
| def split_ref(input, split=split_info): |
| s = np.cumsum([0] + list(split)) |
| return [ |
| np.array(input.take(np.arange(s[i], s[i + 1]), axis=axis)) |
| for i in range(len(split)) |
| ] |
| |
| result_ref = split_ref(*input_tensors) |
| for i, ref in enumerate(result_ref): |
| np.testing.assert_array_equal( |
| result[i], ref, err_msg='Functional Relu result mismatch' |
| ) |
| |
| |
| if __name__ == '__main__': |
| unittest.main() |