| /* |
| * |
| * Copyright 2014, Google Inc. |
| * All rights reserved. |
| * |
| * Redistribution and use in source and binary forms, with or without |
| * modification, are permitted provided that the following conditions are |
| * met: |
| * |
| * * Redistributions of source code must retain the above copyright |
| * notice, this list of conditions and the following disclaimer. |
| * * Redistributions in binary form must reproduce the above |
| * copyright notice, this list of conditions and the following disclaimer |
| * in the documentation and/or other materials provided with the |
| * distribution. |
| * * Neither the name of Google Inc. nor the names of its |
| * contributors may be used to endorse or promote products derived from |
| * this software without specific prior written permission. |
| * |
| * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
| * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
| * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
| * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
| * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
| * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
| * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
| * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
| * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
| * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
| * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
| * |
| */ |
| |
| #include "src/core/surface/server.h" |
| |
| #include <stdlib.h> |
| #include <string.h> |
| |
| #include "src/core/channel/census_filter.h" |
| #include "src/core/channel/channel_args.h" |
| #include "src/core/channel/connected_channel.h" |
| #include "src/core/surface/call.h" |
| #include "src/core/surface/channel.h" |
| #include "src/core/surface/completion_queue.h" |
| #include "src/core/surface/surface_em.h" |
| #include <grpc/support/alloc.h> |
| #include <grpc/support/log.h> |
| #include <grpc/support/string.h> |
| #include <grpc/support/useful.h> |
| |
| typedef enum { PENDING_START, ALL_CALLS, CALL_LIST_COUNT } call_list; |
| |
| typedef struct listener { |
| void *arg; |
| void (*start)(grpc_server *server, void *arg); |
| void (*destroy)(grpc_server *server, void *arg); |
| struct listener *next; |
| } listener; |
| |
| typedef struct call_data call_data; |
| typedef struct channel_data channel_data; |
| |
| struct channel_data { |
| grpc_server *server; |
| grpc_channel *channel; |
| /* linked list of all channels on a server */ |
| channel_data *next; |
| channel_data *prev; |
| }; |
| |
| struct grpc_server { |
| size_t channel_filter_count; |
| const grpc_channel_filter **channel_filters; |
| grpc_channel_args *channel_args; |
| grpc_completion_queue *cq; |
| grpc_em *em; |
| |
| gpr_mu mu; |
| |
| void **tags; |
| size_t ntags; |
| size_t tag_cap; |
| |
| gpr_uint8 shutdown; |
| |
| call_data *lists[CALL_LIST_COUNT]; |
| channel_data root_channel_data; |
| |
| listener *listeners; |
| gpr_refcount internal_refcount; |
| }; |
| |
| typedef struct { |
| call_data *next; |
| call_data *prev; |
| } call_link; |
| |
| typedef enum { |
| /* waiting for metadata */ |
| NOT_STARTED, |
| /* inital metadata read, not flow controlled in yet */ |
| PENDING, |
| /* flow controlled in, on completion queue */ |
| ACTIVATED, |
| /* cancelled before being queued */ |
| ZOMBIED |
| } call_state; |
| |
| struct call_data { |
| grpc_call *call; |
| |
| call_state state; |
| gpr_timespec deadline; |
| |
| gpr_uint8 included[CALL_LIST_COUNT]; |
| call_link links[CALL_LIST_COUNT]; |
| }; |
| |
| #define SERVER_FROM_CALL_ELEM(elem) \ |
| (((channel_data *)(elem)->channel_data)->server) |
| |
| static void do_nothing(void *unused, grpc_op_error ignored) {} |
| |
| static int call_list_join(grpc_server *server, call_data *call, |
| call_list list) { |
| if (call->included[list]) return 0; |
| call->included[list] = 1; |
| if (!server->lists[list]) { |
| server->lists[list] = call; |
| call->links[list].next = call->links[list].prev = call; |
| } else { |
| call->links[list].next = server->lists[list]; |
| call->links[list].prev = server->lists[list]->links[list].prev; |
| call->links[list].next->links[list].prev = |
| call->links[list].prev->links[list].next = call; |
| } |
| return 1; |
| } |
| |
| static call_data *call_list_remove_head(grpc_server *server, call_list list) { |
| call_data *out = server->lists[list]; |
| if (out) { |
| out->included[list] = 0; |
| if (out->links[list].next == out) { |
| server->lists[list] = NULL; |
| } else { |
| server->lists[list] = out->links[list].next; |
| out->links[list].next->links[list].prev = out->links[list].prev; |
| out->links[list].prev->links[list].next = out->links[list].next; |
| } |
| } |
| return out; |
| } |
| |
| static int call_list_remove(grpc_server *server, call_data *call, |
| call_list list) { |
| if (!call->included[list]) return 0; |
| call->included[list] = 0; |
| if (server->lists[list] == call) { |
| server->lists[list] = call->links[list].next; |
| if (server->lists[list] == call) { |
| server->lists[list] = NULL; |
| return 1; |
| } |
| } |
| GPR_ASSERT(server->lists[list] != call); |
| call->links[list].next->links[list].prev = call->links[list].prev; |
| call->links[list].prev->links[list].next = call->links[list].next; |
| return 1; |
| } |
| |
| static void server_ref(grpc_server *server) { |
| gpr_ref(&server->internal_refcount); |
| } |
| |
| static void server_unref(grpc_server *server) { |
| if (gpr_unref(&server->internal_refcount)) { |
| grpc_channel_args_destroy(server->channel_args); |
| gpr_mu_destroy(&server->mu); |
| gpr_free(server->channel_filters); |
| gpr_free(server->tags); |
| gpr_free(server); |
| } |
| } |
| |
| static int is_channel_orphaned(channel_data *chand) { |
| return chand->next == chand; |
| } |
| |
| static void orphan_channel(channel_data *chand) { |
| chand->next->prev = chand->prev; |
| chand->prev->next = chand->next; |
| chand->next = chand->prev = chand; |
| } |
| |
| static void finish_destroy_channel(void *cd, grpc_em_cb_status status) { |
| channel_data *chand = cd; |
| grpc_server *server = chand->server; |
| /*gpr_log(GPR_INFO, "destroy channel %p", chand->channel);*/ |
| grpc_channel_destroy(chand->channel); |
| server_unref(server); |
| } |
| |
| static void destroy_channel(channel_data *chand) { |
| if (is_channel_orphaned(chand)) return; |
| GPR_ASSERT(chand->server != NULL); |
| orphan_channel(chand); |
| server_ref(chand->server); |
| grpc_em_add_callback(chand->server->em, finish_destroy_channel, chand); |
| } |
| |
| static void queue_new_rpc(grpc_server *server, call_data *calld, void *tag) { |
| grpc_call *call = calld->call; |
| grpc_metadata_buffer *mdbuf = grpc_call_get_metadata_buffer(call); |
| size_t count = grpc_metadata_buffer_count(mdbuf); |
| grpc_metadata *elements = grpc_metadata_buffer_extract_elements(mdbuf); |
| const char *host = NULL; |
| const char *method = NULL; |
| size_t i; |
| grpc_metadata status_md; |
| |
| for (i = 0; i < count; i++) { |
| if (0 == strcmp(elements[i].key, ":authority")) { |
| host = elements[i].value; |
| } else if (0 == strcmp(elements[i].key, ":path")) { |
| method = elements[i].value; |
| } |
| } |
| |
| status_md.key = ":status"; |
| status_md.value = "200"; |
| status_md.value_length = 3; |
| grpc_call_add_metadata(call, &status_md, GRPC_WRITE_BUFFER_HINT); |
| |
| grpc_call_internal_ref(call); |
| grpc_cq_end_new_rpc(server->cq, tag, call, |
| grpc_metadata_buffer_cleanup_elements, elements, method, |
| host, calld->deadline, count, elements); |
| } |
| |
| static void start_new_rpc(grpc_call_element *elem) { |
| channel_data *chand = elem->channel_data; |
| call_data *calld = elem->call_data; |
| grpc_server *server = chand->server; |
| |
| gpr_mu_lock(&server->mu); |
| if (server->ntags) { |
| calld->state = ACTIVATED; |
| queue_new_rpc(server, calld, server->tags[--server->ntags]); |
| } else { |
| calld->state = PENDING; |
| call_list_join(server, calld, PENDING_START); |
| } |
| gpr_mu_unlock(&server->mu); |
| } |
| |
| static void kill_zombie(void *elem, grpc_em_cb_status status) { |
| grpc_call_destroy(grpc_call_from_top_element(elem)); |
| } |
| |
| static void finish_rpc(grpc_call_element *elem, int is_full_close) { |
| call_data *calld = elem->call_data; |
| channel_data *chand = elem->channel_data; |
| gpr_mu_lock(&chand->server->mu); |
| switch (calld->state) { |
| case ACTIVATED: |
| grpc_call_recv_finish(elem, is_full_close); |
| break; |
| case PENDING: |
| if (!is_full_close) { |
| grpc_call_recv_finish(elem, is_full_close); |
| break; |
| } |
| call_list_remove(chand->server, calld, PENDING_START); |
| /* fallthrough intended */ |
| case NOT_STARTED: |
| calld->state = ZOMBIED; |
| grpc_em_add_callback(chand->server->em, kill_zombie, elem); |
| break; |
| case ZOMBIED: |
| break; |
| } |
| gpr_mu_unlock(&chand->server->mu); |
| } |
| |
| static void call_op(grpc_call_element *elem, grpc_call_op *op) { |
| GRPC_CALL_LOG_OP(GPR_INFO, elem, op); |
| switch (op->type) { |
| case GRPC_RECV_METADATA: |
| grpc_call_recv_metadata(elem, op); |
| break; |
| case GRPC_RECV_END_OF_INITIAL_METADATA: |
| start_new_rpc(elem); |
| break; |
| case GRPC_RECV_MESSAGE: |
| grpc_call_recv_message(elem, op->data.message, op->done_cb, |
| op->user_data); |
| break; |
| case GRPC_RECV_HALF_CLOSE: |
| finish_rpc(elem, 0); |
| break; |
| case GRPC_RECV_FINISH: |
| finish_rpc(elem, 1); |
| break; |
| case GRPC_RECV_DEADLINE: |
| grpc_call_set_deadline(elem, op->data.deadline); |
| ((call_data *)elem->call_data)->deadline = op->data.deadline; |
| break; |
| default: |
| GPR_ASSERT(op->dir == GRPC_CALL_DOWN); |
| grpc_call_next_op(elem, op); |
| break; |
| } |
| } |
| |
| static void channel_op(grpc_channel_element *elem, grpc_channel_op *op) { |
| channel_data *chand = elem->channel_data; |
| |
| switch (op->type) { |
| case GRPC_ACCEPT_CALL: |
| /* create a call */ |
| grpc_call_create(chand->channel, |
| op->data.accept_call.transport_server_data); |
| break; |
| case GRPC_TRANSPORT_CLOSED: |
| /* if the transport is closed for a server channel, we destroy the |
| channel */ |
| gpr_mu_lock(&chand->server->mu); |
| server_ref(chand->server); |
| destroy_channel(chand); |
| gpr_mu_unlock(&chand->server->mu); |
| server_unref(chand->server); |
| break; |
| default: |
| GPR_ASSERT(op->dir == GRPC_CALL_DOWN); |
| grpc_channel_next_op(elem, op); |
| break; |
| } |
| } |
| |
| static void finish_shutdown_channel(void *cd, grpc_em_cb_status status) { |
| channel_data *chand = cd; |
| grpc_channel_op op; |
| op.type = GRPC_CHANNEL_SHUTDOWN; |
| op.dir = GRPC_CALL_DOWN; |
| channel_op(grpc_channel_stack_element( |
| grpc_channel_get_channel_stack(chand->channel), 0), |
| &op); |
| grpc_channel_internal_unref(chand->channel); |
| } |
| |
| static void shutdown_channel(channel_data *chand) { |
| grpc_channel_internal_ref(chand->channel); |
| grpc_em_add_callback(chand->server->em, finish_shutdown_channel, chand); |
| } |
| |
| static void init_call_elem(grpc_call_element *elem, |
| const void *server_transport_data) { |
| call_data *calld = elem->call_data; |
| channel_data *chand = elem->channel_data; |
| memset(calld, 0, sizeof(call_data)); |
| calld->deadline = gpr_inf_future; |
| calld->call = grpc_call_from_top_element(elem); |
| |
| gpr_mu_lock(&chand->server->mu); |
| call_list_join(chand->server, calld, ALL_CALLS); |
| gpr_mu_unlock(&chand->server->mu); |
| |
| server_ref(chand->server); |
| } |
| |
| static void destroy_call_elem(grpc_call_element *elem) { |
| channel_data *chand = elem->channel_data; |
| int i; |
| |
| gpr_mu_lock(&chand->server->mu); |
| for (i = 0; i < CALL_LIST_COUNT; i++) { |
| call_list_remove(chand->server, elem->call_data, i); |
| } |
| gpr_mu_unlock(&chand->server->mu); |
| |
| server_unref(chand->server); |
| } |
| |
| static void init_channel_elem(grpc_channel_element *elem, |
| const grpc_channel_args *args, |
| grpc_mdctx *metadata_context, int is_first, |
| int is_last) { |
| channel_data *chand = elem->channel_data; |
| GPR_ASSERT(is_first); |
| GPR_ASSERT(!is_last); |
| chand->server = NULL; |
| chand->channel = NULL; |
| chand->next = chand->prev = chand; |
| } |
| |
| static void destroy_channel_elem(grpc_channel_element *elem) { |
| channel_data *chand = elem->channel_data; |
| if (chand->server) { |
| gpr_mu_lock(&chand->server->mu); |
| chand->next->prev = chand->prev; |
| chand->prev->next = chand->next; |
| chand->next = chand->prev = chand; |
| gpr_mu_unlock(&chand->server->mu); |
| server_unref(chand->server); |
| } |
| } |
| |
| static const grpc_channel_filter server_surface_filter = { |
| call_op, channel_op, |
| |
| sizeof(call_data), init_call_elem, destroy_call_elem, |
| |
| sizeof(channel_data), init_channel_elem, destroy_channel_elem, |
| |
| "server", |
| }; |
| |
| static void early_terminate_requested_calls(grpc_completion_queue *cq, |
| void **tags, size_t ntags) { |
| size_t i; |
| |
| for (i = 0; i < ntags; i++) { |
| grpc_cq_end_new_rpc(cq, tags[i], NULL, do_nothing, NULL, NULL, NULL, |
| gpr_inf_past, 0, NULL); |
| } |
| } |
| |
| grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq, |
| grpc_channel_filter **filters, |
| size_t filter_count, |
| const grpc_channel_args *args) { |
| size_t i; |
| int census_enabled = grpc_channel_args_is_census_enabled(args); |
| |
| grpc_server *server = gpr_malloc(sizeof(grpc_server)); |
| memset(server, 0, sizeof(grpc_server)); |
| |
| gpr_mu_init(&server->mu); |
| |
| server->cq = cq; |
| server->em = grpc_surface_em(); |
| /* decremented by grpc_server_destroy */ |
| gpr_ref_init(&server->internal_refcount, 1); |
| server->root_channel_data.next = server->root_channel_data.prev = |
| &server->root_channel_data; |
| |
| /* Server filter stack is: |
| |
| server_surface_filter - for making surface API calls |
| grpc_server_census_filter (optional) - for stats collection and tracing |
| {passed in filter stack} |
| grpc_connected_channel_filter - for interfacing with transports */ |
| server->channel_filter_count = filter_count + 1 + census_enabled; |
| server->channel_filters = |
| gpr_malloc(server->channel_filter_count * sizeof(grpc_channel_filter *)); |
| server->channel_filters[0] = &server_surface_filter; |
| if (census_enabled) { |
| server->channel_filters[1] = &grpc_server_census_filter; |
| } |
| for (i = 0; i < filter_count; i++) { |
| server->channel_filters[i + 1 + census_enabled] = filters[i]; |
| } |
| |
| server->channel_args = grpc_channel_args_copy(args); |
| |
| return server; |
| } |
| |
| void grpc_server_start(grpc_server *server) { |
| listener *l; |
| |
| for (l = server->listeners; l; l = l->next) { |
| l->start(server, l->arg); |
| } |
| } |
| |
| grpc_transport_setup_result grpc_server_setup_transport( |
| grpc_server *s, grpc_transport *transport, |
| grpc_channel_filter const **extra_filters, size_t num_extra_filters, |
| grpc_mdctx *mdctx) { |
| size_t num_filters = s->channel_filter_count + num_extra_filters + 1; |
| grpc_channel_filter const **filters = |
| gpr_malloc(sizeof(grpc_channel_filter *) * num_filters); |
| size_t i; |
| grpc_channel *channel; |
| channel_data *chand; |
| |
| for (i = 0; i < s->channel_filter_count; i++) { |
| filters[i] = s->channel_filters[i]; |
| } |
| for (; i < s->channel_filter_count + num_extra_filters; i++) { |
| filters[i] = extra_filters[i - s->channel_filter_count]; |
| } |
| filters[i] = &grpc_connected_channel_filter; |
| |
| channel = grpc_channel_create_from_filters(filters, num_filters, |
| s->channel_args, mdctx, 0); |
| chand = (channel_data *)grpc_channel_stack_element( |
| grpc_channel_get_channel_stack(channel), 0)->channel_data; |
| chand->server = s; |
| server_ref(s); |
| chand->channel = channel; |
| |
| gpr_mu_lock(&s->mu); |
| chand->next = &s->root_channel_data; |
| chand->prev = chand->next->prev; |
| chand->next->prev = chand->prev->next = chand; |
| gpr_mu_unlock(&s->mu); |
| |
| gpr_free(filters); |
| |
| return grpc_connected_channel_bind_transport( |
| grpc_channel_get_channel_stack(channel), transport); |
| } |
| |
| void grpc_server_shutdown(grpc_server *server) { |
| /* TODO(ctiller): send goaway, etc */ |
| listener *l; |
| void **tags; |
| size_t ntags; |
| |
| /* lock, and gather up some stuff to do */ |
| gpr_mu_lock(&server->mu); |
| if (server->shutdown) { |
| gpr_mu_unlock(&server->mu); |
| return; |
| } |
| |
| tags = server->tags; |
| ntags = server->ntags; |
| server->tags = NULL; |
| server->ntags = 0; |
| |
| server->shutdown = 1; |
| gpr_mu_unlock(&server->mu); |
| |
| /* terminate all the requested calls */ |
| early_terminate_requested_calls(server->cq, tags, ntags); |
| gpr_free(tags); |
| |
| /* Shutdown listeners */ |
| for (l = server->listeners; l; l = l->next) { |
| l->destroy(server, l->arg); |
| } |
| while (server->listeners) { |
| l = server->listeners; |
| server->listeners = l->next; |
| gpr_free(l); |
| } |
| } |
| |
| void grpc_server_destroy(grpc_server *server) { |
| channel_data *c; |
| gpr_mu_lock(&server->mu); |
| for (c = server->root_channel_data.next; c != &server->root_channel_data; |
| c = c->next) { |
| shutdown_channel(c); |
| } |
| gpr_mu_unlock(&server->mu); |
| |
| server_unref(server); |
| } |
| |
| void grpc_server_add_listener(grpc_server *server, void *arg, |
| void (*start)(grpc_server *server, void *arg), |
| void (*destroy)(grpc_server *server, void *arg)) { |
| listener *l = gpr_malloc(sizeof(listener)); |
| l->arg = arg; |
| l->start = start; |
| l->destroy = destroy; |
| l->next = server->listeners; |
| server->listeners = l; |
| } |
| |
| grpc_call_error grpc_server_request_call(grpc_server *server, void *tag_new) { |
| call_data *calld; |
| |
| grpc_cq_begin_op(server->cq, NULL, GRPC_SERVER_RPC_NEW); |
| |
| gpr_mu_lock(&server->mu); |
| |
| if (server->shutdown) { |
| gpr_mu_unlock(&server->mu); |
| early_terminate_requested_calls(server->cq, &tag_new, 1); |
| return GRPC_CALL_OK; |
| } |
| |
| calld = call_list_remove_head(server, PENDING_START); |
| if (calld) { |
| GPR_ASSERT(calld->state == PENDING); |
| calld->state = ACTIVATED; |
| queue_new_rpc(server, calld, tag_new); |
| } else { |
| if (server->tag_cap == server->ntags) { |
| server->tag_cap = GPR_MAX(3 * server->tag_cap / 2, server->tag_cap + 1); |
| server->tags = |
| gpr_realloc(server->tags, sizeof(void *) * server->tag_cap); |
| } |
| server->tags[server->ntags++] = tag_new; |
| } |
| gpr_mu_unlock(&server->mu); |
| |
| return GRPC_CALL_OK; |
| } |
| |
| const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) { |
| return server->channel_args; |
| } |