//
// Copyright (C) 2022 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//      http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

#include <errno.h>
#include <liburing_cpp/IoUring.h>
#include <string.h>

#include <algorithm>
#include <iostream>
#include <memory>

#include "liburing.h"
#include "liburing_cpp/IoUringCQE.h"

namespace io_uring_cpp {

template <typename T>
bool IsZeroInitialized(const T& val) {
  auto begin = reinterpret_cast<const char*>(&val);
  auto end = begin + sizeof(val);
  return std::all_of(begin, end, [](const auto& a) { return a == 0; });
}

class IoUring final : public IoUringInterface {
 public:
  ~IoUring() override {
    if (!IsZeroInitialized(ring)) {
      if (buffer_registered_) {
        UnregisterBuffers();
      }
      if (files_registered_) {
        UnregisterFiles();
      }
      io_uring_queue_exit(&ring);
    }
  }
  IoUring(const IoUring&) = delete;
  IoUring(IoUring&& rhs) {
    ring = rhs.ring;
    memset(&rhs.ring, 0, sizeof(rhs.ring));
  }
  IoUring& operator=(IoUring&& rhs) {
    std::swap(ring, rhs.ring);
    return *this;
  }
  Errno RegisterBuffers(const struct iovec* iovecs,
                        size_t iovec_size) override {
    const auto ret =
        Errno(io_uring_register_buffers(&ring, iovecs, iovec_size));
    buffer_registered_ = ret.IsOk();
    return ret;
  }

  Errno UnregisterBuffers() override {
    const auto ret = Errno(io_uring_unregister_buffers(&ring));
    buffer_registered_ = !ret.IsOk();
    return ret;
  }

  Errno RegisterFiles(const int* files, size_t files_size) override {
    const auto ret = Errno(io_uring_register_files(&ring, files, files_size));
    files_registered_ = ret.IsOk();
    return ret;
  }

  Errno UnregisterFiles() {
    const auto ret = Errno(io_uring_unregister_files(&ring));
    files_registered_ = !ret.IsOk();
    return ret;
  }

  IoUringSQE PrepReadFixed(int fd,
                           void* buf,
                           unsigned nbytes,
                           uint64_t offset,
                           int buf_index) override {
    auto sqe = io_uring_get_sqe(&ring);
    if (sqe == nullptr) {
      return IoUringSQE{nullptr};
    }
    io_uring_prep_read_fixed(sqe, fd, buf, nbytes, offset, buf_index);
    return IoUringSQE{static_cast<void*>(sqe)};
  }

  IoUringSQE PrepRead(int fd, void* buf, unsigned nbytes,
                      uint64_t offset) override {
    auto sqe = io_uring_get_sqe(&ring);
    if (sqe == nullptr) {
      return IoUringSQE{nullptr};
    }
    io_uring_prep_read(sqe, fd, buf, nbytes, offset);
    return IoUringSQE{static_cast<void*>(sqe)};
  }
  IoUringSQE PrepWrite(int fd, const void* buf, unsigned nbytes,
                       uint64_t offset) override {
    auto sqe = io_uring_get_sqe(&ring);
    if (sqe == nullptr) {
      return IoUringSQE{nullptr};
    }
    io_uring_prep_write(sqe, fd, buf, nbytes, offset);
    return IoUringSQE{static_cast<void*>(sqe)};
  }

  size_t SQELeft() const override { return io_uring_sq_space_left(&ring); }
  size_t SQEReady() const override { return io_uring_sq_ready(&ring); }

  IoUringSubmitResult Submit() override {
    return IoUringSubmitResult{io_uring_submit(&ring)};
  }

  IoUringSubmitResult SubmitAndWait(size_t completions) override {
    return IoUringSubmitResult{io_uring_submit_and_wait(&ring, completions)};
  }

  Result<Errno, std::vector<IoUringCQE>> PopCQE(
      const unsigned int count) override {
    std::vector<io_uring_cqe*> cqe_ptrs;
    cqe_ptrs.resize(count);
    const auto ret = io_uring_wait_cqe_nr(&ring, cqe_ptrs.data(), count);
    if (ret != 0) {
      return {Errno(ret)};
    }
    const auto filled = io_uring_peek_batch_cqe(&ring, cqe_ptrs.data(), count);
    if (filled != count) {
      return {Errno(EAGAIN)};
    }
    std::vector<IoUringCQE> cqes;
    cqes.reserve(count);
    for (const auto& cqe : cqe_ptrs) {
      if (cqe == nullptr) {
        return {Errno(EAGAIN)};
      }
      cqes.push_back(IoUringCQE(cqe->res, cqe->flags, cqe->user_data));
      io_uring_cqe_seen(&ring, cqe);
    }
    return {cqes};
  }

  Result<Errno, IoUringCQE> PopCQE() override {
    struct io_uring_cqe* ptr{};
    const auto ret = io_uring_wait_cqe(&ring, &ptr);
    if (ret != 0) {
      return {Errno(ret)};
    }
    const auto cqe = IoUringCQE(ptr->res, ptr->flags, ptr->user_data);
    io_uring_cqe_seen(&ring, ptr);
    return {cqe};
  }

  Result<Errno, IoUringCQE> PeekCQE() override {
    struct io_uring_cqe* ptr{};
    const auto ret = io_uring_peek_cqe(&ring, &ptr);
    if (ret != 0) {
      return {Errno(ret)};
    }
    return {IoUringCQE(ptr->res, ptr->flags, ptr->user_data)};
  }

  IoUring(struct io_uring r) : ring(r) {}

 private:
  struct io_uring ring {};
  bool buffer_registered_ = false;
  bool files_registered_ = false;
  std::atomic<size_t> request_id_{};
};

const char* Errno::ErrMsg() {
  if (error_code == 0) {
    return nullptr;
  }
  return strerror(error_code);
}

std::ostream& operator<<(std::ostream& out, Errno err) {
  out << err.ErrCode() << ", " << err.ErrMsg();
  return out;
}

std::unique_ptr<IoUringInterface> IoUringInterface::CreateLinuxIoUring(
    int queue_depth, int flags) {
  struct io_uring ring {};
  const auto err = io_uring_queue_init(queue_depth, &ring, flags);
  if (err) {
    errno = -err;
    return {};
  }
  return std::unique_ptr<IoUringInterface>(new IoUring(ring));
}

}  // namespace io_uring_cpp
