Some streaming progress
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 7d78bfa..65d11f1 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c
@@ -1015,6 +1015,8 @@ int had_outgoing; char buffer[GPR_LTOA_MIN_BUFSIZE]; + gpr_log(GPR_DEBUG, "cancel %d", id); + if (s) { /* clear out any unreported input & output: nobody cares anymore */ had_outgoing = s->outgoing_sopb.nops != 0; @@ -1185,6 +1187,8 @@ transport *t = tp; stream *s = t->incoming_stream; + gpr_log(GPR_DEBUG, "on_header: %d %s %s", s->id, grpc_mdstr_as_c_string(md->key), grpc_mdstr_as_c_string(md->value)); + GPR_ASSERT(s); stream_list_join(t, s, PENDING_CALLBACKS); if (md->key == t->str_grpc_timeout) {
diff --git a/src/cpp/client/client_unary_call.cc b/src/cpp/client/client_unary_call.cc index 30bf2d0..bc0e837 100644 --- a/src/cpp/client/client_unary_call.cc +++ b/src/cpp/client/client_unary_call.cc
@@ -51,11 +51,12 @@ Status status; buf.AddSendInitialMetadata(context); buf.AddSendMessage(request); - buf.AddRecvMessage(result); + bool got_message; + buf.AddRecvMessage(result, &got_message); buf.AddClientSendClose(); buf.AddClientRecvStatus(nullptr, &status); // TODO metadata call.PerformOps(&buf); - GPR_ASSERT(cq.Pluck(&buf)); + GPR_ASSERT(cq.Pluck(&buf) && (got_message || !status.IsOk())); return status; }
diff --git a/src/cpp/common/call.cc b/src/cpp/common/call.cc index d90ef03..a20d4a0 100644 --- a/src/cpp/common/call.cc +++ b/src/cpp/common/call.cc
@@ -58,6 +58,7 @@ } recv_message_ = nullptr; + got_message_ = nullptr; if (recv_message_buf_) { grpc_byte_buffer_destroy(recv_message_buf_); recv_message_buf_ = nullptr; @@ -128,8 +129,9 @@ send_message_ = &message; } -void CallOpBuffer::AddRecvMessage(google::protobuf::Message *message) { +void CallOpBuffer::AddRecvMessage(google::protobuf::Message *message, bool* got_message) { recv_message_ = message; + got_message_ = got_message; } void CallOpBuffer::AddClientSendClose() { @@ -239,10 +241,15 @@ FillMetadataMap(&recv_initial_metadata_arr_, recv_initial_metadata_); } // Parse received message if any. - if (recv_message_ && recv_message_buf_) { - *status = DeserializeProto(recv_message_buf_, recv_message_); - grpc_byte_buffer_destroy(recv_message_buf_); - recv_message_buf_ = nullptr; + if (recv_message_) { + if (recv_message_buf_) { + *got_message_ = true; + *status = DeserializeProto(recv_message_buf_, recv_message_); + grpc_byte_buffer_destroy(recv_message_buf_); + recv_message_buf_ = nullptr; + } else { + *got_message_ = false; + } } // Parse received status. if (recv_status_) {
diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc index be3d225..17b0543 100644 --- a/src/cpp/server/server.cc +++ b/src/cpp/server/server.cc
@@ -177,9 +177,7 @@ auto status = method_->handler()->RunHandler( MethodHandler::HandlerParameter(&call_, &ctx_, req.get(), res.get())); CallOpBuffer buf; - if (!ctx_.sent_initial_metadata_) { - buf.AddSendInitialMetadata(&ctx_.initial_metadata_); - } + ctx_.SendInitialMetadataIfNeeded(&buf); if (has_response_payload_) { buf.AddSendMessage(*res); }
diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc index 2bf4104..06eeb39 100644 --- a/src/cpp/server/server_context.cc +++ b/src/cpp/server/server_context.cc
@@ -32,6 +32,7 @@ */ #include <grpc++/server_context.h> +#include <grpc++/impl/call.h> #include <grpc/grpc.h> #include "src/cpp/util/time.h" @@ -48,4 +49,11 @@ } } +void ServerContext::SendInitialMetadataIfNeeded(CallOpBuffer* buf) { + if (!sent_initial_metadata_) { + buf->AddSendInitialMetadata(&initial_metadata_); + sent_initial_metadata_ = true; + } +} + } // namespace grpc