update_engine: Pass Action ownership to ActionProcessor

Currently, an object that uses an ActionProcessor for processing one or more
actions has to own the Actions. This is problematic, because if we want to
create an action on the fly and use an ActionProcessor to perform it, we have to
own the Action until it is finished. Furthermore, if someone forget to own the
action, there will be memory leaks because ActionProcessor does not delete the
Action.

This patch passes the ownership of the Actions to the ActionProcessor through
unique pointers. If an object wants to have access to the Action, it can get it
when ActionComplete() is called.

BUG=chromium:807976
TEST=unittests
TEST=cros flash
TEST=precq

Change-Id: I28f7e9fd3425f17cc51b4db4a4abc130a7d6ef8f
Reviewed-on: https://chromium-review.googlesource.com/1065113
Commit-Ready: Amin Hassani <[email protected]>
Tested-by: Amin Hassani <[email protected]>
Reviewed-by: Xiaochu Liu <[email protected]>
diff --git a/common/action_processor.cc b/common/action_processor.cc
index 3549e08..ead99c4 100644
--- a/common/action_processor.cc
+++ b/common/action_processor.cc
@@ -17,6 +17,7 @@
 #include "update_engine/common/action_processor.h"
 
 #include <string>
+#include <utility>
 
 #include <base/logging.h>
 
@@ -24,27 +25,30 @@
 #include "update_engine/common/error_code_utils.h"
 
 using std::string;
+using std::unique_ptr;
 
 namespace chromeos_update_engine {
 
 ActionProcessor::~ActionProcessor() {
   if (IsRunning())
     StopProcessing();
-  for (auto action : actions_)
-    action->SetProcessor(nullptr);
 }
 
-void ActionProcessor::EnqueueAction(AbstractAction* action) {
-  actions_.push_back(action);
+void ActionProcessor::EnqueueAction(unique_ptr<AbstractAction> action) {
   action->SetProcessor(this);
+  actions_.push_back(std::move(action));
+}
+
+bool ActionProcessor::IsRunning() const {
+  return current_action_ != nullptr || suspended_;
 }
 
 void ActionProcessor::StartProcessing() {
   CHECK(!IsRunning());
   if (!actions_.empty()) {
-    current_action_ = actions_.front();
-    LOG(INFO) << "ActionProcessor: starting " << current_action_->Type();
+    current_action_ = std::move(actions_.front());
     actions_.pop_front();
+    LOG(INFO) << "ActionProcessor: starting " << current_action_->Type();
     current_action_->PerformAction();
   }
 }
@@ -53,16 +57,13 @@
   CHECK(IsRunning());
   if (current_action_) {
     current_action_->TerminateProcessing();
-    current_action_->SetProcessor(nullptr);
   }
   LOG(INFO) << "ActionProcessor: aborted "
             << (current_action_ ? current_action_->Type() : "")
             << (suspended_ ? " while suspended" : "");
-  current_action_ = nullptr;
+  current_action_.reset();
   suspended_ = false;
   // Delete all the actions before calling the delegate.
-  for (auto action : actions_)
-    action->SetProcessor(nullptr);
   actions_.clear();
   if (delegate_)
     delegate_->ProcessingStopped(this);
@@ -106,13 +107,12 @@
 
 void ActionProcessor::ActionComplete(AbstractAction* actionptr,
                                      ErrorCode code) {
-  CHECK_EQ(actionptr, current_action_);
+  CHECK_EQ(actionptr, current_action_.get());
   if (delegate_)
     delegate_->ActionCompleted(this, actionptr, code);
   string old_type = current_action_->Type();
   current_action_->ActionCompleted(code);
-  current_action_->SetProcessor(nullptr);
-  current_action_ = nullptr;
+  current_action_.reset();
   LOG(INFO) << "ActionProcessor: finished "
             << (actions_.empty() ? "last action " : "") << old_type
             << (suspended_ ? " while suspended" : "")
@@ -138,7 +138,7 @@
     }
     return;
   }
-  current_action_ = actions_.front();
+  current_action_ = std::move(actions_.front());
   actions_.pop_front();
   LOG(INFO) << "ActionProcessor: starting " << current_action_->Type();
   current_action_->PerformAction();
diff --git a/common/action_processor.h b/common/action_processor.h
index c9c179e..f651b8e 100644
--- a/common/action_processor.h
+++ b/common/action_processor.h
@@ -18,6 +18,8 @@
 #define UPDATE_ENGINE_COMMON_ACTION_PROCESSOR_H_
 
 #include <deque>
+#include <memory>
+#include <vector>
 
 #include <base/macros.h>
 #include <brillo/errors/error.h>
@@ -69,10 +71,10 @@
 
   // Returns true iff the processing was started but not yet completed nor
   // stopped.
-  bool IsRunning() const { return current_action_ != nullptr || suspended_; }
+  bool IsRunning() const;
 
   // Adds another Action to the end of the queue.
-  virtual void EnqueueAction(AbstractAction* action);
+  virtual void EnqueueAction(std::unique_ptr<AbstractAction> action);
 
   // Sets/gets the current delegate. Set to null to remove a delegate.
   ActionProcessorDelegate* delegate() const { return delegate_; }
@@ -81,14 +83,17 @@
   }
 
   // Returns a pointer to the current Action that's processing.
