| /* |
| * Generic workqueue offload mechanism |
| * |
| * Copyright (C) 2015 Jens Axboe <[email protected]> |
| * |
| */ |
| #include <unistd.h> |
| |
| #include "fio.h" |
| #include "flist.h" |
| #include "workqueue.h" |
| #include "smalloc.h" |
| |
| enum { |
| SW_F_IDLE = 1 << 0, |
| SW_F_RUNNING = 1 << 1, |
| SW_F_EXIT = 1 << 2, |
| SW_F_ACCOUNTED = 1 << 3, |
| SW_F_ERROR = 1 << 4, |
| }; |
| |
| static struct submit_worker *__get_submit_worker(struct workqueue *wq, |
| unsigned int start, |
| unsigned int end, |
| struct submit_worker **best) |
| { |
| struct submit_worker *sw = NULL; |
| |
| while (start <= end) { |
| sw = &wq->workers[start]; |
| if (sw->flags & SW_F_IDLE) |
| return sw; |
| if (!(*best) || sw->seq < (*best)->seq) |
| *best = sw; |
| start++; |
| } |
| |
| return NULL; |
| } |
| |
| static struct submit_worker *get_submit_worker(struct workqueue *wq) |
| { |
| unsigned int next = wq->next_free_worker; |
| struct submit_worker *sw, *best = NULL; |
| |
| assert(next < wq->max_workers); |
| |
| sw = __get_submit_worker(wq, next, wq->max_workers - 1, &best); |
| if (!sw && next) |
| sw = __get_submit_worker(wq, 0, next - 1, &best); |
| |
| /* |
| * No truly idle found, use best match |
| */ |
| if (!sw) |
| sw = best; |
| |
| if (sw->index == wq->next_free_worker) { |
| if (sw->index + 1 < wq->max_workers) |
| wq->next_free_worker = sw->index + 1; |
| else |
| wq->next_free_worker = 0; |
| } |
| |
| return sw; |
| } |
| |
| static bool all_sw_idle(struct workqueue *wq) |
| { |
| int i; |
| |
| for (i = 0; i < wq->max_workers; i++) { |
| struct submit_worker *sw = &wq->workers[i]; |
| |
| if (!(sw->flags & SW_F_IDLE)) |
| return false; |
| } |
| |
| return true; |
| } |
| |
| /* |
| * Must be serialized wrt workqueue_enqueue() by caller |
| */ |
| void workqueue_flush(struct workqueue *wq) |
| { |
| wq->wake_idle = 1; |
| |
| while (!all_sw_idle(wq)) { |
| pthread_mutex_lock(&wq->flush_lock); |
| pthread_cond_wait(&wq->flush_cond, &wq->flush_lock); |
| pthread_mutex_unlock(&wq->flush_lock); |
| } |
| |
| wq->wake_idle = 0; |
| } |
| |
| /* |
| * Must be serialized by caller. Returns true for queued, false for busy. |
| */ |
| void workqueue_enqueue(struct workqueue *wq, struct workqueue_work *work) |
| { |
| struct submit_worker *sw; |
| |
| sw = get_submit_worker(wq); |
| assert(sw); |
| |
| pthread_mutex_lock(&sw->lock); |
| flist_add_tail(&work->list, &sw->work_list); |
| sw->seq = ++wq->work_seq; |
| sw->flags &= ~SW_F_IDLE; |
| pthread_mutex_unlock(&sw->lock); |
| |
| pthread_cond_signal(&sw->cond); |
| } |
| |
| static void handle_list(struct submit_worker *sw, struct flist_head *list) |
| { |
| struct workqueue *wq = sw->wq; |
| struct workqueue_work *work; |
| |
| while (!flist_empty(list)) { |
| work = flist_first_entry(list, struct workqueue_work, list); |
| flist_del_init(&work->list); |
| wq->ops.fn(sw, work); |
| } |
| } |
| |
| static void *worker_thread(void *data) |
| { |
| struct submit_worker *sw = data; |
| struct workqueue *wq = sw->wq; |
| unsigned int ret = 0; |
| FLIST_HEAD(local_list); |
| |
| sk_out_assign(sw->sk_out); |
| |
| if (wq->ops.nice) { |
| if (nice(wq->ops.nice) < 0) { |
| log_err("workqueue: nice %s\n", strerror(errno)); |
| ret = 1; |
| } |
| } |
| |
| if (!ret) |
| ret = workqueue_init_worker(sw); |
| |
| pthread_mutex_lock(&sw->lock); |
| sw->flags |= SW_F_RUNNING; |
| if (ret) |
| sw->flags |= SW_F_ERROR; |
| pthread_mutex_unlock(&sw->lock); |
| |
| pthread_mutex_lock(&wq->flush_lock); |
| pthread_cond_signal(&wq->flush_cond); |
| pthread_mutex_unlock(&wq->flush_lock); |
| |
| if (sw->flags & SW_F_ERROR) |
| goto done; |
| |
| while (1) { |
| pthread_mutex_lock(&sw->lock); |
| |
| if (flist_empty(&sw->work_list)) { |
| if (sw->flags & SW_F_EXIT) { |
| pthread_mutex_unlock(&sw->lock); |
| break; |
| } |
| |
| if (workqueue_pre_sleep_check(sw)) { |
| pthread_mutex_unlock(&sw->lock); |
| workqueue_pre_sleep(sw); |
| pthread_mutex_lock(&sw->lock); |
| } |
| |
| /* |
| * We dropped and reaquired the lock, check |
| * state again. |
| */ |
| if (!flist_empty(&sw->work_list)) |
| goto handle_work; |
| |
| if (sw->flags & SW_F_EXIT) { |
| pthread_mutex_unlock(&sw->lock); |
| break; |
| } else if (!(sw->flags & SW_F_IDLE)) { |
| sw->flags |= SW_F_IDLE; |
| wq->next_free_worker = sw->index; |
| if (wq->wake_idle) |
| pthread_cond_signal(&wq->flush_cond); |
| } |
| if (wq->ops.update_acct_fn) |
| wq->ops.update_acct_fn(sw); |
| |
| pthread_cond_wait(&sw->cond, &sw->lock); |
| } else { |
| handle_work: |
| flist_splice_init(&sw->work_list, &local_list); |
| } |
| pthread_mutex_unlock(&sw->lock); |
| handle_list(sw, &local_list); |
| } |
| |
| if (wq->ops.update_acct_fn) |
| wq->ops.update_acct_fn(sw); |
| |
| done: |
| sk_out_drop(); |
| return NULL; |
| } |
| |
| static void free_worker(struct submit_worker *sw, unsigned int *sum_cnt) |
| { |
| struct workqueue *wq = sw->wq; |
| |
| workqueue_exit_worker(sw, sum_cnt); |
| |
| pthread_cond_destroy(&sw->cond); |
| pthread_mutex_destroy(&sw->lock); |
| |
| if (wq->ops.free_worker_fn) |
| wq->ops.free_worker_fn(sw); |
| } |
| |
| static void shutdown_worker(struct submit_worker *sw, unsigned int *sum_cnt) |
| { |
| pthread_join(sw->thread, NULL); |
| free_worker(sw, sum_cnt); |
| } |
| |
| void workqueue_exit(struct workqueue *wq) |
| { |
| unsigned int shutdown, sum_cnt = 0; |
| struct submit_worker *sw; |
| int i; |
| |
| if (!wq->workers) |
| return; |
| |
| for (i = 0; i < wq->max_workers; i++) { |
| sw = &wq->workers[i]; |
| |
| pthread_mutex_lock(&sw->lock); |
| sw->flags |= SW_F_EXIT; |
| pthread_cond_signal(&sw->cond); |
| pthread_mutex_unlock(&sw->lock); |
| } |
| |
| do { |
| shutdown = 0; |
| for (i = 0; i < wq->max_workers; i++) { |
| sw = &wq->workers[i]; |
| if (sw->flags & SW_F_ACCOUNTED) |
| continue; |
| pthread_mutex_lock(&sw->lock); |
| sw->flags |= SW_F_ACCOUNTED; |
| pthread_mutex_unlock(&sw->lock); |
| shutdown_worker(sw, &sum_cnt); |
| shutdown++; |
| } |
| } while (shutdown && shutdown != wq->max_workers); |
| |
| sfree(wq->workers); |
| wq->workers = NULL; |
| pthread_mutex_destroy(&wq->flush_lock); |
| pthread_cond_destroy(&wq->flush_cond); |
| pthread_mutex_destroy(&wq->stat_lock); |
| } |
| |
| static int start_worker(struct workqueue *wq, unsigned int index, |
| struct sk_out *sk_out) |
| { |
| struct submit_worker *sw = &wq->workers[index]; |
| int ret; |
| |
| INIT_FLIST_HEAD(&sw->work_list); |
| |
| ret = mutex_cond_init_pshared(&sw->lock, &sw->cond); |
| if (ret) |
| return ret; |
| |
| sw->wq = wq; |
| sw->index = index; |
| sw->sk_out = sk_out; |
| |
| if (wq->ops.alloc_worker_fn) { |
| ret = wq->ops.alloc_worker_fn(sw); |
| if (ret) |
| return ret; |
| } |
| |
| ret = pthread_create(&sw->thread, NULL, worker_thread, sw); |
| if (!ret) { |
| pthread_mutex_lock(&sw->lock); |
| sw->flags = SW_F_IDLE; |
| pthread_mutex_unlock(&sw->lock); |
| return 0; |
| } |
| |
| free_worker(sw, NULL); |
| return 1; |
| } |
| |
| int workqueue_init(struct thread_data *td, struct workqueue *wq, |
| struct workqueue_ops *ops, unsigned int max_workers, |
| struct sk_out *sk_out) |
| { |
| unsigned int running; |
| int i, error; |
| int ret; |
| |
| wq->max_workers = max_workers; |
| wq->td = td; |
| wq->ops = *ops; |
| wq->work_seq = 0; |
| wq->next_free_worker = 0; |
| |
| ret = mutex_cond_init_pshared(&wq->flush_lock, &wq->flush_cond); |
| if (ret) |
| goto err; |
| ret = mutex_init_pshared(&wq->stat_lock); |
| if (ret) |
| goto err; |
| |
| wq->workers = smalloc(wq->max_workers * sizeof(struct submit_worker)); |
| if (!wq->workers) |
| goto err; |
| |
| for (i = 0; i < wq->max_workers; i++) |
| if (start_worker(wq, i, sk_out)) |
| break; |
| |
| wq->max_workers = i; |
| if (!wq->max_workers) |
| goto err; |
| |
| /* |
| * Wait for them all to be started and initialized |
| */ |
| error = 0; |
| do { |
| struct submit_worker *sw; |
| |
| running = 0; |
| pthread_mutex_lock(&wq->flush_lock); |
| for (i = 0; i < wq->max_workers; i++) { |
| sw = &wq->workers[i]; |
| pthread_mutex_lock(&sw->lock); |
| if (sw->flags & SW_F_RUNNING) |
| running++; |
| if (sw->flags & SW_F_ERROR) |
| error++; |
| pthread_mutex_unlock(&sw->lock); |
| } |
| |
| if (error || running == wq->max_workers) { |
| pthread_mutex_unlock(&wq->flush_lock); |
| break; |
| } |
| |
| pthread_cond_wait(&wq->flush_cond, &wq->flush_lock); |
| pthread_mutex_unlock(&wq->flush_lock); |
| } while (1); |
| |
| if (!error) |
| return 0; |
| |
| err: |
| log_err("Can't create rate workqueue\n"); |
| td_verror(td, ESRCH, "workqueue_init"); |
| workqueue_exit(wq); |
| return 1; |
| } |