update_engine: Watch file descriptors using chromeos::MessageLoop.

This patch removes all the calls to g_io_*() functions used to create
io_channels from a file descriptor and watch for them in the main loop.
Instead, we use the chromeos::MessageLoop backed with the glib
implementation.

This patch also removes the duplicated process handling work done in
P2PManager and uses the common Subprocess class instead.

BUG=chromium:499886
TEST=Added and updated unittests.

Change-Id: Ia093b060d2396325fce69b2bbdb62957ba7bfbc6
Reviewed-on: https://chromium-review.googlesource.com/284593
Tested-by: Alex Deymo <[email protected]>
Reviewed-by: Alex Vakulenko <[email protected]>
Commit-Queue: Alex Deymo <[email protected]>
Trybot-Ready: Alex Deymo <[email protected]>
diff --git a/libcurl_http_fetcher.cc b/libcurl_http_fetcher.cc
index da69281..9d32293 100644
--- a/libcurl_http_fetcher.cc
+++ b/libcurl_http_fetcher.cc
@@ -18,7 +18,6 @@
 
 using base::TimeDelta;
 using chromeos::MessageLoop;
-using std::make_pair;
 using std::max;
 using std::string;
 
@@ -363,7 +362,7 @@
     }
   } else {
     // set up callback
-    SetupMainloopSources();
+    SetupMessageLoopSources();
   }
 }
 
@@ -410,8 +409,8 @@
   CHECK_EQ(curl_easy_pause(curl_handle_, CURLPAUSE_CONT), CURLE_OK);
 }
 