-  AbstractAction* current_action() const {
-    return current_action_;
-  }
+  AbstractAction* current_action() const { return current_action_.get(); }
 
   // Called by an action to notify processor that it's done. Caller passes self.
+  // But this call deletes the action if there no other object has a reference
+  // to it, so in that case, the caller should not try to access any of its
+  // member variables after this call.
   void ActionComplete(AbstractAction* actionptr, ErrorCode code);
 
  private:
+  FRIEND_TEST(ActionProcessorTest, ChainActionsTest);
+
   // Continue processing actions (if any) after the last action terminated with
   // the passed error code. If there are no more actions to process, the
   // processing will terminate.
@@ -96,10 +101,10 @@
 
   // Actions that have not yet begun processing, in the order in which
   // they'll be processed.
-  std::deque<AbstractAction*> actions_;
+  std::deque<std::unique_ptr<AbstractAction>> actions_;
 
   // A pointer to the currently processing Action, if any.
-  AbstractAction* current_action_{nullptr};
+  std::unique_ptr<AbstractAction> current_action_;
 
   // The ErrorCode reported by an action that was suspended but finished while
   // being suspended. This error code is stored here to be reported back to the
diff --git a/common/action_processor_unittest.cc b/common/action_processor_unittest.cc
index 631e42d..b67eca9 100644
--- a/common/action_processor_unittest.cc
+++ b/common/action_processor_unittest.cc
@@ -17,6 +17,7 @@
 #include "update_engine/common/action_processor.h"
 
 #include <string>
+#include <utility>
 
 #include <gtest/gtest.h>
 
