update_engine: Add CachedFileDescriptor

Currently there are operations in the delta_performer which are not
efficient in writing into disk. For example BzipExtentWriter and
XZExtentWriter write in 16KB buffers which is not very efficient for
today's disks specially if we are writing with O_DSYNC flag. Another
problem is operations such as bspatch which write in very small number
of bytes in order of a few bytes to a few hundred bytes. Even though
bspatch open's its own file descriptor and currently have no O_DSYNC
added, writing in very small bytes into disk in general is not a good
idea for the purpose of the update engine.

This patch adds new wrapper for a FileDescriptor interface which caches
all the writes into a large buffer (like 1MB) and writes it into disk
when either a 'Flush' request comes or if the offset of the file has
been changed to a different position than what it is writing now. This
cached buffer can be wrapped around target_fd_ and every operation can
write into it. This centralizes all write oprerations through one file
descriptor and gives greater control on how writes should be performed.

BUG=chromium:762815
TEST=FEATURES="test" emerge-amd-generic update_engine; brillo_update_payload verify

Change-Id: I4911d917ec9362bf66c4dd2ed19d89440042733c
Reviewed-on: https://chromium-review.googlesource.com/674164
Commit-Ready: Amin Hassani <[email protected]>
Tested-by: Amin Hassani <[email protected]>
Reviewed-by: Ben Chan <[email protected]>
diff --git a/Android.mk b/Android.mk
index 12b6fc0..656aba8 100644
--- a/Android.mk
+++ b/Android.mk
@@ -125,6 +125,7 @@
     common/terminator.cc \
     common/utils.cc \
     payload_consumer/bzip_extent_writer.cc \
+    payload_consumer/cached_file_descriptor.cc \
     payload_consumer/delta_performer.cc \
     payload_consumer/download_action.cc \
     payload_consumer/extent_reader.cc \
@@ -923,6 +924,7 @@
     common/test_utils.cc \
     common/utils_unittest.cc \
     payload_consumer/bzip_extent_writer_unittest.cc \
+    payload_consumer/cached_file_descriptor_unittest.cc \
     payload_consumer/delta_performer_integration_test.cc \
     payload_consumer/delta_performer_unittest.cc \
     payload_consumer/extent_reader_unittest.cc \
