Merge pull request #15682 from mehrdada/bump-pylint
Bump pylint to 1.9.2
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 53100e9..71bebde 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -663,9 +663,6 @@
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx address_sorting_test)
endif()
-if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
-add_dependencies(buildtests_cxx cancel_ares_query_test)
-endif()
add_custom_target(buildtests
DEPENDS buildtests_c buildtests_cxx)
@@ -15846,48 +15843,6 @@
endif()
endif (gRPC_BUILD_TESTS)
if (gRPC_BUILD_TESTS)
-if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
-
-add_executable(cancel_ares_query_test
- test/cpp/naming/cancel_ares_query_test.cc
- third_party/googletest/googletest/src/gtest-all.cc
- third_party/googletest/googlemock/src/gmock-all.cc
-)
-
-
-target_include_directories(cancel_ares_query_test
- PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}
- PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include
- PRIVATE ${_gRPC_SSL_INCLUDE_DIR}
- PRIVATE ${_gRPC_PROTOBUF_INCLUDE_DIR}
- PRIVATE ${_gRPC_ZLIB_INCLUDE_DIR}
- PRIVATE ${_gRPC_BENCHMARK_INCLUDE_DIR}
- PRIVATE ${_gRPC_CARES_INCLUDE_DIR}
- PRIVATE ${_gRPC_GFLAGS_INCLUDE_DIR}
- PRIVATE ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
- PRIVATE third_party/googletest/googletest/include
- PRIVATE third_party/googletest/googletest
- PRIVATE third_party/googletest/googlemock/include
- PRIVATE third_party/googletest/googlemock
- PRIVATE ${_gRPC_PROTO_GENS_DIR}
-)
-
-target_link_libraries(cancel_ares_query_test
- ${_gRPC_PROTOBUF_LIBRARIES}
- ${_gRPC_ALLTARGETS_LIBRARIES}
- grpc++_test_util
- grpc_test_util
- gpr_test_util
- grpc++
- grpc
- gpr
- grpc++_test_config
- ${_gRPC_GFLAGS_LIBRARIES}
-)
-
-endif()
-endif (gRPC_BUILD_TESTS)
-if (gRPC_BUILD_TESTS)
add_executable(alts_credentials_fuzzer_one_entry
test/core/security/alts_credentials_fuzzer.cc
diff --git a/Makefile b/Makefile
index 672cdfa..1247566f 100644
--- a/Makefile
+++ b/Makefile
@@ -1316,7 +1316,6 @@
resolver_component_tests_runner_invoker: $(BINDIR)/$(CONFIG)/resolver_component_tests_runner_invoker
address_sorting_test_unsecure: $(BINDIR)/$(CONFIG)/address_sorting_test_unsecure
address_sorting_test: $(BINDIR)/$(CONFIG)/address_sorting_test
-cancel_ares_query_test: $(BINDIR)/$(CONFIG)/cancel_ares_query_test
alts_credentials_fuzzer_one_entry: $(BINDIR)/$(CONFIG)/alts_credentials_fuzzer_one_entry
api_fuzzer_one_entry: $(BINDIR)/$(CONFIG)/api_fuzzer_one_entry
client_fuzzer_one_entry: $(BINDIR)/$(CONFIG)/client_fuzzer_one_entry
@@ -1754,7 +1753,6 @@
$(BINDIR)/$(CONFIG)/resolver_component_tests_runner_invoker \
$(BINDIR)/$(CONFIG)/address_sorting_test_unsecure \
$(BINDIR)/$(CONFIG)/address_sorting_test \
- $(BINDIR)/$(CONFIG)/cancel_ares_query_test \
else
buildtests_cxx: privatelibs_cxx \
@@ -1879,7 +1877,6 @@
$(BINDIR)/$(CONFIG)/resolver_component_tests_runner_invoker \
$(BINDIR)/$(CONFIG)/address_sorting_test_unsecure \
$(BINDIR)/$(CONFIG)/address_sorting_test \
- $(BINDIR)/$(CONFIG)/cancel_ares_query_test \
endif
@@ -2365,8 +2362,6 @@
$(Q) $(BINDIR)/$(CONFIG)/address_sorting_test_unsecure || ( echo test address_sorting_test_unsecure failed ; exit 1 )
$(E) "[RUN] Testing address_sorting_test"
$(Q) $(BINDIR)/$(CONFIG)/address_sorting_test || ( echo test address_sorting_test failed ; exit 1 )
- $(E) "[RUN] Testing cancel_ares_query_test"
- $(Q) $(BINDIR)/$(CONFIG)/cancel_ares_query_test || ( echo test cancel_ares_query_test failed ; exit 1 )
flaky_test_cxx: buildtests_cxx
@@ -23692,49 +23687,6 @@
endif
-CANCEL_ARES_QUERY_TEST_SRC = \
- test/cpp/naming/cancel_ares_query_test.cc \
-
-CANCEL_ARES_QUERY_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(CANCEL_ARES_QUERY_TEST_SRC))))
-ifeq ($(NO_SECURE),true)
-
-# You can't build secure targets if you don't have OpenSSL.
-
-$(BINDIR)/$(CONFIG)/cancel_ares_query_test: openssl_dep_error
-
-else
-
-
-
-
-ifeq ($(NO_PROTOBUF),true)
-
-# You can't build the protoc plugins or protobuf-enabled targets if you don't have protobuf 3.5.0+.
-
-$(BINDIR)/$(CONFIG)/cancel_ares_query_test: protobuf_dep_error
-
-else
-
-$(BINDIR)/$(CONFIG)/cancel_ares_query_test: $(PROTOBUF_DEP) $(CANCEL_ARES_QUERY_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_config.a
- $(E) "[LD] Linking $@"
- $(Q) mkdir -p `dirname $@`
- $(Q) $(LDXX) $(LDFLAGS) $(CANCEL_ARES_QUERY_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_config.a $(LDLIBSXX) $(LDLIBS_PROTOBUF) $(LDLIBS) $(LDLIBS_SECURE) $(GTEST_LIB) -o $(BINDIR)/$(CONFIG)/cancel_ares_query_test
-
-endif
-
-endif
-
-$(OBJDIR)/$(CONFIG)/test/cpp/naming/cancel_ares_query_test.o: $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_config.a
-
-deps_cancel_ares_query_test: $(CANCEL_ARES_QUERY_TEST_OBJS:.o=.dep)
-
-ifneq ($(NO_SECURE),true)
-ifneq ($(NO_DEPS),true)
--include $(CANCEL_ARES_QUERY_TEST_OBJS:.o=.dep)
-endif
-endif
-
-
ALTS_CREDENTIALS_FUZZER_ONE_ENTRY_SRC = \
test/core/security/alts_credentials_fuzzer.cc \
test/core/util/one_corpus_entry_fuzzer.cc \
diff --git a/include/grpcpp/impl/codegen/call.h b/include/grpcpp/impl/codegen/call.h
index 28cc4a9..e324f6b 100644
--- a/include/grpcpp/impl/codegen/call.h
+++ b/include/grpcpp/impl/codegen/call.h
@@ -573,10 +573,13 @@
binary_error_details =
grpc::string(iter->second.begin(), iter->second.length());
}
- *recv_status_ = Status(static_cast<StatusCode>(status_code_),
- grpc::string(GRPC_SLICE_START_PTR(error_message_),
- GRPC_SLICE_END_PTR(error_message_)),
- binary_error_details);
+ *recv_status_ =
+ Status(static_cast<StatusCode>(status_code_),
+ GRPC_SLICE_IS_EMPTY(error_message_)
+ ? grpc::string()
+ : grpc::string(GRPC_SLICE_START_PTR(error_message_),
+ GRPC_SLICE_END_PTR(error_message_)),
+ binary_error_details);
client_context_->set_debug_error_string(
debug_error_string_ != nullptr ? debug_error_string_ : "");
g_core_codegen_interface->grpc_slice_unref(error_message_);
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc
index ea6775a..98391a1 100644
--- a/src/core/ext/filters/client_channel/client_channel.cc
+++ b/src/core/ext/filters/client_channel/client_channel.cc
@@ -817,6 +817,7 @@
// For intercepting recv_trailing_metadata.
grpc_metadata_batch recv_trailing_metadata;
grpc_transport_stream_stats collect_stats;
+ grpc_closure recv_trailing_metadata_ready;
// For intercepting on_complete.
grpc_closure on_complete;
} subchannel_batch_data;
@@ -1192,35 +1193,24 @@
"chand=%p calld=%p: failing %" PRIuPTR " pending batches: %s",
elem->channel_data, calld, num_batches, grpc_error_string(error));
}
- grpc_transport_stream_op_batch*
- batches[GPR_ARRAY_SIZE(calld->pending_batches)];
- size_t num_batches = 0;
+ grpc_core::CallCombinerClosureList closures;
for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
pending_batch* pending = &calld->pending_batches[i];
grpc_transport_stream_op_batch* batch = pending->batch;
if (batch != nullptr) {
- batches[num_batches++] = batch;
+ batch->handler_private.extra_arg = calld;
+ GRPC_CLOSURE_INIT(&batch->handler_private.closure,
+ fail_pending_batch_in_call_combiner, batch,
+ grpc_schedule_on_exec_ctx);
+ closures.Add(&batch->handler_private.closure, GRPC_ERROR_REF(error),
+ "pending_batches_fail");
pending_batch_clear(calld, pending);
}
}
- for (size_t i = yield_call_combiner ? 1 : 0; i < num_batches; ++i) {
- grpc_transport_stream_op_batch* batch = batches[i];
- batch->handler_private.extra_arg = calld;
- GRPC_CLOSURE_INIT(&batch->handler_private.closure,
- fail_pending_batch_in_call_combiner, batch,
- grpc_schedule_on_exec_ctx);
- GRPC_CALL_COMBINER_START(calld->call_combiner,
- &batch->handler_private.closure,
- GRPC_ERROR_REF(error), "pending_batches_fail");
- }
if (yield_call_combiner) {
- if (num_batches > 0) {
- // Note: This will release the call combiner.
- grpc_transport_stream_op_batch_finish_with_failure(
- batches[0], GRPC_ERROR_REF(error), calld->call_combiner);
- } else {
- GRPC_CALL_COMBINER_STOP(calld->call_combiner, "pending_batches_fail");
- }
+ closures.RunClosures(calld->call_combiner);
+ } else {
+ closures.RunClosuresWithoutYielding(calld->call_combiner);
}
GRPC_ERROR_UNREF(error);
}
@@ -1255,30 +1245,22 @@
" pending batches on subchannel_call=%p",
chand, calld, num_batches, calld->subchannel_call);
}
- grpc_transport_stream_op_batch*
- batches[GPR_ARRAY_SIZE(calld->pending_batches)];
- size_t num_batches = 0;
+ grpc_core::CallCombinerClosureList closures;
for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
pending_batch* pending = &calld->pending_batches[i];
grpc_transport_stream_op_batch* batch = pending->batch;
if (batch != nullptr) {
- batches[num_batches++] = batch;
+ batch->handler_private.extra_arg = calld->subchannel_call;
+ GRPC_CLOSURE_INIT(&batch->handler_private.closure,
+ resume_pending_batch_in_call_combiner, batch,
+ grpc_schedule_on_exec_ctx);
+ closures.Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
+ "pending_batches_resume");
pending_batch_clear(calld, pending);
}
}
- for (size_t i = 1; i < num_batches; ++i) {
- grpc_transport_stream_op_batch* batch = batches[i];
- batch->handler_private.extra_arg = calld->subchannel_call;
- GRPC_CLOSURE_INIT(&batch->handler_private.closure,
- resume_pending_batch_in_call_combiner, batch,
- grpc_schedule_on_exec_ctx);
- GRPC_CALL_COMBINER_START(calld->call_combiner,
- &batch->handler_private.closure, GRPC_ERROR_NONE,
- "pending_batches_resume");
- }
- GPR_ASSERT(num_batches > 0);
// Note: This will release the call combiner.
- grpc_subchannel_call_process_op(calld->subchannel_call, batches[0]);
+ closures.RunClosures(calld->call_combiner);
}
static void maybe_clear_pending_batch(grpc_call_element* elem,
@@ -1293,7 +1275,10 @@
batch->payload->recv_initial_metadata.recv_initial_metadata_ready ==
nullptr) &&
(!batch->recv_message ||
- batch->payload->recv_message.recv_message_ready == nullptr)) {
+ batch->payload->recv_message.recv_message_ready == nullptr) &&
+ (!batch->recv_trailing_metadata ||
+ batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready ==
+ nullptr)) {
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_INFO, "chand=%p calld=%p: clearing pending batch", chand,
calld);
@@ -1302,75 +1287,27 @@
}
}
-// Returns true if all ops in the pending batch have been completed.
-static bool pending_batch_is_completed(
- pending_batch* pending, call_data* calld,
- subchannel_call_retry_state* retry_state) {
- if (pending->batch == nullptr || pending->batch->on_complete == nullptr) {
- return false;
+// Returns a pointer to the first pending batch for which predicate(batch)
+// returns true, or null if not found.
+template <typename Predicate>
+static pending_batch* pending_batch_find(grpc_call_element* elem,
+ const char* log_message,
+ Predicate predicate) {
+ channel_data* chand = static_cast<channel_data*>(elem->channel_data);
+ call_data* calld = static_cast<call_data*>(elem->call_data);
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
+ pending_batch* pending = &calld->pending_batches[i];
+ grpc_transport_stream_op_batch* batch = pending->batch;
+ if (batch != nullptr && predicate(batch)) {
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "chand=%p calld=%p: %s pending batch at index %" PRIuPTR, chand,
+ calld, log_message, i);
+ }
+ return pending;
+ }
}
- if (pending->batch->send_initial_metadata &&
- !retry_state->completed_send_initial_metadata) {
- return false;
- }
- if (pending->batch->send_message &&
- retry_state->completed_send_message_count <
- calld->send_messages->size()) {
- return false;
- }
- if (pending->batch->send_trailing_metadata &&
- !retry_state->completed_send_trailing_metadata) {
- return false;
- }
- if (pending->batch->recv_initial_metadata &&
- !retry_state->completed_recv_initial_metadata) {
- return false;
- }
- if (pending->batch->recv_message &&
- retry_state->completed_recv_message_count <
- retry_state->started_recv_message_count) {
- return false;
- }
- if (pending->batch->recv_trailing_metadata &&
- !retry_state->completed_recv_trailing_metadata) {
- return false;
- }
- return true;
-}
-
-// Returns true if any op in the batch was not yet started.
-static bool pending_batch_is_unstarted(
- pending_batch* pending, call_data* calld,
- subchannel_call_retry_state* retry_state) {
- if (pending->batch == nullptr || pending->batch->on_complete == nullptr) {
- return false;
- }
- if (pending->batch->send_initial_metadata &&
- !retry_state->started_send_initial_metadata) {
- return true;
- }
- if (pending->batch->send_message &&
- retry_state->started_send_message_count < calld->send_messages->size()) {
- return true;
- }
- if (pending->batch->send_trailing_metadata &&
- !retry_state->started_send_trailing_metadata) {
- return true;
- }
- if (pending->batch->recv_initial_metadata &&
- !retry_state->started_recv_initial_metadata) {
- return true;
- }
- if (pending->batch->recv_message &&
- retry_state->completed_recv_message_count ==
- retry_state->started_recv_message_count) {
- return true;
- }
- if (pending->batch->recv_trailing_metadata &&
- !retry_state->started_recv_trailing_metadata) {
- return true;
- }
- return false;
+ return nullptr;
}
//
@@ -1557,8 +1494,13 @@
// subchannel_batch_data
//
+// Creates a subchannel_batch_data object on the call's arena with the
+// specified refcount. If set_on_complete is true, the batch's
+// on_complete callback will be set to point to on_complete();
+// otherwise, the batch's on_complete callback will be null.
static subchannel_batch_data* batch_data_create(grpc_call_element* elem,
- int refcount) {
+ int refcount,
+ bool set_on_complete) {
call_data* calld = static_cast<call_data*>(elem->call_data);
subchannel_call_retry_state* retry_state =
static_cast<subchannel_call_retry_state*>(
@@ -1571,9 +1513,11 @@
GRPC_SUBCHANNEL_CALL_REF(calld->subchannel_call, "batch_data_create");
batch_data->batch.payload = &retry_state->batch_payload;
gpr_ref_init(&batch_data->refs, refcount);
- GRPC_CLOSURE_INIT(&batch_data->on_complete, on_complete, batch_data,
- grpc_schedule_on_exec_ctx);
- batch_data->batch.on_complete = &batch_data->on_complete;
+ if (set_on_complete) {
+ GRPC_CLOSURE_INIT(&batch_data->on_complete, on_complete, batch_data,
+ grpc_schedule_on_exec_ctx);
+ batch_data->batch.on_complete = &batch_data->on_complete;
+ }
GRPC_CALL_STACK_REF(calld->owning_call, "batch_data");
return batch_data;
}
@@ -1606,26 +1550,14 @@
static void invoke_recv_initial_metadata_callback(void* arg,
grpc_error* error) {
subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
- channel_data* chand =
- static_cast<channel_data*>(batch_data->elem->channel_data);
- call_data* calld = static_cast<call_data*>(batch_data->elem->call_data);
// Find pending batch.
- pending_batch* pending = nullptr;
- for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
- grpc_transport_stream_op_batch* batch = calld->pending_batches[i].batch;
- if (batch != nullptr && batch->recv_initial_metadata &&
- batch->payload->recv_initial_metadata.recv_initial_metadata_ready !=
- nullptr) {
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO,
- "chand=%p calld=%p: invoking recv_initial_metadata_ready for "
- "pending batch at index %" PRIuPTR,
- chand, calld, i);
- }
- pending = &calld->pending_batches[i];
- break;
- }
- }
+ pending_batch* pending = pending_batch_find(
+ batch_data->elem, "invoking recv_initial_metadata_ready for",
+ [](grpc_transport_stream_op_batch* batch) {
+ return batch->recv_initial_metadata &&
+ batch->payload->recv_initial_metadata
+ .recv_initial_metadata_ready != nullptr;
+ });
GPR_ASSERT(pending != nullptr);
// Return metadata.
grpc_metadata_batch_move(
@@ -1661,10 +1593,19 @@
static_cast<subchannel_call_retry_state*>(
grpc_connected_subchannel_call_get_parent_data(
batch_data->subchannel_call));
+ retry_state->completed_recv_initial_metadata = true;
+ // If a retry was already dispatched, then we're not going to use the
+ // result of this recv_initial_metadata op, so do nothing.
+ if (retry_state->retry_dispatched) {
+ GRPC_CALL_COMBINER_STOP(
+ calld->call_combiner,
+ "recv_initial_metadata_ready after retry dispatched");
+ return;
+ }
// If we got an error or a Trailers-Only response and have not yet gotten
- // the recv_trailing_metadata on_complete callback, then defer
- // propagating this callback back to the surface. We can evaluate whether
- // to retry when recv_trailing_metadata comes back.
+ // the recv_trailing_metadata_ready callback, then defer propagating this
+ // callback back to the surface. We can evaluate whether to retry when
+ // recv_trailing_metadata comes back.
if (GPR_UNLIKELY((batch_data->trailing_metadata_available ||
error != GRPC_ERROR_NONE) &&
!retry_state->completed_recv_trailing_metadata)) {
@@ -1689,9 +1630,9 @@
}
// Received valid initial metadata, so commit the call.
retry_commit(elem, retry_state);
+ // Invoke the callback to return the result to the surface.
// Manually invoking a callback function; it does not take ownership of error.
invoke_recv_initial_metadata_callback(batch_data, error);
- GRPC_ERROR_UNREF(error);
}
//
@@ -1701,25 +1642,13 @@
// Invokes recv_message_ready for a subchannel batch.
static void invoke_recv_message_callback(void* arg, grpc_error* error) {
subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
- channel_data* chand =
- static_cast<channel_data*>(batch_data->elem->channel_data);
- call_data* calld = static_cast<call_data*>(batch_data->elem->call_data);
// Find pending op.
- pending_batch* pending = nullptr;
- for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
- grpc_transport_stream_op_batch* batch = calld->pending_batches[i].batch;
- if (batch != nullptr && batch->recv_message &&
- batch->payload->recv_message.recv_message_ready != nullptr) {
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO,
- "chand=%p calld=%p: invoking recv_message_ready for "
- "pending batch at index %" PRIuPTR,
- chand, calld, i);
- }
- pending = &calld->pending_batches[i];
- break;
- }
- }
+ pending_batch* pending = pending_batch_find(
+ batch_data->elem, "invoking recv_message_ready for",
+ [](grpc_transport_stream_op_batch* batch) {
+ return batch->recv_message &&
+ batch->payload->recv_message.recv_message_ready != nullptr;
+ });
GPR_ASSERT(pending != nullptr);
// Return payload.
*pending->batch->payload->recv_message.recv_message =
@@ -1751,10 +1680,18 @@
static_cast<subchannel_call_retry_state*>(
grpc_connected_subchannel_call_get_parent_data(
batch_data->subchannel_call));
+ ++retry_state->completed_recv_message_count;
+ // If a retry was already dispatched, then we're not going to use the
+ // result of this recv_message op, so do nothing.
+ if (retry_state->retry_dispatched) {
+ GRPC_CALL_COMBINER_STOP(calld->call_combiner,
+ "recv_message_ready after retry dispatched");
+ return;
+ }
// If we got an error or the payload was nullptr and we have not yet gotten
- // the recv_trailing_metadata on_complete callback, then defer
- // propagating this callback back to the surface. We can evaluate whether
- // to retry when recv_trailing_metadata comes back.
+ // the recv_trailing_metadata_ready callback, then defer propagating this
+ // callback back to the surface. We can evaluate whether to retry when
+ // recv_trailing_metadata comes back.
if (GPR_UNLIKELY(
(batch_data->recv_message == nullptr || error != GRPC_ERROR_NONE) &&
!retry_state->completed_recv_trailing_metadata)) {
@@ -1777,133 +1714,241 @@
}
// Received a valid message, so commit the call.
retry_commit(elem, retry_state);
+ // Invoke the callback to return the result to the surface.
// Manually invoking a callback function; it does not take ownership of error.
invoke_recv_message_callback(batch_data, error);
- GRPC_ERROR_UNREF(error);
}
//
-// list of closures to execute in call combiner
+// recv_trailing_metadata handling
//
-// Represents a closure that needs to run in the call combiner as part of
-// starting or completing a batch.
-typedef struct {
- grpc_closure* closure;
- grpc_error* error;
- const char* reason;
- bool free_reason = false;
-} closure_to_execute;
+// Adds recv_trailing_metadata_ready closure to closures.
+static void add_closure_for_recv_trailing_metadata_ready(
+ grpc_call_element* elem, subchannel_batch_data* batch_data,
+ grpc_error* error, grpc_core::CallCombinerClosureList* closures) {
+ // Find pending batch.
+ pending_batch* pending = pending_batch_find(
+ elem, "invoking recv_trailing_metadata for",
+ [](grpc_transport_stream_op_batch* batch) {
+ return batch->recv_trailing_metadata &&
+ batch->payload->recv_trailing_metadata
+ .recv_trailing_metadata_ready != nullptr;
+ });
+ // If we generated the recv_trailing_metadata op internally via
+ // start_internal_recv_trailing_metadata(), then there will be no
+ // pending batch.
+ if (pending == nullptr) {
+ GRPC_ERROR_UNREF(error);
+ return;
+ }
+ // Return metadata.
+ grpc_metadata_batch_move(
+ &batch_data->recv_trailing_metadata,
+ pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata);
+ // Add closure.
+ closures->Add(pending->batch->payload->recv_trailing_metadata
+ .recv_trailing_metadata_ready,
+ error, "recv_trailing_metadata_ready for pending batch");
+ // Update bookkeeping.
+ pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
+ nullptr;
+ maybe_clear_pending_batch(elem, pending);
+}
-static void execute_closures_in_call_combiner(grpc_call_element* elem,
- const char* caller,
- closure_to_execute* closures,
- size_t num_closures) {
+// Adds any necessary closures for deferred recv_initial_metadata and
+// recv_message callbacks to closures.
+static void add_closures_for_deferred_recv_callbacks(
+ subchannel_batch_data* batch_data, subchannel_call_retry_state* retry_state,
+ grpc_core::CallCombinerClosureList* closures) {
+ if (batch_data->batch.recv_trailing_metadata) {
+ // Add closure for deferred recv_initial_metadata_ready.
+ if (GPR_UNLIKELY(retry_state->recv_initial_metadata_ready_deferred_batch !=
+ nullptr)) {
+ GRPC_CLOSURE_INIT(&batch_data->recv_initial_metadata_ready,
+ invoke_recv_initial_metadata_callback,
+ retry_state->recv_initial_metadata_ready_deferred_batch,
+ grpc_schedule_on_exec_ctx);
+ closures->Add(&batch_data->recv_initial_metadata_ready,
+ retry_state->recv_initial_metadata_error,
+ "resuming recv_initial_metadata_ready");
+ retry_state->recv_initial_metadata_ready_deferred_batch = nullptr;
+ }
+ // Add closure for deferred recv_message_ready.
+ if (GPR_UNLIKELY(retry_state->recv_message_ready_deferred_batch !=
+ nullptr)) {
+ GRPC_CLOSURE_INIT(&batch_data->recv_message_ready,
+ invoke_recv_message_callback,
+ retry_state->recv_message_ready_deferred_batch,
+ grpc_schedule_on_exec_ctx);
+ closures->Add(&batch_data->recv_message_ready,
+ retry_state->recv_message_error,
+ "resuming recv_message_ready");
+ retry_state->recv_message_ready_deferred_batch = nullptr;
+ }
+ }
+}
+
+// Returns true if any op in the batch was not yet started.
+// Only looks at send ops, since recv ops are always started immediately.
+static bool pending_batch_is_unstarted(
+ pending_batch* pending, call_data* calld,
+ subchannel_call_retry_state* retry_state) {
+ if (pending->batch == nullptr || pending->batch->on_complete == nullptr) {
+ return false;
+ }
+ if (pending->batch->send_initial_metadata &&
+ !retry_state->started_send_initial_metadata) {
+ return true;
+ }
+ if (pending->batch->send_message &&
+ retry_state->started_send_message_count < calld->send_messages->size()) {
+ return true;
+ }
+ if (pending->batch->send_trailing_metadata &&
+ !retry_state->started_send_trailing_metadata) {
+ return true;
+ }
+ return false;
+}
+
+// For any pending batch containing an op that has not yet been started,
+// adds the pending batch's completion closures to closures.
+static void add_closures_to_fail_unstarted_pending_batches(
+ grpc_call_element* elem, subchannel_call_retry_state* retry_state,
+ grpc_error* error, grpc_core::CallCombinerClosureList* closures) {
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
call_data* calld = static_cast<call_data*>(elem->call_data);
- // Note that the call combiner will be yielded for each closure that
- // we schedule. We're already running in the call combiner, so one of
- // the closures can be scheduled directly, but the others will
- // have to re-enter the call combiner.
- if (num_closures > 0) {
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO, "chand=%p calld=%p: %s starting closure: %s", chand,
- calld, caller, closures[0].reason);
- }
- GRPC_CLOSURE_SCHED(closures[0].closure, closures[0].error);
- if (closures[0].free_reason) {
- gpr_free(const_cast<char*>(closures[0].reason));
- }
- for (size_t i = 1; i < num_closures; ++i) {
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
+ pending_batch* pending = &calld->pending_batches[i];
+ if (pending_batch_is_unstarted(pending, calld, retry_state)) {
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_INFO,
- "chand=%p calld=%p: %s starting closure in call combiner: %s",
- chand, calld, caller, closures[i].reason);
+ "chand=%p calld=%p: failing unstarted pending batch at index "
+ "%" PRIuPTR,
+ chand, calld, i);
}
- GRPC_CALL_COMBINER_START(calld->call_combiner, closures[i].closure,
- closures[i].error, closures[i].reason);
- if (closures[i].free_reason) {
- gpr_free(const_cast<char*>(closures[i].reason));
- }
+ closures->Add(pending->batch->on_complete, GRPC_ERROR_REF(error),
+ "failing on_complete for pending batch");
+ pending->batch->on_complete = nullptr;
+ maybe_clear_pending_batch(elem, pending);
}
- } else {
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO, "chand=%p calld=%p: no closures to run for %s", chand,
- calld, caller);
- }
- GRPC_CALL_COMBINER_STOP(calld->call_combiner, "no closures to run");
}
+ GRPC_ERROR_UNREF(error);
+}
+
+// Intercepts recv_trailing_metadata_ready callback for retries.
+// Commits the call and returns the trailing metadata up the stack.
+static void recv_trailing_metadata_ready(void* arg, grpc_error* error) {
+ subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
+ grpc_call_element* elem = batch_data->elem;
+ channel_data* chand = static_cast<channel_data*>(elem->channel_data);
+ call_data* calld = static_cast<call_data*>(elem->call_data);
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "chand=%p calld=%p: got recv_trailing_metadata_ready, error=%s",
+ chand, calld, grpc_error_string(error));
+ }
+ subchannel_call_retry_state* retry_state =
+ static_cast<subchannel_call_retry_state*>(
+ grpc_connected_subchannel_call_get_parent_data(
+ batch_data->subchannel_call));
+ retry_state->completed_recv_trailing_metadata = true;
+ // Get the call's status and check for server pushback metadata.
+ grpc_status_code status = GRPC_STATUS_OK;
+ grpc_mdelem* server_pushback_md = nullptr;
+ if (error != GRPC_ERROR_NONE) {
+ grpc_error_get_status(error, calld->deadline, &status, nullptr, nullptr,
+ nullptr);
+ } else {
+ grpc_metadata_batch* md_batch =
+ batch_data->batch.payload->recv_trailing_metadata
+ .recv_trailing_metadata;
+ GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr);
+ status =
+ grpc_get_status_code_from_metadata(md_batch->idx.named.grpc_status->md);
+ if (md_batch->idx.named.grpc_retry_pushback_ms != nullptr) {
+ server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md;
+ }
+ }
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_INFO, "chand=%p calld=%p: call finished, status=%s", chand,
+ calld, grpc_status_code_to_string(status));
+ }
+ // Check if we should retry.
+ if (maybe_retry(elem, batch_data, status, server_pushback_md)) {
+ // Unref batch_data for deferred recv_initial_metadata_ready or
+ // recv_message_ready callbacks, if any.
+ if (retry_state->recv_initial_metadata_ready_deferred_batch != nullptr) {
+ batch_data_unref(batch_data);
+ GRPC_ERROR_UNREF(retry_state->recv_initial_metadata_error);
+ }
+ if (retry_state->recv_message_ready_deferred_batch != nullptr) {
+ batch_data_unref(batch_data);
+ GRPC_ERROR_UNREF(retry_state->recv_message_error);
+ }
+ batch_data_unref(batch_data);
+ return;
+ }
+ // Not retrying, so commit the call.
+ retry_commit(elem, retry_state);
+ // Construct list of closures to execute.
+ grpc_core::CallCombinerClosureList closures;
+ // First, add closure for recv_trailing_metadata_ready.
+ add_closure_for_recv_trailing_metadata_ready(
+ elem, batch_data, GRPC_ERROR_REF(error), &closures);
+ // If there are deferred recv_initial_metadata_ready or recv_message_ready
+ // callbacks, add them to closures.
+ add_closures_for_deferred_recv_callbacks(batch_data, retry_state, &closures);
+ // Add closures to fail any pending batches that have not yet been started.
+ add_closures_to_fail_unstarted_pending_batches(
+ elem, retry_state, GRPC_ERROR_REF(error), &closures);
+ // Don't need batch_data anymore.
+ batch_data_unref(batch_data);
+ // Schedule all of the closures identified above.
+ // Note: This will release the call combiner.
+ closures.RunClosures(calld->call_combiner);
}
//
// on_complete callback handling
//
-// Updates retry_state to reflect the ops completed in batch_data.
-static void update_retry_state_for_completed_batch(
- subchannel_batch_data* batch_data,
- subchannel_call_retry_state* retry_state) {
- if (batch_data->batch.send_initial_metadata) {
- retry_state->completed_send_initial_metadata = true;
+// For any pending batch completed in batch_data, adds the necessary
+// completion closures to closures.
+static void add_closure_for_completed_pending_batch(
+ grpc_call_element* elem, subchannel_batch_data* batch_data,
+ subchannel_call_retry_state* retry_state, grpc_error* error,
+ grpc_core::CallCombinerClosureList* closures) {
+ pending_batch* pending = pending_batch_find(
+ elem, "completed", [batch_data](grpc_transport_stream_op_batch* batch) {
+ return batch->on_complete != nullptr &&
+ batch_data->batch.send_initial_metadata ==
+ batch->send_initial_metadata &&
+ batch_data->batch.send_message == batch->send_message &&
+ batch_data->batch.send_trailing_metadata ==
+ batch->send_trailing_metadata;
+ });
+ // If batch_data is a replay batch, then there will be no pending
+ // batch to complete.
+ if (pending == nullptr) {
+ GRPC_ERROR_UNREF(error);
+ return;
}
- if (batch_data->batch.send_message) {
- ++retry_state->completed_send_message_count;
- }
- if (batch_data->batch.send_trailing_metadata) {
- retry_state->completed_send_trailing_metadata = true;
- }
- if (batch_data->batch.recv_initial_metadata) {
- retry_state->completed_recv_initial_metadata = true;
- }
- if (batch_data->batch.recv_message) {
- ++retry_state->completed_recv_message_count;
- }
- if (batch_data->batch.recv_trailing_metadata) {
- retry_state->completed_recv_trailing_metadata = true;
- }
-}
-
-// Adds any necessary closures for deferred recv_initial_metadata and
-// recv_message callbacks to closures, updating *num_closures as needed.
-static void add_closures_for_deferred_recv_callbacks(
- subchannel_batch_data* batch_data, subchannel_call_retry_state* retry_state,
- closure_to_execute* closures, size_t* num_closures) {
- if (batch_data->batch.recv_trailing_metadata) {
- // Add closure for deferred recv_initial_metadata_ready.
- if (GPR_UNLIKELY(retry_state->recv_initial_metadata_ready_deferred_batch !=
- nullptr)) {
- closure_to_execute* closure = &closures[(*num_closures)++];
- closure->closure = GRPC_CLOSURE_INIT(
- &batch_data->recv_initial_metadata_ready,
- invoke_recv_initial_metadata_callback,
- retry_state->recv_initial_metadata_ready_deferred_batch,
- grpc_schedule_on_exec_ctx);
- closure->error = retry_state->recv_initial_metadata_error;
- closure->reason = "resuming recv_initial_metadata_ready";
- retry_state->recv_initial_metadata_ready_deferred_batch = nullptr;
- }
- // Add closure for deferred recv_message_ready.
- if (GPR_UNLIKELY(retry_state->recv_message_ready_deferred_batch !=
- nullptr)) {
- closure_to_execute* closure = &closures[(*num_closures)++];
- closure->closure = GRPC_CLOSURE_INIT(
- &batch_data->recv_message_ready, invoke_recv_message_callback,
- retry_state->recv_message_ready_deferred_batch,
- grpc_schedule_on_exec_ctx);
- closure->error = retry_state->recv_message_error;
- closure->reason = "resuming recv_message_ready";
- retry_state->recv_message_ready_deferred_batch = nullptr;
- }
- }
+ // Add closure.
+ closures->Add(pending->batch->on_complete, error,
+ "on_complete for pending batch");
+ pending->batch->on_complete = nullptr;
+ maybe_clear_pending_batch(elem, pending);
}
// If there are any cached ops to replay or pending ops to start on the
// subchannel call, adds a closure to closures to invoke
-// start_retriable_subchannel_batches(), updating *num_closures as needed.
+// start_retriable_subchannel_batches().
static void add_closures_for_replay_or_pending_send_ops(
grpc_call_element* elem, subchannel_batch_data* batch_data,
- subchannel_call_retry_state* retry_state, closure_to_execute* closures,
- size_t* num_closures) {
+ subchannel_call_retry_state* retry_state,
+ grpc_core::CallCombinerClosureList* closures) {
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
call_data* calld = static_cast<call_data*>(elem->call_data);
bool have_pending_send_message_ops =
@@ -1929,95 +1974,14 @@
"chand=%p calld=%p: starting next batch for pending send op(s)",
chand, calld);
}
- closure_to_execute* closure = &closures[(*num_closures)++];
- closure->closure = GRPC_CLOSURE_INIT(
- &batch_data->batch.handler_private.closure,
- start_retriable_subchannel_batches, elem, grpc_schedule_on_exec_ctx);
- closure->error = GRPC_ERROR_NONE;
- closure->reason = "starting next batch for send_* op(s)";
+ GRPC_CLOSURE_INIT(&batch_data->batch.handler_private.closure,
+ start_retriable_subchannel_batches, elem,
+ grpc_schedule_on_exec_ctx);
+ closures->Add(&batch_data->batch.handler_private.closure, GRPC_ERROR_NONE,
+ "starting next batch for send_* op(s)");
}
}
-// For any pending batch completed in batch_data, adds the necessary
-// completion closures to closures, updating *num_closures as needed.
-static void add_closures_for_completed_pending_batches(
- grpc_call_element* elem, subchannel_batch_data* batch_data,
- subchannel_call_retry_state* retry_state, grpc_error* error,
- closure_to_execute* closures, size_t* num_closures) {
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- call_data* calld = static_cast<call_data*>(elem->call_data);
- for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
- pending_batch* pending = &calld->pending_batches[i];
- if (pending_batch_is_completed(pending, calld, retry_state)) {
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO,
- "chand=%p calld=%p: pending batch completed at index %" PRIuPTR,
- chand, calld, i);
- }
- // Copy the trailing metadata to return it to the surface.
- if (batch_data->batch.recv_trailing_metadata) {
- grpc_metadata_batch_move(&batch_data->recv_trailing_metadata,
- pending->batch->payload->recv_trailing_metadata
- .recv_trailing_metadata);
- }
- closure_to_execute* closure = &closures[(*num_closures)++];
- closure->closure = pending->batch->on_complete;
- closure->error = GRPC_ERROR_REF(error);
- closure->reason = "on_complete for pending batch";
- pending->batch->on_complete = nullptr;
- maybe_clear_pending_batch(elem, pending);
- }
- }
- GRPC_ERROR_UNREF(error);
-}
-
-// For any pending batch containing an op that has not yet been started,
-// adds the pending batch's completion closures to closures, updating
-// *num_closures as needed.
-static void add_closures_to_fail_unstarted_pending_batches(
- grpc_call_element* elem, subchannel_call_retry_state* retry_state,
- grpc_error* error, closure_to_execute* closures, size_t* num_closures) {
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- call_data* calld = static_cast<call_data*>(elem->call_data);
- for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
- pending_batch* pending = &calld->pending_batches[i];
- if (pending_batch_is_unstarted(pending, calld, retry_state)) {
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO,
- "chand=%p calld=%p: failing unstarted pending batch at index "
- "%" PRIuPTR,
- chand, calld, i);
- }
- if (pending->batch->recv_initial_metadata) {
- closure_to_execute* closure = &closures[(*num_closures)++];
- closure->closure = pending->batch->payload->recv_initial_metadata
- .recv_initial_metadata_ready;
- closure->error = GRPC_ERROR_REF(error);
- closure->reason =
- "failing recv_initial_metadata_ready for pending batch";
- pending->batch->payload->recv_initial_metadata
- .recv_initial_metadata_ready = nullptr;
- }
- if (pending->batch->recv_message) {
- *pending->batch->payload->recv_message.recv_message = nullptr;
- closure_to_execute* closure = &closures[(*num_closures)++];
- closure->closure =
- pending->batch->payload->recv_message.recv_message_ready;
- closure->error = GRPC_ERROR_REF(error);
- closure->reason = "failing recv_message_ready for pending batch";
- pending->batch->payload->recv_message.recv_message_ready = nullptr;
- }
- closure_to_execute* closure = &closures[(*num_closures)++];
- closure->closure = pending->batch->on_complete;
- closure->error = GRPC_ERROR_REF(error);
- closure->reason = "failing on_complete for pending batch";
- pending->batch->on_complete = nullptr;
- maybe_clear_pending_batch(elem, pending);
- }
- }
- GRPC_ERROR_UNREF(error);
-}
-
// Callback used to intercept on_complete from subchannel calls.
// Called only when retries are enabled.
static void on_complete(void* arg, grpc_error* error) {
@@ -2035,135 +1999,48 @@
static_cast<subchannel_call_retry_state*>(
grpc_connected_subchannel_call_get_parent_data(
batch_data->subchannel_call));
- // If we have previously completed recv_trailing_metadata, then the
- // call is finished.
- bool call_finished = retry_state->completed_recv_trailing_metadata;
- // Record whether we were already committed before receiving this callback.
- const bool previously_committed = calld->retry_committed;
// Update bookkeeping in retry_state.
- update_retry_state_for_completed_batch(batch_data, retry_state);
- if (call_finished) {
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO, "chand=%p calld=%p: call already finished", chand,
- calld);
- }
- } else {
- // Check if this batch finished the call, and if so, get its status.
- // The call is finished if either (a) this callback was invoked with
- // an error or (b) we receive status.
- grpc_status_code status = GRPC_STATUS_OK;
- grpc_mdelem* server_pushback_md = nullptr;
- if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) { // Case (a).
- call_finished = true;
- grpc_error_get_status(error, calld->deadline, &status, nullptr, nullptr,
- nullptr);
- } else if (batch_data->batch.recv_trailing_metadata) { // Case (b).
- call_finished = true;
- grpc_metadata_batch* md_batch =
- batch_data->batch.payload->recv_trailing_metadata
- .recv_trailing_metadata;
- GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr);
- status = grpc_get_status_code_from_metadata(
- md_batch->idx.named.grpc_status->md);
- if (md_batch->idx.named.grpc_retry_pushback_ms != nullptr) {
- server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md;
- }
- }
- // If the call just finished, check if we should retry.
- if (call_finished) {
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO, "chand=%p calld=%p: call finished, status=%s", chand,
- calld, grpc_status_code_to_string(status));
- }
- if (maybe_retry(elem, batch_data, status, server_pushback_md)) {
- // Unref batch_data for deferred recv_initial_metadata_ready or
- // recv_message_ready callbacks, if any.
- if (batch_data->batch.recv_trailing_metadata &&
- retry_state->recv_initial_metadata_ready_deferred_batch !=
- nullptr) {
- batch_data_unref(batch_data);
- GRPC_ERROR_UNREF(retry_state->recv_initial_metadata_error);
- }
- if (batch_data->batch.recv_trailing_metadata &&
- retry_state->recv_message_ready_deferred_batch != nullptr) {
- batch_data_unref(batch_data);
- GRPC_ERROR_UNREF(retry_state->recv_message_error);
- }
- // Track number of pending subchannel send batches and determine if
- // this was the last one.
- bool last_callback_complete = false;
- if (batch_data->batch.send_initial_metadata ||
- batch_data->batch.send_message ||
- batch_data->batch.send_trailing_metadata) {
- --calld->num_pending_retriable_subchannel_send_batches;
- last_callback_complete =
- calld->num_pending_retriable_subchannel_send_batches == 0;
- }
- batch_data_unref(batch_data);
- // If we just completed the last subchannel send batch, unref the
- // call stack.
- if (last_callback_complete) {
- GRPC_CALL_STACK_UNREF(calld->owning_call, "subchannel_send_batches");
- }
- return;
- }
- // Not retrying, so commit the call.
- retry_commit(elem, retry_state);
- }
+ if (batch_data->batch.send_initial_metadata) {
+ retry_state->completed_send_initial_metadata = true;
}
- // If we were already committed before receiving this callback, free
- // cached data for send ops that we've just completed. (If the call has
- // just now finished, the call to retry_commit() above will have freed all
- // cached send ops, so we don't need to do it here.)
- if (previously_committed) {
+ if (batch_data->batch.send_message) {
+ ++retry_state->completed_send_message_count;
+ }
+ if (batch_data->batch.send_trailing_metadata) {
+ retry_state->completed_send_trailing_metadata = true;
+ }
+ // If the call is committed, free cached data for send ops that we've just
+ // completed.
+ if (calld->retry_committed) {
free_cached_send_op_data_for_completed_batch(elem, batch_data, retry_state);
}
- // Call not being retried.
// Construct list of closures to execute.
- // Max number of closures is number of pending batches plus one for
- // each of:
- // - recv_initial_metadata_ready (either deferred or unstarted)
- // - recv_message_ready (either deferred or unstarted)
- // - starting a new batch for pending send ops
- closure_to_execute closures[GPR_ARRAY_SIZE(calld->pending_batches) + 3];
- size_t num_closures = 0;
- // If there are deferred recv_initial_metadata_ready or recv_message_ready
- // callbacks, add them to closures.
- add_closures_for_deferred_recv_callbacks(batch_data, retry_state, closures,
- &num_closures);
- // Find pending batches whose ops are now complete and add their
- // on_complete callbacks to closures.
- add_closures_for_completed_pending_batches(elem, batch_data, retry_state,
- GRPC_ERROR_REF(error), closures,
- &num_closures);
- // Add closures to handle any pending batches that have not yet been started.
- // If the call is finished, we fail these batches; otherwise, we add a
- // callback to start_retriable_subchannel_batches() to start them on
- // the subchannel call.
- if (call_finished) {
- add_closures_to_fail_unstarted_pending_batches(
- elem, retry_state, GRPC_ERROR_REF(error), closures, &num_closures);
- } else {
- add_closures_for_replay_or_pending_send_ops(elem, batch_data, retry_state,
- closures, &num_closures);
+ grpc_core::CallCombinerClosureList closures;
+ // If a retry was already dispatched, that means we saw
+ // recv_trailing_metadata before this, so we do nothing here.
+ // Otherwise, invoke the callback to return the result to the surface.
+ if (!retry_state->retry_dispatched) {
+ // Add closure for the completed pending batch, if any.
+ add_closure_for_completed_pending_batch(elem, batch_data, retry_state,
+ GRPC_ERROR_REF(error), &closures);
+ // If needed, add a callback to start_retriable_subchannel_batches() to
+ // start any replay or pending send ops on the subchannel call.
+ if (!retry_state->completed_recv_trailing_metadata) {
+ add_closures_for_replay_or_pending_send_ops(elem, batch_data, retry_state,
+ &closures);
+ }
}
// Track number of pending subchannel send batches and determine if this
// was the last one.
- bool last_callback_complete = false;
- if (batch_data->batch.send_initial_metadata ||
- batch_data->batch.send_message ||
- batch_data->batch.send_trailing_metadata) {
- --calld->num_pending_retriable_subchannel_send_batches;
- last_callback_complete =
- calld->num_pending_retriable_subchannel_send_batches == 0;
- }
+ --calld->num_pending_retriable_subchannel_send_batches;
+ const bool last_callback_complete =
+ calld->num_pending_retriable_subchannel_send_batches == 0;
// Don't need batch_data anymore.
batch_data_unref(batch_data);
// Schedule all of the closures identified above.
// Note: This yeilds the call combiner.
- execute_closures_in_call_combiner(elem, "on_complete", closures,
- num_closures);
- // If we just completed the last subchannel send batch, unref the call stack.
+ closures.RunClosures(calld->call_combiner);
+ // If this was the last subchannel send batch, unref the call stack.
if (last_callback_complete) {
GRPC_CALL_STACK_UNREF(calld->owning_call, "subchannel_send_batches");
}
@@ -2185,27 +2062,22 @@
// Adds a closure to closures that will execute batch in the call combiner.
static void add_closure_for_subchannel_batch(
- call_data* calld, grpc_transport_stream_op_batch* batch,
- closure_to_execute* closures, size_t* num_closures) {
+ grpc_call_element* elem, grpc_transport_stream_op_batch* batch,
+ grpc_core::CallCombinerClosureList* closures) {
+ channel_data* chand = static_cast<channel_data*>(elem->channel_data);
+ call_data* calld = static_cast<call_data*>(elem->call_data);
batch->handler_private.extra_arg = calld->subchannel_call;
GRPC_CLOSURE_INIT(&batch->handler_private.closure,
start_batch_in_call_combiner, batch,
grpc_schedule_on_exec_ctx);
- closure_to_execute* closure = &closures[(*num_closures)++];
- closure->closure = &batch->handler_private.closure;
- closure->error = GRPC_ERROR_NONE;
- // If the tracer is enabled, we log a more detailed message, which
- // requires dynamic allocation. This will be freed in
- // start_retriable_subchannel_batches().
if (grpc_client_channel_trace.enabled()) {
char* batch_str = grpc_transport_stream_op_batch_string(batch);
- gpr_asprintf(const_cast<char**>(&closure->reason),
- "starting batch in call combiner: %s", batch_str);
+ gpr_log(GPR_INFO, "chand=%p calld=%p: starting subchannel batch: %s", chand,
+ calld, batch_str);
gpr_free(batch_str);
- closure->free_reason = true;
- } else {
- closure->reason = "start_subchannel_batch";
}
+ closures->Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
+ "start_subchannel_batch");
}
// Adds retriable send_initial_metadata op to batch_data.
@@ -2341,9 +2213,13 @@
grpc_metadata_batch_init(&batch_data->recv_trailing_metadata);
batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata =
&batch_data->recv_trailing_metadata;
- batch_data->batch.collect_stats = true;
- batch_data->batch.payload->collect_stats.collect_stats =
+ batch_data->batch.payload->recv_trailing_metadata.collect_stats =
&batch_data->collect_stats;
+ GRPC_CLOSURE_INIT(&batch_data->recv_trailing_metadata_ready,
+ recv_trailing_metadata_ready, batch_data,
+ grpc_schedule_on_exec_ctx);
+ batch_data->batch.payload->recv_trailing_metadata
+ .recv_trailing_metadata_ready = &batch_data->recv_trailing_metadata_ready;
}
// Helper function used to start a recv_trailing_metadata batch. This
@@ -2364,9 +2240,11 @@
grpc_connected_subchannel_call_get_parent_data(
calld->subchannel_call));
// Create batch_data with 2 refs, since this batch will be unreffed twice:
- // once when the subchannel batch returns, and again when we actually get
- // a recv_trailing_metadata op from the surface.
- subchannel_batch_data* batch_data = batch_data_create(elem, 2);
+ // once for the recv_trailing_metadata_ready callback when the subchannel
+ // batch returns, and again when we actually get a recv_trailing_metadata
+ // op from the surface.
+ subchannel_batch_data* batch_data =
+ batch_data_create(elem, 2, false /* set_on_complete */);
add_retriable_recv_trailing_metadata_op(calld, retry_state, batch_data);
retry_state->recv_trailing_metadata_internal_batch = batch_data;
// Note: This will release the call combiner.
@@ -2391,7 +2269,7 @@
"send_initial_metadata op",
chand, calld);
}
- replay_batch_data = batch_data_create(elem, 1);
+ replay_batch_data = batch_data_create(elem, 1, true /* set_on_complete */);
add_retriable_send_initial_metadata_op(calld, retry_state,
replay_batch_data);
}
@@ -2408,7 +2286,8 @@
chand, calld);
}
if (replay_batch_data == nullptr) {
- replay_batch_data = batch_data_create(elem, 1);
+ replay_batch_data =
+ batch_data_create(elem, 1, true /* set_on_complete */);
}
add_retriable_send_message_op(elem, retry_state, replay_batch_data);
}
@@ -2427,7 +2306,8 @@
chand, calld);
}
if (replay_batch_data == nullptr) {
- replay_batch_data = batch_data_create(elem, 1);
+ replay_batch_data =
+ batch_data_create(elem, 1, true /* set_on_complete */);
}
add_retriable_send_trailing_metadata_op(calld, retry_state,
replay_batch_data);
@@ -2439,7 +2319,7 @@
// *num_batches as needed.
static void add_subchannel_batches_for_pending_batches(
grpc_call_element* elem, subchannel_call_retry_state* retry_state,
- closure_to_execute* closures, size_t* num_closures) {
+ grpc_core::CallCombinerClosureList* closures) {
call_data* calld = static_cast<call_data*>(elem->call_data);
for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
pending_batch* pending = &calld->pending_batches[i];
@@ -2495,13 +2375,11 @@
if (retry_state->completed_recv_trailing_metadata) {
subchannel_batch_data* batch_data =
retry_state->recv_trailing_metadata_internal_batch;
- closure_to_execute* closure = &closures[(*num_closures)++];
- closure->closure = &batch_data->on_complete;
// Batches containing recv_trailing_metadata always succeed.
- closure->error = GRPC_ERROR_NONE;
- closure->reason =
- "re-executing on_complete for recv_trailing_metadata "
- "to propagate internally triggered result";
+ closures->Add(
+ &batch_data->recv_trailing_metadata_ready, GRPC_ERROR_NONE,
+ "re-executing recv_trailing_metadata_ready to propagate "
+ "internally triggered result");
} else {
batch_data_unref(retry_state->recv_trailing_metadata_internal_batch);
}
@@ -2513,14 +2391,19 @@
if (calld->method_params == nullptr ||
calld->method_params->retry_policy() == nullptr ||
calld->retry_committed) {
- add_closure_for_subchannel_batch(calld, batch, closures, num_closures);
+ add_closure_for_subchannel_batch(elem, batch, closures);
pending_batch_clear(calld, pending);
continue;
}
// Create batch with the right number of callbacks.
- const int num_callbacks =
- 1 + batch->recv_initial_metadata + batch->recv_message;
- subchannel_batch_data* batch_data = batch_data_create(elem, num_callbacks);
+ const bool has_send_ops = batch->send_initial_metadata ||
+ batch->send_message ||
+ batch->send_trailing_metadata;
+ const int num_callbacks = has_send_ops + batch->recv_initial_metadata +
+ batch->recv_message +
+ batch->recv_trailing_metadata;
+ subchannel_batch_data* batch_data = batch_data_create(
+ elem, num_callbacks, has_send_ops /* set_on_complete */);
// Cache send ops if needed.
maybe_cache_send_ops_for_batch(calld, pending);
// send_initial_metadata.
@@ -2547,11 +2430,9 @@
}
// recv_trailing_metadata.
if (batch->recv_trailing_metadata) {
- GPR_ASSERT(batch->collect_stats);
add_retriable_recv_trailing_metadata_op(calld, retry_state, batch_data);
}
- add_closure_for_subchannel_batch(calld, &batch_data->batch, closures,
- num_closures);
+ add_closure_for_subchannel_batch(elem, &batch_data->batch, closures);
// Track number of pending subchannel send batches.
// If this is the first one, take a ref to the call stack.
if (batch->send_initial_metadata || batch->send_message ||
@@ -2579,15 +2460,13 @@
grpc_connected_subchannel_call_get_parent_data(
calld->subchannel_call));
// Construct list of closures to execute, one for each pending batch.
- // We can start up to 6 batches.
- closure_to_execute closures[GPR_ARRAY_SIZE(calld->pending_batches)];
- size_t num_closures = 0;
+ grpc_core::CallCombinerClosureList closures;
// Replay previously-returned send_* ops if needed.
subchannel_batch_data* replay_batch_data =
maybe_create_subchannel_batch_for_replay(elem, retry_state);
if (replay_batch_data != nullptr) {
- add_closure_for_subchannel_batch(calld, &replay_batch_data->batch, closures,
- &num_closures);
+ add_closure_for_subchannel_batch(elem, &replay_batch_data->batch,
+ &closures);
// Track number of pending subchannel send batches.
// If this is the first one, take a ref to the call stack.
if (calld->num_pending_retriable_subchannel_send_batches == 0) {
@@ -2596,17 +2475,16 @@
++calld->num_pending_retriable_subchannel_send_batches;
}
// Now add pending batches.
- add_subchannel_batches_for_pending_batches(elem, retry_state, closures,
- &num_closures);
+ add_subchannel_batches_for_pending_batches(elem, retry_state, &closures);
// Start batches on subchannel call.
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_INFO,
"chand=%p calld=%p: starting %" PRIuPTR
" retriable batches on subchannel_call=%p",
- chand, calld, num_closures, calld->subchannel_call);
+ chand, calld, closures.size(), calld->subchannel_call);
}
- execute_closures_in_call_combiner(elem, "start_retriable_subchannel_batches",
- closures, num_closures);
+ // Note: This will yield the call combiner.
+ closures.RunClosures(calld->call_combiner);
}
//
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc
index 07f2e2e..ebe2c4c 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc
@@ -21,7 +21,6 @@
#if GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET_ARES_EV_DRIVER)
#include <ares.h>
-#include <string.h>
#include <sys/ioctl.h>
#include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h"
@@ -56,8 +55,8 @@
bool readable_registered;
/** if the writable closure has been registered */
bool writable_registered;
- /** if the fd has been shutdown yet from grpc iomgr perspective */
- bool already_shutdown;
+ /** if the fd is being shut down */
+ bool shutting_down;
} fd_node;
struct grpc_ares_ev_driver {
@@ -102,26 +101,25 @@
gpr_log(GPR_DEBUG, "delete fd: %d", grpc_fd_wrapped_fd(fdn->fd));
GPR_ASSERT(!fdn->readable_registered);
GPR_ASSERT(!fdn->writable_registered);
- GPR_ASSERT(fdn->already_shutdown);
gpr_mu_destroy(&fdn->mu);
- /* TODO: we need to pass a non-null "release_fd" parameter to
- * grpc_fd_orphan because "epollsig" iomgr will close the fd
- * even if "already_closed" is true, and it only leaves it open
- * if "release_fd" is non-null. This is unlike the rest of the
- * pollers, should this be changed within epollsig? */
- int dummy_release_fd;
/* c-ares library has closed the fd inside grpc_fd. This fd may be picked up
immediately by another thread, and should not be closed by the following
grpc_fd_orphan. */
- grpc_fd_orphan(fdn->fd, nullptr, &dummy_release_fd, true /* already_closed */,
+ grpc_fd_orphan(fdn->fd, nullptr, nullptr, true /* already_closed */,
"c-ares query finished");
gpr_free(fdn);
}
-static void fd_node_shutdown_locked(fd_node* fdn, const char* reason) {
- if (!fdn->already_shutdown) {
- fdn->already_shutdown = true;
- grpc_fd_shutdown(fdn->fd, GRPC_ERROR_CREATE_FROM_STATIC_STRING(reason));
+static void fd_node_shutdown(fd_node* fdn) {
+ gpr_mu_lock(&fdn->mu);
+ fdn->shutting_down = true;
+ if (!fdn->readable_registered && !fdn->writable_registered) {
+ gpr_mu_unlock(&fdn->mu);
+ fd_node_destroy(fdn);
+ } else {
+ grpc_fd_shutdown(
+ fdn->fd, GRPC_ERROR_CREATE_FROM_STATIC_STRING("c-ares fd shutdown"));
+ gpr_mu_unlock(&fdn->mu);
}
}
@@ -129,10 +127,7 @@
grpc_pollset_set* pollset_set) {
*ev_driver = static_cast<grpc_ares_ev_driver*>(
gpr_malloc(sizeof(grpc_ares_ev_driver)));
- ares_options opts;
- memset(&opts, 0, sizeof(opts));
- opts.flags |= ARES_FLAG_STAYOPEN;
- int status = ares_init_options(&(*ev_driver)->channel, &opts, ARES_OPT_FLAGS);
+ int status = ares_init(&(*ev_driver)->channel);
gpr_log(GPR_DEBUG, "grpc_ares_ev_driver_create");
if (status != ARES_SUCCESS) {
char* err_msg;
@@ -169,9 +164,8 @@
ev_driver->shutting_down = true;
fd_node* fn = ev_driver->fds;
while (fn != nullptr) {
- gpr_mu_lock(&fn->mu);
- fd_node_shutdown_locked(fn, "grpc_ares_ev_driver_shutdown");
- gpr_mu_unlock(&fn->mu);
+ grpc_fd_shutdown(fn->fd, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "grpc_ares_ev_driver_shutdown"));
fn = fn->next;
}
gpr_mu_unlock(&ev_driver->mu);
@@ -208,7 +202,14 @@
gpr_mu_lock(&fdn->mu);
const int fd = grpc_fd_wrapped_fd(fdn->fd);
fdn->readable_registered = false;
+ if (fdn->shutting_down && !fdn->writable_registered) {
+ gpr_mu_unlock(&fdn->mu);
+ fd_node_destroy(fdn);
+ grpc_ares_ev_driver_unref(ev_driver);
+ return;
+ }
gpr_mu_unlock(&fdn->mu);
+
gpr_log(GPR_DEBUG, "readable on %d", fd);
if (error == GRPC_ERROR_NONE) {
do {
@@ -235,7 +236,14 @@
gpr_mu_lock(&fdn->mu);
const int fd = grpc_fd_wrapped_fd(fdn->fd);
fdn->writable_registered = false;
+ if (fdn->shutting_down && !fdn->readable_registered) {
+ gpr_mu_unlock(&fdn->mu);
+ fd_node_destroy(fdn);
+ grpc_ares_ev_driver_unref(ev_driver);
+ return;
+ }
gpr_mu_unlock(&fdn->mu);
+
gpr_log(GPR_DEBUG, "writable on %d", fd);
if (error == GRPC_ERROR_NONE) {
ares_process_fd(ev_driver->channel, ARES_SOCKET_BAD, fd);
@@ -280,7 +288,7 @@
fdn->ev_driver = ev_driver;
fdn->readable_registered = false;
fdn->writable_registered = false;
- fdn->already_shutdown = false;
+ fdn->shutting_down = false;
gpr_mu_init(&fdn->mu);
GRPC_CLOSURE_INIT(&fdn->read_closure, on_readable_cb, fdn,
grpc_schedule_on_exec_ctx);
@@ -321,16 +329,7 @@
while (ev_driver->fds != nullptr) {
fd_node* cur = ev_driver->fds;
ev_driver->fds = ev_driver->fds->next;
- gpr_mu_lock(&cur->mu);
- fd_node_shutdown_locked(cur, "c-ares fd shutdown");
- if (!cur->readable_registered && !cur->writable_registered) {
- gpr_mu_unlock(&cur->mu);
- fd_node_destroy(cur);
- } else {
- cur->next = new_list;
- new_list = cur;
- gpr_mu_unlock(&cur->mu);
- }
+ fd_node_shutdown(cur);
}
ev_driver->fds = new_list;
// If the ev driver has no working fd, all the tasks are done.
diff --git a/src/core/ext/filters/deadline/deadline_filter.cc b/src/core/ext/filters/deadline/deadline_filter.cc
index 27d3eac..d575d2d 100644
--- a/src/core/ext/filters/deadline/deadline_filter.cc
+++ b/src/core/ext/filters/deadline/deadline_filter.cc
@@ -128,21 +128,25 @@
}
}
-// Callback run when the call is complete.
-static void on_complete(void* arg, grpc_error* error) {
+// Callback run when we receive trailing metadata.
+static void recv_trailing_metadata_ready(void* arg, grpc_error* error) {
grpc_deadline_state* deadline_state = static_cast<grpc_deadline_state*>(arg);
cancel_timer_if_needed(deadline_state);
- // Invoke the next callback.
- GRPC_CLOSURE_RUN(deadline_state->next_on_complete, GRPC_ERROR_REF(error));
+ // Invoke the original callback.
+ GRPC_CLOSURE_RUN(deadline_state->original_recv_trailing_metadata_ready,
+ GRPC_ERROR_REF(error));
}
-// Inject our own on_complete callback into op.
-static void inject_on_complete_cb(grpc_deadline_state* deadline_state,
- grpc_transport_stream_op_batch* op) {
- deadline_state->next_on_complete = op->on_complete;
- GRPC_CLOSURE_INIT(&deadline_state->on_complete, on_complete, deadline_state,
+// Inject our own recv_trailing_metadata_ready callback into op.
+static void inject_recv_trailing_metadata_ready(
+ grpc_deadline_state* deadline_state, grpc_transport_stream_op_batch* op) {
+ deadline_state->original_recv_trailing_metadata_ready =
+ op->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
+ GRPC_CLOSURE_INIT(&deadline_state->recv_trailing_metadata_ready,
+ recv_trailing_metadata_ready, deadline_state,
grpc_schedule_on_exec_ctx);
- op->on_complete = &deadline_state->on_complete;
+ op->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
+ &deadline_state->recv_trailing_metadata_ready;
}
// Callback and associated state for starting the timer after call stack
@@ -226,7 +230,7 @@
// Make sure we know when the call is complete, so that we can cancel
// the timer.
if (op->recv_trailing_metadata) {
- inject_on_complete_cb(deadline_state, op);
+ inject_recv_trailing_metadata_ready(deadline_state, op);
}
}
}
@@ -323,7 +327,7 @@
// the client never sends trailing metadata, because this is the
// hook that tells us when the call is complete on the server side.
if (op->recv_trailing_metadata) {
- inject_on_complete_cb(&calld->base.deadline_state, op);
+ inject_recv_trailing_metadata_ready(&calld->base.deadline_state, op);
}
}
// Chain to next filter.
diff --git a/src/core/ext/filters/deadline/deadline_filter.h b/src/core/ext/filters/deadline/deadline_filter.h
index 13207cb..1d797f4 100644
--- a/src/core/ext/filters/deadline/deadline_filter.h
+++ b/src/core/ext/filters/deadline/deadline_filter.h
@@ -37,12 +37,12 @@
grpc_deadline_timer_state timer_state;
grpc_timer timer;
grpc_closure timer_callback;
- // Closure to invoke when the call is complete.
+ // Closure to invoke when we receive trailing metadata.
// We use this to cancel the timer.
- grpc_closure on_complete;
- // The original on_complete closure, which we chain to after our own
- // closure is invoked.
- grpc_closure* next_on_complete;
+ grpc_closure recv_trailing_metadata_ready;
+ // The original recv_trailing_metadata_ready closure, which we chain to
+ // after our own closure is invoked.
+ grpc_closure* original_recv_trailing_metadata_ready;
} grpc_deadline_state;
//
diff --git a/src/core/ext/filters/http/client/http_client_filter.cc b/src/core/ext/filters/http/client/http_client_filter.cc
index ae94ce4..1678051 100644
--- a/src/core/ext/filters/http/client/http_client_filter.cc
+++ b/src/core/ext/filters/http/client/http_client_filter.cc
@@ -55,8 +55,8 @@
grpc_closure recv_initial_metadata_ready;
// State for handling recv_trailing_metadata ops.
grpc_metadata_batch* recv_trailing_metadata;
- grpc_closure* original_recv_trailing_metadata_on_complete;
- grpc_closure recv_trailing_metadata_on_complete;
+ grpc_closure* original_recv_trailing_metadata_ready;
+ grpc_closure recv_trailing_metadata_ready;
// State for handling send_message ops.
grpc_transport_stream_op_batch* send_message_batch;
size_t send_message_bytes_read;
@@ -153,8 +153,7 @@
GRPC_CLOSURE_RUN(calld->original_recv_initial_metadata_ready, error);
}
-static void recv_trailing_metadata_on_complete(void* user_data,
- grpc_error* error) {
+static void recv_trailing_metadata_ready(void* user_data, grpc_error* error) {
grpc_call_element* elem = static_cast<grpc_call_element*>(user_data);
call_data* calld = static_cast<call_data*>(elem->call_data);
if (error == GRPC_ERROR_NONE) {
@@ -163,7 +162,7 @@
} else {
GRPC_ERROR_REF(error);
}
- GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_on_complete, error);
+ GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, error);
}
static void send_message_on_complete(void* arg, grpc_error* error) {
@@ -312,8 +311,10 @@
/* substitute our callback for the higher callback */
calld->recv_trailing_metadata =
batch->payload->recv_trailing_metadata.recv_trailing_metadata;
- calld->original_recv_trailing_metadata_on_complete = batch->on_complete;
- batch->on_complete = &calld->recv_trailing_metadata_on_complete;
+ calld->original_recv_trailing_metadata_ready =
+ batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
+ batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
+ &calld->recv_trailing_metadata_ready;
}
grpc_error* error = GRPC_ERROR_NONE;
@@ -420,8 +421,8 @@
GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready,
recv_initial_metadata_ready, elem,
grpc_schedule_on_exec_ctx);
- GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_on_complete,
- recv_trailing_metadata_on_complete, elem,
+ GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready,
+ recv_trailing_metadata_ready, elem,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&calld->send_message_on_complete, send_message_on_complete,
elem, grpc_schedule_on_exec_ctx);
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
index a8090d1..0d6b72c 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
@@ -1149,12 +1149,10 @@
}
}
-/* Flag that this closure barrier wants stats to be updated before finishing */
-#define CLOSURE_BARRIER_STATS_BIT (1 << 0)
/* Flag that this closure barrier may be covering a write in a pollset, and so
we should not complete this closure until we can prove that the write got
scheduled */
-#define CLOSURE_BARRIER_MAY_COVER_WRITE (1 << 1)
+#define CLOSURE_BARRIER_MAY_COVER_WRITE (1 << 0)
/* First bit of the reference count, stored in the high order bits (with the low
bits being used for flags defined above) */
#define CLOSURE_BARRIER_FIRST_REF_BIT (1 << 16)
@@ -1206,10 +1204,6 @@
grpc_error_add_child(closure->error_data.error, error);
}
if (closure->next_data.scratch < CLOSURE_BARRIER_FIRST_REF_BIT) {
- if (closure->next_data.scratch & CLOSURE_BARRIER_STATS_BIT) {
- grpc_transport_move_stats(&s->stats, s->collecting_stats);
- s->collecting_stats = nullptr;
- }
if ((t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE) ||
!(closure->next_data.scratch & CLOSURE_BARRIER_MAY_COVER_WRITE)) {
GRPC_CLOSURE_RUN(closure, closure->error_data.error);
@@ -1351,9 +1345,14 @@
}
grpc_closure* on_complete = op->on_complete;
+ // TODO(roth): This is a hack needed because we use data inside of the
+ // closure itself to do the barrier calculation (i.e., to ensure that
+ // we don't schedule the closure until all ops in the batch have been
+ // completed). This can go away once we move to a new C++ closure API
+ // that provides the ability to create a barrier closure.
if (on_complete == nullptr) {
- on_complete =
- GRPC_CLOSURE_CREATE(do_nothing, nullptr, grpc_schedule_on_exec_ctx);
+ on_complete = GRPC_CLOSURE_INIT(&op->handler_private.closure, do_nothing,
+ nullptr, grpc_schedule_on_exec_ctx);
}
/* use final_data as a barrier until enqueue time; the inital counter is
@@ -1361,12 +1360,6 @@
on_complete->next_data.scratch = CLOSURE_BARRIER_FIRST_REF_BIT;
on_complete->error_data.error = GRPC_ERROR_NONE;
- if (op->collect_stats) {
- GPR_ASSERT(s->collecting_stats == nullptr);
- s->collecting_stats = op_payload->collect_stats.collect_stats;
- on_complete->next_data.scratch |= CLOSURE_BARRIER_STATS_BIT;
- }
-
if (op->cancel_stream) {
GRPC_STATS_INC_HTTP2_OP_CANCEL();
grpc_chttp2_cancel_stream(t, s, op_payload->cancel_stream.cancel_error);
@@ -1600,8 +1593,11 @@
if (op->recv_trailing_metadata) {
GRPC_STATS_INC_HTTP2_OP_RECV_TRAILING_METADATA();
+ GPR_ASSERT(s->collecting_stats == nullptr);
+ s->collecting_stats = op_payload->recv_trailing_metadata.collect_stats;
GPR_ASSERT(s->recv_trailing_metadata_finished == nullptr);
- s->recv_trailing_metadata_finished = add_closure_barrier(on_complete);
+ s->recv_trailing_metadata_finished =
+ op_payload->recv_trailing_metadata.recv_trailing_metadata_ready;
s->recv_trailing_metadata =
op_payload->recv_trailing_metadata.recv_trailing_metadata;
s->final_metadata_requested = true;
@@ -1960,11 +1956,12 @@
}
if (s->read_closed && s->frame_storage.length == 0 && !pending_data &&
s->recv_trailing_metadata_finished != nullptr) {
+ grpc_transport_move_stats(&s->stats, s->collecting_stats);
+ s->collecting_stats = nullptr;
grpc_chttp2_incoming_metadata_buffer_publish(&s->metadata_buffer[1],
s->recv_trailing_metadata);
- grpc_chttp2_complete_closure_step(
- t, s, &s->recv_trailing_metadata_finished, GRPC_ERROR_NONE,
- "recv_trailing_metadata_finished");
+ null_then_run_closure(&s->recv_trailing_metadata_finished,
+ GRPC_ERROR_NONE);
}
}
}
diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.cc b/src/core/ext/transport/cronet/transport/cronet_transport.cc
index 420c2d1..4a252d9 100644
--- a/src/core/ext/transport/cronet/transport/cronet_transport.cc
+++ b/src/core/ext/transport/cronet/transport/cronet_transport.cc
@@ -925,6 +925,10 @@
result = false;
}
/* Check if every op that was asked for is done. */
+ /* TODO(muxi): We should not consider the recv ops here, since they
+ * have their own callbacks. We should invoke a batch's on_complete
+ * as soon as all of the batch's send ops are complete, even if
+ * there are still recv ops pending. */
else if (curr_op->send_initial_metadata &&
!stream_state->state_callback_received[OP_SEND_INITIAL_METADATA]) {
CRONET_LOG(GPR_DEBUG, "Because");
@@ -1280,12 +1284,20 @@
op_can_be_run(stream_op, s, &oas->state,
OP_RECV_TRAILING_METADATA)) {
CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_TRAILING_METADATA", oas);
- if (oas->s->state.rs.trailing_metadata_valid) {
+ grpc_error* error = GRPC_ERROR_NONE;
+ if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
+ error = GRPC_ERROR_REF(stream_state->cancel_error);
+ } else if (stream_state->state_op_done[OP_FAILED]) {
+ error = make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable.");
+ } else if (oas->s->state.rs.trailing_metadata_valid) {
grpc_chttp2_incoming_metadata_buffer_publish(
&oas->s->state.rs.trailing_metadata,
stream_op->payload->recv_trailing_metadata.recv_trailing_metadata);
stream_state->rs.trailing_metadata_valid = false;
}
+ GRPC_CLOSURE_SCHED(
+ stream_op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
+ error);
stream_state->state_op_done[OP_RECV_TRAILING_METADATA] = true;
result = ACTION_TAKEN_NO_CALLBACK;
} else if (stream_op->cancel_stream &&
@@ -1398,6 +1410,11 @@
GRPC_CLOSURE_SCHED(op->payload->recv_message.recv_message_ready,
GRPC_ERROR_CANCELLED);
}
+ if (op->recv_trailing_metadata) {
+ GRPC_CLOSURE_SCHED(
+ op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
+ GRPC_ERROR_CANCELLED);
+ }
GRPC_CLOSURE_SCHED(op->on_complete, GRPC_ERROR_CANCELLED);
return;
}
diff --git a/src/core/ext/transport/inproc/inproc_transport.cc b/src/core/ext/transport/inproc/inproc_transport.cc
index 2c3bff5..b0ca7f8 100644
--- a/src/core/ext/transport/inproc/inproc_transport.cc
+++ b/src/core/ext/transport/inproc/inproc_transport.cc
@@ -120,7 +120,6 @@
struct inproc_stream* stream_list_next;
} inproc_stream;
-static grpc_closure do_nothing_closure;
static bool cancel_stream_locked(inproc_stream* s, grpc_error* error);
static void op_state_machine(void* arg, grpc_error* error);
@@ -373,6 +372,10 @@
const char* msg) {
int is_sm = static_cast<int>(op == s->send_message_op);
int is_stm = static_cast<int>(op == s->send_trailing_md_op);
+ // TODO(vjpai): We should not consider the recv ops here, since they
+ // have their own callbacks. We should invoke a batch's on_complete
+ // as soon as all of the batch's send ops are complete, even if there
+ // are still recv ops pending.
int is_rim = static_cast<int>(op == s->recv_initial_md_op);
int is_rm = static_cast<int>(op == s->recv_message_op);
int is_rtm = static_cast<int>(op == s->recv_trailing_md_op);
@@ -496,6 +499,11 @@
s->send_trailing_md_op = nullptr;
}
if (s->recv_trailing_md_op) {
+ INPROC_LOG(GPR_INFO, "fail_helper %p scheduling trailing-metadata-ready %p",
+ s, error);
+ GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->payload->recv_trailing_metadata
+ .recv_trailing_metadata_ready,
+ GRPC_ERROR_REF(error));
INPROC_LOG(GPR_INFO, "fail_helper %p scheduling trailing-md-on-complete %p",
s, error);
complete_if_batch_end_locked(
@@ -639,6 +647,12 @@
s->trailing_md_sent = true;
if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) {
INPROC_LOG(GPR_INFO,
+ "op_state_machine %p scheduling trailing-metadata-ready", s);
+ GRPC_CLOSURE_SCHED(
+ s->recv_trailing_md_op->payload->recv_trailing_metadata
+ .recv_trailing_metadata_ready,
+ GRPC_ERROR_NONE);
+ INPROC_LOG(GPR_INFO,
"op_state_machine %p scheduling trailing-md-on-complete", s);
GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->on_complete,
GRPC_ERROR_NONE);
@@ -711,6 +725,12 @@
}
if (s->recv_trailing_md_op && s->t->is_client && other &&
other->send_message_op) {
+ INPROC_LOG(GPR_INFO,
+ "op_state_machine %p scheduling trailing-metadata-ready %p", s,
+ GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->payload->recv_trailing_metadata
+ .recv_trailing_metadata_ready,
+ GRPC_ERROR_NONE);
maybe_schedule_op_closure_locked(other, GRPC_ERROR_NONE);
}
if (s->to_read_trailing_md_filled) {
@@ -766,6 +786,10 @@
INPROC_LOG(GPR_INFO,
"op_state_machine %p scheduling trailing-md-on-complete %p",
s, new_err);
+ GRPC_CLOSURE_SCHED(
+ s->recv_trailing_md_op->payload->recv_trailing_metadata
+ .recv_trailing_metadata_ready,
+ GRPC_ERROR_REF(new_err));
GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->on_complete,
GRPC_ERROR_REF(new_err));
s->recv_trailing_md_op = nullptr;
@@ -859,6 +883,9 @@
// couldn't complete that because we hadn't yet sent out trailing
// md, now's the chance
if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) {
+ GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->payload->recv_trailing_metadata
+ .recv_trailing_metadata_ready,
+ GRPC_ERROR_REF(s->cancel_self_error));
complete_if_batch_end_locked(
s, s->cancel_self_error, s->recv_trailing_md_op,
"cancel_stream scheduling trailing-md-on-complete");
@@ -873,6 +900,8 @@
return ret;
}
+static void do_nothing(void* arg, grpc_error* error) {}
+
static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
grpc_transport_stream_op_batch* op) {
INPROC_LOG(GPR_INFO, "perform_stream_op %p %p %p", gt, gs, op);
@@ -892,8 +921,14 @@
}
grpc_error* error = GRPC_ERROR_NONE;
grpc_closure* on_complete = op->on_complete;
+ // TODO(roth): This is a hack needed because we use data inside of the
+ // closure itself to do the barrier calculation (i.e., to ensure that
+ // we don't schedule the closure until all ops in the batch have been
+ // completed). This can go away once we move to a new C++ closure API
+ // that provides the ability to create a barrier closure.
if (on_complete == nullptr) {
- on_complete = &do_nothing_closure;
+ on_complete = GRPC_CLOSURE_INIT(&op->handler_private.closure, do_nothing,
+ nullptr, grpc_schedule_on_exec_ctx);
}
if (op->cancel_stream) {
@@ -1026,6 +1061,15 @@
GRPC_CLOSURE_SCHED(op->payload->recv_message.recv_message_ready,
GRPC_ERROR_REF(error));
}
+ if (op->recv_trailing_metadata) {
+ INPROC_LOG(
+ GPR_INFO,
+ "perform_stream_op error %p scheduling trailing-metadata-ready %p",
+ s, error);
+ GRPC_CLOSURE_SCHED(
+ op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
+ GRPC_ERROR_REF(error));
+ }
}
INPROC_LOG(GPR_INFO, "perform_stream_op %p scheduling on_complete %p", s,
error);
@@ -1129,12 +1173,8 @@
/*******************************************************************************
* GLOBAL INIT AND DESTROY
*/
-static void do_nothing(void* arg, grpc_error* error) {}
-
void grpc_inproc_transport_init(void) {
grpc_core::ExecCtx exec_ctx;
- GRPC_CLOSURE_INIT(&do_nothing_closure, do_nothing, nullptr,
- grpc_schedule_on_exec_ctx);
g_empty_slice = grpc_slice_from_static_buffer(nullptr, 0);
grpc_slice key_tmp = grpc_slice_from_static_string(":path");
diff --git a/src/core/lib/channel/connected_channel.cc b/src/core/lib/channel/connected_channel.cc
index ddd3029..e2ea334 100644
--- a/src/core/lib/channel/connected_channel.cc
+++ b/src/core/lib/channel/connected_channel.cc
@@ -51,6 +51,7 @@
callback_state on_complete[6]; // Max number of pending batches.
callback_state recv_initial_metadata_ready;
callback_state recv_message_ready;
+ callback_state recv_trailing_metadata_ready;
} call_data;
static void run_in_call_combiner(void* arg, grpc_error* error) {
@@ -111,6 +112,12 @@
intercept_callback(calld, state, false, "recv_message_ready",
&batch->payload->recv_message.recv_message_ready);
}
+ if (batch->recv_trailing_metadata) {
+ callback_state* state = &calld->recv_trailing_metadata_ready;
+ intercept_callback(
+ calld, state, false, "recv_trailing_metadata_ready",
+ &batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready);
+ }
if (batch->cancel_stream) {
// There can be more than one cancellation batch in flight at any
// given time, so we can't just pick out a fixed index into
@@ -121,7 +128,7 @@
static_cast<callback_state*>(gpr_malloc(sizeof(*state)));
intercept_callback(calld, state, true, "on_complete (cancel_stream)",
&batch->on_complete);
- } else {
+ } else if (batch->on_complete != nullptr) {
callback_state* state = get_state_for_batch(calld, batch);
intercept_callback(calld, state, false, "on_complete", &batch->on_complete);
}
diff --git a/src/core/lib/iomgr/call_combiner.h b/src/core/lib/iomgr/call_combiner.h
index 0ccd08e..f9ce29f 100644
--- a/src/core/lib/iomgr/call_combiner.h
+++ b/src/core/lib/iomgr/call_combiner.h
@@ -26,6 +26,7 @@
#include <grpc/support/atm.h>
#include "src/core/lib/gpr/mpscq.h"
+#include "src/core/lib/gprpp/inlined_vector.h"
#include "src/core/lib/iomgr/closure.h"
// A simple, lock-free mechanism for serializing activity related to a
@@ -109,4 +110,83 @@
void grpc_call_combiner_cancel(grpc_call_combiner* call_combiner,
grpc_error* error);
+namespace grpc_core {
+
+// Helper for running a list of closures in a call combiner.
+//
+// Each callback running in the call combiner will eventually be
+// returned to the surface, at which point the surface will yield the
+// call combiner. So when we are running in the call combiner and have
+// more than one callback to return to the surface, we need to re-enter
+// the call combiner for all but one of those callbacks.
+class CallCombinerClosureList {
+ public:
+ CallCombinerClosureList() {}
+
+ // Adds a closure to the list. The closure must eventually result in
+ // the call combiner being yielded.
+ void Add(grpc_closure* closure, grpc_error* error, const char* reason) {
+ closures_.emplace_back(closure, error, reason);
+ }
+
+ // Runs all closures in the call combiner and yields the call combiner.
+ //
+ // All but one of the closures in the list will be scheduled via
+ // GRPC_CALL_COMBINER_START(), and the remaining closure will be
+ // scheduled via GRPC_CLOSURE_SCHED(), which will eventually result in
+ // yielding the call combiner. If the list is empty, then the call
+ // combiner will be yielded immediately.
+ void RunClosures(grpc_call_combiner* call_combiner) {
+ for (size_t i = 1; i < closures_.size(); ++i) {
+ auto& closure = closures_[i];
+ GRPC_CALL_COMBINER_START(call_combiner, closure.closure, closure.error,
+ closure.reason);
+ }
+ if (closures_.size() > 0) {
+ if (grpc_call_combiner_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "CallCombinerClosureList executing closure while already "
+ "holding call_combiner %p: closure=%p error=%s reason=%s",
+ call_combiner, closures_[0].closure,
+ grpc_error_string(closures_[0].error), closures_[0].reason);
+ }
+ // This will release the call combiner.
+ GRPC_CLOSURE_SCHED(closures_[0].closure, closures_[0].error);
+ } else {
+ GRPC_CALL_COMBINER_STOP(call_combiner, "no closures to schedule");
+ }
+ closures_.clear();
+ }
+
+ // Runs all closures in the call combiner, but does NOT yield the call
+ // combiner. All closures will be scheduled via GRPC_CALL_COMBINER_START().
+ void RunClosuresWithoutYielding(grpc_call_combiner* call_combiner) {
+ for (size_t i = 0; i < closures_.size(); ++i) {
+ auto& closure = closures_[i];
+ GRPC_CALL_COMBINER_START(call_combiner, closure.closure, closure.error,
+ closure.reason);
+ }
+ closures_.clear();
+ }
+
+ size_t size() const { return closures_.size(); }
+
+ private:
+ struct CallCombinerClosure {
+ grpc_closure* closure;
+ grpc_error* error;
+ const char* reason;
+
+ CallCombinerClosure(grpc_closure* closure, grpc_error* error,
+ const char* reason)
+ : closure(closure), error(error), reason(reason) {}
+ };
+
+ // There are generally a maximum of 6 closures to run in the call
+ // combiner, one for each pending op.
+ InlinedVector<CallCombinerClosure, 6> closures_;
+};
+
+} // namespace grpc_core
+
#endif /* GRPC_CORE_LIB_IOMGR_CALL_COMBINER_H */
diff --git a/src/core/lib/iomgr/closure.h b/src/core/lib/iomgr/closure.h
index 34a4944..f14c723 100644
--- a/src/core/lib/iomgr/closure.h
+++ b/src/core/lib/iomgr/closure.h
@@ -283,9 +283,10 @@
if (c->scheduled) {
gpr_log(GPR_ERROR,
"Closure already scheduled. (closure: %p, created: [%s:%d], "
- "previously scheduled at: [%s: %d] run?: %s",
+ "previously scheduled at: [%s: %d], newly scheduled at [%s: %d], "
+ "run?: %s",
c, c->file_created, c->line_created, c->file_initiated,
- c->line_initiated, c->run ? "true" : "false");
+ c->line_initiated, file, line, c->run ? "true" : "false");
abort();
}
c->scheduled = true;
diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc
index 993ea94..7903297 100644
--- a/src/core/lib/iomgr/ev_epollex_linux.cc
+++ b/src/core/lib/iomgr/ev_epollex_linux.cc
@@ -438,12 +438,7 @@
/* Might be called multiple times */
static void fd_shutdown(grpc_fd* fd, grpc_error* why) {
if (fd->read_closure->SetShutdown(GRPC_ERROR_REF(why))) {
- if (shutdown(fd->fd, SHUT_RDWR)) {
- if (errno != ENOTCONN) {
- gpr_log(GPR_ERROR, "Error shutting down fd %d. errno: %d",
- grpc_fd_wrapped_fd(fd), errno);
- }
- }
+ shutdown(fd->fd, SHUT_RDWR);
fd->write_closure->SetShutdown(GRPC_ERROR_REF(why));
}
GRPC_ERROR_UNREF(why);
diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc
index 1cf8ea9..d44846c 100644
--- a/src/core/lib/surface/call.cc
+++ b/src/core/lib/surface/call.cc
@@ -233,6 +233,7 @@
grpc_closure receiving_slice_ready;
grpc_closure receiving_stream_ready;
grpc_closure receiving_initial_metadata_ready;
+ grpc_closure receiving_trailing_metadata_ready;
uint32_t test_only_last_message_flags;
grpc_closure release_call;
@@ -1209,7 +1210,6 @@
if (bctl->op.send_initial_metadata) {
grpc_metadata_batch_destroy(
-
&call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]);
}
if (bctl->op.send_message) {
@@ -1217,14 +1217,9 @@
}
if (bctl->op.send_trailing_metadata) {
grpc_metadata_batch_destroy(
-
&call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]);
}
if (bctl->op.recv_trailing_metadata) {
- grpc_metadata_batch* md =
- &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
- recv_trailing_filter(call, md);
-
/* propagate cancellation to any interested children */
gpr_atm_rel_store(&call->received_final_op_atm, 1);
parent_call* pc = get_parent_call(call);
@@ -1246,7 +1241,6 @@
}
gpr_mu_unlock(&pc->child_list_mu);
}
-
if (call->is_client) {
get_final_status(call, set_status_value_directly,
call->final_op.client.status,
@@ -1256,7 +1250,6 @@
get_final_status(call, set_cancelled_value,
call->final_op.server.cancelled, nullptr, nullptr);
}
-
GRPC_ERROR_UNREF(error);
error = GRPC_ERROR_NONE;
}
@@ -1538,6 +1531,19 @@
finish_batch_step(bctl);
}
+static void receiving_trailing_metadata_ready(void* bctlp, grpc_error* error) {
+ batch_control* bctl = static_cast<batch_control*>(bctlp);
+ grpc_call* call = bctl->call;
+ GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_trailing_metadata_ready");
+ add_batch_error(bctl, GRPC_ERROR_REF(error), false);
+ if (error == GRPC_ERROR_NONE) {
+ grpc_metadata_batch* md =
+ &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
+ recv_trailing_filter(call, md);
+ }
+ finish_batch_step(bctl);
+}
+
static void finish_batch(void* bctlp, grpc_error* error) {
batch_control* bctl = static_cast<batch_control*>(bctlp);
grpc_call* call = bctl->call;
@@ -1558,7 +1564,8 @@
size_t i;
const grpc_op* op;
batch_control* bctl;
- int num_completion_callbacks_needed = 1;
+ bool has_send_ops = false;
+ int num_recv_ops = 0;
grpc_call_error error = GRPC_CALL_OK;
grpc_transport_stream_op_batch* stream_op;
grpc_transport_stream_op_batch_payload* stream_op_payload;
@@ -1664,6 +1671,7 @@
stream_op_payload->send_initial_metadata.peer_string =
&call->peer_string;
}
+ has_send_ops = true;
break;
}
case GRPC_OP_SEND_MESSAGE: {
@@ -1693,6 +1701,7 @@
&op->data.send_message.send_message->data.raw.slice_buffer, flags);
stream_op_payload->send_message.send_message.reset(
call->sending_stream.get());
+ has_send_ops = true;
break;
}
case GRPC_OP_SEND_CLOSE_FROM_CLIENT: {
@@ -1713,6 +1722,7 @@
call->sent_final_op = true;
stream_op_payload->send_trailing_metadata.send_trailing_metadata =
&call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */];
+ has_send_ops = true;
break;
}
case GRPC_OP_SEND_STATUS_FROM_SERVER: {
@@ -1777,6 +1787,7 @@
}
stream_op_payload->send_trailing_metadata.send_trailing_metadata =
&call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */];
+ has_send_ops = true;
break;
}
case GRPC_OP_RECV_INITIAL_METADATA: {
@@ -1804,7 +1815,7 @@
stream_op_payload->recv_initial_metadata.peer_string =
&call->peer_string;
}
- num_completion_callbacks_needed++;
+ ++num_recv_ops;
break;
}
case GRPC_OP_RECV_MESSAGE: {
@@ -1826,7 +1837,7 @@
grpc_schedule_on_exec_ctx);
stream_op_payload->recv_message.recv_message_ready =
&call->receiving_stream_ready;
- num_completion_callbacks_needed++;
+ ++num_recv_ops;
break;
}
case GRPC_OP_RECV_STATUS_ON_CLIENT: {
@@ -1852,11 +1863,16 @@
call->final_op.client.error_string =
op->data.recv_status_on_client.error_string;
stream_op->recv_trailing_metadata = true;
- stream_op->collect_stats = true;
stream_op_payload->recv_trailing_metadata.recv_trailing_metadata =
&call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
- stream_op_payload->collect_stats.collect_stats =
+ stream_op_payload->recv_trailing_metadata.collect_stats =
&call->final_info.stats.transport_stream_stats;
+ GRPC_CLOSURE_INIT(&call->receiving_trailing_metadata_ready,
+ receiving_trailing_metadata_ready, bctl,
+ grpc_schedule_on_exec_ctx);
+ stream_op_payload->recv_trailing_metadata.recv_trailing_metadata_ready =
+ &call->receiving_trailing_metadata_ready;
+ ++num_recv_ops;
break;
}
case GRPC_OP_RECV_CLOSE_ON_SERVER: {
@@ -1877,11 +1893,16 @@
call->final_op.server.cancelled =
op->data.recv_close_on_server.cancelled;
stream_op->recv_trailing_metadata = true;
- stream_op->collect_stats = true;
stream_op_payload->recv_trailing_metadata.recv_trailing_metadata =
&call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
- stream_op_payload->collect_stats.collect_stats =
+ stream_op_payload->recv_trailing_metadata.collect_stats =
&call->final_info.stats.transport_stream_stats;
+ GRPC_CLOSURE_INIT(&call->receiving_trailing_metadata_ready,
+ receiving_trailing_metadata_ready, bctl,
+ grpc_schedule_on_exec_ctx);
+ stream_op_payload->recv_trailing_metadata.recv_trailing_metadata_ready =
+ &call->receiving_trailing_metadata_ready;
+ ++num_recv_ops;
break;
}
}
@@ -1891,13 +1912,15 @@
if (!is_notify_tag_closure) {
GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag));
}
- gpr_ref_init(&bctl->steps_to_complete, num_completion_callbacks_needed);
+ gpr_ref_init(&bctl->steps_to_complete, (has_send_ops ? 1 : 0) + num_recv_ops);
- GRPC_CLOSURE_INIT(&bctl->finish_batch, finish_batch, bctl,
- grpc_schedule_on_exec_ctx);
- stream_op->on_complete = &bctl->finish_batch;
+ if (has_send_ops) {
+ GRPC_CLOSURE_INIT(&bctl->finish_batch, finish_batch, bctl,
+ grpc_schedule_on_exec_ctx);
+ stream_op->on_complete = &bctl->finish_batch;
+ }
+
gpr_atm_rel_store(&call->any_ops_sent_atm, 1);
-
execute_batch(call, stream_op, &bctl->start_batch);
done:
diff --git a/src/core/lib/transport/transport.cc b/src/core/lib/transport/transport.cc
index 039d603..cbdb77c 100644
--- a/src/core/lib/transport/transport.cc
+++ b/src/core/lib/transport/transport.cc
@@ -212,21 +212,32 @@
if (batch->send_message) {
batch->payload->send_message.send_message.reset();
}
- if (batch->recv_message) {
- GRPC_CALL_COMBINER_START(
- call_combiner, batch->payload->recv_message.recv_message_ready,
- GRPC_ERROR_REF(error), "failing recv_message_ready");
- }
- if (batch->recv_initial_metadata) {
- GRPC_CALL_COMBINER_START(
- call_combiner,
- batch->payload->recv_initial_metadata.recv_initial_metadata_ready,
- GRPC_ERROR_REF(error), "failing recv_initial_metadata_ready");
- }
- GRPC_CLOSURE_SCHED(batch->on_complete, error);
if (batch->cancel_stream) {
GRPC_ERROR_UNREF(batch->payload->cancel_stream.cancel_error);
}
+ // Construct a list of closures to execute.
+ grpc_core::CallCombinerClosureList closures;
+ if (batch->recv_initial_metadata) {
+ closures.Add(
+ batch->payload->recv_initial_metadata.recv_initial_metadata_ready,
+ GRPC_ERROR_REF(error), "failing recv_initial_metadata_ready");
+ }
+ if (batch->recv_message) {
+ closures.Add(batch->payload->recv_message.recv_message_ready,
+ GRPC_ERROR_REF(error), "failing recv_message_ready");
+ }
+ if (batch->recv_trailing_metadata) {
+ closures.Add(
+ batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
+ GRPC_ERROR_REF(error), "failing recv_trailing_metadata_ready");
+ }
+ if (batch->on_complete != nullptr) {
+ closures.Add(batch->on_complete, GRPC_ERROR_REF(error),
+ "failing on_complete");
+ }
+ // Execute closures.
+ closures.RunClosures(call_combiner);
+ GRPC_ERROR_UNREF(error);
}
typedef struct {
diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h
index b2e252d..585b9df 100644
--- a/src/core/lib/transport/transport.h
+++ b/src/core/lib/transport/transport.h
@@ -122,9 +122,15 @@
/* Transport stream op: a set of operations to perform on a transport
against a single stream */
typedef struct grpc_transport_stream_op_batch {
- /** Should be enqueued when all requested operations (excluding recv_message
- and recv_initial_metadata which have their own closures) in a given batch
- have been completed. */
+ /** Should be scheduled when all of the non-recv operations in the batch
+ are complete.
+
+ The recv ops (recv_initial_metadata, recv_message, and
+ recv_trailing_metadata) each have their own callbacks. If a batch
+ contains both recv ops and non-recv ops, on_complete should be
+ scheduled as soon as the non-recv ops are complete, regardless of
+ whether or not the recv ops are complete. If a batch contains
+ only recv ops, on_complete can be null. */
grpc_closure* on_complete;
/** Values for the stream op (fields set are determined by flags above) */
@@ -149,9 +155,6 @@
*/
bool recv_trailing_metadata : 1;
- /** Collect any stats into provided buffer, zero internal stat counters */
- bool collect_stats : 1;
-
/** Cancel this stream with the provided error */
bool cancel_stream : 1;
@@ -219,11 +222,10 @@
struct {
grpc_metadata_batch* recv_trailing_metadata;
- } recv_trailing_metadata;
-
- struct {
grpc_transport_stream_stats* collect_stats;
- } collect_stats;
+ /** Should be enqueued when initial metadata is ready to be processed. */
+ grpc_closure* recv_trailing_metadata_ready;
+ } recv_trailing_metadata;
/** Forcefully close this stream.
The HTTP2 semantics should be:
diff --git a/src/core/lib/transport/transport_op_string.cc b/src/core/lib/transport/transport_op_string.cc
index 25ab492..8c7db64 100644
--- a/src/core/lib/transport/transport_op_string.cc
+++ b/src/core/lib/transport/transport_op_string.cc
@@ -120,13 +120,6 @@
gpr_strvec_add(&b, tmp);
}
- if (op->collect_stats) {
- gpr_strvec_add(&b, gpr_strdup(" "));
- gpr_asprintf(&tmp, "COLLECT_STATS:%p",
- op->payload->collect_stats.collect_stats);
- gpr_strvec_add(&b, tmp);
- }
-
out = gpr_strvec_flatten(&b, nullptr);
gpr_strvec_destroy(&b);
diff --git a/src/csharp/Grpc.Core.NativeDebug.nuspec b/src/csharp/Grpc.Core.NativeDebug.nuspec
new file mode 100644
index 0000000..d4bb8ad
--- /dev/null
+++ b/src/csharp/Grpc.Core.NativeDebug.nuspec
@@ -0,0 +1,25 @@
+<?xml version="1.0" encoding="utf-8"?>
+<package>
+ <metadata>
+ <id>Grpc.Core.NativeDebug</id>
+ <title>Grpc.Core: Native Debug Symbols</title>
+ <summary>Debug symbols for the native library contained in Grpc.Core</summary>
+ <description>Currently contains grpc_csharp_ext.pdb</description>
+ <version>$version$</version>
+ <authors>Google Inc.</authors>
+ <owners>grpc-packages</owners>
+ <licenseUrl>https://github.com/grpc/grpc/blob/master/LICENSE</licenseUrl>
+ <projectUrl>https://github.com/grpc/grpc</projectUrl>
+ <requireLicenseAcceptance>false</requireLicenseAcceptance>
+ <releaseNotes>Release $version$</releaseNotes>
+ <copyright>Copyright 2015, Google Inc.</copyright>
+ <tags>gRPC RPC Protocol HTTP/2</tags>
+ </metadata>
+ <files>
+ <!-- forward slashes in src path enable building on Linux -->
+ <file src="nativelibs/csharp_ext_windows_x86/grpc_csharp_ext.dll" target="runtimes/win/native/grpc_csharp_ext.x86.dll" />
+ <file src="nativelibs/csharp_ext_windows_x86/grpc_csharp_ext.pdb" target="runtimes/win/native/grpc_csharp_ext.x86.pdb" />
+ <file src="nativelibs/csharp_ext_windows_x64/grpc_csharp_ext.dll" target="runtimes/win/native/grpc_csharp_ext.x64.dll" />
+ <file src="nativelibs/csharp_ext_windows_x64/grpc_csharp_ext.pdb" target="runtimes/win/native/grpc_csharp_ext.x64.pdb" />
+ </files>
+</package>
diff --git a/src/csharp/build_packages_dotnetcli.bat b/src/csharp/build_packages_dotnetcli.bat
index 924d7b1..f111178 100755
--- a/src/csharp/build_packages_dotnetcli.bat
+++ b/src/csharp/build_packages_dotnetcli.bat
@@ -47,6 +47,7 @@
%DOTNET% pack --configuration Release Grpc.Reflection --output ..\..\..\artifacts || goto :error
%NUGET% pack Grpc.nuspec -Version %VERSION% -OutputDirectory ..\..\artifacts || goto :error
+%NUGET% pack Grpc.Core.NativeDebug.nuspec -Version %VERSION% -OutputDirectory ..\..\artifacts
%NUGET% pack Grpc.Tools.nuspec -Version %VERSION% -OutputDirectory ..\..\artifacts
@rem copy resulting nuget packages to artifacts directory
diff --git a/src/csharp/build_packages_dotnetcli.sh b/src/csharp/build_packages_dotnetcli.sh
index 5c73a8f..fb4138a 100755
--- a/src/csharp/build_packages_dotnetcli.sh
+++ b/src/csharp/build_packages_dotnetcli.sh
@@ -46,6 +46,7 @@
dotnet pack --configuration Release Grpc.Reflection --output ../../../artifacts
nuget pack Grpc.nuspec -Version "1.13.0-dev" -OutputDirectory ../../artifacts
+nuget pack Grpc.Core.NativeDebug.nuspec -Version "1.13.0-dev" -OutputDirectory ../../artifacts
nuget pack Grpc.Tools.nuspec -Version "1.13.0-dev" -OutputDirectory ../../artifacts
(cd ../../artifacts && zip csharp_nugets_dotnetcli.zip *.nupkg)
diff --git a/src/objective-c/examples/Sample/Sample.xcodeproj/project.pbxproj b/src/objective-c/examples/Sample/Sample.xcodeproj/project.pbxproj
index ab7159c..cdd1c6c 100644
--- a/src/objective-c/examples/Sample/Sample.xcodeproj/project.pbxproj
+++ b/src/objective-c/examples/Sample/Sample.xcodeproj/project.pbxproj
@@ -325,6 +325,7 @@
buildSettings = {
ASSETCATALOG_COMPILER_APPICON_NAME = AppIcon;
INFOPLIST_FILE = Sample/Info.plist;
+ LD_GENERATE_MAP_FILE = YES;
LD_RUNPATH_SEARCH_PATHS = "$(inherited) @executable_path/Frameworks";
PRODUCT_BUNDLE_IDENTIFIER = "org.grpc.$(PRODUCT_NAME:rfc1034identifier)";
PRODUCT_NAME = "$(TARGET_NAME)";
@@ -337,6 +338,7 @@
buildSettings = {
ASSETCATALOG_COMPILER_APPICON_NAME = AppIcon;
INFOPLIST_FILE = Sample/Info.plist;
+ LD_GENERATE_MAP_FILE = YES;
LD_RUNPATH_SEARCH_PATHS = "$(inherited) @executable_path/Frameworks";
PRODUCT_BUNDLE_IDENTIFIER = "org.grpc.$(PRODUCT_NAME:rfc1034identifier)";
PRODUCT_NAME = "$(TARGET_NAME)";
diff --git a/src/objective-c/examples/Sample/Sample.xcodeproj/xcshareddata/xcschemes/Sample.xcscheme b/src/objective-c/examples/Sample/Sample.xcodeproj/xcshareddata/xcschemes/Sample.xcscheme
index d399e22..e356ea2 100644
--- a/src/objective-c/examples/Sample/Sample.xcodeproj/xcshareddata/xcschemes/Sample.xcscheme
+++ b/src/objective-c/examples/Sample/Sample.xcodeproj/xcshareddata/xcschemes/Sample.xcscheme
@@ -42,7 +42,7 @@
</AdditionalOptions>
</TestAction>
<LaunchAction
- buildConfiguration = "Debug"
+ buildConfiguration = "Release"
selectedDebuggerIdentifier = "Xcode.DebuggerFoundation.Debugger.LLDB"
selectedLauncherIdentifier = "Xcode.DebuggerFoundation.Launcher.LLDB"
launchStyle = "0"
diff --git a/src/objective-c/tests/analyze_link_map.py b/src/objective-c/tests/analyze_link_map.py
deleted file mode 100755
index 48e3441..0000000
--- a/src/objective-c/tests/analyze_link_map.py
+++ /dev/null
@@ -1,78 +0,0 @@
-#!/usr/bin/python
-# Copyright 2018 gRPC authors.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# This script analyzes link map file generated by Xcode. It calculates and
-# prints out the sizes of each dependent library and the total sizes of the
-# symbols.
-# The script takes one parameter, which is the path to the link map file.
-
-import sys
-import re
-
-table_tag = {}
-state = "start"
-
-table_stats_symbol = {}
-table_stats_dead = {}
-section_total_size = 0
-symbol_total_size = 0
-
-
-file_import = sys.argv[1]
-lines = list(open(file_import))
-for line in lines:
- line_stripped = line[:-1]
- if "# Object files:" == line_stripped:
- state = "object"
- continue
- elif "# Sections:" == line_stripped:
- state = "section"
- continue
- elif "# Symbols:" == line_stripped:
- state = "symbol"
- continue
- elif "# Dead Stripped Symbols:" == line_stripped:
- state = "dead"
- continue
-
- if state == "object":
- segs = re.search('(\[ *[0-9]*\]) (.*)', line_stripped)
- table_tag[segs.group(1)] = segs.group(2)
-
- if state == "section":
- if len(line_stripped) == 0 or line_stripped[0] == '#':
- continue
- segs = re.search('^(.+?)\s+(.+?)\s+.*', line_stripped)
- section_total_size += int(segs.group(2), 16)
-
- if state == "symbol":
- if len(line_stripped) == 0 or line_stripped[0] == '#':
- continue
- segs = re.search('^.+?\s+(.+?)\s+(\[.+?\]).*', line_stripped)
- target = table_tag[segs.group(2)]
- target_stripped = re.search('^(.*?)(\(.+?\))?$', target).group(1)
- size = int(segs.group(1), 16)
- if not target_stripped in table_stats_symbol:
- table_stats_symbol[target_stripped] = 0
- table_stats_symbol[target_stripped] += size
-
-print("Sections total size: %d" % section_total_size)
-
-for target in table_stats_symbol:
- print(target)
- print(table_stats_symbol[target])
- symbol_total_size += table_stats_symbol[target]
-
-print("Symbols total size: %d" % symbol_total_size)
diff --git a/src/objective-c/tests/build_one_example.sh b/src/objective-c/tests/build_one_example.sh
index 985d55f..1eace54 100755
--- a/src/objective-c/tests/build_one_example.sh
+++ b/src/objective-c/tests/build_one_example.sh
@@ -42,6 +42,9 @@
build \
-workspace *.xcworkspace \
-scheme $SCHEME \
- -destination name="iPhone 6" \
+ -destination generic/platform=iOS \
+ -derivedDataPath Build \
+ CODE_SIGN_IDENTITY="" \
+ CODE_SIGNING_REQUIRED=NO \
| egrep -v "$XCODEBUILD_FILTER" \
| egrep -v "^$" -
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pxd.pxi
index 7e9ea33..8d73215 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pxd.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pxd.pxi
@@ -57,6 +57,11 @@
cdef grpc_channel_credentials *c_credentials
+cdef class SSLSessionCacheLRU:
+
+ cdef grpc_ssl_session_cache *_cache
+
+
cdef class SSLChannelCredentials(ChannelCredentials):
cdef readonly object _pem_root_certificates
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi
index dff9097..f4ccfbc 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi
@@ -17,6 +17,9 @@
import grpc
import threading
+from libc.stdint cimport uintptr_t
+
+
def _spawn_callback_in_thread(cb_func, args):
threading.Thread(target=cb_func, args=args).start()
@@ -29,6 +32,7 @@
def _spawn_callback_async(callback, args):
async_callback_func(callback, args)
+
cdef class CallCredentials:
cdef grpc_call_credentials *c(self):
@@ -107,6 +111,21 @@
raise NotImplementedError()
+cdef class SSLSessionCacheLRU:
+
+ def __cinit__(self, capacity):
+ grpc_init()
+ self._cache = grpc_ssl_session_cache_create_lru(capacity)
+
+ def __int__(self):
+ return <uintptr_t>self._cache
+
+ def __dealloc__(self):
+ if self._cache != NULL:
+ grpc_ssl_session_cache_destroy(self._cache)
+ grpc_shutdown()
+
+
cdef class SSLChannelCredentials(ChannelCredentials):
def __cinit__(self, pem_root_certificates, private_key, certificate_chain):
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
index 2d6c900..cfefeaf 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
@@ -131,6 +131,7 @@
const char *GRPC_ARG_PRIMARY_USER_AGENT_STRING
const char *GRPC_ARG_SECONDARY_USER_AGENT_STRING
const char *GRPC_SSL_TARGET_NAME_OVERRIDE_ARG
+ const char *GRPC_SSL_SESSION_CACHE_ARG
const char *GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM
const char *GRPC_COMPRESSION_CHANNEL_DEFAULT_LEVEL
const char *GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET
@@ -452,8 +453,16 @@
# We don't care about the internals (and in fact don't know them)
pass
+
+ ctypedef struct grpc_ssl_session_cache:
+ # We don't care about the internals (and in fact don't know them)
+ pass
+
ctypedef void (*grpc_ssl_roots_override_callback)(char **pem_root_certs)
+ grpc_ssl_session_cache *grpc_ssl_session_cache_create_lru(size_t capacity)
+ void grpc_ssl_session_cache_destroy(grpc_ssl_session_cache* cache)
+
void grpc_set_ssl_roots_override_callback(
grpc_ssl_roots_override_callback cb) nogil
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
index ecd9916..37b98eb 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
@@ -51,6 +51,7 @@
default_authority = GRPC_ARG_DEFAULT_AUTHORITY
primary_user_agent_string = GRPC_ARG_PRIMARY_USER_AGENT_STRING
secondary_user_agent_string = GRPC_ARG_SECONDARY_USER_AGENT_STRING
+ ssl_session_cache = GRPC_SSL_SESSION_CACHE_ARG
ssl_target_name_override = GRPC_SSL_TARGET_NAME_OVERRIDE_ARG
diff --git a/src/python/grpcio/grpc/experimental/session_cache.py b/src/python/grpcio/grpc/experimental/session_cache.py
new file mode 100644
index 0000000..5c55f7c
--- /dev/null
+++ b/src/python/grpcio/grpc/experimental/session_cache.py
@@ -0,0 +1,45 @@
+# Copyright 2018 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""gRPC's APIs for TLS Session Resumption support"""
+
+from grpc._cython import cygrpc as _cygrpc
+
+
+def ssl_session_cache_lru(capacity):
+ """Creates an SSLSessionCache with LRU replacement policy
+
+ Args:
+ capacity: Size of the cache
+
+ Returns:
+ An SSLSessionCache with LRU replacement policy that can be passed as a value for
+ the grpc.ssl_session_cache option to a grpc.Channel. SSL session caches are used
+ to store session tickets, which clients can present to resume previous TLS sessions
+ with a server.
+ """
+ return SSLSessionCache(_cygrpc.SSLSessionCacheLRU(capacity))
+
+
+class SSLSessionCache(object):
+ """An encapsulation of a session cache used for TLS session resumption.
+
+ Instances of this class can be passed to a Channel as values for the
+ grpc.ssl_session_cache option
+ """
+
+ def __init__(self, cache):
+ self._cache = cache
+
+ def __int__(self):
+ return int(self._cache)
diff --git a/src/python/grpcio_tests/tests/tests.json b/src/python/grpcio_tests/tests/tests.json
index 0d94426..65460a9 100644
--- a/src/python/grpcio_tests/tests/tests.json
+++ b/src/python/grpcio_tests/tests/tests.json
@@ -53,6 +53,7 @@
"unit._server_ssl_cert_config_test.ServerSSLCertReloadTestCertConfigReuse",
"unit._server_ssl_cert_config_test.ServerSSLCertReloadTestWithClientAuth",
"unit._server_ssl_cert_config_test.ServerSSLCertReloadTestWithoutClientAuth",
+ "unit._session_cache_test.SSLSessionCacheTest",
"unit.beta._beta_features_test.BetaFeaturesTest",
"unit.beta._beta_features_test.ContextManagementAndLifecycleTest",
"unit.beta._connectivity_channel_test.ConnectivityStatesTest",
diff --git a/src/python/grpcio_tests/tests/unit/_auth_context_test.py b/src/python/grpcio_tests/tests/unit/_auth_context_test.py
index 8c1a30e..d174051 100644
--- a/src/python/grpcio_tests/tests/unit/_auth_context_test.py
+++ b/src/python/grpcio_tests/tests/unit/_auth_context_test.py
@@ -18,6 +18,7 @@
import grpc
from grpc import _channel
+from grpc.experimental import session_cache
import six
from tests.unit import test_common
@@ -140,6 +141,50 @@
self.assertSequenceEqual([b'*.test.google.com'],
auth_ctx['x509_common_name'])
+ def _do_one_shot_client_rpc(self, channel_creds, channel_options, port,
+ expect_ssl_session_reused):
+ channel = grpc.secure_channel(
+ 'localhost:{}'.format(port), channel_creds, options=channel_options)
+ response = channel.unary_unary(_UNARY_UNARY)(_REQUEST)
+ auth_data = pickle.loads(response)
+ self.assertEqual(expect_ssl_session_reused,
+ auth_data[_AUTH_CTX]['ssl_session_reused'])
+ channel.close()
+
+ def testSessionResumption(self):
+ # Set up a secure server
+ handler = grpc.method_handlers_generic_handler('test', {
+ 'UnaryUnary':
+ grpc.unary_unary_rpc_method_handler(handle_unary_unary)
+ })
+ server = test_common.test_server()
+ server.add_generic_rpc_handlers((handler,))
+ server_cred = grpc.ssl_server_credentials(_SERVER_CERTS)
+ port = server.add_secure_port('[::]:0', server_cred)
+ server.start()
+
+ # Create a cache for TLS session tickets
+ cache = session_cache.ssl_session_cache_lru(1)
+ channel_creds = grpc.ssl_channel_credentials(
+ root_certificates=_TEST_ROOT_CERTIFICATES)
+ channel_options = _PROPERTY_OPTIONS + (
+ ('grpc.ssl_session_cache', cache),)
+
+ # Initial connection has no session to resume
+ self._do_one_shot_client_rpc(
+ channel_creds,
+ channel_options,
+ port,
+ expect_ssl_session_reused=[b'false'])
+
+ # Subsequent connections resume sessions
+ self._do_one_shot_client_rpc(
+ channel_creds,
+ channel_options,
+ port,
+ expect_ssl_session_reused=[b'true'])
+ server.stop(None)
+
if __name__ == '__main__':
unittest.main(verbosity=2)
diff --git a/src/python/grpcio_tests/tests/unit/_session_cache_test.py b/src/python/grpcio_tests/tests/unit/_session_cache_test.py
new file mode 100644
index 0000000..b4e4670
--- /dev/null
+++ b/src/python/grpcio_tests/tests/unit/_session_cache_test.py
@@ -0,0 +1,145 @@
+# Copyright 2018 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Tests experimental TLS Session Resumption API"""
+
+import pickle
+import unittest
+
+import grpc
+from grpc import _channel
+from grpc.experimental import session_cache
+
+from tests.unit import test_common
+from tests.unit import resources
+
+_REQUEST = b'\x00\x00\x00'
+_RESPONSE = b'\x00\x00\x00'
+
+_UNARY_UNARY = '/test/UnaryUnary'
+
+_SERVER_HOST_OVERRIDE = 'foo.test.google.fr'
+_ID = 'id'
+_ID_KEY = 'id_key'
+_AUTH_CTX = 'auth_ctx'
+
+_PRIVATE_KEY = resources.private_key()
+_CERTIFICATE_CHAIN = resources.certificate_chain()
+_TEST_ROOT_CERTIFICATES = resources.test_root_certificates()
+_SERVER_CERTS = ((_PRIVATE_KEY, _CERTIFICATE_CHAIN),)
+_PROPERTY_OPTIONS = ((
+ 'grpc.ssl_target_name_override',
+ _SERVER_HOST_OVERRIDE,
+),)
+
+
+def handle_unary_unary(request, servicer_context):
+ return pickle.dumps({
+ _ID: servicer_context.peer_identities(),
+ _ID_KEY: servicer_context.peer_identity_key(),
+ _AUTH_CTX: servicer_context.auth_context()
+ })
+
+
+def start_secure_server():
+ handler = grpc.method_handlers_generic_handler('test', {
+ 'UnaryUnary':
+ grpc.unary_unary_rpc_method_handler(handle_unary_unary)
+ })
+ server = test_common.test_server()
+ server.add_generic_rpc_handlers((handler,))
+ server_cred = grpc.ssl_server_credentials(_SERVER_CERTS)
+ port = server.add_secure_port('[::]:0', server_cred)
+ server.start()
+
+ return server, port
+
+
+class SSLSessionCacheTest(unittest.TestCase):
+
+ def _do_one_shot_client_rpc(self, channel_creds, channel_options, port,
+ expect_ssl_session_reused):
+ channel = grpc.secure_channel(
+ 'localhost:{}'.format(port), channel_creds, options=channel_options)
+ response = channel.unary_unary(_UNARY_UNARY)(_REQUEST)
+ auth_data = pickle.loads(response)
+ self.assertEqual(expect_ssl_session_reused,
+ auth_data[_AUTH_CTX]['ssl_session_reused'])
+ channel.close()
+
+ def testSSLSessionCacheLRU(self):
+ server_1, port_1 = start_secure_server()
+
+ cache = session_cache.ssl_session_cache_lru(1)
+ channel_creds = grpc.ssl_channel_credentials(
+ root_certificates=_TEST_ROOT_CERTIFICATES)
+ channel_options = _PROPERTY_OPTIONS + (
+ ('grpc.ssl_session_cache', cache),)
+
+ # Initial connection has no session to resume
+ self._do_one_shot_client_rpc(
+ channel_creds,
+ channel_options,
+ port_1,
+ expect_ssl_session_reused=[b'false'])
+
+ # Connection to server_1 resumes from initial session
+ self._do_one_shot_client_rpc(
+ channel_creds,
+ channel_options,
+ port_1,
+ expect_ssl_session_reused=[b'true'])
+
+ # Connection to a different server with the same name overwrites the cache entry
+ server_2, port_2 = start_secure_server()
+ self._do_one_shot_client_rpc(
+ channel_creds,
+ channel_options,
+ port_2,
+ expect_ssl_session_reused=[b'false'])
+ self._do_one_shot_client_rpc(
+ channel_creds,
+ channel_options,
+ port_2,
+ expect_ssl_session_reused=[b'true'])
+ server_2.stop(None)
+
+ # Connection to server_1 now falls back to full TLS handshake
+ self._do_one_shot_client_rpc(
+ channel_creds,
+ channel_options,
+ port_1,
+ expect_ssl_session_reused=[b'false'])
+
+ # Re-creating server_1 causes old sessions to become invalid
+ server_1.stop(None)
+ server_1, port_1 = start_secure_server()
+
+ # Old sessions should no longer be valid
+ self._do_one_shot_client_rpc(
+ channel_creds,
+ channel_options,
+ port_1,
+ expect_ssl_session_reused=[b'false'])
+
+ # Resumption should work for subsequent connections
+ self._do_one_shot_client_rpc(
+ channel_creds,
+ channel_options,
+ port_1,
+ expect_ssl_session_reused=[b'true'])
+ server_1.stop(None)
+
+
+if __name__ == '__main__':
+ unittest.main(verbosity=2)
diff --git a/templates/src/csharp/build_packages_dotnetcli.bat.template b/templates/src/csharp/build_packages_dotnetcli.bat.template
index 1bf78c4..45010fe 100755
--- a/templates/src/csharp/build_packages_dotnetcli.bat.template
+++ b/templates/src/csharp/build_packages_dotnetcli.bat.template
@@ -49,6 +49,7 @@
%%DOTNET% pack --configuration Release Grpc.Reflection --output ..\..\..\artifacts || goto :error
%%NUGET% pack Grpc.nuspec -Version %VERSION% -OutputDirectory ..\..\artifacts || goto :error
+ %%NUGET% pack Grpc.Core.NativeDebug.nuspec -Version %VERSION% -OutputDirectory ..\..\artifacts
%%NUGET% pack Grpc.Tools.nuspec -Version %VERSION% -OutputDirectory ..\..\artifacts
@rem copy resulting nuget packages to artifacts directory
diff --git a/templates/src/csharp/build_packages_dotnetcli.sh.template b/templates/src/csharp/build_packages_dotnetcli.sh.template
index ddfea74..1172582 100755
--- a/templates/src/csharp/build_packages_dotnetcli.sh.template
+++ b/templates/src/csharp/build_packages_dotnetcli.sh.template
@@ -48,6 +48,7 @@
dotnet pack --configuration Release Grpc.Reflection --output ../../../artifacts
nuget pack Grpc.nuspec -Version "${settings.csharp_version}" -OutputDirectory ../../artifacts
+ nuget pack Grpc.Core.NativeDebug.nuspec -Version "${settings.csharp_version}" -OutputDirectory ../../artifacts
nuget pack Grpc.Tools.nuspec -Version "${settings.csharp_version}" -OutputDirectory ../../artifacts
(cd ../../artifacts && zip csharp_nugets_dotnetcli.zip *.nupkg)
diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc
index 60238e9..3c1d48c 100644
--- a/test/cpp/end2end/end2end_test.cc
+++ b/test/cpp/end2end/end2end_test.cc
@@ -1209,8 +1209,13 @@
std::vector<ErrorStatus> expected_status;
expected_status.emplace_back();
expected_status.back().set_code(13); // INTERNAL
+ // No Error message or details
+
+ expected_status.emplace_back();
+ expected_status.back().set_code(13); // INTERNAL
expected_status.back().set_error_message("text error message");
expected_status.back().set_binary_error_details("text error details");
+
expected_status.emplace_back();
expected_status.back().set_code(13); // INTERNAL
expected_status.back().set_error_message("text error message");
diff --git a/test/cpp/microbenchmarks/bm_call_create.cc b/test/cpp/microbenchmarks/bm_call_create.cc
index 831b29c..dd1610d 100644
--- a/test/cpp/microbenchmarks/bm_call_create.cc
+++ b/test/cpp/microbenchmarks/bm_call_create.cc
@@ -621,18 +621,26 @@
static void StartTransportStreamOp(grpc_call_element* elem,
grpc_transport_stream_op_batch* op) {
call_data* calld = static_cast<call_data*>(elem->call_data);
+ // Construct list of closures to return.
+ grpc_core::CallCombinerClosureList closures;
if (op->recv_initial_metadata) {
- GRPC_CALL_COMBINER_START(
- calld->call_combiner,
- op->payload->recv_initial_metadata.recv_initial_metadata_ready,
- GRPC_ERROR_NONE, "recv_initial_metadata");
+ closures.Add(op->payload->recv_initial_metadata.recv_initial_metadata_ready,
+ GRPC_ERROR_NONE, "recv_initial_metadata");
}
if (op->recv_message) {
- GRPC_CALL_COMBINER_START(calld->call_combiner,
- op->payload->recv_message.recv_message_ready,
- GRPC_ERROR_NONE, "recv_message");
+ closures.Add(op->payload->recv_message.recv_message_ready, GRPC_ERROR_NONE,
+ "recv_message");
}
- GRPC_CLOSURE_SCHED(op->on_complete, GRPC_ERROR_NONE);
+ if (op->recv_trailing_metadata) {
+ closures.Add(
+ op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
+ GRPC_ERROR_NONE, "recv_trailing_metadata");
+ }
+ if (op->on_complete != nullptr) {
+ closures.Add(op->on_complete, GRPC_ERROR_NONE, "on_complete");
+ }
+ // Execute closures.
+ closures.RunClosures(calld->call_combiner);
}
static void StartTransportOp(grpc_channel_element* elem,
diff --git a/test/cpp/naming/BUILD b/test/cpp/naming/BUILD
index 2925e8f..fa0b216 100644
--- a/test/cpp/naming/BUILD
+++ b/test/cpp/naming/BUILD
@@ -22,7 +22,7 @@
licenses(["notice"]) # Apache v2
-load("//bazel:grpc_build_system.bzl", "grpc_py_binary", "grpc_cc_test")
+load("//bazel:grpc_build_system.bzl", "grpc_py_binary")
load(":generate_resolver_component_tests.bzl", "generate_resolver_component_tests")
@@ -35,20 +35,4 @@
testonly = True,
)
-grpc_cc_test(
- name = "cancel_ares_query_test",
- srcs = ["cancel_ares_query_test.cc"],
- external_deps = ["gmock"],
- deps = [
- "//test/cpp/util:test_util",
- "//test/core/util:grpc_test_util",
- "//test/core/util:gpr_test_util",
- "//:grpc++",
- "//:grpc",
- "//:gpr",
- "//test/cpp/util:test_config",
- "//test/core/end2end:cq_verifier",
- ],
-)
-
generate_resolver_component_tests()
diff --git a/test/cpp/naming/cancel_ares_query_test.cc b/test/cpp/naming/cancel_ares_query_test.cc
deleted file mode 100644
index 11cdc0b..0000000
--- a/test/cpp/naming/cancel_ares_query_test.cc
+++ /dev/null
@@ -1,289 +0,0 @@
-/*
- *
- * Copyright 2015 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include <stdio.h>
-#include <string.h>
-
-#include <gflags/gflags.h>
-#include <gmock/gmock.h>
-
-#include <grpc/byte_buffer.h>
-#include <grpc/grpc.h>
-#include <grpc/support/alloc.h>
-#include <grpc/support/log.h>
-#include <grpc/support/time.h>
-#include "include/grpc/support/string_util.h"
-#include "src/core/ext/filters/client_channel/resolver.h"
-#include "src/core/ext/filters/client_channel/resolver_registry.h"
-#include "src/core/lib/channel/channel_args.h"
-#include "src/core/lib/debug/stats.h"
-#include "src/core/lib/gpr/env.h"
-#include "src/core/lib/gpr/host_port.h"
-#include "src/core/lib/gpr/string.h"
-#include "src/core/lib/gprpp/orphanable.h"
-#include "src/core/lib/gprpp/thd.h"
-#include "src/core/lib/iomgr/combiner.h"
-#include "src/core/lib/iomgr/pollset.h"
-#include "src/core/lib/iomgr/pollset_set.h"
-#include "test/core/end2end/cq_verifier.h"
-#include "test/core/util/cmdline.h"
-#include "test/core/util/port.h"
-#include "test/core/util/test_config.h"
-
-// TODO: pull in different headers when enabling this
-// test on windows. Also set BAD_SOCKET_RETURN_VAL
-// to INVALID_SOCKET on windows.
-#include "src/core/lib/iomgr/sockaddr_posix.h"
-#define BAD_SOCKET_RETURN_VAL -1
-
-namespace {
-
-void* Tag(intptr_t t) { return (void*)t; }
-
-gpr_timespec FiveSecondsFromNow(void) {
- return grpc_timeout_seconds_to_deadline(5);
-}
-
-void DrainCq(grpc_completion_queue* cq) {
- grpc_event ev;
- do {
- ev = grpc_completion_queue_next(cq, FiveSecondsFromNow(), nullptr);
- } while (ev.type != GRPC_QUEUE_SHUTDOWN);
-}
-
-void EndTest(grpc_channel* client, grpc_completion_queue* cq) {
- grpc_channel_destroy(client);
- grpc_completion_queue_shutdown(cq);
- DrainCq(cq);
- grpc_completion_queue_destroy(cq);
-}
-
-class FakeNonResponsiveDNSServer {
- public:
- FakeNonResponsiveDNSServer(int port) {
- socket_ = socket(AF_INET6, SOCK_DGRAM, 0);
- if (socket_ == BAD_SOCKET_RETURN_VAL) {
- gpr_log(GPR_DEBUG, "Failed to create UDP ipv6 socket");
- abort();
- }
- sockaddr_in6 addr;
- memset(&addr, 0, sizeof(addr));
- addr.sin6_family = AF_INET6;
- addr.sin6_port = htons(port);
- ((char*)&addr.sin6_addr)[15] = 1;
- if (bind(socket_, (const sockaddr*)&addr, sizeof(addr)) != 0) {
- gpr_log(GPR_DEBUG, "Failed to bind UDP ipv6 socket to [::1]:%d", port);
- abort();
- }
- }
- ~FakeNonResponsiveDNSServer() { close(socket_); }
-
- private:
- int socket_;
-};
-
-struct ArgsStruct {
- gpr_atm done_atm;
- gpr_mu* mu;
- grpc_pollset* pollset;
- grpc_pollset_set* pollset_set;
- grpc_combiner* lock;
- grpc_channel_args* channel_args;
-};
-
-void ArgsInit(ArgsStruct* args) {
- args->pollset = (grpc_pollset*)gpr_zalloc(grpc_pollset_size());
- grpc_pollset_init(args->pollset, &args->mu);
- args->pollset_set = grpc_pollset_set_create();
- grpc_pollset_set_add_pollset(args->pollset_set, args->pollset);
- args->lock = grpc_combiner_create();
- gpr_atm_rel_store(&args->done_atm, 0);
- args->channel_args = nullptr;
-}
-
-void DoNothing(void* arg, grpc_error* error) {}
-
-void ArgsFinish(ArgsStruct* args) {
- grpc_pollset_set_del_pollset(args->pollset_set, args->pollset);
- grpc_pollset_set_destroy(args->pollset_set);
- grpc_closure DoNothing_cb;
- GRPC_CLOSURE_INIT(&DoNothing_cb, DoNothing, nullptr,
- grpc_schedule_on_exec_ctx);
- grpc_pollset_shutdown(args->pollset, &DoNothing_cb);
- // exec_ctx needs to be flushed before calling grpc_pollset_destroy()
- grpc_channel_args_destroy(args->channel_args);
- grpc_core::ExecCtx::Get()->Flush();
- grpc_pollset_destroy(args->pollset);
- gpr_free(args->pollset);
- GRPC_COMBINER_UNREF(args->lock, nullptr);
-}
-
-void PollPollsetUntilRequestDone(ArgsStruct* args) {
- while (true) {
- bool done = gpr_atm_acq_load(&args->done_atm) != 0;
- if (done) {
- break;
- }
- grpc_pollset_worker* worker = nullptr;
- grpc_core::ExecCtx exec_ctx;
- gpr_mu_lock(args->mu);
- GRPC_LOG_IF_ERROR(
- "pollset_work",
- grpc_pollset_work(args->pollset, &worker,
- grpc_timespec_to_millis_round_up(
- gpr_inf_future(GPR_CLOCK_REALTIME))));
- gpr_mu_unlock(args->mu);
- }
-}
-
-void CheckResolverResultAssertFailureLocked(void* arg, grpc_error* error) {
- EXPECT_NE(error, GRPC_ERROR_NONE);
- ArgsStruct* args = static_cast<ArgsStruct*>(arg);
- gpr_atm_rel_store(&args->done_atm, 1);
- gpr_mu_lock(args->mu);
- GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(args->pollset, nullptr));
- gpr_mu_unlock(args->mu);
-}
-
-TEST(CancelDuringAresQuery, TestCancelActiveDNSQuery) {
- grpc_core::ExecCtx exec_ctx;
- ArgsStruct args;
- ArgsInit(&args);
- int fake_dns_port = grpc_pick_unused_port_or_die();
- FakeNonResponsiveDNSServer fake_dns_server(fake_dns_port);
- char* client_target;
- GPR_ASSERT(gpr_asprintf(
- &client_target,
- "dns://[::1]:%d/dont-care-since-wont-be-resolved.test.com:1234",
- fake_dns_port));
- // create resolver and resolve
- grpc_core::OrphanablePtr<grpc_core::Resolver> resolver =
- grpc_core::ResolverRegistry::CreateResolver(client_target, nullptr,
- args.pollset_set, args.lock);
- gpr_free(client_target);
- grpc_closure on_resolver_result_changed;
- GRPC_CLOSURE_INIT(&on_resolver_result_changed,
- CheckResolverResultAssertFailureLocked, (void*)&args,
- grpc_combiner_scheduler(args.lock));
- resolver->NextLocked(&args.channel_args, &on_resolver_result_changed);
- // Without resetting and causing resolver shutdown, the
- // PollPollsetUntilRequestDone call should never finish.
- resolver.reset();
- grpc_core::ExecCtx::Get()->Flush();
- PollPollsetUntilRequestDone(&args);
- ArgsFinish(&args);
-}
-
-TEST(CancelDuringAresQuery,
- TestHitDeadlineAndDestroyChannelDuringAresResolutionIsGraceful) {
- // Start up fake non responsive DNS server
- int fake_dns_port = grpc_pick_unused_port_or_die();
- FakeNonResponsiveDNSServer fake_dns_server(fake_dns_port);
- // Create a call that will try to use the fake DNS server
- char* client_target = nullptr;
- GPR_ASSERT(gpr_asprintf(
- &client_target,
- "dns://[::1]:%d/dont-care-since-wont-be-resolved.test.com:1234",
- fake_dns_port));
- grpc_channel* client =
- grpc_insecure_channel_create(client_target,
- /* client_args */ nullptr, nullptr);
- gpr_free(client_target);
- grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr);
- cq_verifier* cqv = cq_verifier_create(cq);
- gpr_timespec deadline = grpc_timeout_milliseconds_to_deadline(10);
- grpc_call* call = grpc_channel_create_call(
- client, nullptr, GRPC_PROPAGATE_DEFAULTS, cq,
- grpc_slice_from_static_string("/foo"), nullptr, deadline, nullptr);
- GPR_ASSERT(call);
- grpc_metadata_array initial_metadata_recv;
- grpc_metadata_array trailing_metadata_recv;
- grpc_metadata_array request_metadata_recv;
- grpc_metadata_array_init(&initial_metadata_recv);
- grpc_metadata_array_init(&trailing_metadata_recv);
- grpc_metadata_array_init(&request_metadata_recv);
- grpc_call_details call_details;
- grpc_call_details_init(&call_details);
- grpc_status_code status;
- const char* error_string;
- grpc_slice details;
- // Set ops for client the request
- grpc_op ops_base[6];
- memset(ops_base, 0, sizeof(ops_base));
- grpc_op* op = ops_base;
- op->op = GRPC_OP_SEND_INITIAL_METADATA;
- op->data.send_initial_metadata.count = 0;
- op->flags = 0;
- op->reserved = nullptr;
- op++;
- op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
- op->flags = 0;
- op->reserved = nullptr;
- op++;
- op->op = GRPC_OP_RECV_INITIAL_METADATA;
- op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
- op->flags = 0;
- op->reserved = nullptr;
- op++;
- op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
- op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
- op->data.recv_status_on_client.status = &status;
- op->data.recv_status_on_client.status_details = &details;
- op->data.recv_status_on_client.error_string = &error_string;
- op->flags = 0;
- op->reserved = nullptr;
- op++;
- // Run the call and sanity check it failed as expected
- grpc_call_error error = grpc_call_start_batch(
- call, ops_base, static_cast<size_t>(op - ops_base), Tag(1), nullptr);
- EXPECT_EQ(GRPC_CALL_OK, error);
- CQ_EXPECT_COMPLETION(cqv, Tag(1), 1);
- cq_verify(cqv);
- EXPECT_EQ(status, GRPC_STATUS_DEADLINE_EXCEEDED);
- // Teardown
- grpc_slice_unref(details);
- gpr_free((void*)error_string);
- grpc_metadata_array_destroy(&initial_metadata_recv);
- grpc_metadata_array_destroy(&trailing_metadata_recv);
- grpc_metadata_array_destroy(&request_metadata_recv);
- grpc_call_details_destroy(&call_details);
- grpc_call_unref(call);
- cq_verifier_destroy(cqv);
- EndTest(client, cq);
-}
-
-} // namespace
-
-int main(int argc, char** argv) {
- grpc_test_init(argc, argv);
- ::testing::InitGoogleTest(&argc, argv);
- gpr_setenv("GRPC_DNS_RESOLVER", "ares");
- // Sanity check the time that it takes to run the test
- // including the teardown time (the teardown
- // part of the test involves cancelling the DNS query,
- // which is the main point of interest for this test).
- gpr_timespec overall_deadline = grpc_timeout_seconds_to_deadline(4);
- grpc_init();
- auto result = RUN_ALL_TESTS();
- grpc_shutdown();
- if (gpr_time_cmp(gpr_now(GPR_CLOCK_MONOTONIC), overall_deadline) > 0) {
- gpr_log(GPR_ERROR, "Test took too long");
- abort();
- }
- return result;
-}
diff --git a/test/cpp/naming/gen_build_yaml.py b/test/cpp/naming/gen_build_yaml.py
index eb2c01e..6e63cbe 100755
--- a/test/cpp/naming/gen_build_yaml.py
+++ b/test/cpp/naming/gen_build_yaml.py
@@ -120,25 +120,6 @@
'grpc++_test_config',
],
} for unsecure_build_config_suffix in ['_unsecure', '']
- ] + [
- {
- 'name': 'cancel_ares_query_test',
- 'build': 'test',
- 'language': 'c++',
- 'gtest': True,
- 'run': True,
- 'src': ['test/cpp/naming/cancel_ares_query_test.cc'],
- 'platforms': ['linux', 'posix', 'mac'],
- 'deps': [
- 'grpc++_test_util',
- 'grpc_test_util',
- 'gpr_test_util',
- 'grpc++',
- 'grpc',
- 'gpr',
- 'grpc++_test_config',
- ],
- },
]
}
diff --git a/test/cpp/naming/resolver_component_test.cc b/test/cpp/naming/resolver_component_test.cc
index 649441d..f4be064 100644
--- a/test/cpp/naming/resolver_component_test.cc
+++ b/test/cpp/naming/resolver_component_test.cc
@@ -22,14 +22,10 @@
#include <grpc/support/string_util.h>
#include <grpc/support/sync.h>
#include <grpc/support/time.h>
-
#include <string.h>
-#include <errno.h>
-#include <fcntl.h>
#include <gflags/gflags.h>
#include <gmock/gmock.h>
-#include <thread>
#include <vector>
#include "test/cpp/util/subprocess.h"
@@ -52,12 +48,6 @@
#include "test/core/util/port.h"
#include "test/core/util/test_config.h"
-// TODO: pull in different headers when enabling this
-// test on windows. Also set BAD_SOCKET_RETURN_VAL
-// to INVALID_SOCKET on windows.
-#include "src/core/lib/iomgr/sockaddr_posix.h"
-#define BAD_SOCKET_RETURN_VAL -1
-
using grpc::SubProcess;
using std::vector;
using testing::UnorderedElementsAreArray;
@@ -241,73 +231,7 @@
}
}
-void OpenAndCloseSocketsStressLoop(int dummy_port, gpr_event* done_ev) {
- // The goal of this loop is to catch socket
- // "use after close" bugs within the c-ares resolver by acting
- // like some separate thread doing I/O.
- // It's goal is to try to hit race conditions whereby:
- // 1) The c-ares resolver closes a socket.
- // 2) This loop opens a socket with (coincidentally) the same handle.
- // 3) the c-ares resolver mistakenly uses that same socket without
- // realizing that its closed.
- // 4) This loop performs an operation on that socket that should
- // succeed but instead fails because of what the c-ares
- // resolver did in the meantime.
- sockaddr_in6 addr;
- memset(&addr, 0, sizeof(addr));
- addr.sin6_family = AF_INET6;
- addr.sin6_port = htons(dummy_port);
- ((char*)&addr.sin6_addr)[15] = 1;
- for (;;) {
- if (gpr_event_get(done_ev)) {
- return;
- }
- std::vector<int> sockets;
- // First open a bunch of sockets, bind and listen
- // '50' is an arbitrary number that, experimentally,
- // has a good chance of catching bugs.
- for (size_t i = 0; i < 50; i++) {
- int s = socket(AF_INET6, SOCK_STREAM, 0);
- int val = 1;
- setsockopt(s, SOL_SOCKET, SO_REUSEPORT, &val, sizeof(val));
- fcntl(s, F_SETFL, O_NONBLOCK);
- ASSERT_TRUE(s != BAD_SOCKET_RETURN_VAL)
- << "Failed to create TCP ipv6 socket";
- gpr_log(GPR_DEBUG, "Opened fd: %d", s);
- ASSERT_TRUE(bind(s, (const sockaddr*)&addr, sizeof(addr)) == 0)
- << "Failed to bind socket " + std::to_string(s) +
- " to [::1]:" + std::to_string(dummy_port) +
- ". errno: " + std::to_string(errno);
- ASSERT_TRUE(listen(s, 1) == 0) << "Failed to listen on socket " +
- std::to_string(s) +
- ". errno: " + std::to_string(errno);
- sockets.push_back(s);
- }
- // Do a non-blocking accept followed by a close on all of those sockets.
- // Do this in a separate loop to try to induce a time window to hit races.
- for (size_t i = 0; i < sockets.size(); i++) {
- gpr_log(GPR_DEBUG, "non-blocking accept then close on %d", sockets[i]);
- if (accept(sockets[i], nullptr, nullptr)) {
- // If e.g. a "shutdown" was called on this fd from another thread,
- // then this accept call should fail with an unexpected error.
- ASSERT_TRUE(errno == EAGAIN || errno == EWOULDBLOCK)
- << "OpenAndCloseSocketsStressLoop accept on socket " +
- std::to_string(sockets[i]) +
- " failed in "
- "an unexpected way. "
- "errno: " +
- std::to_string(errno) +
- ". Socket use-after-close bugs are likely.";
- }
- ASSERT_TRUE(close(sockets[i]) == 0)
- << "Failed to close socket: " + std::to_string(sockets[i]) +
- ". errno: " + std::to_string(errno);
- }
- }
-}
-
void CheckResolverResultLocked(void* argsp, grpc_error* err) {
- EXPECT_EQ(err, GRPC_ERROR_NONE);
ArgsStruct* args = (ArgsStruct*)argsp;
grpc_channel_args* channel_args = args->channel_args;
const grpc_arg* channel_arg =
@@ -347,17 +271,7 @@
gpr_mu_unlock(args->mu);
}
-void CheckResolvedWithoutErrorLocked(void* argsp, grpc_error* err) {
- EXPECT_EQ(err, GRPC_ERROR_NONE);
- ArgsStruct* args = (ArgsStruct*)argsp;
- gpr_atm_rel_store(&args->done_atm, 1);
- gpr_mu_lock(args->mu);
- GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(args->pollset, nullptr));
- gpr_mu_unlock(args->mu);
-}
-
-void RunResolvesRelevantRecordsTest(void (*OnDoneLocked)(void* arg,
- grpc_error* error)) {
+TEST(ResolverComponentTest, TestResolvesRelevantRecords) {
grpc_core::ExecCtx exec_ctx;
ArgsStruct args;
ArgsInit(&args);
@@ -375,32 +289,14 @@
args.pollset_set, args.lock);
gpr_free(whole_uri);
grpc_closure on_resolver_result_changed;
- GRPC_CLOSURE_INIT(&on_resolver_result_changed, OnDoneLocked, (void*)&args,
- grpc_combiner_scheduler(args.lock));
+ GRPC_CLOSURE_INIT(&on_resolver_result_changed, CheckResolverResultLocked,
+ (void*)&args, grpc_combiner_scheduler(args.lock));
resolver->NextLocked(&args.channel_args, &on_resolver_result_changed);
grpc_core::ExecCtx::Get()->Flush();
PollPollsetUntilRequestDone(&args);
ArgsFinish(&args);
}
-TEST(ResolverComponentTest, TestResolvesRelevantRecords) {
- RunResolvesRelevantRecordsTest(CheckResolverResultLocked);
-}
-
-TEST(ResolverComponentTest, TestResolvesRelevantRecordsWithConcurrentFdStress) {
- // Start up background stress thread
- int dummy_port = grpc_pick_unused_port_or_die();
- gpr_event done_ev;
- gpr_event_init(&done_ev);
- std::thread socket_stress_thread(OpenAndCloseSocketsStressLoop, dummy_port,
- &done_ev);
- // Run the resolver test
- RunResolvesRelevantRecordsTest(CheckResolvedWithoutErrorLocked);
- // Shutdown and join stress thread
- gpr_event_set(&done_ev, (void*)1);
- socket_stress_thread.join();
-}
-
} // namespace
int main(int argc, char** argv) {
diff --git a/test/cpp/util/grpc_tool.cc b/test/cpp/util/grpc_tool.cc
index 195b6bd..840ca07 100644
--- a/test/cpp/util/grpc_tool.cc
+++ b/test/cpp/util/grpc_tool.cc
@@ -471,17 +471,26 @@
std::shared_ptr<grpc::Channel> channel =
grpc::CreateChannel(server_address, cred.GetCredentials());
- parser.reset(new grpc::testing::ProtoFileParser(
- FLAGS_remotedb ? channel : nullptr, FLAGS_proto_path, FLAGS_protofiles));
+ if (!FLAGS_binary_input || !FLAGS_binary_output) {
+ parser.reset(
+ new grpc::testing::ProtoFileParser(FLAGS_remotedb ? channel : nullptr,
+ FLAGS_proto_path, FLAGS_protofiles));
+ if (parser->HasError()) {
+ fprintf(
+ stderr,
+ "Failed to find remote reflection service and local proto files.\n");
+ return false;
+ }
+ }
if (FLAGS_binary_input) {
formatted_method_name = method_name;
} else {
formatted_method_name = parser->GetFormattedMethodName(method_name);
- }
-
- if (parser->HasError()) {
- return false;
+ if (parser->HasError()) {
+ fprintf(stderr, "Failed to find method %s in proto files.\n",
+ method_name.c_str());
+ }
}
if (argc == 3) {
@@ -711,6 +720,7 @@
serialized_request_proto = parser->GetSerializedProtoFromMethod(
method_name, request_text, true /* is_request */);
if (parser->HasError()) {
+ fprintf(stderr, "Failed to parse request.\n");
return false;
}
}
@@ -735,6 +745,7 @@
serialized_response_proto = parser->GetTextFormatFromMethod(
method_name, serialized_response_proto, false /* is_request */);
if (parser->HasError()) {
+ fprintf(stderr, "Failed to parse response.\n");
return false;
}
}
@@ -814,6 +825,9 @@
new grpc::testing::ProtoFileParser(FLAGS_remotedb ? channel : nullptr,
FLAGS_proto_path, FLAGS_protofiles));
if (parser->HasError()) {
+ fprintf(
+ stderr,
+ "Failed to find remote reflection service and local proto files.\n");
return false;
}
}
@@ -824,6 +838,7 @@
serialized_request_proto =
parser->GetSerializedProtoFromMessageType(type_name, message_text);
if (parser->HasError()) {
+ fprintf(stderr, "Failed to serialize the message.\n");
return false;
}
}
@@ -834,6 +849,7 @@
grpc::string output_text = parser->GetTextFormatFromMessageType(
type_name, serialized_request_proto);
if (parser->HasError()) {
+ fprintf(stderr, "Failed to deserialize the message.\n");
return false;
}
output_ss << output_text << std::endl;
diff --git a/tools/profiling/ios_bin/binary_diff.py b/tools/profiling/ios_bin/binary_diff.py
new file mode 100755
index 0000000..6d5ae65
--- /dev/null
+++ b/tools/profiling/ios_bin/binary_diff.py
@@ -0,0 +1,127 @@
+#!/usr/bin/env python2.7
+#
+# Copyright 2018 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import argparse
+import glob
+import multiprocessing
+import os
+import shutil
+import subprocess
+import sys
+from parse_link_map import parse_link_map
+
+sys.path.append(
+ os.path.join(
+ os.path.dirname(sys.argv[0]), '..', '..', 'run_tests', 'python_utils'))
+import comment_on_pr
+
+size_labels = ('Core', 'ObjC', 'BoringSSL', 'Protobuf', 'Total')
+
+argp = argparse.ArgumentParser(
+ description='Binary size diff of gRPC Objective-C sample')
+
+argp.add_argument(
+ '-d',
+ '--diff_base',
+ type=str,
+ help='Commit or branch to compare the current one to')
+
+args = argp.parse_args()
+
+
+def dir_size(dir):
+ total = 0
+ for dirpath, dirnames, filenames in os.walk(dir):
+ for f in filenames:
+ fp = os.path.join(dirpath, f)
+ total += os.stat(fp).st_size
+ return total
+
+
+def get_size(where, frameworks):
+ build_dir = 'src/objective-c/examples/Sample/Build-%s/' % where
+ if not frameworks:
+ link_map_filename = 'Build/Intermediates.noindex/Sample.build/Release-iphoneos/Sample.build/Sample-LinkMap-normal-arm64.txt'
+ return parse_link_map(build_dir + link_map_filename)
+ else:
+ framework_dir = 'Build/Products/Release-iphoneos/Sample.app/Frameworks/'
+ boringssl_size = dir_size(
+ build_dir + framework_dir + 'openssl.framework')
+ core_size = dir_size(build_dir + framework_dir + 'grpc.framework')
+ objc_size = dir_size(build_dir + framework_dir + 'GRPCClient.framework') + \
+ dir_size(build_dir + framework_dir + 'RxLibrary.framework') + \
+ dir_size(build_dir + framework_dir + 'ProtoRPC.framework')
+ protobuf_size = dir_size(
+ build_dir + framework_dir + 'Protobuf.framework')
+ app_size = dir_size(
+ build_dir + 'Build/Products/Release-iphoneos/Sample.app')
+ return core_size, objc_size, boringssl_size, protobuf_size, app_size
+
+
+def build(where, frameworks):
+ shutil.rmtree(
+ 'src/objective-c/examples/Sample/Build-%s' % where, ignore_errors=True)
+ subprocess.check_call(
+ 'CONFIG=opt EXAMPLE_PATH=src/objective-c/examples/Sample SCHEME=Sample FRAMEWORKS=%s ./build_one_example.sh'
+ % ('YES' if frameworks else 'NO'),
+ shell=True,
+ cwd='src/objective-c/tests')
+ os.rename('src/objective-c/examples/Sample/Build',
+ 'src/objective-c/examples/Sample/Build-%s' % where)
+
+
+text = ''
+for frameworks in [False, True]:
+ build('new', frameworks)
+ new_size = get_size('new', frameworks)
+ old_size = None
+
+ if args.diff_base:
+ old = 'old'
+ where_am_i = subprocess.check_output(
+ ['git', 'rev-parse', '--abbrev-ref', 'HEAD']).strip()
+ subprocess.check_call(['git', 'checkout', '--', '.'])
+ subprocess.check_call(['git', 'checkout', args.diff_base])
+ subprocess.check_call(['git', 'submodule', 'update'])
+ try:
+ build('old', frameworks)
+ old_size = get_size('old', frameworks)
+ finally:
+ subprocess.check_call(['git', 'checkout', '--', '.'])
+ subprocess.check_call(['git', 'checkout', where_am_i])
+ subprocess.check_call(['git', 'submodule', 'update'])
+
+ text += ('****************FRAMEWORKS*****************\n'
+ if frameworks else '******************STATIC*******************\n')
+ row_format = "{:>10}{:>15}{:>15}" + '\n'
+ text += row_format.format('New size', '', 'Old size')
+ for i in range(0, len(size_labels)):
+ if old_size == None:
+ diff_sign = ' '
+ elif new_size[i] == old_size[i]:
+ diff_sign = ' (=)'
+ elif new_size[i] > old_size[i]:
+ diff_sign = ' (>)'
+ else:
+ diff_sign = ' (<)'
+ text += ('\n' if i == len(size_labels) - 1 else '') + row_format.format(
+ '{:,}'.format(new_size[i]), size_labels[i] + diff_sign,
+ '{:,}'.format(old_size[i]) if old_size != None else '')
+ text += '\n'
+
+print text
+
+comment_on_pr.comment_on_pr('```\n%s\n```' % text)
diff --git a/tools/profiling/ios_bin/parse_link_map.py b/tools/profiling/ios_bin/parse_link_map.py
new file mode 100755
index 0000000..eaa1d6e
--- /dev/null
+++ b/tools/profiling/ios_bin/parse_link_map.py
@@ -0,0 +1,104 @@
+#!/usr/bin/python
+# Copyright 2018 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# This script analyzes link map file generated by Xcode. It calculates and
+# prints out the sizes of each dependent library and the total sizes of the
+# symbols.
+# The script takes one parameter, which is the path to the link map file.
+
+import sys
+import re
+
+
+def parse_link_map(filename):
+ table_tag = {}
+ state = "start"
+
+ table_stats_symbol = {}
+ table_stats_dead = {}
+ section_total_size = 0
+ symbol_total_size = 0
+
+ boringssl_size = 0
+ core_size = 0
+ objc_size = 0
+ protobuf_size = 0
+
+ lines = list(open(filename))
+ for line in lines:
+ line_stripped = line[:-1]
+ if "# Object files:" == line_stripped:
+ state = "object"
+ continue
+ elif "# Sections:" == line_stripped:
+ state = "section"
+ continue
+ elif "# Symbols:" == line_stripped:
+ state = "symbol"
+ continue
+ elif "# Dead Stripped Symbols:" == line_stripped:
+ state = "dead"
+ continue
+
+ if state == "object":
+ segs = re.search('(\[ *[0-9]*\]) (.*)', line_stripped)
+ table_tag[segs.group(1)] = segs.group(2)
+
+ if state == "section":
+ if len(line_stripped) == 0 or line_stripped[0] == '#':
+ continue
+ segs = re.search('^(.+?)\s+(.+?)\s+.*', line_stripped)
+ section_total_size += int(segs.group(2), 16)
+
+ if state == "symbol":
+ if len(line_stripped) == 0 or line_stripped[0] == '#':
+ continue
+ segs = re.search('^.+?\s+(.+?)\s+(\[.+?\]).*', line_stripped)
+ target = table_tag[segs.group(2)]
+ target_stripped = re.search('^(.*?)(\(.+?\))?$', target).group(1)
+ size = int(segs.group(1), 16)
+ if not target_stripped in table_stats_symbol:
+ table_stats_symbol[target_stripped] = 0
+ table_stats_symbol[target_stripped] += size
+ if 'BoringSSL' in target_stripped:
+ boringssl_size += size
+ elif 'libgRPC-Core' in target_stripped:
+ core_size += size
+ elif 'libgRPC-RxLibrary' in target_stripped or \
+ 'libgRPC' in target_stripped or \
+ 'libgRPC-ProtoLibrary' in target_stripped:
+ objc_size += size
+ elif 'libProtobuf' in target_stripped:
+ protobuf_size += size
+
+ for target in table_stats_symbol:
+ symbol_total_size += table_stats_symbol[target]
+
+ return core_size, objc_size, boringssl_size, protobuf_size, symbol_total_size
+
+
+def main():
+ filename = sys.argv[1]
+ core_size, objc_size, boringssl_size, protobuf_size, total_size = parse_link_map(
+ filename)
+ print('Core size:{:,}'.format(core_size))
+ print('ObjC size:{:,}'.format(objc_size))
+ print('BoringSSL size:{:,}'.format(boringssl_size))
+ print('Protobuf size:{:,}\n'.format(protobuf_size))
+ print('Total size:{:,}'.format(total_size))
+
+
+if __name__ == "__main__":
+ main()
diff --git a/tools/run_tests/artifacts/build_artifact_csharp.bat b/tools/run_tests/artifacts/build_artifact_csharp.bat
index a84bc54..ac2c92b 100644
--- a/tools/run_tests/artifacts/build_artifact_csharp.bat
+++ b/tools/run_tests/artifacts/build_artifact_csharp.bat
@@ -19,11 +19,12 @@
@call tools\run_tests\helper_scripts\pre_build_csharp.bat %ARCHITECTURE% || goto :error
cd cmake\build\%ARCHITECTURE%
-cmake --build . --target grpc_csharp_ext --config Release
+cmake --build . --target grpc_csharp_ext --config RelWithDebInfo
cd ..\..\..
mkdir -p %ARTIFACTS_OUT%
-copy /Y cmake\build\Win32\Release\grpc_csharp_ext.dll %ARTIFACTS_OUT% || copy /Y cmake\build\x64\Release\grpc_csharp_ext.dll %ARTIFACTS_OUT% || goto :error
+copy /Y cmake\build\Win32\RelWithDebInfo\grpc_csharp_ext.dll %ARTIFACTS_OUT% || copy /Y cmake\build\x64\RelWithDebInfo\grpc_csharp_ext.dll %ARTIFACTS_OUT% || goto :error
+copy /Y cmake\build\Win32\RelWithDebInfo\grpc_csharp_ext.pdb %ARTIFACTS_OUT% || copy /Y cmake\build\x64\RelWithDebInfo\grpc_csharp_ext.pdb %ARTIFACTS_OUT% || goto :error
goto :EOF
diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json
index ae7c65d..f6a3b66 100644
--- a/tools/run_tests/generated/sources_and_headers.json
+++ b/tools/run_tests/generated/sources_and_headers.json
@@ -6546,26 +6546,6 @@
"gpr",
"gpr_test_util",
"grpc",
- "grpc++",
- "grpc++_test_config",
- "grpc++_test_util",
- "grpc_test_util"
- ],
- "headers": [],
- "is_filegroup": false,
- "language": "c++",
- "name": "cancel_ares_query_test",
- "src": [
- "test/cpp/naming/cancel_ares_query_test.cc"
- ],
- "third_party": false,
- "type": "target"
- },
- {
- "deps": [
- "gpr",
- "gpr_test_util",
- "grpc",
"grpc_test_util"
],
"headers": [],
diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json
index 6128f36..00604f1 100644
--- a/tools/run_tests/generated/tests.json
+++ b/tools/run_tests/generated/tests.json
@@ -5640,28 +5640,6 @@
},
{
"args": [],
- "benchmark": false,
- "ci_platforms": [
- "linux",
- "mac",
- "posix"
- ],
- "cpu_cost": 1.0,
- "exclude_configs": [],
- "exclude_iomgrs": [],
- "flaky": false,
- "gtest": true,
- "language": "c++",
- "name": "cancel_ares_query_test",
- "platforms": [
- "linux",
- "mac",
- "posix"
- ],
- "uses_polling": true
- },
- {
- "args": [],
"boringssl": true,
"ci_platforms": [
"linux",