@@ -96,7 +97,11 @@
   void SetUp() override {
     action_processor_.set_delegate(&delegate_);
     // Silence Type() calls used for logging.
-    EXPECT_CALL(mock_action_, Type()).Times(testing::AnyNumber());
+    mock_action_.reset(new testing::StrictMock<MockAction>());
+    mock_action_ptr_ = mock_action_.get();
+    action_.reset(new ActionProcessorTestAction());
+    action_ptr_ = action_.get();
+    EXPECT_CALL(*mock_action_, Type()).Times(testing::AnyNumber());
   }
 
   void TearDown() override {
@@ -110,34 +115,35 @@
   MyActionProcessorDelegate delegate_{&action_processor_};
 
   // Common actions used during most tests.
-  testing::StrictMock<MockAction> mock_action_;
-  ActionProcessorTestAction action_;
+  std::unique_ptr<testing::StrictMock<MockAction>> mock_action_;
+  testing::StrictMock<MockAction>* mock_action_ptr_;
+  std::unique_ptr<ActionProcessorTestAction> action_;
+  ActionProcessorTestAction* action_ptr_;
 };
 
 TEST_F(ActionProcessorTest, SimpleTest) {
   EXPECT_FALSE(action_processor_.IsRunning());
-  action_processor_.EnqueueAction(&action_);
+  action_processor_.EnqueueAction(std::move(action_));
   EXPECT_FALSE(action_processor_.IsRunning());
-  EXPECT_FALSE(action_.IsRunning());
+  EXPECT_FALSE(action_ptr_->IsRunning());
   action_processor_.StartProcessing();
   EXPECT_TRUE(action_processor_.IsRunning());
-  EXPECT_TRUE(action_.IsRunning());
-  EXPECT_EQ(action_processor_.current_action(), &action_);
-  action_.CompleteAction();
+  EXPECT_TRUE(action_ptr_->IsRunning());
+  action_ptr_->CompleteAction();
   EXPECT_FALSE(action_processor_.IsRunning());
-  EXPECT_FALSE(action_.IsRunning());
+  EXPECT_EQ(action_processor_.current_action(), nullptr);
 }
 
 TEST_F(ActionProcessorTest, DelegateTest) {
-  action_processor_.EnqueueAction(&action_);
+  action_processor_.EnqueueAction(std::move(action_));
   action_processor_.StartProcessing();
-  action_.CompleteAction();
+  action_ptr_->CompleteAction();
   EXPECT_TRUE(delegate_.processing_done_called_);
   EXPECT_TRUE(delegate_.action_completed_called_);
 }
 
 TEST_F(ActionProcessorTest, StopProcessingTest) {
-  action_processor_.EnqueueAction(&action_);
+  action_processor_.EnqueueAction(std::move(action_));
   action_processor_.StartProcessing();
   action_processor_.StopProcessing();
   EXPECT_TRUE(delegate_.processing_stopped_called_);
@@ -150,54 +156,58 @@
   // This test doesn't use a delegate since it terminates several actions.
   action_processor_.set_delegate(nullptr);
 
-  ActionProcessorTestAction action1, action2;
-  action_processor_.EnqueueAction(&action1);
-  action_processor_.EnqueueAction(&action2);
+  auto action0 = std::make_unique<ActionProcessorTestAction>();
+  auto action1 = std::make_unique<ActionProcessorTestAction>();
+  auto action2 = std::make_unique<ActionProcessorTestAction>();
+  auto action0_ptr = action0.get();
+  auto action1_ptr = action1.get();
+  auto action2_ptr = action2.get();
+  action_processor_.EnqueueAction(std::move(action0));
+  action_processor_.EnqueueAction(std::move(action1));
+  action_processor_.EnqueueAction(std::move(action2));
+
+  EXPECT_EQ(action_processor_.actions_.size(), 3);
+  EXPECT_EQ(action_processor_.actions_[0].get(), action0_ptr);
+  EXPECT_EQ(action_processor_.actions_[1].get(), action1_ptr);
+  EXPECT_EQ(action_processor_.actions_[2].get(), action2_ptr);
+
   action_processor_.StartProcessing();
-  EXPECT_EQ(&action1, action_processor_.current_action());
+  EXPECT_EQ(action0_ptr, action_processor_.current_action());
   EXPECT_TRUE(action_processor_.IsRunning());
-  action1.CompleteAction();
-  EXPECT_EQ(&action2, action_processor_.current_action());
+  action0_ptr->CompleteAction();
+  EXPECT_EQ(action1_ptr, action_processor_.current_action());
   EXPECT_TRUE(action_processor_.IsRunning());
-  action2.CompleteAction();
+  action1_ptr->CompleteAction();
+  EXPECT_EQ(action2_ptr, action_processor_.current_action());
+  EXPECT_TRUE(action_processor_.actions_.empty());
+  EXPECT_TRUE(action_processor_.IsRunning());
+  action2_ptr->CompleteAction();
   EXPECT_EQ(nullptr, action_processor_.current_action());
+  EXPECT_TRUE(action_processor_.actions_.empty());
   EXPECT_FALSE(action_processor_.IsRunning());
 }
 
-TEST_F(ActionProcessorTest, DtorTest) {
-  ActionProcessorTestAction action1, action2;
-  {
-    ActionProcessor action_processor;
-    action_processor.EnqueueAction(&action1);
-    action_processor.EnqueueAction(&action2);
-    action_processor.StartProcessing();
-  }
-  EXPECT_EQ(nullptr, action1.processor());
-  EXPECT_FALSE(action1.IsRunning());
-  EXPECT_EQ(nullptr, action2.processor());
-  EXPECT_FALSE(action2.IsRunning());
-}
-
 TEST_F(ActionProcessorTest, DefaultDelegateTest) {
-  // Just make sure it doesn't crash
-  action_processor_.EnqueueAction(&action_);
+  // Just make sure it doesn't crash.
+  action_processor_.EnqueueAction(std::move(action_));
   action_processor_.StartProcessing();
-  action_.CompleteAction();
+  action_ptr_->CompleteAction();
 
-  action_processor_.EnqueueAction(&action_);
+  action_.reset(new ActionProcessorTestAction());
+  action_processor_.EnqueueAction(std::move(action_));
   action_processor_.StartProcessing();
   action_processor_.StopProcessing();
 }
 
