| |
| |
| |
| |
| from multiprocessing import Process, Manager |
| |
| import numpy as np |
| import unittest |
| import tempfile |
| import shutil |
| import logging |
| |
| from hypothesis import given, settings |
| import hypothesis.strategies as st |
| |
| from caffe2.python import workspace |
| |
| log = logging.getLogger("parallelize_bmuf_distributed_test") |
| log.setLevel(logging.INFO) |
| |
| |
| def bmuf_process(filestore_dir, process_id, shared_results, |
| cpu_device=False, nesterov=False): |
| # We need to import caffe2 in every process to initialize CUDA independently. |
| from caffe2.python import core, cnn, data_parallel_model, dyndep |
| from caffe2.proto import caffe2_pb2 |
| dyndep.InitOpsLibrary("@/caffe2/caffe2/distributed:file_store_handler_ops") |
| |
| if not cpu_device: |
| if not workspace.has_gpu_support: |
| log.info('No GPU support test is Ignored.') |
| return |
| if workspace.NumGpuDevices() < 4: |
| log.info('Not enough GPU support, test IGNORED') |
| return |
| |
| model = cnn.CNNModelHelper( |
| order="NHWC", |
| name="test" |
| ) |
| if not cpu_device: |
| device_type = workspace.GpuDeviceType |
| device_prefix = "gpu" |
| else: |
| device_type = caffe2_pb2.CPU |
| device_prefix = "cpu" |
| |
| devices = [0, 1] if process_id == 0 else [2, 3] |
| |
| def _model_build_fun(model, loss_scale): |
| fc = model.FC( |
| "data", "fc", 16, 1, ("ConstantFill", {}), ("ConstantFill", {}) |
| ) |
| fc_fl = model.FlattenToVec(fc, "fc_fl") |
| sigm = model.Sigmoid(fc_fl, "sigm") |
| sq = model.SquaredL2Distance([sigm, "label"], "sq") |
| loss = model.AveragedLoss(sq, "loss") |
| loss = model.Scale(loss, scale=loss_scale) |
| |
| # For testing explicit sync |
| model.param_init_net.UniformFill([], ["sync_num"], shape=[1]) |
| return [loss] |
| |
| def _input_builder_fun(model): |
| return None |
| |
| def _param_update_fun(model): |
| ITER = model.Iter("ITER") |
| LR = model.net.LearningRate( |
| [ITER], |
| "LR", |
| base_lr=(-0.1), |
| policy="fixed", |
| ) |
| ONE = model.param_init_net.ConstantFill( |
| [], "ONE", shape=[1], value=1.0, |
| ) |
| for param in model.GetParams(): |
| grad = model.param_to_grad[param] |
| model.WeightedSum([param, ONE, grad, LR], param) |
| |
| def _generate_data(devices, process_id, device_type, device_prefix): |
| np.random.seed(26 + process_id * 10) |
| # Each run has same input, independent of number of gpus |
| batch_size = 64 |
| for _ in range(0, 10): |
| full_data = np.random.rand(batch_size, 16) |
| full_labels = np.round(full_data[:, 0]) |
| batch_per_device = batch_size // len(devices) |
| |
| for (j, g) in enumerate(devices): |
| st = j * batch_per_device |
| en = st + batch_per_device |
| data = full_data[st:en, :].astype(np.float32) |
| labels = full_labels[st:en].astype(np.float32) |
| with core.DeviceScope(core.DeviceOption(device_type, g)): |
| workspace.FeedBlob("{}_{}/data".format(device_prefix, g), data) |
| workspace.FeedBlob("{}_{}/label".format(device_prefix, g), labels) |
| |
| _generate_data(devices, process_id, device_type, device_prefix) |
| |
| workspace.RunOperatorOnce( |
| core.CreateOperator( |
| "FileStoreHandlerCreate", [], ["store_handler"], |
| path=filestore_dir |
| ) |
| ) |
| rendezvous = dict( |
| kv_handler="store_handler", |
| shard_id=process_id, |
| num_shards=2, |
| engine="GLOO", |
| exit_nets=None |
| ) |
| |
| data_parallel_model.Parallelize_BMUF( |
| model, |
| _input_builder_fun, |
| _model_build_fun, |
| _param_update_fun, |
| devices=devices, |
| rendezvous=rendezvous, |
| nesterov=nesterov, |
| add_blobs_to_sync=["sync_num"], |
| cpu_device=cpu_device |
| ) |
| |
| data_parallel_model.RunInitNet(model) |
| |
| def _device_pid(device, pid): |
| if pid == 1: |
| return device + 2 |
| return device |
| |
| np.testing.assert_equal( |
| workspace.FetchBlob("{}_{}/fc_w_v".format( |
| device_prefix, _device_pid(0, process_id))), |
| np.zeros(16).astype(np.float32).reshape(1, 16) |
| ) |
| |
| # Run the algorithm for one iteration to have non-zero params. |
| data_parallel_model.RunNet(model, 1) |
| |
| # Save iteration momentum and post local update params |
| results = {} |
| v_b_ = workspace.FetchBlob( |
| "{}_{}/fc_b_v".format(device_prefix, _device_pid(0, process_id))) |
| v_w_ = workspace.FetchBlob( |
| "{}_{}/fc_w_v".format(device_prefix, _device_pid(0, process_id))) |
| |
| results['v_b_'] = v_b_ |
| results['v_w_'] = v_w_ |
| |
| workspace.RunNetOnce(model.net) |
| |
| b_0_ = workspace.FetchBlob( |
| "{}_{}/fc_b".format(device_prefix, _device_pid(0, process_id))) |
| w_0_ = workspace.FetchBlob( |
| "{}_{}/fc_w".format(device_prefix, _device_pid(0, process_id))) |
| b_1_ = workspace.FetchBlob( |
| "{}_{}/fc_b".format(device_prefix, _device_pid(1, process_id))) |
| w_1_ = workspace.FetchBlob( |
| "{}_{}/fc_w".format(device_prefix, _device_pid(1, process_id))) |
| |
| results['b_0_'] = b_0_ |
| results['w_0_'] = w_0_ |
| results['b_1_'] = b_1_ |
| results['w_1_'] = w_1_ |
| |
| # Test sync |
| if process_id == 0: |
| workspace.FeedBlob( |
| device_prefix + "_0/sync_num", |
| np.array([2603]).astype(np.float32), |
| device_option=core.DeviceOption(device_type, 0)) |
| |
| # Compute block gradients. |
| b_g_ = workspace.FetchBlob( |
| "{}_{}/fc_b_g".format(device_prefix, _device_pid(0, process_id))) |
| w_g_ = workspace.FetchBlob( |
| "{}_{}/fc_w_g".format(device_prefix, _device_pid(0, process_id))) |
| results['b_g_'] = b_g_ |
| results['w_g_'] = w_g_ |
| workspace.RunNetOnce(model._global_model_param_updates_net) |
| |
| # g_b = (b_0_ + b_1_) / 2 - b_g_ |
| # g_w = (w_0_ + w_1_) / 2 - w_g_ |
| v_b = workspace.FetchBlob( |
| "{}_{}/fc_b_v".format(device_prefix, _device_pid(0, process_id))) |
| v_w = workspace.FetchBlob( |
| "{}_{}/fc_w_v".format(device_prefix, _device_pid(0, process_id))) |
| w_g = workspace.FetchBlob( |
| "{}_{}/fc_w_g".format(device_prefix, _device_pid(0, process_id))) |
| b_g = workspace.FetchBlob( |
| "{}_{}/fc_b_g".format(device_prefix, _device_pid(0, process_id))) |
| w_0 = workspace.FetchBlob( |
| "{}_{}/fc_w".format(device_prefix, _device_pid(0, process_id))) |
| b_0 = workspace.FetchBlob( |
| "{}_{}/fc_b".format(device_prefix, _device_pid(0, process_id))) |
| w_1 = workspace.FetchBlob( |
| "{}_{}/fc_w".format(device_prefix, _device_pid(1, process_id))) |
| b_1 = workspace.FetchBlob( |
| "{}_{}/fc_b".format(device_prefix, _device_pid(1, process_id))) |
| results['v_b'] = v_b |
| results['v_w'] = v_w |
| results['w_g'] = w_g |
| results['b_g'] = b_g |
| results['w_0'] = w_0 |
| results['b_0'] = b_0 |
| results['w_1'] = w_1 |
| results['b_1'] = b_1 |
| |
| # Test add_blobs_to_sync |
| for j in devices: |
| sync = workspace.FetchBlob( |
| device_prefix + "_{}/sync_num".format(j))[0] |
| results['sync_{}'.format(j)] = sync |
| |
| shared_results[process_id] = results |
| |
| |
| class DistributedTest(unittest.TestCase): |
| |
| @given( |
| cpu_device=st.booleans(), |
| nesterov=st.booleans() |
| ) |
| @settings(deadline=10000) |
| def test_bmuf_distributed(self, cpu_device, nesterov): |
| if (not cpu_device) and workspace.has_hip_support: |
| log.info('Skipping the test on ROCm due to regression in ROCm3.5') |
| return |
| self._test_bmuf_distributed(cpu_device=cpu_device, nesterov=nesterov) |
| |
| def _test_bmuf_distributed(self, cpu_device=False, nesterov=False): |
| processes = [] |
| filestore_dir = tempfile.mkdtemp() |
| results = Manager().dict() |
| for idx in range(0, 2): |
| process = Process( |
| target=bmuf_process, |
| args=(filestore_dir, idx, results, cpu_device, nesterov) |
| ) |
| processes.append(process) |
| process.start() |
| |
| while len(processes) > 0: |
| process = processes.pop() |
| process.join() |
| shutil.rmtree(filestore_dir) |
| |
| if len(results) == 0: |
| return |
| |
| w_0 = results[0]['w_0'] |
| w_1 = results[0]['w_1'] |
| b_0 = results[0]['b_0'] |
| b_1 = results[0]['b_1'] |
| # Check parameters are in sync. |
| np.testing.assert_equal(w_0, w_1) |
| np.testing.assert_equal(w_0, results[1]['w_0']) |
| np.testing.assert_equal(w_0, results[1]['w_1']) |
| np.testing.assert_equal(b_0, b_1) |
| np.testing.assert_equal(b_0, results[1]['b_0']) |
| np.testing.assert_equal(b_0, results[1]['b_1']) |
| |
| w_g_ = results[0]['w_g_'] |
| b_g_ = results[0]['b_g_'] |
| |
| g_b = (results[0]['b_0_'] + results[1]['b_0_'] + results[0]['b_1_'] + |
| results[1]['b_1_']) / 4 - b_g_ |
| g_w = (results[0]['w_0_'] + results[1]['w_0_'] + results[0]['w_1_'] + |
| results[1]['w_1_']) / 4 - w_g_ |
| v_b_ = results[0]['v_b_'] |
| v_b = results[0]['v_b'] |
| v_w_ = results[0]['v_w_'] |
| v_w = results[0]['v_w'] |
| |
| for pid in results.keys(): |
| for k in results[pid].keys(): |
| if k.startswith("sync_num"): |
| self.assertEqual(2603, results[pid][k]) |
| |
| # Check block gradients are correct. |
| np.testing.assert_almost_equal(v_b, 0.75 * v_b_ + g_b) |
| np.testing.assert_almost_equal(v_w, 0.75 * v_w_ + g_w) |
| |
| # Check params update step |
| if nesterov: |
| np.testing.assert_equal(w_0, w_g_ + v_w - 0.75 * (v_w - v_w_)) |
| np.testing.assert_equal(b_0, b_g_ + v_b - 0.75 * (v_b - v_b_)) |
| else: |
| np.testing.assert_equal(w_0, w_g_ + v_w) |
| np.testing.assert_equal(b_0, b_g_ + v_b) |