-// This method sets up callbacks with the glib main loop.
-void LibcurlHttpFetcher::SetupMainloopSources() {
+// This method sets up callbacks with the MessageLoop.
+void LibcurlHttpFetcher::SetupMessageLoopSources() {
   fd_set fd_read;
   fd_set fd_write;
   fd_set fd_exc;
@@ -429,14 +428,14 @@
 
   // We should iterate through all file descriptors up to libcurl's fd_max or
   // the highest one we're tracking, whichever is larger.
-  for (size_t t = 0; t < arraysize(io_channels_); ++t) {
-    if (!io_channels_[t].empty())
-      fd_max = max(fd_max, io_channels_[t].rbegin()->first);
+  for (size_t t = 0; t < arraysize(fd_task_maps_); ++t) {
+    if (!fd_task_maps_[t].empty())
+      fd_max = max(fd_max, fd_task_maps_[t].rbegin()->first);
   }
 
   // For each fd, if we're not tracking it, track it. If we are tracking it, but
   // libcurl doesn't care about it anymore, stop tracking it. After this loop,
-  // there should be exactly as many GIOChannel objects in io_channels_[0|1] as
+  // there should be exactly as many tasks scheduled in fd_task_maps_[0|1] as
   // there are read/write fds that we're tracking.
   for (int fd = 0; fd <= fd_max; ++fd) {
     // Note that fd_exc is unused in the current version of libcurl so is_exc
@@ -446,16 +445,20 @@
       is_exc || (FD_ISSET(fd, &fd_read) != 0),  // track 0 -- read
       is_exc || (FD_ISSET(fd, &fd_write) != 0)  // track 1 -- write
     };
+    MessageLoop::WatchMode watch_modes[2] = {
+      MessageLoop::WatchMode::kWatchRead,
+      MessageLoop::WatchMode::kWatchWrite,
+    };
 
-    for (size_t t = 0; t < arraysize(io_channels_); ++t) {
-      bool tracked = io_channels_[t].find(fd) != io_channels_[t].end();
+    for (size_t t = 0; t < arraysize(fd_task_maps_); ++t) {
+      auto fd_task_it = fd_task_maps_[t].find(fd);
+      bool tracked = fd_task_it != fd_task_maps_[t].end();
 
       if (!must_track[t]) {
         // If we have an outstanding io_channel, remove it.
         if (tracked) {
-          g_source_remove(io_channels_[t][fd].second);
-          g_io_channel_unref(io_channels_[t][fd].first);
-          io_channels_[t].erase(io_channels_[t].find(fd));
+          MessageLoop::current()->CancelTask(fd_task_it->second);
+          fd_task_maps_[t].erase(fd_task_it);
         }
         continue;
       }
@@ -464,16 +467,15 @@
       if (tracked)
         continue;
 
-      // Set conditions appropriately -- read for track 0, write for track 1.
-      GIOCondition condition = static_cast<GIOCondition>(
-          ((t == 0) ? (G_IO_IN | G_IO_PRI) : G_IO_OUT) | G_IO_ERR | G_IO_HUP);
-
       // Track a new fd.
-      GIOChannel* io_channel = g_io_channel_unix_new(fd);
-      guint tag =
-          g_io_add_watch(io_channel, condition, &StaticFDCallback, this);
+      fd_task_maps_[t][fd] = MessageLoop::current()->WatchFileDescriptor(
+          FROM_HERE,
+          fd,
+          watch_modes[t],
+          true,  // persistent
+          base::Bind(&LibcurlHttpFetcher::CurlPerformOnce,
+                     base::Unretained(this)));
 
-      io_channels_[t][fd] = make_pair(io_channel, tag);
       static int io_counter = 0;
       io_counter++;
       if (io_counter % 50 == 0) {
@@ -493,16 +495,6 @@
   }
 }
 
-bool LibcurlHttpFetcher::FDCallback(GIOChannel *source,
-                                    GIOCondition condition) {
-  CurlPerformOnce();
-  // We handle removing of this source elsewhere, so we always return true.
-  // The docs say, "the function should return FALSE if the event source
-  // should be removed."
-  // http://www.gtk.org/api/2.6/glib/glib-IO-Channels.html#GIOFunc
-  return true;
-}
-
 void LibcurlHttpFetcher::RetryTimeoutCallback() {
   ResumeTransfer(url_);
   CurlPerformOnce();
@@ -525,13 +517,16 @@
   MessageLoop::current()->CancelTask(timeout_id_);
   timeout_id_ = MessageLoop::kTaskIdNull;
 
-  for (size_t t = 0; t < arraysize(io_channels_); ++t) {
-    for (IOChannels::iterator it = io_channels_[t].begin();
-         it != io_channels_[t].end(); ++it) {
-      g_source_remove(it->second.second);
-      g_io_channel_unref(it->second.first);
+  for (size_t t = 0; t < arraysize(fd_task_maps_); ++t) {
+    for (const auto& fd_taks_pair : fd_task_maps_[t]) {
+      if (!MessageLoop::current()->CancelTask(fd_taks_pair.second)) {
+        LOG(WARNING) << "Error canceling the watch task "
+                     << fd_taks_pair.second << " for "
+                     << (t ? "writing" : "reading") << " the fd "
+                     << fd_taks_pair.first;
+      }
     }
-    io_channels_[t].clear();
+    fd_task_maps_[t].clear();
   }
 
   if (curl_http_headers_) {
diff --git a/libcurl_http_fetcher.h b/libcurl_http_fetcher.h
index d52b783..4f68a9a 100644
--- a/libcurl_http_fetcher.h
+++ b/libcurl_http_fetcher.h
@@ -10,7 +10,6 @@
 #include <utility>
 
 #include <curl/curl.h>
-#include <glib.h>
 
 #include <base/logging.h>
 #include <base/macros.h>
@@ -125,16 +124,6 @@
   // left off.
   virtual void ResumeTransfer(const std::string& url);
 
-  // These two methods are for glib main loop callbacks. They are called
-  // when either a file descriptor is ready for work or when a timer
-  // has fired. The static versions are shims for libcurl which has a C API.
-  bool FDCallback(GIOChannel *source, GIOCondition condition);
-  static gboolean StaticFDCallback(GIOChannel *source,
-                                   GIOCondition condition,
-                                   gpointer data) {
-    return reinterpret_cast<LibcurlHttpFetcher*>(data)->FDCallback(source,
-                                                                   condition);
-  }
   void TimeoutCallback();
   void RetryTimeoutCallback();
 
@@ -146,10 +135,10 @@
   // Returns true if we should resume immediately after this call.
   void CurlPerformOnce();
 
-  // Sets up glib main loop sources as needed by libcurl. This is generally
+  // Sets up message loop sources as needed by libcurl. This is generally
   // the file descriptor of the socket and a timer in case nothing happens
   // on the fds.
-  void SetupMainloopSources();
+  void SetupMessageLoopSources();
 
   // Callback called by libcurl when new data has arrived on the transfer
   size_t LibcurlWrite(void *ptr, size_t size, size_t nmemb);
@@ -160,7 +149,7 @@
   }
 
   // Cleans up the following if they are non-null:
-  // curl(m) handles, io_channels_, timeout_source_.
+  // curl(m) handles, fd_task_maps_, timeout_id_.
   void CleanUp();
 
   // Force terminate the transfer. This will invoke the delegate's (if any)
@@ -185,17 +174,16 @@
   struct curl_slist* curl_http_headers_{nullptr};
 
   // Lists of all read(0)/write(1) file descriptors that we're waiting on from
-  // the glib main loop. libcurl may open/close descriptors and switch their
+  // the message loop. libcurl may open/close descriptors and switch their
   // directions so maintain two separate lists so that watch conditions can be
   // set appropriately.
-  typedef std::map<int, std::pair<GIOChannel*, guint>> IOChannels;
-  IOChannels io_channels_[2];
+  std::map<int, chromeos::MessageLoop::TaskId> fd_task_maps_[2];
 
   // The TaskId of the timer we're waiting on. kTaskIdNull if we are not waiting
   // on it.
   chromeos::MessageLoop::TaskId timeout_id_{chromeos::MessageLoop::kTaskIdNull};
 
-  bool transfer_in_progress_ = false;
+  bool transfer_in_progress_{false};
 
   // The transfer size. -1 if not known.
   off_t transfer_size_{0};
diff --git a/p2p_manager.cc b/p2p_manager.cc
index c1a2738..ee0f583 100644
--- a/p2p_manager.cc
+++ b/p2p_manager.cc
@@ -35,6 +35,7 @@
 #include <base/strings/stringprintf.h>
 
 #include "update_engine/glib_utils.h"
+#include "update_engine/subprocess.h"
 #include "update_engine/update_manager/policy.h"
 #include "update_engine/update_manager/update_manager.h"
 #include "update_engine/utils.h"
@@ -45,6 +46,7 @@
 using base::StringPrintf;
 using base::Time;
 using base::TimeDelta;
+using chromeos::MessageLoop;
 using chromeos_update_manager::EvalStatus;
 using chromeos_update_manager::Policy;
 using chromeos_update_manager::UpdateManager;
@@ -105,7 +107,7 @@
                  UpdateManager* update_manager,
                  const string& file_extension,
                  const int num_files_to_keep,
-                 const base::TimeDelta& max_file_age);
+                 const TimeDelta& max_file_age);
 
   // P2PManager methods.
   void SetDevicePolicy(const policy::DevicePolicy* device_policy) override;
@@ -147,7 +149,7 @@
 
   // Utility function to delete a file given by |path| and log the
   // path as well as |reason|. Returns false on failure.
-  bool DeleteP2PFile(const FilePath& path, const std::string& reason);
+  bool DeleteP2PFile(const FilePath& path, const string& reason);
 
   // Schedules an async request for tracking changes in P2P enabled status.
   void ScheduleEnabledStatusChange();
@@ -178,7 +180,7 @@
 
   // If non-zero, files older than this will not be kept after
   // performing housekeeping.
-  const base::TimeDelta max_file_age_;
+  const TimeDelta max_file_age_;
 
   // The string ".p2p".
   static const char kP2PExtension[];
@@ -206,7 +208,7 @@
                                UpdateManager* update_manager,
                                const string& file_extension,
                                const int num_files_to_keep,
-                               const base::TimeDelta& max_file_age)
+                               const TimeDelta& max_file_age)
   : clock_(clock),
     update_manager_(update_manager),
     file_extension_(file_extension),
@@ -238,34 +240,14 @@
 }
 
 bool P2PManagerImpl::EnsureP2P(bool should_be_running) {
-  gchar *standard_error = nullptr;
-  GError *error = nullptr;
-  gint exit_status = 0;
+  int return_code = 0;
+  string output;
 
   may_be_running_ = true;  // Unless successful, we must be conservative.
 
   vector<string> args = configuration_->GetInitctlArgs(should_be_running);
-  unique_ptr<gchar*, utils::GLibStrvFreeDeleter> argv(
-      utils::StringVectorToGStrv(args));
-  if (!g_spawn_sync(nullptr,  // working_directory
-                    argv.get(),
-                    nullptr,  // envp
-                    static_cast<GSpawnFlags>(G_SPAWN_SEARCH_PATH),
-                    nullptr, nullptr,  // child_setup, user_data
-                    nullptr,  // standard_output
-                    &standard_error,
-                    &exit_status,
-                    &error)) {
-    LOG(ERROR) << "Error spawning " << utils::StringVectorToString(args)
-               << ": " << utils::GetAndFreeGError(&error);
-    return false;
-  }
-  unique_ptr<gchar, utils::GLibFreeDeleter> standard_error_deleter(
-      standard_error);
-
-  if (!WIFEXITED(exit_status)) {
-    LOG(ERROR) << "Error spawning '" << utils::StringVectorToString(args)
-               << "': WIFEXITED is false";
+  if (!Subprocess::SynchronousExec(args, &return_code, &output)) {
+    LOG(ERROR) << "Error spawning " << utils::StringVectorToString(args);
     return false;
   }
 
@@ -274,11 +256,11 @@
   // necessity because initctl does not offer actions such as "start if not
   // running" or "stop if running".
   // TODO(zeuthen,chromium:277051): Avoid doing this.
-  if (WEXITSTATUS(exit_status) != 0) {
-    const gchar *expected_error_message = should_be_running ?
+  if (return_code != 0) {
+    const char *expected_error_message = should_be_running ?
       "initctl: Job is already running: p2p\n" :
       "initctl: Unknown instance \n";
-    if (g_strcmp0(standard_error, expected_error_message) != 0)
+    if (output != expected_error_message)
       return false;
   }
 
@@ -322,7 +304,7 @@
 }
 
 bool P2PManagerImpl::DeleteP2PFile(const FilePath& path,
-                                   const std::string& reason) {
+                                   const string& reason) {
   LOG(INFO) << "Deleting p2p file " << path.value()
             << " (reason: " << reason << ")";
   if (unlink(path.value().c_str()) != 0) {
@@ -367,7 +349,7 @@
     // If instructed to keep only files younger than a given age
     // (|max_file_age_| != 0), delete files satisfying this criteria
     // right now. Otherwise add it to a list we'll consider for later.
-    if (clock_ != nullptr && max_file_age_ != base::TimeDelta() &&
+    if (clock_ != nullptr && max_file_age_ != TimeDelta() &&
         clock_->GetWallclockTime() - time > max_file_age_) {
       if (!DeleteP2PFile(file, "file too old"))
         deletion_failed = true;
@@ -396,81 +378,50 @@
 class LookupData {
  public:
   explicit LookupData(P2PManager::LookupCallback callback)
-    : callback_(callback),
-      pid_(0),
-      stdout_fd_(-1),
-      stdout_channel_source_id_(0),
-      child_watch_source_id_(0),
-      timeout_source_id_(0),
-      reported_(false) {}
+    : callback_(callback) {}
 
   ~LookupData() {
-    if (child_watch_source_id_ != 0)
-      g_source_remove(child_watch_source_id_);
-    if (stdout_channel_source_id_ != 0)
-      g_source_remove(stdout_channel_source_id_);
-    if (timeout_source_id_ != 0)
-      g_source_remove(timeout_source_id_);
-    if (stdout_fd_ != -1)
-      close(stdout_fd_);
-    if (pid_ != 0)
-      kill(pid_, SIGTERM);
+    if (timeout_task_ != MessageLoop::kTaskIdNull)
+      MessageLoop::current()->CancelTask(timeout_task_);
+    if (child_tag_)
+      Subprocess::Get().KillExec(child_tag_);
   }
 
-  void InitiateLookup(gchar **argv, TimeDelta timeout) {
+  void InitiateLookup(const vector<string>& cmd, TimeDelta timeout) {
     // NOTE: if we fail early (i.e. in this method), we need to schedule
     // an idle to report the error. This is because we guarantee that
-    // the callback is always called from the GLib mainloop (this
+    // the callback is always called from the message loop (this
     // guarantee is useful for testing).
 
-    GError *error = nullptr;
-    if (!g_spawn_async_with_pipes(nullptr,  // working_directory
-                                  argv,
-                                  nullptr,  // envp
-                                  static_cast<GSpawnFlags>(G_SPAWN_SEARCH_PATH |
-                                                     G_SPAWN_DO_NOT_REAP_CHILD),
-                                  nullptr,  // child_setup
-                                  this,
-                                  &pid_,
-                                  nullptr,  // standard_input
-                                  &stdout_fd_,
-                                  nullptr,  // standard_error
-                                  &error)) {
-      LOG(ERROR) << "Error spawning p2p-client: "
-                 << utils::GetAndFreeGError(&error);
+    // We expect to run just "p2p-client" and find it in the path.
+    child_tag_ = Subprocess::Get().ExecFlags(
+        cmd, G_SPAWN_SEARCH_PATH, false /* redirect stderr */, OnLookupDone,
+        this);
+
+    if (!child_tag_) {
+      LOG(ERROR) << "Error spawning " << utils::StringVectorToString(cmd);
       ReportErrorAndDeleteInIdle();
       return;
     }
 
-    GIOChannel* io_channel = g_io_channel_unix_new(stdout_fd_);
-    stdout_channel_source_id_ = g_io_add_watch(
-        io_channel,
-        static_cast<GIOCondition>(G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP),
-        OnIOChannelActivity, this);
-    CHECK_NE(stdout_channel_source_id_, 0u);
-    g_io_channel_unref(io_channel);
-
-    child_watch_source_id_ = g_child_watch_add(pid_, OnChildWatchActivity,
-                                               this);
-    CHECK_NE(child_watch_source_id_, 0u);
-
-    if (timeout.ToInternalValue() > 0) {
-      timeout_source_id_ = g_timeout_add(timeout.InMilliseconds(),
-                                         OnTimeout, this);
-      CHECK_NE(timeout_source_id_, 0u);
+    if (timeout > TimeDelta()) {
+      timeout_task_ = MessageLoop::current()->PostDelayedTask(
+          FROM_HERE,
+          Bind(&LookupData::OnTimeout, base::Unretained(this)),
+          timeout);
     }
   }
 
  private:
   void ReportErrorAndDeleteInIdle() {
-    g_idle_add(static_cast<GSourceFunc>(OnIdleForReportErrorAndDelete), this);
+    MessageLoop::current()->PostTask(FROM_HERE, Bind(
+        &LookupData::OnIdleForReportErrorAndDelete,
+        base::Unretained(this)));
   }
 
-  static gboolean OnIdleForReportErrorAndDelete(gpointer user_data) {
-    LookupData *lookup_data = reinterpret_cast<LookupData*>(user_data);
-    lookup_data->ReportError();
-    delete lookup_data;
-    return FALSE;  // Remove source.
+  void OnIdleForReportErrorAndDelete() {
+    ReportError();
+    delete this;
   }
 
   void IssueCallback(const string& url) {
@@ -485,11 +436,10 @@
     reported_ = true;
   }
 
-  void ReportSuccess() {
+  void ReportSuccess(const string& output) {
     if (reported_)
       return;
-
-    string url = stdout_;
+    string url = output;
     size_t newline_pos = url.find('\n');
     if (newline_pos != string::npos)
       url.resize(newline_pos);
@@ -503,72 +453,40 @@
       LOG(ERROR) << "p2p URL '" << url << "' does not look right. Ignoring.";
       ReportError();
     }
-
     reported_ = true;
   }
 
-  static gboolean OnIOChannelActivity(GIOChannel *source,
-                                      GIOCondition condition,
-                                      gpointer user_data) {
+  static void OnLookupDone(int return_code,
+                           const string& output,
+                           void *user_data) {
     LookupData *lookup_data = reinterpret_cast<LookupData*>(user_data);
-    gchar* str = nullptr;
-    GError* error = nullptr;
-    GIOStatus status = g_io_channel_read_line(source,
-                                              &str,
-                                              nullptr,  // len
-                                              nullptr,  // line_terminator
-                                              &error);
-    if (status != G_IO_STATUS_NORMAL) {
-      // Ignore EOF since we usually get that before SIGCHLD and we
-      // need to examine exit status there.
-      if (status != G_IO_STATUS_EOF) {
-        LOG(ERROR) << "Error reading a line from p2p-client: "
-                   << utils::GetAndFreeGError(&error);
-        lookup_data->ReportError();
-        delete lookup_data;
-      }
-    } else {
-      if (str != nullptr) {
-        lookup_data->stdout_ += str;
-        g_free(str);
-      }
-    }
-    return TRUE;  // Don't remove source.
-  }
-
-  static void OnChildWatchActivity(GPid pid,
-                                   gint status,
-                                   gpointer user_data) {
-    LookupData *lookup_data = reinterpret_cast<LookupData*>(user_data);
-
-    if (!WIFEXITED(status)) {
-      LOG(ERROR) << "Child didn't exit normally";
-      lookup_data->ReportError();
-    } else if (WEXITSTATUS(status) != 0) {
+    lookup_data->child_tag_ = 0;
+    if (return_code != 0) {
       LOG(INFO) << "Child exited with non-zero exit code "
-                << WEXITSTATUS(status);
+                << return_code;
       lookup_data->ReportError();
     } else {
-      lookup_data->ReportSuccess();
+      lookup_data->ReportSuccess(output);
     }
     delete lookup_data;
   }
 
-  static gboolean OnTimeout(gpointer user_data) {
-    LookupData *lookup_data = reinterpret_cast<LookupData*>(user_data);
-    lookup_data->ReportError();
-    delete lookup_data;
-    return TRUE;  // Don't remove source.
+  void OnTimeout() {
+    timeout_task_ = MessageLoop::kTaskIdNull;
+    ReportError();
+    delete this;
   }
 
   P2PManager::LookupCallback callback_;
-  GPid pid_;
-  gint stdout_fd_;
-  guint stdout_channel_source_id_;
-  guint child_watch_source_id_;
-  guint timeout_source_id_;
-  string stdout_;
-  bool reported_;
+
+  // The Subprocess tag of the running process. A value of 0 means that the
+  // process is not running.
+  uint32_t child_tag_{0};
+
+  // The timeout task_id we are waiting on, if any.
+  MessageLoop::TaskId timeout_task_{MessageLoop::kTaskIdNull};
+
+  bool reported_{false};
 };
 
 void P2PManagerImpl::LookupUrlForFile(const string& file_id,
@@ -579,9 +497,7 @@
   string file_id_with_ext = file_id + "." + file_extension_;
   vector<string> args = configuration_->GetP2PClientArgs(file_id_with_ext,
                                                          minimum_size);
-  gchar **argv = utils::StringVectorToGStrv(args);
-  lookup_data->InitiateLookup(argv, max_time_to_wait);
-  g_strfreev(argv);
+  lookup_data->InitiateLookup(args, max_time_to_wait);
 }
 
 bool P2PManagerImpl::FileShare(const string& file_id,
@@ -823,7 +739,7 @@
     UpdateManager* update_manager,
     const string& file_extension,
     const int num_files_to_keep,
-    const base::TimeDelta& max_file_age) {
+    const TimeDelta& max_file_age) {
   return new P2PManagerImpl(configuration,
                             clock,
                             update_manager,
diff --git a/postinstall_runner_action_unittest.cc b/postinstall_runner_action_unittest.cc
index 0460c6c..4549272 100644
--- a/postinstall_runner_action_unittest.cc
+++ b/postinstall_runner_action_unittest.cc
@@ -15,12 +15,16 @@
 #include <base/files/file_util.h>
 #include <base/strings/string_util.h>
 #include <base/strings/stringprintf.h>
+#include <chromeos/bind_lambda.h>
+#include <chromeos/message_loops/glib_message_loop.h>
+#include <chromeos/message_loops/message_loop_utils.h>
 #include <gtest/gtest.h>
 
 #include "update_engine/constants.h"
 #include "update_engine/test_utils.h"
 #include "update_engine/utils.h"
 
+using chromeos::MessageLoop;
 using chromeos_update_engine::test_utils::System;
 using chromeos_update_engine::test_utils::WriteFileString;
 using std::string;
@@ -29,34 +33,34 @@
 
 namespace chromeos_update_engine {
 
-namespace {
-gboolean StartProcessorInRunLoop(gpointer data) {
-  ActionProcessor *processor = reinterpret_cast<ActionProcessor*>(data);
-  processor->StartProcessing();
-  return FALSE;
-}
-}  // namespace
-
 class PostinstallRunnerActionTest : public ::testing::Test {
- public:
+ protected:
+  void SetUp() override {
+    loop_.SetAsCurrent();
+  }
+
+  void TearDown() override {
+    EXPECT_EQ(0, chromeos::MessageLoopRunMaxIterations(&loop_, 1));
+  }
+
   // DoTest with various combinations of do_losetup, err_code and
   // powerwash_required.
   void DoTest(bool do_losetup, int err_code, bool powerwash_required);
 
  private:
   static const char* kImageMountPointTemplate;
+
+  chromeos::GlibMessageLoop loop_;
 };
 
 class PostinstActionProcessorDelegate : public ActionProcessorDelegate {
  public:
   PostinstActionProcessorDelegate()
-      : loop_(nullptr),
-        code_(ErrorCode::kError),
+      : code_(ErrorCode::kError),
         code_set_(false) {}
   void ProcessingDone(const ActionProcessor* processor,
                       ErrorCode code) {
-    ASSERT_TRUE(loop_);
-    g_main_loop_quit(loop_);
+    MessageLoop::current()->BreakLoop();
   }
   void ActionCompleted(ActionProcessor* processor,
                        AbstractAction* action,
@@ -66,38 +70,31 @@
       code_set_ = true;
     }
   }
-  GMainLoop* loop_;
   ErrorCode code_;
   bool code_set_;
 };
 
 TEST_F(PostinstallRunnerActionTest, RunAsRootSimpleTest) {
-  ASSERT_EQ(0, getuid());
   DoTest(true, 0, false);
 }
 
 TEST_F(PostinstallRunnerActionTest, RunAsRootPowerwashRequiredTest) {
-  ASSERT_EQ(0, getuid());
   DoTest(true, 0, true);
 }
 
 TEST_F(PostinstallRunnerActionTest, RunAsRootCantMountTest) {
-  ASSERT_EQ(0, getuid());
   DoTest(false, 0, true);
 }
 
 TEST_F(PostinstallRunnerActionTest, RunAsRootErrScriptTest) {
-  ASSERT_EQ(0, getuid());
   DoTest(true, 1, false);
 }
 
 TEST_F(PostinstallRunnerActionTest, RunAsRootFirmwareBErrScriptTest) {
-  ASSERT_EQ(0, getuid());
   DoTest(true, 3, false);
 }
 
 TEST_F(PostinstallRunnerActionTest, RunAsRootFirmwareROErrScriptTest) {
-  ASSERT_EQ(0, getuid());
   DoTest(true, 4, false);
 }
 
@@ -189,11 +186,9 @@
   processor.EnqueueAction(&collector_action);
   processor.set_delegate(&delegate);
 
-  GMainLoop* loop = g_main_loop_new(g_main_context_default(), FALSE);
-  delegate.loop_ = loop;
-  g_timeout_add(0, &StartProcessorInRunLoop, &processor);
-  g_main_loop_run(loop);
-  g_main_loop_unref(loop);
+  loop_.PostTask(FROM_HERE,
+                 base::Bind([&processor] { processor.StartProcessing(); }));
+  loop_.Run();
   ASSERT_FALSE(processor.IsRunning());
 
   EXPECT_TRUE(delegate.code_set_);
diff --git a/subprocess.cc b/subprocess.cc
index 9b90920..6a34641 100644
--- a/subprocess.cc
+++ b/subprocess.cc
@@ -4,6 +4,7 @@
 
 #include "update_engine/subprocess.h"
 
+#include <fcntl.h>
 #include <stdlib.h>
 #include <string.h>
 #include <unistd.h>
@@ -12,12 +13,15 @@
 #include <string>
 #include <vector>
 
+#include <base/bind.h>
 #include <base/logging.h>
+#include <base/posix/eintr_wrapper.h>
 #include <base/strings/string_util.h>
 #include <base/strings/stringprintf.h>
 
 #include "update_engine/glib_utils.h"
 
+using chromeos::MessageLoop;
 using std::shared_ptr;
 using std::string;
 using std::unique_ptr;
@@ -28,13 +32,14 @@
 void Subprocess::GChildExitedCallback(GPid pid, gint status, gpointer data) {
   SubprocessRecord* record = reinterpret_cast<SubprocessRecord*>(data);
 
-  // Make sure we read any remaining process output. Then close the pipe.
-  GStdoutWatchCallback(record->gioout, G_IO_IN, &record->stdout);
-  int fd = g_io_channel_unix_get_fd(record->gioout);
-  g_source_remove(record->gioout_tag);
-  g_io_channel_unref(record->gioout);
-  close(fd);
+  // Make sure we read any remaining process output and then close the pipe.
+  OnStdoutReady(record);
 
+  MessageLoop::current()->CancelTask(record->task_id);
+  record->task_id = MessageLoop::kTaskIdNull;
+  if (IGNORE_EINTR(close(record->stdout_fd)) != 0) {
+    PLOG(ERROR) << "Error closing fd " << record->stdout_fd;
+  }
   g_spawn_close_pid(pid);
   gint use_status = status;
   if (WIFEXITED(status))
@@ -56,21 +61,21 @@
   dup2(1, 2);
 }
 
-gboolean Subprocess::GStdoutWatchCallback(GIOChannel* source,
-                                          GIOCondition condition,
-                                          gpointer data) {
-  string* stdout = reinterpret_cast<string*>(data);
+void Subprocess::OnStdoutReady(SubprocessRecord* record) {
   char buf[1024];
-  gsize bytes_read;
-  while (g_io_channel_read_chars(source,
-                                 buf,
-                                 arraysize(buf),
-                                 &bytes_read,
-                                 nullptr) == G_IO_STATUS_NORMAL &&
-         bytes_read > 0) {
-    stdout->append(buf, bytes_read);
-  }
-  return TRUE;  // Keep the callback source. It's freed in GChilExitedCallback.
+  ssize_t rc = 0;
+  do {
+    rc = HANDLE_EINTR(read(record->stdout_fd, buf, arraysize(buf)));
+    if (rc < 0) {
+      // EAGAIN and EWOULDBLOCK are normal return values when there's no more
+      // input as we are in non-blocking mode.
+      if (errno != EWOULDBLOCK && errno != EAGAIN) {
+        PLOG(ERROR) << "Error reading fd " << record->stdout_fd;
+      }
+    } else {
+      record->stdout.append(buf, rc);
+    }
+  } while (rc > 0);
 }
 
 namespace {
@@ -127,16 +132,16 @@
 uint32_t Subprocess::Exec(const vector<string>& cmd,
                           ExecCallback callback,
                           void* p) {
-  GPid child_pid;
-  unique_ptr<char*[]> argv(new char*[cmd.size() + 1]);
-  for (unsigned int i = 0; i < cmd.size(); i++) {
-    argv[i] = strdup(cmd[i].c_str());
-    if (!argv[i]) {
-      FreeArgvInError(argv.get());  // null in argv[i] terminates argv.
-      return 0;
-    }
-  }
-  argv[cmd.size()] = nullptr;
+  return ExecFlags(cmd, static_cast<GSpawnFlags>(0), true, callback, p);
+}
+
+uint32_t Subprocess::ExecFlags(const vector<string>& cmd,
+                               GSpawnFlags flags,
+                               bool redirect_stderr_to_stdout,
+                               ExecCallback callback,
+                               void* p) {
+  unique_ptr<gchar*, utils::GLibStrvFreeDeleter> argv(
+       utils::StringVectorToGStrv(cmd));
 
   char** argp = ArgPointer();
   if (!argp) {
@@ -154,39 +159,47 @@
       nullptr,  // working directory
       argv.get(),
       argp,
-      G_SPAWN_DO_NOT_REAP_CHILD,  // flags
-      GRedirectStderrToStdout,  // child setup function
+      static_cast<GSpawnFlags>(flags | G_SPAWN_DO_NOT_REAP_CHILD),  // flags
+      // child setup function:
+      redirect_stderr_to_stdout ? GRedirectStderrToStdout : nullptr,
       nullptr,  // child setup data pointer
-      &child_pid,
+      &record->pid,
       nullptr,
       &stdout_fd,
       nullptr,
       &error);
-  FreeArgv(argv.get());
   if (!success) {
     LOG(ERROR) << "g_spawn_async failed: " << utils::GetAndFreeGError(&error);
     return 0;
   }
   record->tag =
-      g_child_watch_add(child_pid, GChildExitedCallback, record.get());
+      g_child_watch_add(record->pid, GChildExitedCallback, record.get());
+  record->stdout_fd = stdout_fd;
   subprocess_records_[record->tag] = record;
 
-  // Capture the subprocess output.
-  record->gioout = g_io_channel_unix_new(stdout_fd);
-  g_io_channel_set_encoding(record->gioout, nullptr, nullptr);
-  LOG_IF(WARNING,
-         g_io_channel_set_flags(record->gioout, G_IO_FLAG_NONBLOCK, nullptr) !=
-         G_IO_STATUS_NORMAL) << "Unable to set non-blocking I/O mode.";
-  record->gioout_tag = g_io_add_watch(
-      record->gioout,
-      static_cast<GIOCondition>(G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP),
-      GStdoutWatchCallback,
-      &record->stdout);
+  // Capture the subprocess output. Make our end of the pipe non-blocking.
+  int fd_flags = fcntl(stdout_fd, F_GETFL, 0) | O_NONBLOCK;
+  if (HANDLE_EINTR(fcntl(record->stdout_fd, F_SETFL, fd_flags)) < 0) {
+    LOG(ERROR) << "Unable to set non-blocking I/O mode on fd "
+               << record->stdout_fd << ".";
+  }
+
+  record->task_id = MessageLoop::current()->WatchFileDescriptor(
+      FROM_HERE,
+      record->stdout_fd,
+      MessageLoop::WatchMode::kWatchRead,
+      true,
+      base::Bind(&Subprocess::OnStdoutReady, record.get()));
+
   return record->tag;
 }
 
-void Subprocess::CancelExec(uint32_t tag) {
-  subprocess_records_[tag]->callback = nullptr;
+void Subprocess::KillExec(uint32_t tag) {
+  const auto& record = subprocess_records_.find(tag);
+  if (record == subprocess_records_.end())
+    return;
+  record->second->callback = nullptr;
+  kill(record->second->pid, SIGTERM);
 }
 
 bool Subprocess::SynchronousExecFlags(const vector<string>& cmd,
@@ -250,10 +263,8 @@
 }
 
 bool Subprocess::SubprocessInFlight() {
-  for (std::map<int, shared_ptr<SubprocessRecord>>::iterator it =
-           subprocess_records_.begin();
-       it != subprocess_records_.end(); ++it) {
-    if (it->second->callback)
+  for (const auto& tag_record_pair : subprocess_records_) {
+    if (tag_record_pair.second->callback)
       return true;
   }
   return false;
diff --git a/subprocess.h b/subprocess.h
index 02746fe..13f7eb7 100644
--- a/subprocess.h
+++ b/subprocess.h
@@ -14,12 +14,14 @@
 
 #include <base/logging.h>
 #include <base/macros.h>
+#include <chromeos/message_loops/message_loop.h>
 #include <gtest/gtest_prod.h>  // for FRIEND_TEST
 
 // The Subprocess class is a singleton. It's used to spawn off a subprocess
 // and get notified when the subprocess exits. The result of Exec() can
-// be saved and used to cancel the callback request. If you know you won't
-// call CancelExec(), you may safely lose the return value from Exec().
+// be saved and used to cancel the callback request and kill your process. If
+// you know you won't call KillExec(), you may safely lose the return value
+// from Exec().
 
 namespace chromeos_update_engine {
 
@@ -38,9 +40,14 @@
   uint32_t Exec(const std::vector<std::string>& cmd,
                 ExecCallback callback,
                 void* p);
+  uint32_t ExecFlags(const std::vector<std::string>& cmd,
+                     GSpawnFlags flags,
+                     bool redirect_stderr_to_stdout,
+                     ExecCallback callback,
+                     void* p);
 
-  // Used to cancel the callback. The process will still run to completion.
-  void CancelExec(uint32_t tag);
+  // Kills the running process with SIGTERM and ignores the callback.
+  void KillExec(uint32_t tag);
 
   // Executes a command synchronously. Returns true on success. If |stdout| is
   // non-null, the process output is stored in it, otherwise the output is
@@ -53,7 +60,7 @@
                               int* return_code,
                               std::string* stdout);
 
-  // Gets the one instance
+  // Gets the one instance.
   static Subprocess& Get() {
     return *subprocess_singleton_;
   }
@@ -65,17 +72,17 @@
   FRIEND_TEST(SubprocessTest, CancelTest);
 
   struct SubprocessRecord {
-    SubprocessRecord()
-        : tag(0),
-          callback(nullptr),
-          callback_data(nullptr),
-          gioout(nullptr),
-          gioout_tag(0) {}
-    uint32_t tag;
-    ExecCallback callback;
-    void* callback_data;
-    GIOChannel* gioout;
-    guint gioout_tag;
+    SubprocessRecord() = default;
+
+    uint32_t tag{0};
+    chromeos::MessageLoop::TaskId task_id{chromeos::MessageLoop::kTaskIdNull};
+
+    ExecCallback callback{nullptr};
+    void* callback_data{nullptr};
+
+    GPid pid;
+
+    int stdout_fd{-1};
     std::string stdout;
   };
 
@@ -91,16 +98,14 @@
 
   // Callback which runs whenever there is input available on the subprocess
   // stdout pipe.
-  static gboolean GStdoutWatchCallback(GIOChannel* source,
-                                       GIOCondition condition,
-                                       gpointer data);
+  static void OnStdoutReady(SubprocessRecord* record);
 
   // The global instance.
   static Subprocess* subprocess_singleton_;
 
   // A map from the asynchronous subprocess tag (see Exec) to the subprocess
   // record structure for all active asynchronous subprocesses.
-  std::map<int, std::shared_ptr<SubprocessRecord>> subprocess_records_;
+  std::map<uint32_t, std::shared_ptr<SubprocessRecord>> subprocess_records_;
 
   DISALLOW_COPY_AND_ASSIGN(Subprocess);
 };
diff --git a/subprocess_unittest.cc b/subprocess_unittest.cc
index 37dea05..dba9e6c 100644
--- a/subprocess_unittest.cc
+++ b/subprocess_unittest.cc
@@ -68,29 +68,40 @@
   MessageLoop::current()->BreakLoop();
 }
 
+void CallbackStdoutOnlyEcho(int return_code,
+                            const string& output,
+                            void* /* unused */) {
+  EXPECT_EQ(0, return_code);
+  EXPECT_NE(string::npos, output.find("on stdout"));
+  EXPECT_EQ(string::npos, output.find("on stderr"));
+  MessageLoop::current()->BreakLoop();
+}
+
 }  // namespace
 
 TEST_F(SubprocessTest, SimpleTest) {
-  loop_.PostTask(
-      FROM_HERE,
-      base::Bind([] {
-        Subprocess::Get().Exec(vector<string>{"/bin/false"}, Callback, nullptr);
-      }));
+  Subprocess::Get().Exec(vector<string>{"/bin/false"}, Callback, nullptr);
   loop_.Run();
 }
 
 TEST_F(SubprocessTest, EchoTest) {
-  loop_.PostTask(
-      FROM_HERE,
-      base::Bind([] {
-        Subprocess::Get().Exec(
-            vector<string>{
-                "/bin/sh",
-                "-c",
-                "echo this is stdout; echo this is stderr > /dev/stderr"},
-            CallbackEcho,
-            nullptr);
-      }));
+  Subprocess::Get().Exec(
+      vector<string>{
+          "/bin/sh",
+          "-c",
+          "echo this is stdout; echo this is stderr > /dev/stderr"},
+      CallbackEcho,
+      nullptr);
+  loop_.Run();
+}
+
+TEST_F(SubprocessTest, StderrNotIncludedInOutputTest) {
+  Subprocess::Get().ExecFlags(
+      vector<string>{"/bin/sh", "-c", "echo on stdout; echo on stderr >&2"},
+      static_cast<GSpawnFlags>(0),
+      false,  // don't redirect stderr
+      CallbackStdoutOnlyEcho,
+      nullptr);
   loop_.Run();
 }
 
@@ -107,10 +118,7 @@
 }
 
 TEST_F(SubprocessTest, SynchronousEchoNoOutputTest) {
-  vector<string> cmd = {
-      "/bin/sh",
-      "-c",
-      "echo test"};
+  vector<string> cmd = {"/bin/sh", "-c", "echo test"};
   int rc = -1;
   ASSERT_TRUE(Subprocess::SynchronousExec(cmd, &rc, nullptr));
   EXPECT_EQ(0, rc);
@@ -176,7 +184,7 @@
   remove(temp_file_name);
   CHECK_GT(local_server_port, 0);
   LOG(INFO) << "server listening on port " << local_server_port;
-  Subprocess::Get().CancelExec(tag);
+  Subprocess::Get().KillExec(tag);
 }
 
 void ExitWhenDone(bool* spawned) {