-// This test suspends and resume the action processor while running one action_.
+// This test suspends and resume the action processor while running one action.
 TEST_F(ActionProcessorTest, SuspendResumeTest) {
-  action_processor_.EnqueueAction(&mock_action_);
+  action_processor_.EnqueueAction(std::move(mock_action_));
 
   testing::InSequence s;
-  EXPECT_CALL(mock_action_, PerformAction());
+  EXPECT_CALL(*mock_action_ptr_, PerformAction());
   action_processor_.StartProcessing();
 
-  EXPECT_CALL(mock_action_, SuspendAction());
+  EXPECT_CALL(*mock_action_ptr_, SuspendAction());
   action_processor_.SuspendProcessing();
   // Suspending the processor twice should not suspend the action twice.
   action_processor_.SuspendProcessing();
@@ -205,32 +215,31 @@
   // IsRunning should return whether there's is an action doing some work, even
   // if it is suspended.
   EXPECT_TRUE(action_processor_.IsRunning());
-  EXPECT_EQ(&mock_action_, action_processor_.current_action());
+  EXPECT_EQ(mock_action_ptr_, action_processor_.current_action());
 
-  EXPECT_CALL(mock_action_, ResumeAction());
+  EXPECT_CALL(*mock_action_ptr_, ResumeAction());
   action_processor_.ResumeProcessing();
 
   // Calling ResumeProcessing twice should not affect the action_.
   action_processor_.ResumeProcessing();
-
-  action_processor_.ActionComplete(&mock_action_, ErrorCode::kSuccess);
+  action_processor_.ActionComplete(mock_action_ptr_, ErrorCode::kSuccess);
 }
 
 // This test suspends an action that presumably doesn't support suspend/resume
 // and it finished before being resumed.
 TEST_F(ActionProcessorTest, ActionCompletedWhileSuspendedTest) {
-  action_processor_.EnqueueAction(&mock_action_);
+  action_processor_.EnqueueAction(std::move(mock_action_));
 
   testing::InSequence s;
-  EXPECT_CALL(mock_action_, PerformAction());
+  EXPECT_CALL(*mock_action_ptr_, PerformAction());
   action_processor_.StartProcessing();
 
-  EXPECT_CALL(mock_action_, SuspendAction());
+  EXPECT_CALL(*mock_action_ptr_, SuspendAction());
   action_processor_.SuspendProcessing();
 
   // Simulate the action completion while suspended. No other call to
   // |mock_action_| is expected at this point.
-  action_processor_.ActionComplete(&mock_action_, ErrorCode::kSuccess);
+  action_processor_.ActionComplete(mock_action_ptr_, ErrorCode::kSuccess);
 
   // The processing should not be done since the ActionProcessor is suspended
   // and the processing is considered to be still running until resumed.
@@ -243,15 +252,15 @@
 }
 
 TEST_F(ActionProcessorTest, StoppedWhileSuspendedTest) {
-  action_processor_.EnqueueAction(&mock_action_);
+  action_processor_.EnqueueAction(std::move(mock_action_));
 
   testing::InSequence s;
-  EXPECT_CALL(mock_action_, PerformAction());
+  EXPECT_CALL(*mock_action_ptr_, PerformAction());
   action_processor_.StartProcessing();
-  EXPECT_CALL(mock_action_, SuspendAction());
+  EXPECT_CALL(*mock_action_ptr_, SuspendAction());
   action_processor_.SuspendProcessing();
 
-  EXPECT_CALL(mock_action_, TerminateProcessing());
+  EXPECT_CALL(*mock_action_ptr_, TerminateProcessing());
   action_processor_.StopProcessing();
   // Stopping the processing should abort the current execution no matter what.
   EXPECT_TRUE(delegate_.processing_stopped_called_);
diff --git a/common/action_unittest.cc b/common/action_unittest.cc
index dcdce17..b2f9ba4 100644
--- a/common/action_unittest.cc
+++ b/common/action_unittest.cc
@@ -16,8 +16,11 @@
 
 #include "update_engine/common/action.h"
 
-#include <gtest/gtest.h>
 #include <string>
+#include <utility>
+
+#include <gtest/gtest.h>
+
 #include "update_engine/common/action_processor.h"
 
 using std::string;
@@ -56,21 +59,19 @@
 // This test creates two simple Actions and sends a message via an ActionPipe
 // from one to the other.
 TEST(ActionTest, SimpleTest) {
-  ActionTestAction action;
-
-  EXPECT_FALSE(action.in_pipe());
-  EXPECT_FALSE(action.out_pipe());
-  EXPECT_FALSE(action.processor());
-  EXPECT_FALSE(action.IsRunning());
+  auto action = std::make_unique<ActionTestAction>();
+  auto action_ptr = action.get();
+  EXPECT_FALSE(action->in_pipe());
+  EXPECT_FALSE(action->out_pipe());
+  EXPECT_FALSE(action->processor());
+  EXPECT_FALSE(action->IsRunning());
 
   ActionProcessor action_processor;
-  action_processor.EnqueueAction(&action);
-  EXPECT_EQ(&action_processor, action.processor());
-
+  action_processor.EnqueueAction(std::move(action));
+  EXPECT_EQ(&action_processor, action_ptr->processor());
   action_processor.StartProcessing();
-  EXPECT_TRUE(action.IsRunning());
-  action.CompleteAction();
-  EXPECT_FALSE(action.IsRunning());
+  EXPECT_TRUE(action_ptr->IsRunning());
+  action_ptr->CompleteAction();
 }
 
 }  // namespace chromeos_update_engine