diff --git a/payload_consumer/cached_file_descriptor.cc b/payload_consumer/cached_file_descriptor.cc
new file mode 100644
index 0000000..7f2515e
--- /dev/null
+++ b/payload_consumer/cached_file_descriptor.cc
@@ -0,0 +1,98 @@
+//
+// Copyright (C) 2017 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#include "update_engine/payload_consumer/cached_file_descriptor.h"
+
+#include <unistd.h>
+
+#include <algorithm>
+
+#include <base/logging.h>
+
+#include "update_engine/common/utils.h"
+
+namespace chromeos_update_engine {
+
+off64_t CachedFileDescriptor::Seek(off64_t offset, int whence) {
+  // Only support SEEK_SET and SEEK_CUR. I think these two would be enough. If
+  // we want to support SEEK_END then we have to figure out the size of the
+  // underlying file descriptor each time and it may not be a very good idea.
+  CHECK(whence == SEEK_SET || whence == SEEK_CUR);
+  off64_t next_offset = whence == SEEK_SET ? offset : offset_ + offset;
+
+  if (next_offset != offset_) {
+    // We sought somewhere other than what we are now. So we have to flush and
+    // move to the new offset.
+    if (!FlushCache()) {
+      return -1;
+    }
+    // Then we have to seek there.
+    if (fd_->Seek(next_offset, SEEK_SET) < 0) {
+      return -1;
+    }
+    offset_ = next_offset;
+  }
+  return offset_;
+}
+
+ssize_t CachedFileDescriptor::Write(const void* buf, size_t count) {
+  auto bytes = static_cast<const uint8_t*>(buf);
+  size_t total_bytes_wrote = 0;
+  while (total_bytes_wrote < count) {
+    auto bytes_to_cache =
+        std::min(count - total_bytes_wrote, cache_.size() - bytes_cached_);
+    if (bytes_to_cache > 0) {  // Which means |cache_| is still have some space.
+      memcpy(cache_.data() + bytes_cached_,
+             bytes + total_bytes_wrote,
+             bytes_to_cache);
+      total_bytes_wrote += bytes_to_cache;
+      bytes_cached_ += bytes_to_cache;
+    }
+    if (bytes_cached_ == cache_.size()) {
+      // Cache is full; write it to the |fd_| as long as you can.
+      if (!FlushCache()) {
+        return -1;
+      }
+    }
+  }
+  offset_ += total_bytes_wrote;
+  return total_bytes_wrote;
+}
+
+bool CachedFileDescriptor::Flush() {
+  return FlushCache() && fd_->Flush();
+}
+
+bool CachedFileDescriptor::Close() {
+  offset_ = 0;
+  return FlushCache() && fd_->Close();
+}
+
+bool CachedFileDescriptor::FlushCache() {
+  size_t begin = 0;
+  while (begin < bytes_cached_) {
+    auto bytes_wrote = fd_->Write(cache_.data() + begin, bytes_cached_ - begin);
+    if (bytes_wrote < 0) {
+      PLOG(ERROR) << "Failed to flush cached data!";
+      return false;
+    }
+    begin += bytes_wrote;
+  }
+  bytes_cached_ = 0;
+  return true;
+}
+
+}  // namespace chromeos_update_engine
diff --git a/payload_consumer/cached_file_descriptor.h b/payload_consumer/cached_file_descriptor.h
new file mode 100644
index 0000000..28c48f7
--- /dev/null
+++ b/payload_consumer/cached_file_descriptor.h
@@ -0,0 +1,76 @@
+//
+// Copyright (C) 2017 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#ifndef UPDATE_ENGINE_PAYLOAD_CONSUMER_CACHED_FILE_DESCRIPTOR_H_
+#define UPDATE_ENGINE_PAYLOAD_CONSUMER_CACHED_FILE_DESCRIPTOR_H_
+
+#include <errno.h>
+#include <sys/types.h>
+
+#include <memory>
+#include <vector>
+
+#include <brillo/secure_blob.h>
+
+#include "update_engine/payload_consumer/file_descriptor.h"
+
+namespace chromeos_update_engine {
+
+class CachedFileDescriptor : public FileDescriptor {
+ public:
+  CachedFileDescriptor(FileDescriptorPtr fd, size_t cache_size) : fd_(fd) {
+    cache_.resize(cache_size);
+  }
+  ~CachedFileDescriptor() override = default;
+
+  bool Open(const char* path, int flags, mode_t mode) override {
+    return fd_->Open(path, flags, mode);
+  }
+  bool Open(const char* path, int flags) override {
+    return fd_->Open(path, flags);
+  }
+  ssize_t Read(void* buf, size_t count) override {
+    return fd_->Read(buf, count);
+  }
+  ssize_t Write(const void* buf, size_t count) override;
+  off64_t Seek(off64_t offset, int whence) override;
+  uint64_t BlockDevSize() override { return fd_->BlockDevSize(); }
+  bool BlkIoctl(int request,
+                uint64_t start,
+                uint64_t length,
+                int* result) override {
+    return fd_->BlkIoctl(request, start, length, result);
+  }
+  bool Flush() override;
+  bool Close() override;
+  bool IsSettingErrno() override { return fd_->IsSettingErrno(); }
+  bool IsOpen() override { return fd_->IsOpen(); }
+
+ private:
+  // Internal flush without the need to call |fd_->Flush()|.
+  bool FlushCache();
+
+  FileDescriptorPtr fd_;
+  brillo::Blob cache_;
+  size_t bytes_cached_{0};
+  off64_t offset_{0};
+
+  DISALLOW_COPY_AND_ASSIGN(CachedFileDescriptor);
+};
+
+}  // namespace chromeos_update_engine
+
+#endif  // UPDATE_ENGINE_PAYLOAD_CONSUMER_CACHED_FILE_DESCRIPTOR_H_
diff --git a/payload_consumer/cached_file_descriptor_unittest.cc b/payload_consumer/cached_file_descriptor_unittest.cc
new file mode 100644
index 0000000..3c13486
--- /dev/null
+++ b/payload_consumer/cached_file_descriptor_unittest.cc
@@ -0,0 +1,201 @@
+//
+// Copyright (C) 2017 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#include "update_engine/payload_consumer/cached_file_descriptor.h"
+
+#include <fcntl.h>
+
+#include <algorithm>
+#include <string>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "update_engine/common/test_utils.h"
+#include "update_engine/common/utils.h"
+
+using chromeos_update_engine::test_utils::ExpectVectorsEq;
+using std::min;
+using std::string;
+using std::vector;
+
+namespace chromeos_update_engine {
+
+namespace {
+const size_t kCacheSize = 100;
+const size_t kFileSize = 1024;
+const size_t kRandomIterations = 1000;
+}  // namespace
+
+class CachedFileDescriptorTest : public ::testing::Test {
+ public:
+  void Open() {
+    cfd_.reset(new CachedFileDescriptor(fd_, kCacheSize));
+    EXPECT_TRUE(cfd_->Open(temp_file_.path().c_str(), O_RDWR, 0600));
+  }
+
+  void Write(uint8_t* buffer, size_t count) {
+    size_t total_bytes_wrote = 0;
+    while (total_bytes_wrote < count) {
+      auto bytes_wrote =
+          cfd_->Write(buffer + total_bytes_wrote, count - total_bytes_wrote);
+      ASSERT_NE(bytes_wrote, -1);
+      total_bytes_wrote += bytes_wrote;
+    }
+  }
+
+  void Close() { EXPECT_TRUE(cfd_->Close()); }
+
+  void SetUp() override {
+    brillo::Blob zero_blob(kFileSize, 0);
+    EXPECT_TRUE(utils::WriteFile(
+        temp_file_.path().c_str(), zero_blob.data(), zero_blob.size()));
+    Open();
+  }
+
+  void TearDown() override {
+    Close();
+    EXPECT_FALSE(cfd_->IsOpen());
+  }
+
+ protected:
+  FileDescriptorPtr fd_{new EintrSafeFileDescriptor};
+  test_utils::ScopedTempFile temp_file_{"CachedFileDescriptor-file.XXXXXX"};
+  int value_{1};
+  FileDescriptorPtr cfd_;
+};
+
+TEST_F(CachedFileDescriptorTest, IsOpenTest) {
+  EXPECT_TRUE(cfd_->IsOpen());
+}
+
+TEST_F(CachedFileDescriptorTest, SimpleWriteTest) {
+  EXPECT_EQ(cfd_->Seek(0, SEEK_SET), 0);
+  brillo::Blob blob_in(kFileSize, value_);
+  Write(blob_in.data(), blob_in.size());
+  EXPECT_TRUE(cfd_->Flush());
+
+  brillo::Blob blob_out;
+  EXPECT_TRUE(utils::ReadFile(temp_file_.path(), &blob_out));
+  EXPECT_EQ(blob_in, blob_out);
+}
+
+TEST_F(CachedFileDescriptorTest, OneBytePerWriteTest) {
+  EXPECT_EQ(cfd_->Seek(0, SEEK_SET), 0);
+  brillo::Blob blob_in(kFileSize, value_);
+  for (size_t idx = 0; idx < blob_in.size(); idx++) {
+    Write(&blob_in[idx], 1);
+  }
+  EXPECT_TRUE(cfd_->Flush());
+
+  brillo::Blob blob_out;
+  EXPECT_TRUE(utils::ReadFile(temp_file_.path(), &blob_out));
+  EXPECT_EQ(blob_in, blob_out);
+}
+
+TEST_F(CachedFileDescriptorTest, RandomWriteTest) {
+  EXPECT_EQ(cfd_->Seek(0, SEEK_SET), 0);
+
+  brillo::Blob blob_in(kFileSize, 0);
+  srand(time(nullptr));
+  uint32_t rand_seed;
+  for (size_t idx = 0; idx < kRandomIterations; idx++) {
+    // zero to full size available.
+    size_t start = rand_r(&rand_seed) % blob_in.size();
+    size_t size = rand_r(&rand_seed) % (blob_in.size() - start);
+    std::fill_n(&blob_in[start], size, idx % 256);
+    EXPECT_EQ(cfd_->Seek(start, SEEK_SET), start);
+    Write(&blob_in[start], size);
+  }
+  EXPECT_TRUE(cfd_->Flush());
+
+  brillo::Blob blob_out;
+  EXPECT_TRUE(utils::ReadFile(temp_file_.path(), &blob_out));
+  EXPECT_EQ(blob_in, blob_out);
+}
+
+TEST_F(CachedFileDescriptorTest, SeekTest) {
+  EXPECT_EQ(cfd_->Seek(0, SEEK_SET), 0);
+  EXPECT_EQ(cfd_->Seek(1, SEEK_SET), 1);
+  EXPECT_EQ(cfd_->Seek(kFileSize - 1, SEEK_SET), kFileSize - 1);
+  EXPECT_EQ(cfd_->Seek(kFileSize, SEEK_SET), kFileSize);
+  EXPECT_EQ(cfd_->Seek(kFileSize + 1, SEEK_SET), kFileSize + 1);
+
+  EXPECT_EQ(cfd_->Seek(0, SEEK_SET), 0);
+  EXPECT_EQ(cfd_->Seek(1, SEEK_CUR), 1);
+  EXPECT_EQ(cfd_->Seek(1, SEEK_CUR), 2);
+  EXPECT_EQ(cfd_->Seek(kFileSize - 1, SEEK_SET), kFileSize - 1);
+  EXPECT_EQ(cfd_->Seek(1, SEEK_CUR), kFileSize);
+  EXPECT_EQ(cfd_->Seek(1, SEEK_CUR), kFileSize + 1);
+}
+
+TEST_F(CachedFileDescriptorTest, NoFlushTest) {
+  EXPECT_EQ(cfd_->Seek(0, SEEK_SET), 0);
+  brillo::Blob blob_in(kFileSize, value_);
+  Write(blob_in.data(), blob_in.size());
+
+  brillo::Blob blob_out;
+  EXPECT_TRUE(utils::ReadFile(temp_file_.path(), &blob_out));
+  EXPECT_NE(blob_in, blob_out);
+}
+
+TEST_F(CachedFileDescriptorTest, CacheSizeWriteTest) {
+  size_t seek = 10;
+  brillo::Blob blob_in(kFileSize, 0);
+  std::fill_n(&blob_in[seek], kCacheSize, value_);
+  // We are writing exactly one cache size; Then it should be commited.
+  EXPECT_EQ(cfd_->Seek(seek, SEEK_SET), seek);
+  Write(&blob_in[seek], kCacheSize);
+
+  brillo::Blob blob_out;
+  EXPECT_TRUE(utils::ReadFile(temp_file_.path(), &blob_out));
+  EXPECT_EQ(blob_in, blob_out);
+}
+
+TEST_F(CachedFileDescriptorTest, UnderCacheSizeWriteTest) {
+  size_t seek = 100;
+  size_t less_than_cache_size = kCacheSize - 1;
+  EXPECT_EQ(cfd_->Seek(seek, SEEK_SET), seek);
+  brillo::Blob blob_in(kFileSize, 0);
+  std::fill_n(&blob_in[seek], less_than_cache_size, value_);
+  // We are writing less than one cache size; then it should not be commited.
+  Write(&blob_in[seek], less_than_cache_size);
+
+  // Revert the changes in |blob_in|.
+  std::fill_n(&blob_in[seek], less_than_cache_size, 0);
+  brillo::Blob blob_out;
+  EXPECT_TRUE(utils::ReadFile(temp_file_.path(), &blob_out));
+  EXPECT_EQ(blob_in, blob_out);
+}
+
+TEST_F(CachedFileDescriptorTest, SeekAfterWriteTest) {
+  size_t seek = 100;
+  size_t less_than_cache_size = kCacheSize - 3;
+  EXPECT_EQ(cfd_->Seek(seek, SEEK_SET), seek);
+  brillo::Blob blob_in(kFileSize, 0);
+  std::fill_n(&blob_in[seek], less_than_cache_size, value_);
+  // We are writing less than  one cache size; then it should not be commited.
+  Write(&blob_in[seek], less_than_cache_size);
+
+  // Then we seek, it should've written the cache after seek.
+  EXPECT_EQ(cfd_->Seek(200, SEEK_SET), 200);
+
+  brillo::Blob blob_out;
+  EXPECT_TRUE(utils::ReadFile(temp_file_.path(), &blob_out));
+  EXPECT_EQ(blob_in, blob_out);
+}
+
+}  // namespace chromeos_update_engine
diff --git a/update_engine.gyp b/update_engine.gyp
index 41ee1e8..0e0ba9f 100644
--- a/update_engine.gyp
+++ b/update_engine.gyp
@@ -185,6 +185,7 @@
         'common/terminator.cc',
         'common/utils.cc',
         'payload_consumer/bzip_extent_writer.cc',
+        'payload_consumer/cached_file_descriptor.cc',
         'payload_consumer/delta_performer.cc',
         'payload_consumer/download_action.cc',
         'payload_consumer/extent_reader.cc',
@@ -530,6 +531,7 @@
             'omaha_utils_unittest.cc',
             'p2p_manager_unittest.cc',
             'payload_consumer/bzip_extent_writer_unittest.cc',
+            'payload_consumer/cached_file_descriptor_unittest.cc',
             'payload_consumer/delta_performer_integration_test.cc',
             'payload_consumer/delta_performer_unittest.cc',
             'payload_consumer/download_action_unittest.cc',