One RPCMgr instance per CQ
diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc index 760309d..786195e 100644 --- a/src/cpp/server/server_builder.cc +++ b/src/cpp/server/server_builder.cc
@@ -93,7 +93,7 @@ gpr_log(GPR_ERROR, "Adding multiple AsyncGenericService is unsupported for now. " "Dropping the service %p", - service); + (void *) service); } else { generic_service_ = service; } @@ -138,7 +138,6 @@ } std::unique_ptr<Server> ServerBuilder::BuildAndStart() { - // == Determine if the server has any syncrhonous methods == bool has_sync_methods = false; for (auto it = services_.begin(); it != services_.end(); ++it) { @@ -157,6 +156,35 @@ } } + // If this is a Sync server, i.e a server expositing sync API, then the server + // needs to create some completion queues to listen for incoming requests. + // 'sync_server_cqs' are those internal completion queues. + // + // This is different from the completion queues added to the server via + // ServerBuilder's AddCompletionQueue() method (those completion queues + // are in 'cqs_' member variable of ServerBuilder object) + std::shared_ptr<std::vector<ServerCompletionQueue>> sync_server_cqs( + new std::vector<ServerCompletionQueue>()); + + if (has_sync_methods) { + // If the server has synchronous methods, it will need completion queues to + // handle those methods. Create one cq per core (or create 4 if number of + // cores is less than 4 or unavailable) + // + // TODO (sreek) - The default number 4 is just a guess. Check if a lower or + // higher number makes sense + int num_cqs = gpr_cpu_num_cores(); + num_cqs = GPR_MAX(num_cqs, 4); + + for (int i = 0; i < num_cqs; i++) { + // emplace_back() would have been ideal here but doesn't work since the + // ServerCompletionQueue's constructor is private. With emplace_back, the + // constructor is called from somewhere within the library; so making + // ServerBuilder class a friend to ServerCompletion queue won't help. + sync_server_cqs->push_back(ServerCompletionQueue()); + } + } + // == Channel args == ChannelArguments args; for (auto option = options_.begin(); option != options_.end(); ++option) { @@ -178,28 +206,38 @@ maybe_default_compression_algorithm_.algorithm); } - std::unique_ptr<Server> server( - new Server(has_sync_methods, max_message_size_, &args)); + // TODO (sreek) Make the number of pollers configurable + std::unique_ptr<Server> server(new Server(sync_server_cqs, max_message_size_, + &args, kDefaultMinPollers, + kDefaultMaxPollers)); ServerInitializer* initializer = server->initializer(); - // If the server has atleast one sync methods, we know that this is a Sync - // server or a Hybrid server and the completion queue (server->cq_) would be - // frequently polled. - int num_frequently_polled_cqs = has_sync_methods ? 1 : 0; + // Register all the completion queues with the server. i.e + // 1. sync_server_cqs: internal completion queues created IF this is a sync + // server + // 2. cqs_: Completion queues added via AddCompletionQueue() call - for (auto cq = cqs_.begin(); cq != cqs_.end(); ++cq) { - // A completion queue that is not polled frequently (by calling Next() or - // AsyncNext()) is not safe to use for listening to incoming channels. - // Register all such completion queues as non-listening completion queues - // with the GRPC core library. - if ((*cq)->IsFrequentlyPolled()) { - grpc_server_register_completion_queue(server->server_, (*cq)->cq(), + // All sync cqs (if any) are frequently polled by the GrpcRpcManager + int num_frequently_polled_cqs = sync_server_cqs->size(); + + for (auto it = sync_server_cqs->begin(); it != sync_server_cqs->end(); ++it) { + grpc_server_register_completion_queue(server->server_, it->cq(), nullptr); + } + + // cqs_ contains the completion queue added by calling the ServerBuilder's + // AddCompletionQueue() API. Some of them may not be frequently polled (i.e by + // calling Next() or AsyncNext()) and hence are not safe to be used for + // listening to incoming channels. Such completion queues must be registered + // as non-listening queues + for (auto it = cqs_.begin(); it != cqs_.end(); ++it) { + if ((*it)->IsFrequentlyPolled()) { + grpc_server_register_completion_queue(server->server_, (*it)->cq(), nullptr); num_frequently_polled_cqs++; } else { grpc_server_register_non_listening_completion_queue(server->server_, - (*cq)->cq(), nullptr); + (*it)->cq(), nullptr); } }