blob: 07e771f7e119e54f46d1cddee500fa92b70f3521 [file] [log] [blame]
/*
*
* Copyright 2014, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include "src/cpp/stream/stream_context.h"
#include <grpc/grpc.h>
#include <grpc/support/log.h>
#include "src/cpp/rpc_method.h"
#include "src/cpp/proto/proto_utils.h"
#include "src/cpp/util/time.h"
#include <grpc++/client_context.h>
#include <grpc++/config.h>
#include <google/protobuf/message.h>
namespace grpc {
// Client only ctor
StreamContext::StreamContext(const RpcMethod& method, ClientContext* context,
const google::protobuf::Message* request,
google::protobuf::Message* result)
: is_client_(true),
method_(&method),
context_(context),
request_(request),
result_(result),
invoke_ev_(nullptr),
read_ev_(nullptr),
write_ev_(nullptr),
reading_(false),
writing_(false),
got_read_(false),
got_write_(false),
peer_halfclosed_(false),
self_halfclosed_(false),
stream_finished_(false),
waiting_(false) {
GPR_ASSERT(method_->method_type() != RpcMethod::RpcType::NORMAL_RPC);
}
StreamContext::~StreamContext() { cq_poller_.join(); }
void StreamContext::PollingLoop() {
grpc_event* ev = nullptr;
gpr_timespec absolute_deadline;
AbsoluteDeadlineTimepoint2Timespec(context_->absolute_deadline(),
&absolute_deadline);
std::condition_variable* cv_to_notify = nullptr;
std::unique_lock<std::mutex> lock(mu_, std::defer_lock);
while (1) {
cv_to_notify = nullptr;
lock.lock();
if (stream_finished_ && !reading_ && !writing_) {
return;
}
lock.unlock();
ev = grpc_completion_queue_next(context_->cq(), absolute_deadline);
lock.lock();
if (!ev) {
stream_finished_ = true;
final_status_ = Status(StatusCode::DEADLINE_EXCEEDED);
std::condition_variable* cvs[3] = {reading_ ? &read_cv_ : nullptr,
writing_ ? &write_cv_ : nullptr,
waiting_ ? &finish_cv_ : nullptr};
got_read_ = reading_;
got_write_ = writing_;
read_ev_ = nullptr;
write_ev_ = nullptr;
lock.unlock();
for (int i = 0; i < 3; i++) {
if (cvs[i]) cvs[i]->notify_one();
}
break;
}
switch (ev->type) {
case GRPC_READ:
GPR_ASSERT(reading_);
got_read_ = true;
read_ev_ = ev;
cv_to_notify = &read_cv_;
reading_ = false;
break;
case GRPC_FINISH_ACCEPTED:
case GRPC_WRITE_ACCEPTED:
got_write_ = true;
write_ev_ = ev;
cv_to_notify = &write_cv_;
writing_ = false;
break;
case GRPC_FINISHED: {
grpc::string error_details(
ev->data.finished.details ? ev->data.finished.details : "");
final_status_ = Status(static_cast<StatusCode>(ev->data.finished.code),
error_details);
grpc_event_finish(ev);
stream_finished_ = true;
if (waiting_) {
cv_to_notify = &finish_cv_;
}
break;
}
case GRPC_INVOKE_ACCEPTED:
invoke_ev_ = ev;
cv_to_notify = &invoke_cv_;
break;
case GRPC_CLIENT_METADATA_READ:
grpc_event_finish(ev);
break;
default:
grpc_event_finish(ev);
// not handling other types now
gpr_log(GPR_ERROR, "unknown event type");
abort();
}
lock.unlock();
if (cv_to_notify) {
cv_to_notify->notify_one();
}
}
}
void StreamContext::Start(bool buffered) {
// TODO(yangg) handle metadata send path
int flag = buffered ? GRPC_WRITE_BUFFER_HINT : 0;
grpc_call_error error = grpc_call_start_invoke(
context_->call(), context_->cq(), this, this, this, flag);
GPR_ASSERT(GRPC_CALL_OK == error);
// kicks off the poller thread
cq_poller_ = std::thread(&StreamContext::PollingLoop, this);
std::unique_lock<std::mutex> lock(mu_);
while (!invoke_ev_) {
invoke_cv_.wait(lock);
}
lock.unlock();
GPR_ASSERT(invoke_ev_->data.invoke_accepted == GRPC_OP_OK);
grpc_event_finish(invoke_ev_);
}
namespace {
// Wait for got_event with event_cv protected by mu, return event.
grpc_event* WaitForEvent(bool* got_event, std::condition_variable* event_cv,
std::mutex* mu, grpc_event** event) {
std::unique_lock<std::mutex> lock(*mu);
while (*got_event == false) {
event_cv->wait(lock);
}
*got_event = false;
return *event;
}
} // namespace
bool StreamContext::Read(google::protobuf::Message* msg) {
std::unique_lock<std::mutex> lock(mu_);
if (stream_finished_) {
peer_halfclosed_ = true;
return false;
}
reading_ = true;
lock.unlock();
grpc_call_error err = grpc_call_start_read(context_->call(), this);
GPR_ASSERT(err == GRPC_CALL_OK);
grpc_event* ev = WaitForEvent(&got_read_, &read_cv_, &mu_, &read_ev_);
if (!ev) {
return false;
}
GPR_ASSERT(ev->type == GRPC_READ);
bool ret = true;
if (ev->data.read) {
if (!DeserializeProto(ev->data.read, msg)) {
ret = false; // parse error
// TODO(yangg) cancel the stream.
}
} else {
ret = false;
peer_halfclosed_ = true;
}
grpc_event_finish(ev);
return ret;
}
bool StreamContext::Write(const google::protobuf::Message* msg, bool is_last) {
bool ret = true;
grpc_event* ev = nullptr;
std::unique_lock<std::mutex> lock(mu_);
if (stream_finished_) {
self_halfclosed_ = true;
return false;
}
writing_ = true;
lock.unlock();
if (msg) {
grpc_byte_buffer* out_buf = nullptr;
if (!SerializeProto(*msg, &out_buf)) {
FinishStream(Status(StatusCode::INVALID_ARGUMENT,
"Failed to serialize request proto"),
true);
return false;
}
int flag = is_last ? GRPC_WRITE_BUFFER_HINT : 0;
grpc_call_error err =
grpc_call_start_write(context_->call(), out_buf, this, flag);
grpc_byte_buffer_destroy(out_buf);
GPR_ASSERT(err == GRPC_CALL_OK);
ev = WaitForEvent(&got_write_, &write_cv_, &mu_, &write_ev_);
if (!ev) {
return false;
}
GPR_ASSERT(ev->type == GRPC_WRITE_ACCEPTED);
ret = ev->data.write_accepted == GRPC_OP_OK;
grpc_event_finish(ev);
}
if (is_last) {
grpc_call_error err = grpc_call_writes_done(context_->call(), this);
GPR_ASSERT(err == GRPC_CALL_OK);
ev = WaitForEvent(&got_write_, &write_cv_, &mu_, &write_ev_);
if (!ev) {
return false;
}
GPR_ASSERT(ev->type == GRPC_FINISH_ACCEPTED);
grpc_event_finish(ev);
self_halfclosed_ = true;
}
return ret;
}
const Status& StreamContext::Wait() {
std::unique_lock<std::mutex> lock(mu_);
// TODO(yangg) if not halfclosed cancel the stream
GPR_ASSERT(self_halfclosed_);
GPR_ASSERT(peer_halfclosed_);
GPR_ASSERT(!waiting_);
waiting_ = true;
while (!stream_finished_) {
finish_cv_.wait(lock);
}
return final_status_;
}
void StreamContext::FinishStream(const Status& status, bool send) { return; }
} // namespace grpc