Fetch local files asynchronously.
This patch implements a new fetcher that only handles local files.
While libcurl supports file:// urls, the stream can't be suspended when
accessing local files.
This new FileFetcher is based on the brillo::FileStream class which
properly handles the asynchronous reads from regular files.
Bug: 28866512
TEST=Added unittest. Deployed an update from a file:// URL.
Change-Id: Ie9d07dda2d773312e55be3c0ab7c9e5b737be18b
diff --git a/common/file_fetcher.cc b/common/file_fetcher.cc
new file mode 100644
index 0000000..77dadd1
--- /dev/null
+++ b/common/file_fetcher.cc
@@ -0,0 +1,187 @@
+//
+// Copyright (C) 2016 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#include "update_engine/common/file_fetcher.h"
+
+#include <algorithm>
+#include <string>
+
+#include <base/bind.h>
+#include <base/format_macros.h>
+#include <base/location.h>
+#include <base/logging.h>
+#include <base/strings/string_util.h>
+#include <base/strings/stringprintf.h>
+#include <brillo/streams/file_stream.h>
+
+#include "update_engine/common/certificate_checker.h"
+#include "update_engine/common/hardware_interface.h"
+#include "update_engine/common/platform_constants.h"
+
+using std::string;
+
+namespace {
+
+size_t kReadBufferSize = 16 * 1024;
+
+} // namespace
+
+namespace chromeos_update_engine {
+
+// static
+bool FileFetcher::SupportedUrl(const string& url) {
+ // Note that we require the file path to start with a "/".
+ return base::StartsWith(
+ url, "file:///", base::CompareCase::INSENSITIVE_ASCII);
+}
+
+FileFetcher::~FileFetcher() {
+ LOG_IF(ERROR, transfer_in_progress_)
+ << "Destroying the fetcher while a transfer is in progress.";
+ CleanUp();
+}
+
+// Begins the transfer, which must not have already been started.
+void FileFetcher::BeginTransfer(const string& url) {
+ CHECK(!transfer_in_progress_);
+
+ if (!SupportedUrl(url)) {
+ LOG(ERROR) << "Unsupported file URL: " << url;
+ // No HTTP error code when the URL is not supported.
+ http_response_code_ = 0;
+ CleanUp();
+ if (delegate_)
+ delegate_->TransferComplete(this, false);
+ return;
+ }
+
+ string file_path = url.substr(strlen("file://"));
+ stream_ =
+ brillo::FileStream::Open(base::FilePath(file_path),
+ brillo::Stream::AccessMode::READ,
+ brillo::FileStream::Disposition::OPEN_EXISTING,
+ nullptr);
+
+ if (!stream_) {
+ LOG(ERROR) << "Couldn't open " << file_path;
+ http_response_code_ = kHttpResponseNotFound;
+ CleanUp();
+ if (delegate_)
+ delegate_->TransferComplete(this, false);
+ return;
+ }
+ http_response_code_ = kHttpResponseOk;
+
+ if (offset_)
+ stream_->SetPosition(offset_, nullptr);
+ bytes_copied_ = 0;
+ transfer_in_progress_ = true;
+ ScheduleRead();
+}
+
+void FileFetcher::TerminateTransfer() {
+ CleanUp();
+ if (delegate_) {
+ // Note that after the callback returns this object may be destroyed.
+ delegate_->TransferTerminated(this);
+ }
+}
+
+void FileFetcher::ScheduleRead() {
+ if (transfer_paused_ || ongoing_read_ || !transfer_in_progress_)
+ return;
+
+ buffer_.resize(kReadBufferSize);
+ size_t bytes_to_read = buffer_.size();
+ if (data_length_ >= 0) {
+ bytes_to_read = std::min(static_cast<uint64_t>(bytes_to_read),
+ data_length_ - bytes_copied_);
+ }
+
+ if (!bytes_to_read) {
+ OnReadDoneCallback(0);
+ return;
+ }
+
+ ongoing_read_ = stream_->ReadAsync(
+ buffer_.data(),
+ bytes_to_read,
+ base::Bind(&FileFetcher::OnReadDoneCallback, base::Unretained(this)),
+ base::Bind(&FileFetcher::OnReadErrorCallback, base::Unretained(this)),
+ nullptr);
+
+ if (!ongoing_read_) {
+ LOG(ERROR) << "Unable to schedule an asynchronous read from the stream.";
+ CleanUp();
+ if (delegate_)
+ delegate_->TransferComplete(this, false);
+ }
+}
+
+void FileFetcher::OnReadDoneCallback(size_t bytes_read) {
+ ongoing_read_ = false;
+ if (bytes_read == 0) {
+ CleanUp();
+ if (delegate_)
+ delegate_->TransferComplete(this, true);
+ } else {
+ bytes_copied_ += bytes_read;
+ if (delegate_)
+ delegate_->ReceivedBytes(this, buffer_.data(), bytes_read);
+ ScheduleRead();
+ }
+}
+
+void FileFetcher::OnReadErrorCallback(const brillo::Error* error) {
+ LOG(ERROR) << "Asynchronous read failed: " << error->GetMessage();
+ CleanUp();
+ if (delegate_)
+ delegate_->TransferComplete(this, false);
+}
+
+void FileFetcher::Pause() {
+ if (transfer_paused_) {
+ LOG(ERROR) << "Fetcher already paused.";
+ return;
+ }
+ transfer_paused_ = true;
+}
+
+void FileFetcher::Unpause() {
+ if (!transfer_paused_) {
+ LOG(ERROR) << "Resume attempted when fetcher not paused.";
+ return;
+ }
+ transfer_paused_ = false;
+ ScheduleRead();
+}
+
+void FileFetcher::CleanUp() {
+ if (stream_) {
+ stream_->CancelPendingAsyncOperations();
+ stream_->CloseBlocking(nullptr);
+ stream_.reset();
+ }
+ // Destroying the |stream_| releases the callback, so we don't have any
+ // ongoing read at this point.
+ ongoing_read_ = false;
+ buffer_ = brillo::Blob();
+
+ transfer_in_progress_ = false;
+ transfer_paused_ = false;
+}
+
+} // namespace chromeos_update_engine