update_engine: Watch file descriptors using chromeos::MessageLoop.
This patch removes all the calls to g_io_*() functions used to create
io_channels from a file descriptor and watch for them in the main loop.
Instead, we use the chromeos::MessageLoop backed with the glib
implementation.
This patch also removes the duplicated process handling work done in
P2PManager and uses the common Subprocess class instead.
BUG=chromium:499886
TEST=Added and updated unittests.
Change-Id: Ia093b060d2396325fce69b2bbdb62957ba7bfbc6
Reviewed-on: https://chromium-review.googlesource.com/284593
Tested-by: Alex Deymo <[email protected]>
Reviewed-by: Alex Vakulenko <[email protected]>
Commit-Queue: Alex Deymo <[email protected]>
Trybot-Ready: Alex Deymo <[email protected]>
diff --git a/libcurl_http_fetcher.cc b/libcurl_http_fetcher.cc
index da69281..9d32293 100644
--- a/libcurl_http_fetcher.cc
+++ b/libcurl_http_fetcher.cc
@@ -18,7 +18,6 @@
using base::TimeDelta;
using chromeos::MessageLoop;
-using std::make_pair;
using std::max;
using std::string;
@@ -363,7 +362,7 @@
}
} else {
// set up callback
- SetupMainloopSources();
+ SetupMessageLoopSources();
}
}
@@ -410,8 +409,8 @@
CHECK_EQ(curl_easy_pause(curl_handle_, CURLPAUSE_CONT), CURLE_OK);
}
-// This method sets up callbacks with the glib main loop.
-void LibcurlHttpFetcher::SetupMainloopSources() {
+// This method sets up callbacks with the MessageLoop.
+void LibcurlHttpFetcher::SetupMessageLoopSources() {
fd_set fd_read;
fd_set fd_write;
fd_set fd_exc;
@@ -429,14 +428,14 @@
// We should iterate through all file descriptors up to libcurl's fd_max or
// the highest one we're tracking, whichever is larger.
- for (size_t t = 0; t < arraysize(io_channels_); ++t) {
- if (!io_channels_[t].empty())
- fd_max = max(fd_max, io_channels_[t].rbegin()->first);
+ for (size_t t = 0; t < arraysize(fd_task_maps_); ++t) {
+ if (!fd_task_maps_[t].empty())
+ fd_max = max(fd_max, fd_task_maps_[t].rbegin()->first);
}
// For each fd, if we're not tracking it, track it. If we are tracking it, but
// libcurl doesn't care about it anymore, stop tracking it. After this loop,
- // there should be exactly as many GIOChannel objects in io_channels_[0|1] as
+ // there should be exactly as many tasks scheduled in fd_task_maps_[0|1] as
// there are read/write fds that we're tracking.
for (int fd = 0; fd <= fd_max; ++fd) {
// Note that fd_exc is unused in the current version of libcurl so is_exc
@@ -446,16 +445,20 @@
is_exc || (FD_ISSET(fd, &fd_read) != 0), // track 0 -- read
is_exc || (FD_ISSET(fd, &fd_write) != 0) // track 1 -- write
};
+ MessageLoop::WatchMode watch_modes[2] = {
+ MessageLoop::WatchMode::kWatchRead,
+ MessageLoop::WatchMode::kWatchWrite,
+ };
- for (size_t t = 0; t < arraysize(io_channels_); ++t) {
- bool tracked = io_channels_[t].find(fd) != io_channels_[t].end();
+ for (size_t t = 0; t < arraysize(fd_task_maps_); ++t) {
+ auto fd_task_it = fd_task_maps_[t].find(fd);
+ bool tracked = fd_task_it != fd_task_maps_[t].end();
if (!must_track[t]) {
// If we have an outstanding io_channel, remove it.
if (tracked) {
- g_source_remove(io_channels_[t][fd].second);
- g_io_channel_unref(io_channels_[t][fd].first);
- io_channels_[t].erase(io_channels_[t].find(fd));
+ MessageLoop::current()->CancelTask(fd_task_it->second);
+ fd_task_maps_[t].erase(fd_task_it);
}
continue;
}
@@ -464,16 +467,15 @@
if (tracked)
continue;
- // Set conditions appropriately -- read for track 0, write for track 1.
- GIOCondition condition = static_cast<GIOCondition>(
- ((t == 0) ? (G_IO_IN | G_IO_PRI) : G_IO_OUT) | G_IO_ERR | G_IO_HUP);
-
// Track a new fd.
- GIOChannel* io_channel = g_io_channel_unix_new(fd);
- guint tag =
- g_io_add_watch(io_channel, condition, &StaticFDCallback, this);
+ fd_task_maps_[t][fd] = MessageLoop::current()->WatchFileDescriptor(
+ FROM_HERE,
+ fd,
+ watch_modes[t],
+ true, // persistent
+ base::Bind(&LibcurlHttpFetcher::CurlPerformOnce,
+ base::Unretained(this)));
- io_channels_[t][fd] = make_pair(io_channel, tag);
static int io_counter = 0;
io_counter++;
if (io_counter % 50 == 0) {
@@ -493,16 +495,6 @@
}
}
-bool LibcurlHttpFetcher::FDCallback(GIOChannel *source,
- GIOCondition condition) {
- CurlPerformOnce();
- // We handle removing of this source elsewhere, so we always return true.
- // The docs say, "the function should return FALSE if the event source
- // should be removed."
- // http://www.gtk.org/api/2.6/glib/glib-IO-Channels.html#GIOFunc
- return true;
-}
-
void LibcurlHttpFetcher::RetryTimeoutCallback() {
ResumeTransfer(url_);
CurlPerformOnce();
@@ -525,13 +517,16 @@
MessageLoop::current()->CancelTask(timeout_id_);
timeout_id_ = MessageLoop::kTaskIdNull;
- for (size_t t = 0; t < arraysize(io_channels_); ++t) {
- for (IOChannels::iterator it = io_channels_[t].begin();
- it != io_channels_[t].end(); ++it) {
- g_source_remove(it->second.second);
- g_io_channel_unref(it->second.first);
+ for (size_t t = 0; t < arraysize(fd_task_maps_); ++t) {
+ for (const auto& fd_taks_pair : fd_task_maps_[t]) {
+ if (!MessageLoop::current()->CancelTask(fd_taks_pair.second)) {
+ LOG(WARNING) << "Error canceling the watch task "
+ << fd_taks_pair.second << " for "
+ << (t ? "writing" : "reading") << " the fd "
+ << fd_taks_pair.first;
+ }
}
- io_channels_[t].clear();
+ fd_task_maps_[t].clear();
}
if (curl_http_headers_) {
diff --git a/libcurl_http_fetcher.h b/libcurl_http_fetcher.h
index d52b783..4f68a9a 100644
--- a/libcurl_http_fetcher.h
+++ b/libcurl_http_fetcher.h
@@ -10,7 +10,6 @@
#include <utility>
#include <curl/curl.h>
-#include <glib.h>
#include <base/logging.h>
#include <base/macros.h>
@@ -125,16 +124,6 @@
// left off.
virtual void ResumeTransfer(const std::string& url);
- // These two methods are for glib main loop callbacks. They are called
- // when either a file descriptor is ready for work or when a timer
- // has fired. The static versions are shims for libcurl which has a C API.
- bool FDCallback(GIOChannel *source, GIOCondition condition);
- static gboolean StaticFDCallback(GIOChannel *source,
- GIOCondition condition,
- gpointer data) {
- return reinterpret_cast<LibcurlHttpFetcher*>(data)->FDCallback(source,
- condition);
- }
void TimeoutCallback();
void RetryTimeoutCallback();
@@ -146,10 +135,10 @@
// Returns true if we should resume immediately after this call.
void CurlPerformOnce();
- // Sets up glib main loop sources as needed by libcurl. This is generally
+ // Sets up message loop sources as needed by libcurl. This is generally
// the file descriptor of the socket and a timer in case nothing happens
// on the fds.
- void SetupMainloopSources();
+ void SetupMessageLoopSources();
// Callback called by libcurl when new data has arrived on the transfer
size_t LibcurlWrite(void *ptr, size_t size, size_t nmemb);
@@ -160,7 +149,7 @@
}
// Cleans up the following if they are non-null:
- // curl(m) handles, io_channels_, timeout_source_.
+ // curl(m) handles, fd_task_maps_, timeout_id_.
void CleanUp();
// Force terminate the transfer. This will invoke the delegate's (if any)
@@ -185,17 +174,16 @@
struct curl_slist* curl_http_headers_{nullptr};
// Lists of all read(0)/write(1) file descriptors that we're waiting on from
- // the glib main loop. libcurl may open/close descriptors and switch their
+ // the message loop. libcurl may open/close descriptors and switch their
// directions so maintain two separate lists so that watch conditions can be
// set appropriately.
- typedef std::map<int, std::pair<GIOChannel*, guint>> IOChannels;
- IOChannels io_channels_[2];
+ std::map<int, chromeos::MessageLoop::TaskId> fd_task_maps_[2];
// The TaskId of the timer we're waiting on. kTaskIdNull if we are not waiting
// on it.
chromeos::MessageLoop::TaskId timeout_id_{chromeos::MessageLoop::kTaskIdNull};
- bool transfer_in_progress_ = false;
+ bool transfer_in_progress_{false};
// The transfer size. -1 if not known.
off_t transfer_size_{0};
diff --git a/p2p_manager.cc b/p2p_manager.cc
index c1a2738..ee0f583 100644
--- a/p2p_manager.cc
+++ b/p2p_manager.cc
@@ -35,6 +35,7 @@
#include <base/strings/stringprintf.h>
#include "update_engine/glib_utils.h"
+#include "update_engine/subprocess.h"
#include "update_engine/update_manager/policy.h"
#include "update_engine/update_manager/update_manager.h"
#include "update_engine/utils.h"
@@ -45,6 +46,7 @@
using base::StringPrintf;
using base::Time;
using base::TimeDelta;
+using chromeos::MessageLoop;
using chromeos_update_manager::EvalStatus;
using chromeos_update_manager::Policy;
using chromeos_update_manager::UpdateManager;
@@ -105,7 +107,7 @@
UpdateManager* update_manager,
const string& file_extension,
const int num_files_to_keep,
- const base::TimeDelta& max_file_age);
+ const TimeDelta& max_file_age);
// P2PManager methods.
void SetDevicePolicy(const policy::DevicePolicy* device_policy) override;
@@ -147,7 +149,7 @@
// Utility function to delete a file given by |path| and log the
// path as well as |reason|. Returns false on failure.
- bool DeleteP2PFile(const FilePath& path, const std::string& reason);
+ bool DeleteP2PFile(const FilePath& path, const string& reason);
// Schedules an async request for tracking changes in P2P enabled status.
void ScheduleEnabledStatusChange();
@@ -178,7 +180,7 @@
// If non-zero, files older than this will not be kept after
// performing housekeeping.
- const base::TimeDelta max_file_age_;
+ const TimeDelta max_file_age_;
// The string ".p2p".
static const char kP2PExtension[];
@@ -206,7 +208,7 @@
UpdateManager* update_manager,
const string& file_extension,
const int num_files_to_keep,
- const base::TimeDelta& max_file_age)
+ const TimeDelta& max_file_age)
: clock_(clock),
update_manager_(update_manager),
file_extension_(file_extension),
@@ -238,34 +240,14 @@
}
bool P2PManagerImpl::EnsureP2P(bool should_be_running) {
- gchar *standard_error = nullptr;
- GError *error = nullptr;
- gint exit_status = 0;
+ int return_code = 0;
+ string output;
may_be_running_ = true; // Unless successful, we must be conservative.
vector<string> args = configuration_->GetInitctlArgs(should_be_running);
- unique_ptr<gchar*, utils::GLibStrvFreeDeleter> argv(
- utils::StringVectorToGStrv(args));
- if (!g_spawn_sync(nullptr, // working_directory
- argv.get(),
- nullptr, // envp
- static_cast<GSpawnFlags>(G_SPAWN_SEARCH_PATH),
- nullptr, nullptr, // child_setup, user_data
- nullptr, // standard_output
- &standard_error,
- &exit_status,
- &error)) {
- LOG(ERROR) << "Error spawning " << utils::StringVectorToString(args)
- << ": " << utils::GetAndFreeGError(&error);
- return false;
- }
- unique_ptr<gchar, utils::GLibFreeDeleter> standard_error_deleter(
- standard_error);
-
- if (!WIFEXITED(exit_status)) {
- LOG(ERROR) << "Error spawning '" << utils::StringVectorToString(args)
- << "': WIFEXITED is false";
+ if (!Subprocess::SynchronousExec(args, &return_code, &output)) {
+ LOG(ERROR) << "Error spawning " << utils::StringVectorToString(args);
return false;
}
@@ -274,11 +256,11 @@
// necessity because initctl does not offer actions such as "start if not
// running" or "stop if running".
// TODO(zeuthen,chromium:277051): Avoid doing this.
- if (WEXITSTATUS(exit_status) != 0) {
- const gchar *expected_error_message = should_be_running ?
+ if (return_code != 0) {
+ const char *expected_error_message = should_be_running ?
"initctl: Job is already running: p2p\n" :
"initctl: Unknown instance \n";
- if (g_strcmp0(standard_error, expected_error_message) != 0)
+ if (output != expected_error_message)
return false;
}
@@ -322,7 +304,7 @@
}
bool P2PManagerImpl::DeleteP2PFile(const FilePath& path,
- const std::string& reason) {
+ const string& reason) {
LOG(INFO) << "Deleting p2p file " << path.value()
<< " (reason: " << reason << ")";
if (unlink(path.value().c_str()) != 0) {
@@ -367,7 +349,7 @@
// If instructed to keep only files younger than a given age
// (|max_file_age_| != 0), delete files satisfying this criteria
// right now. Otherwise add it to a list we'll consider for later.
- if (clock_ != nullptr && max_file_age_ != base::TimeDelta() &&
+ if (clock_ != nullptr && max_file_age_ != TimeDelta() &&
clock_->GetWallclockTime() - time > max_file_age_) {
if (!DeleteP2PFile(file, "file too old"))
deletion_failed = true;
@@ -396,81 +378,50 @@
class LookupData {
public:
explicit LookupData(P2PManager::LookupCallback callback)
- : callback_(callback),
- pid_(0),
- stdout_fd_(-1),
- stdout_channel_source_id_(0),
- child_watch_source_id_(0),
- timeout_source_id_(0),
- reported_(false) {}
+ : callback_(callback) {}
~LookupData() {
- if (child_watch_source_id_ != 0)
- g_source_remove(child_watch_source_id_);
- if (stdout_channel_source_id_ != 0)
- g_source_remove(stdout_channel_source_id_);
- if (timeout_source_id_ != 0)
- g_source_remove(timeout_source_id_);
- if (stdout_fd_ != -1)
- close(stdout_fd_);
- if (pid_ != 0)
- kill(pid_, SIGTERM);
+ if (timeout_task_ != MessageLoop::kTaskIdNull)
+ MessageLoop::current()->CancelTask(timeout_task_);
+ if (child_tag_)
+ Subprocess::Get().KillExec(child_tag_);
}
- void InitiateLookup(gchar **argv, TimeDelta timeout) {
+ void InitiateLookup(const vector<string>& cmd, TimeDelta timeout) {
// NOTE: if we fail early (i.e. in this method), we need to schedule
// an idle to report the error. This is because we guarantee that
- // the callback is always called from the GLib mainloop (this
+ // the callback is always called from the message loop (this
// guarantee is useful for testing).
- GError *error = nullptr;
- if (!g_spawn_async_with_pipes(nullptr, // working_directory
- argv,
- nullptr, // envp
- static_cast<GSpawnFlags>(G_SPAWN_SEARCH_PATH |
- G_SPAWN_DO_NOT_REAP_CHILD),
- nullptr, // child_setup
- this,
- &pid_,
- nullptr, // standard_input
- &stdout_fd_,
- nullptr, // standard_error
- &error)) {
- LOG(ERROR) << "Error spawning p2p-client: "
- << utils::GetAndFreeGError(&error);
+ // We expect to run just "p2p-client" and find it in the path.
+ child_tag_ = Subprocess::Get().ExecFlags(
+ cmd, G_SPAWN_SEARCH_PATH, false /* redirect stderr */, OnLookupDone,
+ this);
+
+ if (!child_tag_) {
+ LOG(ERROR) << "Error spawning " << utils::StringVectorToString(cmd);
ReportErrorAndDeleteInIdle();
return;
}
- GIOChannel* io_channel = g_io_channel_unix_new(stdout_fd_);
- stdout_channel_source_id_ = g_io_add_watch(
- io_channel,
- static_cast<GIOCondition>(G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP),
- OnIOChannelActivity, this);
- CHECK_NE(stdout_channel_source_id_, 0u);
- g_io_channel_unref(io_channel);
-
- child_watch_source_id_ = g_child_watch_add(pid_, OnChildWatchActivity,
- this);
- CHECK_NE(child_watch_source_id_, 0u);
-
- if (timeout.ToInternalValue() > 0) {
- timeout_source_id_ = g_timeout_add(timeout.InMilliseconds(),
- OnTimeout, this);
- CHECK_NE(timeout_source_id_, 0u);
+ if (timeout > TimeDelta()) {
+ timeout_task_ = MessageLoop::current()->PostDelayedTask(
+ FROM_HERE,
+ Bind(&LookupData::OnTimeout, base::Unretained(this)),
+ timeout);
}
}
private:
void ReportErrorAndDeleteInIdle() {
- g_idle_add(static_cast<GSourceFunc>(OnIdleForReportErrorAndDelete), this);
+ MessageLoop::current()->PostTask(FROM_HERE, Bind(
+ &LookupData::OnIdleForReportErrorAndDelete,
+ base::Unretained(this)));
}
- static gboolean OnIdleForReportErrorAndDelete(gpointer user_data) {
- LookupData *lookup_data = reinterpret_cast<LookupData*>(user_data);
- lookup_data->ReportError();
- delete lookup_data;
- return FALSE; // Remove source.
+ void OnIdleForReportErrorAndDelete() {
+ ReportError();
+ delete this;
}
void IssueCallback(const string& url) {
@@ -485,11 +436,10 @@
reported_ = true;
}
- void ReportSuccess() {
+ void ReportSuccess(const string& output) {
if (reported_)
return;
-
- string url = stdout_;
+ string url = output;
size_t newline_pos = url.find('\n');
if (newline_pos != string::npos)
url.resize(newline_pos);
@@ -503,72 +453,40 @@
LOG(ERROR) << "p2p URL '" << url << "' does not look right. Ignoring.";
ReportError();
}
-
reported_ = true;
}
- static gboolean OnIOChannelActivity(GIOChannel *source,
- GIOCondition condition,
- gpointer user_data) {
+ static void OnLookupDone(int return_code,
+ const string& output,
+ void *user_data) {
LookupData *lookup_data = reinterpret_cast<LookupData*>(user_data);
- gchar* str = nullptr;
- GError* error = nullptr;
- GIOStatus status = g_io_channel_read_line(source,
- &str,
- nullptr, // len
- nullptr, // line_terminator
- &error);
- if (status != G_IO_STATUS_NORMAL) {
- // Ignore EOF since we usually get that before SIGCHLD and we
- // need to examine exit status there.
- if (status != G_IO_STATUS_EOF) {
- LOG(ERROR) << "Error reading a line from p2p-client: "
- << utils::GetAndFreeGError(&error);
- lookup_data->ReportError();
- delete lookup_data;
- }
- } else {
- if (str != nullptr) {
- lookup_data->stdout_ += str;
- g_free(str);
- }
- }
- return TRUE; // Don't remove source.
- }
-
- static void OnChildWatchActivity(GPid pid,
- gint status,
- gpointer user_data) {
- LookupData *lookup_data = reinterpret_cast<LookupData*>(user_data);
-
- if (!WIFEXITED(status)) {
- LOG(ERROR) << "Child didn't exit normally";
- lookup_data->ReportError();
- } else if (WEXITSTATUS(status) != 0) {
+ lookup_data->child_tag_ = 0;
+ if (return_code != 0) {
LOG(INFO) << "Child exited with non-zero exit code "
- << WEXITSTATUS(status);
+ << return_code;
lookup_data->ReportError();
} else {
- lookup_data->ReportSuccess();
+ lookup_data->ReportSuccess(output);
}
delete lookup_data;
}
- static gboolean OnTimeout(gpointer user_data) {
- LookupData *lookup_data = reinterpret_cast<LookupData*>(user_data);
- lookup_data->ReportError();
- delete lookup_data;
- return TRUE; // Don't remove source.
+ void OnTimeout() {
+ timeout_task_ = MessageLoop::kTaskIdNull;
+ ReportError();
+ delete this;
}
P2PManager::LookupCallback callback_;
- GPid pid_;
- gint stdout_fd_;
- guint stdout_channel_source_id_;
- guint child_watch_source_id_;
- guint timeout_source_id_;
- string stdout_;
- bool reported_;
+
+ // The Subprocess tag of the running process. A value of 0 means that the
+ // process is not running.
+ uint32_t child_tag_{0};
+
+ // The timeout task_id we are waiting on, if any.
+ MessageLoop::TaskId timeout_task_{MessageLoop::kTaskIdNull};
+
+ bool reported_{false};
};
void P2PManagerImpl::LookupUrlForFile(const string& file_id,
@@ -579,9 +497,7 @@
string file_id_with_ext = file_id + "." + file_extension_;
vector<string> args = configuration_->GetP2PClientArgs(file_id_with_ext,
minimum_size);
- gchar **argv = utils::StringVectorToGStrv(args);
- lookup_data->InitiateLookup(argv, max_time_to_wait);
- g_strfreev(argv);
+ lookup_data->InitiateLookup(args, max_time_to_wait);
}
bool P2PManagerImpl::FileShare(const string& file_id,
@@ -823,7 +739,7 @@
UpdateManager* update_manager,
const string& file_extension,
const int num_files_to_keep,
- const base::TimeDelta& max_file_age) {
+ const TimeDelta& max_file_age) {
return new P2PManagerImpl(configuration,
clock,
update_manager,
diff --git a/postinstall_runner_action_unittest.cc b/postinstall_runner_action_unittest.cc
index 0460c6c..4549272 100644
--- a/postinstall_runner_action_unittest.cc
+++ b/postinstall_runner_action_unittest.cc
@@ -15,12 +15,16 @@
#include <base/files/file_util.h>
#include <base/strings/string_util.h>
#include <base/strings/stringprintf.h>
+#include <chromeos/bind_lambda.h>
+#include <chromeos/message_loops/glib_message_loop.h>
+#include <chromeos/message_loops/message_loop_utils.h>
#include <gtest/gtest.h>
#include "update_engine/constants.h"
#include "update_engine/test_utils.h"
#include "update_engine/utils.h"
+using chromeos::MessageLoop;
using chromeos_update_engine::test_utils::System;
using chromeos_update_engine::test_utils::WriteFileString;
using std::string;
@@ -29,34 +33,34 @@
namespace chromeos_update_engine {
-namespace {
-gboolean StartProcessorInRunLoop(gpointer data) {
- ActionProcessor *processor = reinterpret_cast<ActionProcessor*>(data);
- processor->StartProcessing();
- return FALSE;
-}
-} // namespace
-
class PostinstallRunnerActionTest : public ::testing::Test {
- public:
+ protected:
+ void SetUp() override {
+ loop_.SetAsCurrent();
+ }
+
+ void TearDown() override {
+ EXPECT_EQ(0, chromeos::MessageLoopRunMaxIterations(&loop_, 1));
+ }
+
// DoTest with various combinations of do_losetup, err_code and
// powerwash_required.
void DoTest(bool do_losetup, int err_code, bool powerwash_required);
private:
static const char* kImageMountPointTemplate;
+
+ chromeos::GlibMessageLoop loop_;
};
class PostinstActionProcessorDelegate : public ActionProcessorDelegate {
public:
PostinstActionProcessorDelegate()
- : loop_(nullptr),
- code_(ErrorCode::kError),
+ : code_(ErrorCode::kError),
code_set_(false) {}
void ProcessingDone(const ActionProcessor* processor,
ErrorCode code) {
- ASSERT_TRUE(loop_);
- g_main_loop_quit(loop_);
+ MessageLoop::current()->BreakLoop();
}
void ActionCompleted(ActionProcessor* processor,
AbstractAction* action,
@@ -66,38 +70,31 @@
code_set_ = true;
}
}
- GMainLoop* loop_;
ErrorCode code_;
bool code_set_;
};
TEST_F(PostinstallRunnerActionTest, RunAsRootSimpleTest) {
- ASSERT_EQ(0, getuid());
DoTest(true, 0, false);
}
TEST_F(PostinstallRunnerActionTest, RunAsRootPowerwashRequiredTest) {
- ASSERT_EQ(0, getuid());
DoTest(true, 0, true);
}
TEST_F(PostinstallRunnerActionTest, RunAsRootCantMountTest) {
- ASSERT_EQ(0, getuid());
DoTest(false, 0, true);
}
TEST_F(PostinstallRunnerActionTest, RunAsRootErrScriptTest) {
- ASSERT_EQ(0, getuid());
DoTest(true, 1, false);
}
TEST_F(PostinstallRunnerActionTest, RunAsRootFirmwareBErrScriptTest) {
- ASSERT_EQ(0, getuid());
DoTest(true, 3, false);
}
TEST_F(PostinstallRunnerActionTest, RunAsRootFirmwareROErrScriptTest) {
- ASSERT_EQ(0, getuid());
DoTest(true, 4, false);
}
@@ -189,11 +186,9 @@
processor.EnqueueAction(&collector_action);
processor.set_delegate(&delegate);
- GMainLoop* loop = g_main_loop_new(g_main_context_default(), FALSE);
- delegate.loop_ = loop;
- g_timeout_add(0, &StartProcessorInRunLoop, &processor);
- g_main_loop_run(loop);
- g_main_loop_unref(loop);
+ loop_.PostTask(FROM_HERE,
+ base::Bind([&processor] { processor.StartProcessing(); }));
+ loop_.Run();
ASSERT_FALSE(processor.IsRunning());
EXPECT_TRUE(delegate.code_set_);
diff --git a/subprocess.cc b/subprocess.cc
index 9b90920..6a34641 100644
--- a/subprocess.cc
+++ b/subprocess.cc
@@ -4,6 +4,7 @@
#include "update_engine/subprocess.h"
+#include <fcntl.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
@@ -12,12 +13,15 @@
#include <string>
#include <vector>
+#include <base/bind.h>
#include <base/logging.h>
+#include <base/posix/eintr_wrapper.h>
#include <base/strings/string_util.h>
#include <base/strings/stringprintf.h>
#include "update_engine/glib_utils.h"
+using chromeos::MessageLoop;
using std::shared_ptr;
using std::string;
using std::unique_ptr;
@@ -28,13 +32,14 @@
void Subprocess::GChildExitedCallback(GPid pid, gint status, gpointer data) {
SubprocessRecord* record = reinterpret_cast<SubprocessRecord*>(data);
- // Make sure we read any remaining process output. Then close the pipe.
- GStdoutWatchCallback(record->gioout, G_IO_IN, &record->stdout);
- int fd = g_io_channel_unix_get_fd(record->gioout);
- g_source_remove(record->gioout_tag);
- g_io_channel_unref(record->gioout);
- close(fd);
+ // Make sure we read any remaining process output and then close the pipe.
+ OnStdoutReady(record);
+ MessageLoop::current()->CancelTask(record->task_id);
+ record->task_id = MessageLoop::kTaskIdNull;
+ if (IGNORE_EINTR(close(record->stdout_fd)) != 0) {
+ PLOG(ERROR) << "Error closing fd " << record->stdout_fd;
+ }
g_spawn_close_pid(pid);
gint use_status = status;
if (WIFEXITED(status))
@@ -56,21 +61,21 @@
dup2(1, 2);
}
-gboolean Subprocess::GStdoutWatchCallback(GIOChannel* source,
- GIOCondition condition,
- gpointer data) {
- string* stdout = reinterpret_cast<string*>(data);
+void Subprocess::OnStdoutReady(SubprocessRecord* record) {
char buf[1024];
- gsize bytes_read;
- while (g_io_channel_read_chars(source,
- buf,
- arraysize(buf),
- &bytes_read,
- nullptr) == G_IO_STATUS_NORMAL &&
- bytes_read > 0) {
- stdout->append(buf, bytes_read);
- }
- return TRUE; // Keep the callback source. It's freed in GChilExitedCallback.
+ ssize_t rc = 0;
+ do {
+ rc = HANDLE_EINTR(read(record->stdout_fd, buf, arraysize(buf)));
+ if (rc < 0) {
+ // EAGAIN and EWOULDBLOCK are normal return values when there's no more
+ // input as we are in non-blocking mode.
+ if (errno != EWOULDBLOCK && errno != EAGAIN) {
+ PLOG(ERROR) << "Error reading fd " << record->stdout_fd;
+ }
+ } else {
+ record->stdout.append(buf, rc);
+ }
+ } while (rc > 0);
}
namespace {
@@ -127,16 +132,16 @@
uint32_t Subprocess::Exec(const vector<string>& cmd,
ExecCallback callback,
void* p) {
- GPid child_pid;
- unique_ptr<char*[]> argv(new char*[cmd.size() + 1]);
- for (unsigned int i = 0; i < cmd.size(); i++) {
- argv[i] = strdup(cmd[i].c_str());
- if (!argv[i]) {
- FreeArgvInError(argv.get()); // null in argv[i] terminates argv.
- return 0;
- }
- }
- argv[cmd.size()] = nullptr;
+ return ExecFlags(cmd, static_cast<GSpawnFlags>(0), true, callback, p);
+}
+
+uint32_t Subprocess::ExecFlags(const vector<string>& cmd,
+ GSpawnFlags flags,
+ bool redirect_stderr_to_stdout,
+ ExecCallback callback,
+ void* p) {
+ unique_ptr<gchar*, utils::GLibStrvFreeDeleter> argv(
+ utils::StringVectorToGStrv(cmd));
char** argp = ArgPointer();
if (!argp) {
@@ -154,39 +159,47 @@
nullptr, // working directory
argv.get(),
argp,
- G_SPAWN_DO_NOT_REAP_CHILD, // flags
- GRedirectStderrToStdout, // child setup function
+ static_cast<GSpawnFlags>(flags | G_SPAWN_DO_NOT_REAP_CHILD), // flags
+ // child setup function:
+ redirect_stderr_to_stdout ? GRedirectStderrToStdout : nullptr,
nullptr, // child setup data pointer
- &child_pid,
+ &record->pid,
nullptr,
&stdout_fd,
nullptr,
&error);
- FreeArgv(argv.get());
if (!success) {
LOG(ERROR) << "g_spawn_async failed: " << utils::GetAndFreeGError(&error);
return 0;
}
record->tag =
- g_child_watch_add(child_pid, GChildExitedCallback, record.get());
+ g_child_watch_add(record->pid, GChildExitedCallback, record.get());
+ record->stdout_fd = stdout_fd;
subprocess_records_[record->tag] = record;
- // Capture the subprocess output.
- record->gioout = g_io_channel_unix_new(stdout_fd);
- g_io_channel_set_encoding(record->gioout, nullptr, nullptr);
- LOG_IF(WARNING,
- g_io_channel_set_flags(record->gioout, G_IO_FLAG_NONBLOCK, nullptr) !=
- G_IO_STATUS_NORMAL) << "Unable to set non-blocking I/O mode.";
- record->gioout_tag = g_io_add_watch(
- record->gioout,
- static_cast<GIOCondition>(G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP),
- GStdoutWatchCallback,
- &record->stdout);
+ // Capture the subprocess output. Make our end of the pipe non-blocking.
+ int fd_flags = fcntl(stdout_fd, F_GETFL, 0) | O_NONBLOCK;
+ if (HANDLE_EINTR(fcntl(record->stdout_fd, F_SETFL, fd_flags)) < 0) {
+ LOG(ERROR) << "Unable to set non-blocking I/O mode on fd "
+ << record->stdout_fd << ".";
+ }
+
+ record->task_id = MessageLoop::current()->WatchFileDescriptor(
+ FROM_HERE,
+ record->stdout_fd,
+ MessageLoop::WatchMode::kWatchRead,
+ true,
+ base::Bind(&Subprocess::OnStdoutReady, record.get()));
+
return record->tag;
}
-void Subprocess::CancelExec(uint32_t tag) {
- subprocess_records_[tag]->callback = nullptr;
+void Subprocess::KillExec(uint32_t tag) {
+ const auto& record = subprocess_records_.find(tag);
+ if (record == subprocess_records_.end())
+ return;
+ record->second->callback = nullptr;
+ kill(record->second->pid, SIGTERM);
}
bool Subprocess::SynchronousExecFlags(const vector<string>& cmd,
@@ -250,10 +263,8 @@
}
bool Subprocess::SubprocessInFlight() {
- for (std::map<int, shared_ptr<SubprocessRecord>>::iterator it =
- subprocess_records_.begin();
- it != subprocess_records_.end(); ++it) {
- if (it->second->callback)
+ for (const auto& tag_record_pair : subprocess_records_) {
+ if (tag_record_pair.second->callback)
return true;
}
return false;
diff --git a/subprocess.h b/subprocess.h
index 02746fe..13f7eb7 100644
--- a/subprocess.h
+++ b/subprocess.h
@@ -14,12 +14,14 @@
#include <base/logging.h>
#include <base/macros.h>
+#include <chromeos/message_loops/message_loop.h>
#include <gtest/gtest_prod.h> // for FRIEND_TEST
// The Subprocess class is a singleton. It's used to spawn off a subprocess
// and get notified when the subprocess exits. The result of Exec() can
-// be saved and used to cancel the callback request. If you know you won't
-// call CancelExec(), you may safely lose the return value from Exec().
+// be saved and used to cancel the callback request and kill your process. If
+// you know you won't call KillExec(), you may safely lose the return value
+// from Exec().
namespace chromeos_update_engine {
@@ -38,9 +40,14 @@
uint32_t Exec(const std::vector<std::string>& cmd,
ExecCallback callback,
void* p);
+ uint32_t ExecFlags(const std::vector<std::string>& cmd,
+ GSpawnFlags flags,
+ bool redirect_stderr_to_stdout,
+ ExecCallback callback,
+ void* p);
- // Used to cancel the callback. The process will still run to completion.
- void CancelExec(uint32_t tag);
+ // Kills the running process with SIGTERM and ignores the callback.
+ void KillExec(uint32_t tag);
// Executes a command synchronously. Returns true on success. If |stdout| is
// non-null, the process output is stored in it, otherwise the output is
@@ -53,7 +60,7 @@
int* return_code,
std::string* stdout);
- // Gets the one instance
+ // Gets the one instance.
static Subprocess& Get() {
return *subprocess_singleton_;
}
@@ -65,17 +72,17 @@
FRIEND_TEST(SubprocessTest, CancelTest);
struct SubprocessRecord {
- SubprocessRecord()
- : tag(0),
- callback(nullptr),
- callback_data(nullptr),
- gioout(nullptr),
- gioout_tag(0) {}
- uint32_t tag;
- ExecCallback callback;
- void* callback_data;
- GIOChannel* gioout;
- guint gioout_tag;
+ SubprocessRecord() = default;
+
+ uint32_t tag{0};
+ chromeos::MessageLoop::TaskId task_id{chromeos::MessageLoop::kTaskIdNull};
+
+ ExecCallback callback{nullptr};
+ void* callback_data{nullptr};
+
+ GPid pid;
+
+ int stdout_fd{-1};
std::string stdout;
};
@@ -91,16 +98,14 @@
// Callback which runs whenever there is input available on the subprocess
// stdout pipe.
- static gboolean GStdoutWatchCallback(GIOChannel* source,
- GIOCondition condition,
- gpointer data);
+ static void OnStdoutReady(SubprocessRecord* record);
// The global instance.
static Subprocess* subprocess_singleton_;
// A map from the asynchronous subprocess tag (see Exec) to the subprocess
// record structure for all active asynchronous subprocesses.
- std::map<int, std::shared_ptr<SubprocessRecord>> subprocess_records_;
+ std::map<uint32_t, std::shared_ptr<SubprocessRecord>> subprocess_records_;
DISALLOW_COPY_AND_ASSIGN(Subprocess);
};
diff --git a/subprocess_unittest.cc b/subprocess_unittest.cc
index 37dea05..dba9e6c 100644
--- a/subprocess_unittest.cc
+++ b/subprocess_unittest.cc
@@ -68,29 +68,40 @@
MessageLoop::current()->BreakLoop();
}
+void CallbackStdoutOnlyEcho(int return_code,
+ const string& output,
+ void* /* unused */) {
+ EXPECT_EQ(0, return_code);
+ EXPECT_NE(string::npos, output.find("on stdout"));
+ EXPECT_EQ(string::npos, output.find("on stderr"));
+ MessageLoop::current()->BreakLoop();
+}
+
} // namespace
TEST_F(SubprocessTest, SimpleTest) {
- loop_.PostTask(
- FROM_HERE,
- base::Bind([] {
- Subprocess::Get().Exec(vector<string>{"/bin/false"}, Callback, nullptr);
- }));
+ Subprocess::Get().Exec(vector<string>{"/bin/false"}, Callback, nullptr);
loop_.Run();
}
TEST_F(SubprocessTest, EchoTest) {
- loop_.PostTask(
- FROM_HERE,
- base::Bind([] {
- Subprocess::Get().Exec(
- vector<string>{
- "/bin/sh",
- "-c",
- "echo this is stdout; echo this is stderr > /dev/stderr"},
- CallbackEcho,
- nullptr);
- }));
+ Subprocess::Get().Exec(
+ vector<string>{
+ "/bin/sh",
+ "-c",
+ "echo this is stdout; echo this is stderr > /dev/stderr"},
+ CallbackEcho,
+ nullptr);
+ loop_.Run();
+}
+
+TEST_F(SubprocessTest, StderrNotIncludedInOutputTest) {
+ Subprocess::Get().ExecFlags(
+ vector<string>{"/bin/sh", "-c", "echo on stdout; echo on stderr >&2"},
+ static_cast<GSpawnFlags>(0),
+ false, // don't redirect stderr
+ CallbackStdoutOnlyEcho,
+ nullptr);
loop_.Run();
}
@@ -107,10 +118,7 @@
}
TEST_F(SubprocessTest, SynchronousEchoNoOutputTest) {
- vector<string> cmd = {
- "/bin/sh",
- "-c",
- "echo test"};
+ vector<string> cmd = {"/bin/sh", "-c", "echo test"};
int rc = -1;
ASSERT_TRUE(Subprocess::SynchronousExec(cmd, &rc, nullptr));
EXPECT_EQ(0, rc);
@@ -176,7 +184,7 @@
remove(temp_file_name);
CHECK_GT(local_server_port, 0);
LOG(INFO) << "server listening on port " << local_server_port;
- Subprocess::Get().CancelExec(tag);
+ Subprocess::Get().KillExec(tag);
}
void ExitWhenDone(bool* spawned) {