Support streaming for lz4 recompress

This reduces peak memory usage of update_engine from
~700MB to ~400MB. As we no longer need to cache the entire patched
blocks in memory, data are written to disk as they come.

Test: th
Change-Id: I7b353dbaee4ee63e984ec2014476e3d27387e0fc
diff --git a/common/utils.cc b/common/utils.cc
index 8ea398f..0b76eea 100644
--- a/common/utils.cc
+++ b/common/utils.cc
@@ -1062,16 +1062,16 @@
   return base::NumberToString(base::StringPieceHash()(str_to_convert));
 }
 
-static bool ParseTimestamp(const std::string& str, int64_t* out) {
-  if (!base::StringToInt64(str, out)) {
+static bool ParseTimestamp(std::string_view str, int64_t* out) {
+  if (!base::StringToInt64(base::StringPiece(str.data(), str.size()), out)) {
     LOG(WARNING) << "Invalid timestamp: " << str;
     return false;
   }
   return true;
 }
 
-ErrorCode IsTimestampNewer(const std::string& old_version,
-                           const std::string& new_version) {
+ErrorCode IsTimestampNewer(const std::string_view old_version,
+                           const std::string_view new_version) {
   if (old_version.empty() || new_version.empty()) {
     LOG(WARNING)
         << "One of old/new timestamp is empty, permit update anyway. Old: "
diff --git a/common/utils.h b/common/utils.h
index 874d0af..201e47e 100644
--- a/common/utils.h
+++ b/common/utils.h
@@ -411,8 +411,8 @@
 // integer.
 // Return kPayloadTimestampError if both are integers but |new_version| <
 // |old_version|.
-ErrorCode IsTimestampNewer(const std::string& old_version,
-                           const std::string& new_version);
+ErrorCode IsTimestampNewer(const std::string_view old_version,
+                           const std::string_view new_version);
 
 std::unique_ptr<android::base::MappedFile> GetReadonlyZeroBlock(size_t size);
 
diff --git a/lz4diff/lz4patch.cc b/lz4diff/lz4patch.cc
index 7766e24..9de6d58 100644
--- a/lz4diff/lz4patch.cc
+++ b/lz4diff/lz4patch.cc
@@ -30,6 +30,7 @@
 
 #include "android-base/strings.h"
 #include "lz4diff/lz4diff.h"
+#include "lz4diff/lz4diff.pb.h"
 #include "lz4diff_compress.h"
 #include "lz4diff_format.h"
 #include "puffin/puffpatch.h"
@@ -168,45 +169,6 @@
   return err == 0;
 }
 
-bool ApplyPostfixPatch(
-    std::string_view recompressed_blob,
-    const google::protobuf::RepeatedPtrField<CompressedBlockInfo>&
-        dst_block_info,
-    Blob* output) {
-  // Output size should be always identical to size of recompressed_blob
-  output->clear();
-  output->reserve(recompressed_blob.size());
-  size_t offset = 0;
-  for (const auto& block_info : dst_block_info) {
-    auto block =
-        recompressed_blob.substr(offset, block_info.compressed_length());
-    if (!block_info.sha256_hash().empty()) {
-      Blob actual_hash;
-      CHECK(HashCalculator::RawHashOfBytes(
-          block.data(), block.size(), &actual_hash));
-      if (ToStringView(actual_hash) != block_info.sha256_hash()) {
-        LOG(ERROR) << "Block " << block_info
-                   << " is corrupted. This usually means the patch generator "
-                      "used a different version of LZ4, or an incompatible LZ4 "
-                      "patch generator was used, or LZ4 produces different "
-                      "output on different platforms. Expected hash: "
-                   << HexEncode(block_info.sha256_hash())
-                   << ", actual hash: " << HexEncode(actual_hash);
-      }
-    }
-    if (!block_info.postfix_bspatch().empty()) {
-      Blob fixed_block;
-      TEST_AND_RETURN_FALSE(
-          bspatch(block, block_info.postfix_bspatch(), &fixed_block));
-      output->insert(output->end(), fixed_block.begin(), fixed_block.end());
-    } else {
-      output->insert(output->end(), block.begin(), block.end());
-    }
-    offset += block_info.compressed_length();
-  }
-  return true;
-}
-
 bool puffpatch(std::string_view input_data,
                std::string_view patch_data,
                Blob* output) {
@@ -219,6 +181,7 @@
 std::vector<CompressedBlock> ToCompressedBlockVec(
     const google::protobuf::RepeatedPtrField<CompressedBlockInfo>& rpf) {
   std::vector<CompressedBlock> ret;
+  ret.reserve(rpf.size());
   for (const auto& block : rpf) {
     auto& info = ret.emplace_back();
     info.compressed_length = block.compressed_length();
@@ -237,6 +200,129 @@
   return false;
 }
 
+size_t GetCompressedSize(
+    const google::protobuf::RepeatedPtrField<CompressedBlockInfo>& info) {
+  size_t compressed_size = 0;
+  for (const auto& block : info) {
+    compressed_size += block.compressed_length();
+  }
+  return compressed_size;
+}
+
+size_t GetDecompressedSize(
+    const google::protobuf::RepeatedPtrField<CompressedBlockInfo>& info) {
+  size_t decompressed_size = 0;
+  for (const auto& block : info) {
+    decompressed_size += block.uncompressed_length();
+  }
+  return decompressed_size;
+}
+
+bool ApplyInnerPatch(Blob decompressed_src,
+                     const Lz4diffPatch& patch,
+                     Blob* decompressed_dst) {
+  switch (patch.pb_header.inner_type()) {
+    case InnerPatchType::BSDIFF:
+      TEST_AND_RETURN_FALSE(bspatch(
+          ToStringView(decompressed_src), patch.inner_patch, decompressed_dst));
+      break;
+    case InnerPatchType::PUFFDIFF:
+      TEST_AND_RETURN_FALSE(puffpatch(
+          ToStringView(decompressed_src), patch.inner_patch, decompressed_dst));
+      break;
+    default:
+      LOG(ERROR) << "Unsupported patch type: " << patch.pb_header.inner_type();
+      return false;
+  }
+  return true;
+}
+
+// TODO(zhangkelvin) Rewrite this in C++ 20 coroutine once that's available.
+// Hand coding CPS is not fun.
+bool Lz4Patch(std::string_view src_data,
+              const Lz4diffPatch& patch,
+              const SinkFunc& sink) {
+  auto decompressed_src = TryDecompressBlob(
+      src_data,
+      ToCompressedBlockVec(patch.pb_header.src_info().block_info()),
+      patch.pb_header.src_info().zero_padding_enabled());
+  TEST_AND_RETURN_FALSE(!decompressed_src.empty());
+  Blob decompressed_dst;
+  const auto decompressed_dst_size =
+      GetDecompressedSize(patch.pb_header.dst_info().block_info());
+  decompressed_dst.reserve(decompressed_dst_size);
+
+  ApplyInnerPatch(std::move(decompressed_src), patch, &decompressed_dst);
+
+  if (!HasPosfixPatches(patch)) {
+    return TryCompressBlob(
+        ToStringView(decompressed_dst),
+        ToCompressedBlockVec(patch.pb_header.dst_info().block_info()),
+        patch.pb_header.dst_info().zero_padding_enabled(),
+        patch.pb_header.dst_info().algo(),
+        sink);
+  }
+  auto postfix_patcher =
+      [&sink,
+       block_idx = 0,
+       &dst_block_info = patch.pb_header.dst_info().block_info()](
+          const uint8_t* data, size_t size) mutable -> size_t {
+    if (block_idx >= dst_block_info.size()) {
+      return sink(data, size);
+    }
+    const auto& block_info = dst_block_info[block_idx];
+    TEST_EQ(size, block_info.compressed_length());
+    DEFER { block_idx++; };
+    if (block_info.postfix_bspatch().empty()) {
+      return sink(data, size);
+    }
+    if (!block_info.sha256_hash().empty()) {
+      Blob actual_hash;
+      TEST_AND_RETURN_FALSE(
+          HashCalculator::RawHashOfBytes(data, size, &actual_hash));
+      if (ToStringView(actual_hash) != block_info.sha256_hash()) {
+        LOG(ERROR) << "Block " << block_info
+                   << " is corrupted. This usually means the patch generator "
+                      "used a different version of LZ4, or an incompatible LZ4 "
+                      "patch generator was used, or LZ4 produces different "
+                      "output on different platforms. Expected hash: "
+                   << HexEncode(block_info.sha256_hash())
+                   << ", actual hash: " << HexEncode(actual_hash);
+        return 0;
+      }
+    }
+    Blob fixed_block;
+    TEST_AND_RETURN_FALSE(
+        bspatch(std::string_view(reinterpret_cast<const char*>(data), size),
+                block_info.postfix_bspatch(),
+                &fixed_block));
+    return sink(fixed_block.data(), fixed_block.size());
+  };
+
+  return TryCompressBlob(
+      ToStringView(decompressed_dst),
+      ToCompressedBlockVec(patch.pb_header.dst_info().block_info()),
+      patch.pb_header.dst_info().zero_padding_enabled(),
+      patch.pb_header.dst_info().algo(),
+      postfix_patcher);
+}
+
+bool Lz4Patch(std::string_view src_data,
+              const Lz4diffPatch& patch,
+              Blob* output) {
+  Blob blob;
+  const auto output_size =
+      GetCompressedSize(patch.pb_header.dst_info().block_info());
+  blob.reserve(output_size);
+  TEST_AND_RETURN_FALSE(Lz4Patch(
+      src_data, patch, [&blob](const uint8_t* data, size_t size) -> size_t {
+        blob.insert(blob.end(), data, data + size);
+        return size;
+      }));
+  *output = std::move(blob);
+  return true;
+}
+
 }  // namespace
 
 bool Lz4Patch(std::string_view src_data,
@@ -244,57 +330,15 @@
               Blob* output) {
   Lz4diffPatch patch;
   TEST_AND_RETURN_FALSE(ParseLz4DifffPatch(patch_data, &patch));
+  return Lz4Patch(src_data, patch, output);
+}
 
-  Blob decompressed_dst;
-  // This scope is here just so that |decompressed_src| can be freed earlier
-  // than function scope.
-  // This whole patching algorithm has non-trivial memory usage, as it needs to
-  // load source data in to memory and decompress that. Now both src and
-  // decompressed src data are in memory.
-  // TODO(b/206729162) Make lz4diff more memory efficient and more streaming
-  // friendly.
-  {
-    const auto decompressed_src = TryDecompressBlob(
-        src_data,
-        ToCompressedBlockVec(patch.pb_header.src_info().block_info()),
-        patch.pb_header.src_info().zero_padding_enabled());
-    switch (patch.pb_header.inner_type()) {
-      case InnerPatchType::BSDIFF:
-        TEST_AND_RETURN_FALSE(bspatch(ToStringView(decompressed_src),
-                                      patch.inner_patch,
-                                      &decompressed_dst));
-        break;
-      case InnerPatchType::PUFFDIFF:
-        TEST_AND_RETURN_FALSE(puffpatch(ToStringView(decompressed_src),
-                                        patch.inner_patch,
-                                        &decompressed_dst));
-        break;
-      default:
-        LOG(ERROR) << "Unsupported patch type: "
-                   << patch.pb_header.inner_type();
-        return false;
-    }
-  }
-
-  auto recompressed_dst = TryCompressBlob(
-      ToStringView(decompressed_dst),
-      ToCompressedBlockVec(patch.pb_header.dst_info().block_info()),
-      patch.pb_header.dst_info().zero_padding_enabled(),
-      patch.pb_header.dst_info().algo());
-  TEST_AND_RETURN_FALSE(recompressed_dst.size() > 0);
-  // free memory used by |decompressed_dst|.
-  decompressed_dst = {};
-
-  if (HasPosfixPatches(patch)) {
-    TEST_AND_RETURN_FALSE(
-        ApplyPostfixPatch(ToStringView(recompressed_dst),
-                          patch.pb_header.dst_info().block_info(),
-                          output));
-  } else {
-    *output = std::move(recompressed_dst);
-  }
-
-  return true;
+bool Lz4Patch(std::string_view src_data,
+              std::string_view patch_data,
+              const SinkFunc& sink) {
+  Lz4diffPatch patch;
+  TEST_AND_RETURN_FALSE(ParseLz4DifffPatch(patch_data, &patch));
+  return Lz4Patch(src_data, patch, sink);
 }
 
 bool Lz4Patch(const Blob& src_data, const Blob& patch_data, Blob* output) {
diff --git a/lz4diff/lz4patch.h b/lz4diff/lz4patch.h
index ce49430..8b99c23 100644
--- a/lz4diff/lz4patch.h
+++ b/lz4diff/lz4patch.h
@@ -21,6 +21,11 @@
 #include "lz4diff_format.h"
 
 namespace chromeos_update_engine {
+
+bool Lz4Patch(std::string_view src_data,
+              std::string_view patch_data,
+              const SinkFunc& sink);
+
 bool Lz4Patch(std::string_view src_data,
               std::string_view patch_data,
               Blob* output);
diff --git a/payload_consumer/block_extent_writer.cc b/payload_consumer/block_extent_writer.cc
index 6b1fba7..055b485 100644
--- a/payload_consumer/block_extent_writer.cc
+++ b/payload_consumer/block_extent_writer.cc
@@ -42,8 +42,9 @@
 
   if (buffer_.empty() && count >= cur_extent_size) {
     if (!WriteExtent(data, cur_extent, block_size_)) {
-      LOG(ERROR) << "WriteExtent(" << cur_extent.start_block() << ", " << data
-                 << ", " << cur_extent_size << ") failed.";
+      LOG(ERROR) << "WriteExtent(" << cur_extent.start_block() << ", "
+                 << static_cast<const void*>(data) << ", " << cur_extent_size
+                 << ") failed.";
       // return value is expected to be greater than 0. Return 0 to signal error
       // condition
       return 0;
diff --git a/payload_consumer/install_operation_executor.cc b/payload_consumer/install_operation_executor.cc
index 5318cc3..69ef9c1 100644
--- a/payload_consumer/install_operation_executor.cc
+++ b/payload_consumer/install_operation_executor.cc
@@ -268,12 +268,18 @@
     size_t count) {
   brillo::Blob src_data;
 
-  brillo::Blob dst_data;
   TEST_AND_RETURN_FALSE(utils::ReadExtents(
       source_fd, operation.src_extents(), &src_data, block_size_));
-  TEST_AND_RETURN_FALSE(
-      Lz4Patch(ToStringView(src_data), ToStringView(data, count), &dst_data));
-  return writer->Write(dst_data.data(), dst_data.size());
+  TEST_AND_RETURN_FALSE(Lz4Patch(
+      ToStringView(src_data),
+      ToStringView(data, count),
+      [writer(writer.get())](const uint8_t* data, size_t size) -> size_t {
+        if (!writer->Write(data, size)) {
+          return 0;
+        }
+        return size;
+      }));
+  return true;
 }
 
 bool InstallOperationExecutor::ExecuteSourceBsdiffOperation(