diff --git a/common/mock_action_processor.h b/common/mock_action_processor.h
index 04275c1..4c62109 100644
--- a/common/mock_action_processor.h
+++ b/common/mock_action_processor.h
@@ -17,6 +17,10 @@
 #ifndef UPDATE_ENGINE_COMMON_MOCK_ACTION_PROCESSOR_H_
 #define UPDATE_ENGINE_COMMON_MOCK_ACTION_PROCESSOR_H_
 
+#include <deque>
+#include <memory>
+#include <utility>
+
 #include <gmock/gmock.h>
 
 #include "update_engine/common/action.h"
@@ -27,6 +31,12 @@
  public:
   MOCK_METHOD0(StartProcessing, void());
   MOCK_METHOD1(EnqueueAction, void(AbstractAction* action));
+
+  // This is a legacy workaround described in:
+  // https://github.com/google/googletest/blob/master/googlemock/docs/CookBook.md#legacy-workarounds-for-move-only-types-legacymoveonly
+  void EnqueueAction(std::unique_ptr<AbstractAction> action) override {
+    EnqueueAction(action.get());
+  }
 };
 
 }  // namespace chromeos_update_engine
diff --git a/common/mock_http_fetcher.cc b/common/mock_http_fetcher.cc
index f1ae72a..9507c9d 100644
--- a/common/mock_http_fetcher.cc
+++ b/common/mock_http_fetcher.cc
@@ -47,71 +47,49 @@
     SendData(true);
 }
 
