blob: e02d56c0a123948ae9fd86b2737069f22ab38b81 [file] [log] [blame]
/*
*
* Copyright 2014, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include "src/core/eventmanager/em.h"
#include <unistd.h>
#include <fcntl.h>
#include <grpc/support/atm.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
#include <grpc/support/time.h>
#include <event2/event.h>
#include <event2/thread.h>
int evthread_use_threads(void);
#define ALARM_TRIGGER_INIT ((gpr_atm)0)
#define ALARM_TRIGGER_INCREMENT ((gpr_atm)1)
#define DONE_SHUTDOWN ((void *)1)
#define POLLER_ID_INVALID ((gpr_atm)-1)
/* ================== grpc_em implementation ===================== */
/* If anything is in the work queue, process one item and return 1.
Return 0 if there were no work items to complete.
Requires em->mu locked, may unlock and relock during the call. */
static int maybe_do_queue_work(grpc_em *em) {
grpc_em_activation_data *work = em->q;
if (work == NULL) return 0;
if (work->next == work) {
em->q = NULL;
} else {
em->q = work->next;
em->q->prev = work->prev;
em->q->next->prev = em->q->prev->next = em->q;
}
work->next = work->prev = NULL;
gpr_mu_unlock(&em->mu);
work->cb(work->arg, work->status);
gpr_mu_lock(&em->mu);
return 1;
}
/* Break out of the event loop on timeout */
static void timer_callback(int fd, short events, void *context) {
event_base_loopbreak((struct event_base *)context);
}
/* Spend some time polling if no other thread is.
Returns 1 if polling was performed, 0 otherwise.
Requires em->mu locked, may unlock and relock during the call. */
static int maybe_do_polling_work(grpc_em *em, struct timeval delay) {
int status;
if (em->num_pollers) return 0;
em->num_pollers = 1;
gpr_mu_unlock(&em->mu);
event_add(em->timeout_ev, &delay);
status = event_base_loop(em->event_base, EVLOOP_ONCE);
if (status < 0) {
gpr_log(GPR_ERROR, "event polling loop stops with error status %d", status);
}
event_del(em->timeout_ev);
gpr_mu_lock(&em->mu);
em->num_pollers = 0;
gpr_cv_broadcast(&em->cv);
return 1;
}
int grpc_em_work(grpc_em *em, gpr_timespec deadline) {
gpr_timespec delay_timespec = gpr_time_sub(deadline, gpr_now());
/* poll for no longer than one second */
gpr_timespec max_delay = {1, 0};
struct timeval delay;
GPR_ASSERT(em);
if (gpr_time_cmp(delay_timespec, gpr_time_0) <= 0) {
return 0;
}
if (gpr_time_cmp(delay_timespec, max_delay) > 0) {
delay_timespec = max_delay;
}
delay = gpr_timeval_from_timespec(delay_timespec);
if (maybe_do_queue_work(em) || maybe_do_polling_work(em, delay)) {
em->last_poll_completed = gpr_now();
return 1;
}
return 0;
}
static void backup_poller_thread(void *p) {
grpc_em *em = p;
int backup_poller_engaged = 0;
/* allow no pollers for 100 milliseconds, then engage backup polling */
gpr_timespec allow_no_pollers = gpr_time_from_micros(100 * 1000);
gpr_mu_lock(&em->mu);
while (!em->shutdown_backup_poller) {
if (em->num_pollers == 0) {
gpr_timespec now = gpr_now();
gpr_timespec time_until_engage = gpr_time_sub(
allow_no_pollers, gpr_time_sub(now, em->last_poll_completed));
if (gpr_time_cmp(time_until_engage, gpr_time_0) <= 0) {
if (!backup_poller_engaged) {
gpr_log(GPR_DEBUG, "No pollers for a while - engaging backup poller");
backup_poller_engaged = 1;
}
if (!maybe_do_queue_work(em)) {
struct timeval tv = {1, 0};
maybe_do_polling_work(em, tv);
}
} else {
if (backup_poller_engaged) {
gpr_log(GPR_DEBUG, "Backup poller disengaged");
backup_poller_engaged = 0;
}
gpr_mu_unlock(&em->mu);
gpr_sleep_until(gpr_time_add(now, time_until_engage));
gpr_mu_lock(&em->mu);
}
} else {
if (backup_poller_engaged) {
gpr_log(GPR_DEBUG, "Backup poller disengaged");
backup_poller_engaged = 0;
}
gpr_cv_wait(&em->cv, &em->mu, gpr_inf_future);
}
}
gpr_mu_unlock(&em->mu);
gpr_event_set(&em->backup_poller_done, (void *)1);
}
grpc_em_error grpc_em_init(grpc_em *em) {
gpr_thd_id backup_poller_id;
if (evthread_use_threads() != 0) {
gpr_log(GPR_ERROR, "Failed to initialize libevent thread support!");
return GRPC_EM_ERROR;
}
gpr_mu_init(&em->mu);
gpr_cv_init(&em->cv);
em->q = NULL;
em->num_pollers = 0;
em->num_fds = 0;
em->last_poll_completed = gpr_now();
em->shutdown_backup_poller = 0;
gpr_event_init(&em->backup_poller_done);
em->event_base = NULL;
em->timeout_ev = NULL;
em->event_base = event_base_new();
if (!em->event_base) {
gpr_log(GPR_ERROR, "Failed to create the event base");
return GRPC_EM_ERROR;
}
if (evthread_make_base_notifiable(em->event_base) != 0) {
gpr_log(GPR_ERROR, "Couldn't make event base notifiable cross threads!");
return GRPC_EM_ERROR;
}
em->timeout_ev = evtimer_new(em->event_base, timer_callback, em->event_base);
gpr_thd_new(&backup_poller_id, backup_poller_thread, em, NULL);
return GRPC_EM_OK;
}
grpc_em_error grpc_em_destroy(grpc_em *em) {
gpr_timespec fd_shutdown_deadline =
gpr_time_add(gpr_now(), gpr_time_from_micros(10 * 1000 * 1000));
/* broadcast shutdown */
gpr_mu_lock(&em->mu);
while (em->num_fds) {
gpr_log(GPR_INFO,
"waiting for %d fds to be destroyed before closing event manager",
em->num_fds);
if (gpr_cv_wait(&em->cv, &em->mu, fd_shutdown_deadline)) {
gpr_log(GPR_ERROR,
"not all fds destroyed before shutdown deadline: memory leaks "
"are likely");
break;
} else if (em->num_fds == 0) {
gpr_log(GPR_INFO, "all fds closed");
}
}
em->shutdown_backup_poller = 1;
gpr_cv_broadcast(&em->cv);
gpr_mu_unlock(&em->mu);
gpr_event_wait(&em->backup_poller_done, gpr_inf_future);
/* drain pending work */
gpr_mu_lock(&em->mu);
while (maybe_do_queue_work(em))
;
gpr_mu_unlock(&em->mu);
/* complete shutdown */
gpr_mu_destroy(&em->mu);
gpr_cv_destroy(&em->cv);
if (em->timeout_ev != NULL) {
event_free(em->timeout_ev);
}
if (em->event_base != NULL) {
event_base_free(em->event_base);
em->event_base = NULL;
}
return GRPC_EM_OK;
}
static void add_task(grpc_em *em, grpc_em_activation_data *adata) {
gpr_mu_lock(&em->mu);
if (em->q) {
adata->next = em->q;
adata->prev = adata->next->prev;
adata->next->prev = adata->prev->next = adata;
} else {
em->q = adata;
adata->next = adata->prev = adata;
}
gpr_cv_broadcast(&em->cv);
gpr_mu_unlock(&em->mu);
}
/* ===============grpc_em_alarm implementation==================== */
/* The following function frees up the alarm's libevent structure and
should always be invoked just before calling the alarm's callback */
static void alarm_ev_destroy(grpc_em_alarm *alarm) {
grpc_em_activation_data *adata = &alarm->task.activation[GRPC_EM_TA_ONLY];
if (adata->ev != NULL) {
event_free(adata->ev);
adata->ev = NULL;
}
}
/* Proxy callback triggered by alarm->ev to call alarm->cb */
static void libevent_alarm_cb(int fd, short what, void *arg /*=alarm*/) {
grpc_em_alarm *alarm = arg;
grpc_em_activation_data *adata = &alarm->task.activation[GRPC_EM_TA_ONLY];
int trigger_old;
/* First check if this alarm has been canceled, atomically */
trigger_old =
gpr_atm_full_fetch_add(&alarm->triggered, ALARM_TRIGGER_INCREMENT);
if (trigger_old == ALARM_TRIGGER_INIT) {
/* Before invoking user callback, destroy the libevent structure */
alarm_ev_destroy(alarm);
adata->status = GRPC_CALLBACK_SUCCESS;
add_task(alarm->task.em, adata);
}
}
grpc_em_error grpc_em_alarm_init(grpc_em_alarm *alarm, grpc_em *em,
grpc_em_cb_func alarm_cb, void *alarm_cb_arg) {
grpc_em_activation_data *adata = &alarm->task.activation[GRPC_EM_TA_ONLY];
alarm->task.type = GRPC_EM_TASK_ALARM;
alarm->task.em = em;
gpr_atm_rel_store(&alarm->triggered, ALARM_TRIGGER_INIT);
adata->cb = alarm_cb;
adata->arg = alarm_cb_arg;
adata->prev = NULL;
adata->next = NULL;
adata->ev = NULL;
return GRPC_EM_OK;
}
grpc_em_error grpc_em_alarm_add(grpc_em_alarm *alarm, gpr_timespec deadline) {
grpc_em_activation_data *adata = &alarm->task.activation[GRPC_EM_TA_ONLY];
gpr_timespec delay_timespec = gpr_time_sub(deadline, gpr_now());
struct timeval delay = gpr_timeval_from_timespec(delay_timespec);
if (adata->ev) {
event_free(adata->ev);
gpr_log(GPR_INFO, "Adding an alarm that already has an event.");
adata->ev = NULL;
}
adata->ev = evtimer_new(alarm->task.em->event_base, libevent_alarm_cb, alarm);
/* Set the trigger field to untriggered. Do this as the last store since
it is a release of previous stores. */
gpr_atm_rel_store(&alarm->triggered, ALARM_TRIGGER_INIT);
if (adata->ev != NULL && evtimer_add(adata->ev, &delay) == 0) {
return GRPC_EM_OK;
} else {
return GRPC_EM_ERROR;
}
}
grpc_em_error grpc_em_alarm_cancel(grpc_em_alarm *alarm, void **arg) {
grpc_em_activation_data *adata = &alarm->task.activation[GRPC_EM_TA_ONLY];
int trigger_old;
*arg = adata->arg;
/* First check if this alarm has been triggered, atomically */
trigger_old =
gpr_atm_full_fetch_add(&alarm->triggered, ALARM_TRIGGER_INCREMENT);
if (trigger_old == ALARM_TRIGGER_INIT) {
/* We need to make sure that we only invoke the callback if it hasn't
already been invoked */
/* First remove this event from libevent. This returns success even if the
event has gone active or invoked its callback. */
if (evtimer_del(adata->ev) != 0) {
/* The delete was unsuccessful for some reason. */
gpr_log(GPR_ERROR, "Attempt to delete alarm event was unsuccessful");
return GRPC_EM_ERROR;
}
/* Free up the event structure before invoking callback */
alarm_ev_destroy(alarm);
adata->status = GRPC_CALLBACK_CANCELLED;
add_task(alarm->task.em, adata);
}
return GRPC_EM_OK;
}
/* ==================== grpc_em_fd implementation =================== */
/* Proxy callback to call a gRPC read/write callback */
static void em_fd_cb(int fd, short what, void *arg /*=em_fd*/) {
grpc_em_fd *em_fd = arg;
grpc_em_cb_status status = GRPC_CALLBACK_SUCCESS;
int run_read_cb = 0;
int run_write_cb = 0;
grpc_em_activation_data *rdata, *wdata;
gpr_mu_lock(&em_fd->mu);
/* TODO(klempner): We need to delete the event here too so we avoid spurious
shutdowns. */
if (em_fd->shutdown_started) {
status = GRPC_CALLBACK_CANCELLED;
} else if (status == GRPC_CALLBACK_SUCCESS && (what & EV_TIMEOUT)) {
status = GRPC_CALLBACK_TIMED_OUT;
/* TODO(klempner): This is broken if we are monitoring both read and write
events on the same fd -- generating a spurious event is okay, but
generating a spurious timeout is not. */
what |= (EV_READ | EV_WRITE);
}
if (what & EV_READ) {
switch (em_fd->read_state) {
case GRPC_EM_FD_WAITING:
run_read_cb = 1;
em_fd->read_state = GRPC_EM_FD_IDLE;
break;
case GRPC_EM_FD_IDLE:
case GRPC_EM_FD_CACHED:
em_fd->read_state = GRPC_EM_FD_CACHED;
}
}
if (what & EV_WRITE) {
switch (em_fd->write_state) {
case GRPC_EM_FD_WAITING:
run_write_cb = 1;
em_fd->write_state = GRPC_EM_FD_IDLE;
break;
case GRPC_EM_FD_IDLE:
case GRPC_EM_FD_CACHED:
em_fd->write_state = GRPC_EM_FD_CACHED;
}
}
if (run_read_cb) {
rdata = &(em_fd->task.activation[GRPC_EM_TA_READ]);
rdata->status = status;
add_task(em_fd->task.em, rdata);
} else if (run_write_cb) {
wdata = &(em_fd->task.activation[GRPC_EM_TA_WRITE]);
wdata->status = status;
add_task(em_fd->task.em, wdata);
}
gpr_mu_unlock(&em_fd->mu);
}
static void em_fd_shutdown_cb(int fd, short what, void *arg /*=em_fd*/) {
/* TODO(klempner): This could just run directly in the calling thread, except
that libevent's handling of event_active() on an event which is already in
flight on a different thread is racy and easily triggers TSAN.
*/
grpc_em_fd *em_fd = arg;
gpr_mu_lock(&em_fd->mu);
em_fd->shutdown_started = 1;
if (em_fd->read_state == GRPC_EM_FD_WAITING) {
event_active(em_fd->task.activation[GRPC_EM_TA_READ].ev, EV_READ, 1);
}
if (em_fd->write_state == GRPC_EM_FD_WAITING) {
event_active(em_fd->task.activation[GRPC_EM_TA_WRITE].ev, EV_WRITE, 1);
}
gpr_mu_unlock(&em_fd->mu);
}
grpc_em_error grpc_em_fd_init(grpc_em_fd *em_fd, grpc_em *em, int fd) {
int flags;
grpc_em_activation_data *rdata, *wdata;
gpr_mu_lock(&em->mu);
em->num_fds++;
gpr_mu_unlock(&em->mu);
em_fd->shutdown_ev = NULL;
gpr_mu_init(&em_fd->mu);
flags = fcntl(fd, F_GETFL, 0);
if ((flags & O_NONBLOCK) == 0) {
gpr_log(GPR_ERROR, "File descriptor %d is blocking", fd);
return GRPC_EM_INVALID_ARGUMENTS;
}
em_fd->task.type = GRPC_EM_TASK_FD;
em_fd->task.em = em;
em_fd->fd = fd;
rdata = &(em_fd->task.activation[GRPC_EM_TA_READ]);
rdata->ev = NULL;
rdata->cb = NULL;
rdata->arg = NULL;
rdata->status = GRPC_CALLBACK_SUCCESS;
rdata->prev = NULL;
rdata->next = NULL;
wdata = &(em_fd->task.activation[GRPC_EM_TA_WRITE]);
wdata->ev = NULL;
wdata->cb = NULL;
wdata->arg = NULL;
wdata->status = GRPC_CALLBACK_SUCCESS;
wdata->prev = NULL;
wdata->next = NULL;
em_fd->read_state = GRPC_EM_FD_IDLE;
em_fd->write_state = GRPC_EM_FD_IDLE;
/* TODO(chenw): detect platforms where only level trigger is supported,
and set the event to non-persist. */
rdata->ev = event_new(em->event_base, em_fd->fd, EV_ET | EV_PERSIST | EV_READ,
em_fd_cb, em_fd);
if (!rdata->ev) {
gpr_log(GPR_ERROR, "Failed to create read event");
return GRPC_EM_ERROR;
}
wdata->ev = event_new(em->event_base, em_fd->fd,
EV_ET | EV_PERSIST | EV_WRITE, em_fd_cb, em_fd);
if (!wdata->ev) {
gpr_log(GPR_ERROR, "Failed to create write event");
return GRPC_EM_ERROR;
}
em_fd->shutdown_ev =
event_new(em->event_base, -1, EV_READ, em_fd_shutdown_cb, em_fd);
if (!em_fd->shutdown_ev) {
gpr_log(GPR_ERROR, "Failed to create shutdown event");
return GRPC_EM_ERROR;
}
em_fd->shutdown_started = 0;
return GRPC_EM_OK;
}
void grpc_em_fd_destroy(grpc_em_fd *em_fd) {
grpc_em_task_activity_type type;
grpc_em_activation_data *adata;
grpc_em *em = em_fd->task.em;
/* ensure anyone holding the lock has left - it's the callers responsibility
to ensure that no new users enter */
gpr_mu_lock(&em_fd->mu);
gpr_mu_unlock(&em_fd->mu);
for (type = GRPC_EM_TA_READ; type < GRPC_EM_TA_COUNT; type++) {
adata = &(em_fd->task.activation[type]);
GPR_ASSERT(adata->next == NULL);
if (adata->ev != NULL) {
event_free(adata->ev);
adata->ev = NULL;
}
}
if (em_fd->shutdown_ev != NULL) {
event_free(em_fd->shutdown_ev);
em_fd->shutdown_ev = NULL;
}
gpr_mu_destroy(&em_fd->mu);
gpr_mu_lock(&em->mu);
em->num_fds--;
gpr_cv_broadcast(&em->cv);
gpr_mu_unlock(&em->mu);
}
int grpc_em_fd_get(struct grpc_em_fd *em_fd) { return em_fd->fd; }
/* Returns the event manager associated with *em_fd. */
grpc_em *grpc_em_fd_get_em(grpc_em_fd *em_fd) { return em_fd->task.em; }
/* TODO(chenw): should we enforce the contract that notify_on_read cannot be
called when the previously registered callback has not been called yet. */
grpc_em_error grpc_em_fd_notify_on_read(grpc_em_fd *em_fd,
grpc_em_cb_func read_cb,
void *read_cb_arg,
gpr_timespec deadline) {
int force_event = 0;
grpc_em_activation_data *rdata;
grpc_em_error result = GRPC_EM_OK;
gpr_timespec delay_timespec = gpr_time_sub(deadline, gpr_now());
struct timeval delay = gpr_timeval_from_timespec(delay_timespec);
struct timeval *delayp =
gpr_time_cmp(deadline, gpr_inf_future) ? &delay : NULL;
rdata = &em_fd->task.activation[GRPC_EM_TA_READ];
gpr_mu_lock(&em_fd->mu);
rdata->cb = read_cb;
rdata->arg = read_cb_arg;
force_event =
(em_fd->shutdown_started || em_fd->read_state == GRPC_EM_FD_CACHED);
em_fd->read_state = GRPC_EM_FD_WAITING;
if (force_event) {
event_active(rdata->ev, EV_READ, 1);
} else if (event_add(rdata->ev, delayp) == -1) {
result = GRPC_EM_ERROR;
}
gpr_mu_unlock(&em_fd->mu);
return result;
}
grpc_em_error grpc_em_fd_notify_on_write(grpc_em_fd *em_fd,
grpc_em_cb_func write_cb,
void *write_cb_arg,
gpr_timespec deadline) {
int force_event = 0;
grpc_em_activation_data *wdata;
grpc_em_error result = GRPC_EM_OK;
gpr_timespec delay_timespec = gpr_time_sub(deadline, gpr_now());
struct timeval delay = gpr_timeval_from_timespec(delay_timespec);
struct timeval *delayp =
gpr_time_cmp(deadline, gpr_inf_future) ? &delay : NULL;
wdata = &em_fd->task.activation[GRPC_EM_TA_WRITE];
gpr_mu_lock(&em_fd->mu);
wdata->cb = write_cb;
wdata->arg = write_cb_arg;
force_event =
(em_fd->shutdown_started || em_fd->write_state == GRPC_EM_FD_CACHED);
em_fd->write_state = GRPC_EM_FD_WAITING;
if (force_event) {
event_active(wdata->ev, EV_WRITE, 1);
} else if (event_add(wdata->ev, delayp) == -1) {
result = GRPC_EM_ERROR;
}
gpr_mu_unlock(&em_fd->mu);
return result;
}
void grpc_em_fd_shutdown(grpc_em_fd *em_fd) {
event_active(em_fd->shutdown_ev, EV_READ, 1);
}
/*====================== Other callback functions ======================*/
/* Sometimes we want a followup callback: something to be added from the
current callback for the EM to invoke once this callback is complete.
This is implemented by inserting an entry into an EM queue. */
/* The following structure holds the field needed for adding the
followup callback. These are the argument for the followup callback,
the function to use for the followup callback, and the
activation data pointer used for the queues (to free in the CB) */
struct followup_callback_arg {
grpc_em_cb_func func;
void *cb_arg;
grpc_em_activation_data adata;
};
static void followup_proxy_callback(void *cb_arg, grpc_em_cb_status status) {
struct followup_callback_arg *fcb_arg = cb_arg;
/* Invoke the function */
fcb_arg->func(fcb_arg->cb_arg, status);
gpr_free(fcb_arg);
}
grpc_em_error grpc_em_add_callback(grpc_em *em, grpc_em_cb_func cb,
void *cb_arg) {
grpc_em_activation_data *adptr;
struct followup_callback_arg *fcb_arg;
fcb_arg = gpr_malloc(sizeof(*fcb_arg));
if (fcb_arg == NULL) {
return GRPC_EM_ERROR;
}
/* Set up the activation data and followup callback argument structures */
adptr = &fcb_arg->adata;
adptr->ev = NULL;
adptr->cb = followup_proxy_callback;
adptr->arg = fcb_arg;
adptr->status = GRPC_CALLBACK_SUCCESS;
adptr->prev = NULL;
adptr->next = NULL;
fcb_arg->func = cb;
fcb_arg->cb_arg = cb_arg;
/* Insert an activation data for the specified em */
add_task(em, adptr);
return GRPC_EM_OK;
}