-// Returns false on one condition: If timeout_id_ was already set
-// and it needs to be deleted by the caller. If timeout_id_ is null
-// when this function is called, this function will always return true.
-bool MockHttpFetcher::SendData(bool skip_delivery) {
-  if (fail_transfer_) {
+void MockHttpFetcher::SendData(bool skip_delivery) {
+  if (fail_transfer_ || sent_size_ == data_.size()) {
     SignalTransferComplete();
-    return timeout_id_ != MessageLoop::kTaskIdNull;
-  }
-
-  CHECK_LT(sent_size_, data_.size());
-  if (!skip_delivery) {
-    const size_t chunk_size = min(kMockHttpFetcherChunkSize,
-                                  data_.size() - sent_size_);
-    CHECK(delegate_);
-    delegate_->ReceivedBytes(this, &data_[sent_size_], chunk_size);
-    // We may get terminated in the callback.
-    if (sent_size_ == data_.size()) {
-      LOG(INFO) << "Terminated in the ReceivedBytes callback.";
-      return timeout_id_ != MessageLoop::kTaskIdNull;
-    }
-    sent_size_ += chunk_size;
-    CHECK_LE(sent_size_, data_.size());
-    if (sent_size_ == data_.size()) {
-      // We've sent all the data. Notify of success.
-      SignalTransferComplete();
-    }
+    return;
   }
 
   if (paused_) {
-    // If we're paused, we should return true if timeout_id_ is set,
-    // since we need the caller to delete it.
-    return timeout_id_ != MessageLoop::kTaskIdNull;
+    // If we're paused, we should return so no callback is scheduled.
+    return;
   }
 
-  if (timeout_id_ != MessageLoop::kTaskIdNull) {
-    // we still need a timeout if there's more data to send
-    return sent_size_ < data_.size();
-  } else if (sent_size_ < data_.size()) {
-    // we don't have a timeout source and we need one
+  // Setup timeout callback even if the transfer is about to be completed in
+  // order to get a call to |TransferComplete|.
+  if (timeout_id_ == MessageLoop::kTaskIdNull) {
     timeout_id_ = MessageLoop::current()->PostDelayedTask(
         FROM_HERE,
         base::Bind(&MockHttpFetcher::TimeoutCallback, base::Unretained(this)),
         base::TimeDelta::FromMilliseconds(10));
   }
-  return true;
+
+  if (!skip_delivery) {
+    const size_t chunk_size =
+        min(kMockHttpFetcherChunkSize, data_.size() - sent_size_);
+    sent_size_ += chunk_size;
+    CHECK(delegate_);
+    delegate_->ReceivedBytes(this, &data_[sent_size_ - chunk_size], chunk_size);
+  }
+  // We may get terminated and deleted right after |ReceivedBytes| call, so we
+  // should not access any class member variable after this call.
 }
 
 void MockHttpFetcher::TimeoutCallback() {
   CHECK(!paused_);
-  if (SendData(false)) {
-    // We need to re-schedule the timeout.
-    timeout_id_ = MessageLoop::current()->PostDelayedTask(
-        FROM_HERE,
-        base::Bind(&MockHttpFetcher::TimeoutCallback, base::Unretained(this)),
-        base::TimeDelta::FromMilliseconds(10));
-  } else {
-    timeout_id_ = MessageLoop::kTaskIdNull;
-  }
+  timeout_id_ = MessageLoop::kTaskIdNull;
+  CHECK_LE(sent_size_, data_.size());
+  // Same here, we should not access any member variable after this call.
+  SendData(false);
 }
 
 // If the transfer is in progress, aborts the transfer early.
 // The transfer cannot be resumed.
 void MockHttpFetcher::TerminateTransfer() {
   LOG(INFO) << "Terminating transfer.";
-  sent_size_ = data_.size();
   // Kill any timeout, it is ok to call with kTaskIdNull.
   MessageLoop::current()->CancelTask(timeout_id_);
   timeout_id_ = MessageLoop::kTaskIdNull;
@@ -140,9 +118,7 @@
 void MockHttpFetcher::Unpause() {
   CHECK(paused_) << "You must pause before unpause.";
   paused_ = false;
-  if (sent_size_ < data_.size()) {
-    SendData(false);
-  }
+  SendData(false);
 }
 
 void MockHttpFetcher::FailTransfer(int http_response_code) {
diff --git a/common/mock_http_fetcher.h b/common/mock_http_fetcher.h
index 367802e..00f4e2b 100644
--- a/common/mock_http_fetcher.h
+++ b/common/mock_http_fetcher.h
@@ -112,13 +112,10 @@
   }
 
  private:
-  // Sends data to the delegate and sets up a timeout callback if needed.
-  // There must be a delegate and there must be data to send. If there is
-  // already a timeout callback, and it should be deleted by the caller,
-  // this will return false; otherwise true is returned.
-  // If skip_delivery is true, no bytes will be delivered, but the callbacks
-  // still be set if needed.
-  bool SendData(bool skip_delivery);
+  // Sends data to the delegate and sets up a timeout callback if needed. There
+  // must be a delegate. If |skip_delivery| is true, no bytes will be delivered,
+  // but the callbacks still be set if needed.
+  void SendData(bool skip_delivery);
 
   // Callback for when our message loop timeout expires.
   void TimeoutCallback();