| /* |
| * Copyright (c) 2022 Samsung Electronics Co., Ltd. |
| * All Rights Reserved. |
| * |
| * Redistribution and use in source and binary forms, with or without |
| * modification, are permitted provided that the following conditions are met: |
| * |
| * - Redistributions of source code must retain the above copyright notice, |
| * this list of conditions and the following disclaimer. |
| * |
| * - Redistributions in binary form must reproduce the above copyright notice, |
| * this list of conditions and the following disclaimer in the documentation |
| * and/or other materials provided with the distribution. |
| * |
| * - Neither the name of the copyright owner, nor the names of its contributors |
| * may be used to endorse or promote products derived from this software |
| * without specific prior written permission. |
| * |
| * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
| * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
| * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
| * ARE DISCLAIMED.IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE |
| * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
| * CONSEQUENTIAL DAMAGES(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
| * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
| * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
| * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
| * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
| * POSSIBILITY OF SUCH DAMAGE. |
| */ |
| |
| #include <stdio.h> |
| #include <stdlib.h> |
| #include "oapv_tpool.h" |
| #if defined(WIN32) || defined(WIN64) |
| #include <windows.h> |
| #include <process.h> |
| #else |
| #include <pthread.h> |
| #endif |
| |
| #define WINDOWS_MUTEX_SYNC 0 |
| |
| #if !defined(WIN32) && !defined(WIN64) |
| |
| typedef struct thread_ctx { |
| // synchronization members |
| pthread_t t_handle; // worker thread handle |
| pthread_attr_t tAttribute; // worker thread attribute |
| pthread_cond_t w_event; // wait event for worker thread |
| pthread_cond_t r_event; // wait event for main thread |
| pthread_mutex_t c_section; // for synchronization |
| |
| // member field to run a task |
| oapv_fn_thread_entry_t task; |
| void *t_arg; |
| tpool_status_t t_status; |
| tpool_result_t t_result; |
| int thread_id; |
| int task_ret; // return value of task function |
| } thread_ctx_t; |
| |
| typedef struct thread_mutex { |
| pthread_mutex_t lmutex; |
| } thread_mutex_t; |
| |
| static void *tpool_worker_thread(void *arg) |
| { |
| /********************* main routine for thread pool worker thread ************************* |
| ********************** worker thread can remain in suspended or running state ************* |
| ********************* control the synchronization with help of thread context members *****/ |
| |
| // member Initialization section |
| thread_ctx_t *t_context = (thread_ctx_t *)arg; |
| int ret; |
| |
| if(!t_context) { |
| return 0; // error handling, more like a fail safe mechanism |
| } |
| |
| while(1) { |
| // worker thread loop |
| // remains suspended/sleep waiting for an event |
| |
| // get the mutex and check the state |
| pthread_mutex_lock(&t_context->c_section); |
| while(t_context->t_status == TPOOL_SUSPENDED) { |
| // wait for the event |
| pthread_cond_wait(&t_context->w_event, &t_context->c_section); |
| } |
| |
| if(t_context->t_status == TPOOL_TERMINATED) { |
| t_context->t_result = TPOOL_SUCCESS; |
| pthread_mutex_unlock(&t_context->c_section); |
| break; // exit the routine |
| } |
| |
| t_context->t_status = TPOOL_RUNNING; |
| pthread_mutex_unlock(&t_context->c_section); |
| |
| // run the routine |
| // worker thread state is running with entry function and arg set |
| ret = t_context->task(t_context->t_arg); |
| |
| // signal the thread waiting on the result |
| pthread_mutex_lock(&t_context->c_section); |
| t_context->t_status = TPOOL_SUSPENDED; |
| t_context->t_result = TPOOL_SUCCESS; |
| t_context->task_ret = ret; |
| pthread_cond_signal(&t_context->r_event); |
| pthread_mutex_unlock(&t_context->c_section); |
| } |
| |
| return 0; |
| } |
| |
| static oapv_thread_t tpool_create_thread(oapv_tpool_t *tp, int thread_id) |
| { |
| if(!tp) { |
| return NULL; // error management |
| } |
| |
| thread_ctx_t *tctx = NULL; |
| |
| tctx = (thread_ctx_t *)malloc(sizeof(thread_ctx_t)); |
| |
| if(!tctx) { |
| return NULL; // error management, bad alloc |
| } |
| |
| int result = 1; |
| |
| // intialize conditional variable and mutexes |
| result = pthread_mutex_init(&tctx->c_section, NULL); |
| if(result) { |
| goto TERROR; // error handling |
| } |
| result = pthread_cond_init(&tctx->w_event, NULL); |
| if(result) { |
| goto TERROR; |
| } |
| result = pthread_cond_init(&tctx->r_event, NULL); |
| if(result) { |
| goto TERROR; |
| } |
| |
| // initialize the worker thread attribute and set the type to joinable |
| result = pthread_attr_init(&tctx->tAttribute); |
| if(result) { |
| goto TERROR; |
| } |
| |
| result = pthread_attr_setdetachstate(&tctx->tAttribute, PTHREAD_CREATE_JOINABLE); |
| if(result) { |
| goto TERROR; |
| } |
| |
| tctx->task = NULL; |
| tctx->t_arg = NULL; |
| tctx->t_status = TPOOL_SUSPENDED; |
| tctx->t_result = TPOOL_INVALID_STATE; |
| tctx->thread_id = thread_id; |
| |
| // create the worker thread |
| result = pthread_create(&tctx->t_handle, &tctx->tAttribute, tpool_worker_thread, (void *)(tctx)); |
| if(result) { |
| goto TERROR; |
| } |
| |
| // deinit the attribue |
| pthread_attr_destroy(&tctx->tAttribute); |
| return (oapv_thread_t)tctx; |
| |
| TERROR: |
| pthread_mutex_destroy(&tctx->c_section); |
| pthread_cond_destroy(&tctx->w_event); |
| pthread_cond_destroy(&tctx->r_event); |
| pthread_attr_destroy(&tctx->tAttribute); |
| free(tctx); |
| |
| return NULL; // error handling, can't create a worker thread with proper initialization |
| } |
| |
| static tpool_result_t tpool_assign_task(oapv_thread_t thread_id, oapv_fn_thread_entry_t entry, void *arg) |
| { |
| // assign the task function and argument |
| // worker thread may be in running state or suspended state |
| // if worker thread is in suspended state, it can be waiting for first run or it has finished one task and is waiting again |
| // if worker thread is in running state, it will come to waiting state |
| // in any case, waiting on read event will always work |
| |
| thread_ctx_t *tctx = (thread_ctx_t *)(thread_id); |
| if(!tctx) { |
| return TPOOL_INVALID_ARG; |
| } |
| |
| // lock the mutex and wait on read event |
| pthread_mutex_lock(&tctx->c_section); |
| while(tctx->t_status == TPOOL_RUNNING) { |
| pthread_cond_wait(&tctx->r_event, &tctx->c_section); |
| } |
| |
| // thread is in suspended state |
| tctx->t_status = TPOOL_RUNNING; |
| tctx->task = entry; |
| tctx->t_arg = arg; |
| // signal the worker thread to wake up and run the task |
| pthread_cond_signal(&tctx->w_event); |
| pthread_mutex_unlock(&tctx->c_section); // release the lock |
| |
| return TPOOL_SUCCESS; |
| } |
| |
| static tpool_result_t tpool_retrieve_result(oapv_thread_t thread_id, int *ret) |
| { |
| // whatever task has been assigned to worker thread |
| // wait for it to finish get the result |
| |
| thread_ctx_t *t_context = (thread_ctx_t *)(thread_id); |
| if(!t_context) { |
| return TPOOL_INVALID_ARG; |
| } |
| |
| tpool_result_t result = TPOOL_SUCCESS; |
| |
| pthread_mutex_lock(&t_context->c_section); |
| while(TPOOL_RUNNING == t_context->t_status) { |
| pthread_cond_wait(&t_context->r_event, &t_context->c_section); |
| } |
| result = t_context->t_result; |
| if(ret != NULL) |
| *ret = t_context->task_ret; |
| pthread_mutex_unlock(&t_context->c_section); |
| return result; |
| } |
| |
| static tpool_result_t tpool_terminate_thread(oapv_thread_t *thread_id) |
| { |
| // handler to close the thread |
| // close the thread handle |
| // release all the resource |
| // delete the thread context object |
| |
| thread_ctx_t *t_context = (thread_ctx_t *)(*thread_id); |
| if(!t_context) { |
| return TPOOL_INVALID_ARG; |
| } |
| |
| // The worker thread might be in suspended state or may be processing a task |
| pthread_mutex_lock(&t_context->c_section); |
| while(TPOOL_RUNNING == t_context->t_status) { |
| pthread_cond_wait(&t_context->r_event, &t_context->c_section); |
| } |
| |
| t_context->t_status = TPOOL_TERMINATED; |
| pthread_cond_signal(&t_context->w_event); |
| |
| pthread_mutex_unlock(&t_context->c_section); |
| |
| // join the worker thread |
| pthread_join(t_context->t_handle, NULL); |
| |
| // clean all the synchronization memebers |
| pthread_mutex_destroy(&t_context->c_section); |
| pthread_cond_destroy(&t_context->w_event); |
| pthread_cond_destroy(&t_context->r_event); |
| |
| // delete the thread context memory |
| free(t_context); |
| (*thread_id) = NULL; |
| return TPOOL_SUCCESS; |
| } |
| |
| static int tpool_threadsafe_decrement(oapv_sync_obj_t sobj, volatile int *pcnt) |
| { |
| thread_mutex_t *imutex = (thread_mutex_t *)(sobj); |
| int temp = 0; |
| |
| // lock the mutex, decrement the count and release the mutex |
| pthread_mutex_lock(&imutex->lmutex); |
| temp = *pcnt; |
| *pcnt = --temp; |
| pthread_mutex_unlock(&imutex->lmutex); |
| |
| return temp; |
| } |
| |
| oapv_sync_obj_t oapv_tpool_sync_obj_create() |
| { |
| thread_mutex_t *imutex = (thread_mutex_t *)malloc(sizeof(thread_mutex_t)); |
| if(0 == imutex) { |
| return 0; // failure case |
| } |
| |
| // intialize the mutex |
| int result = pthread_mutex_init(&imutex->lmutex, NULL); |
| if(result) { |
| if(imutex) { |
| free(imutex); |
| } |
| imutex = 0; |
| } |
| |
| return imutex; |
| } |
| |
| tpool_result_t oapv_tpool_sync_obj_delete(oapv_sync_obj_t *sobj) |
| { |
| |
| thread_mutex_t *imutex = (thread_mutex_t *)(*sobj); |
| |
| // delete the mutex |
| pthread_mutex_destroy(&imutex->lmutex); |
| |
| // free the memory |
| free(imutex); |
| *sobj = NULL; |
| |
| return TPOOL_SUCCESS; |
| } |
| |
| void oapv_tpool_enter_cs(oapv_sync_obj_t sobj) |
| { |
| thread_mutex_t *imutex = (thread_mutex_t *)(sobj); |
| pthread_mutex_lock(&imutex->lmutex); |
| } |
| |
| void oapv_tpool_leave_cs(oapv_sync_obj_t sobj) |
| { |
| thread_mutex_t *imutex = (thread_mutex_t *)(sobj); |
| pthread_mutex_unlock(&imutex->lmutex); |
| } |
| |
| #else |
| typedef struct thread_ctx { |
| // synchronization members |
| HANDLE t_handle; // worker thread handle |
| HANDLE w_event; // worker thread waiting event handle |
| HANDLE r_event; // signalling thread read event handle |
| CRITICAL_SECTION c_section; // critical section for fast synchronization |
| |
| // member field to run a task |
| oapv_fn_thread_entry_t task; |
| void *t_arg; |
| tpool_status_t t_status; |
| tpool_result_t t_result; |
| int task_ret; |
| int thread_id; |
| |
| } thread_ctx_t; |
| |
| typedef struct thread_mutex { |
| #if WINDOWS_MUTEX_SYNC |
| HANDLE lmutex; |
| #else |
| CRITICAL_SECTION c_section; // critical section for fast synchronization |
| #endif |
| |
| } thread_mutex_t; |
| |
| static unsigned int __stdcall tpool_worker_thread(void *arg) |
| { |
| /********************* main routine for thread pool worker thread ************************* |
| ********************** worker thread can remain in suspended or running state ************* |
| ********************* control the synchronization with help of thread context members *****/ |
| |
| // member Initialization section |
| thread_ctx_t *t_context = (thread_ctx_t *)arg; |
| if(!t_context) { |
| return 0; // error handling, more like a fail safe mechanism |
| } |
| |
| while(1) { |
| // worker thread loop |
| // remains suspended/sleep waiting for an event |
| WaitForSingleObject(t_context->w_event, INFINITE); |
| |
| // worker thread has received the event to wake up and perform operation |
| EnterCriticalSection(&t_context->c_section); |
| if(t_context->t_status == TPOOL_TERMINATED) { |
| // received signal to terminate |
| t_context->t_result = TPOOL_SUCCESS; |
| LeaveCriticalSection(&t_context->c_section); |
| break; |
| } |
| LeaveCriticalSection(&t_context->c_section); |
| |
| // worker thread state is running with entry function and arg set |
| t_context->task_ret = t_context->task(t_context->t_arg); |
| |
| // change the state to suspended/waiting |
| EnterCriticalSection(&t_context->c_section); |
| t_context->t_status = TPOOL_SUSPENDED; |
| t_context->t_result = TPOOL_SUCCESS; |
| LeaveCriticalSection(&t_context->c_section); |
| |
| // send an event to thread, waiting for it to finish it's task |
| SetEvent(t_context->r_event); |
| } |
| |
| return 0; |
| } |
| |
| static oapv_thread_t tpool_create_thread(oapv_tpool_t *tp, int thread_id) |
| { |
| if(!tp) { |
| return NULL; // error management |
| } |
| |
| thread_ctx_t *thread_context = NULL; |
| thread_context = (thread_ctx_t *)malloc(sizeof(thread_ctx_t)); |
| |
| if(!thread_context) { |
| return NULL; // error management, bad alloc |
| } |
| |
| // create waiting event |
| // create waiting event as automatic reset, only one thread can come out of waiting state |
| // done intentionally ... signally happens from different thread and only worker thread should be able to respond |
| thread_context->w_event = CreateEvent(NULL, FALSE, FALSE, NULL); |
| if(!thread_context->w_event) { |
| goto TERROR; // error handling, can't create event handler |
| } |
| |
| thread_context->r_event = CreateEvent(NULL, TRUE, TRUE, NULL); // read event is enabled by default |
| if(!thread_context->r_event) { |
| goto TERROR; |
| } |
| |
| InitializeCriticalSection(&(thread_context->c_section)); // This section for fast data retrieval |
| |
| // intialize the state variables for the thread context object |
| thread_context->task = NULL; |
| thread_context->t_arg = NULL; |
| thread_context->t_status = TPOOL_SUSPENDED; |
| thread_context->t_result = TPOOL_INVALID_STATE; |
| thread_context->thread_id = thread_id; |
| |
| thread_context->t_handle = (HANDLE)_beginthreadex(NULL, 0, tpool_worker_thread, (void *)thread_context, 0, NULL); // create a thread store the handle and pass the handle to context |
| if(!thread_context->t_handle) { |
| goto TERROR; |
| } |
| |
| // Everything created and intialized properly |
| // return the created thread_context; |
| return (oapv_thread_t)thread_context; |
| |
| TERROR: |
| if(thread_context->w_event) { |
| CloseHandle(thread_context->w_event); |
| } |
| if(thread_context->r_event) { |
| CloseHandle(thread_context->r_event); |
| } |
| DeleteCriticalSection(&thread_context->c_section); |
| if(thread_context) { |
| free(thread_context); |
| } |
| |
| return NULL; // error handling, can't create a worker thread with proper initialization |
| } |
| |
| static tpool_result_t tpool_assign_task(oapv_thread_t thread_id, oapv_fn_thread_entry_t entry, void *arg) |
| { |
| // assign the task function and argument |
| // worker thread may be in running state or suspended state |
| // if worker thread is in suspended state, it can be waiting for first run or it has finished one task and is waiting again |
| // if worker thread is in running state, it will come to waiting state |
| // in any case, waiting on read event will always work |
| |
| thread_ctx_t *t_context = (thread_ctx_t *)(thread_id); |
| if(!t_context) { |
| return TPOOL_INVALID_ARG; |
| } |
| |
| WaitForSingleObject(t_context->r_event, INFINITE); |
| |
| // worker thread is in waiting state |
| EnterCriticalSection(&t_context->c_section); |
| t_context->t_status = TPOOL_RUNNING; |
| t_context->task = entry; |
| t_context->t_arg = arg; |
| // signal the worker thread to wake up and run the task |
| ResetEvent(t_context->r_event); |
| SetEvent(t_context->w_event); |
| LeaveCriticalSection(&t_context->c_section); |
| |
| return TPOOL_SUCCESS; |
| } |
| |
| static tpool_result_t tpool_retrieve_result(oapv_thread_t thread_id, int *ret) |
| { |
| // whatever task has been assigned to worker thread |
| // wait for it to finish get the result |
| thread_ctx_t *t_context = (thread_ctx_t *)(thread_id); |
| if(!t_context) { |
| return TPOOL_INVALID_ARG; |
| } |
| |
| tpool_result_t result = TPOOL_SUCCESS; |
| |
| WaitForSingleObject(t_context->r_event, INFINITE); |
| |
| // worker thread has finished it's job and now it is in waiting state |
| EnterCriticalSection(&t_context->c_section); |
| result = t_context->t_result; |
| if(ret != NULL) |
| *ret = t_context->task_ret; |
| LeaveCriticalSection(&t_context->c_section); |
| |
| return result; |
| } |
| |
| tpool_result_t tpool_terminate_thread(oapv_thread_t *thread_id) |
| { |
| // handler to close the thread |
| // close the thread handle |
| // release all the resource |
| // delete the thread context object |
| |
| // the thread may be running or it is in suspended state |
| // if it is in suspended state, read event will be active |
| // if it is in running state, read event will be active after sometime |
| |
| thread_ctx_t *t_context = (thread_ctx_t *)(*thread_id); |
| if(!t_context) { |
| return TPOOL_INVALID_ARG; |
| } |
| |
| WaitForSingleObject(t_context->r_event, INFINITE); |
| |
| // worker thread is in waiting state |
| EnterCriticalSection(&t_context->c_section); |
| t_context->t_status = TPOOL_TERMINATED; |
| LeaveCriticalSection(&t_context->c_section); |
| |
| // signal the worker thread to wake up and run the task |
| SetEvent(t_context->w_event); |
| |
| // wait for worker thread to finish it's routine |
| WaitForSingleObject(t_context->t_handle, INFINITE); |
| CloseHandle(t_context->t_handle); // freed all the resources for the thread |
| |
| CloseHandle(t_context->w_event); |
| CloseHandle(t_context->r_event); |
| DeleteCriticalSection(&t_context->c_section); |
| |
| // delete the thread context memory |
| free(t_context); |
| (*thread_id) = NULL; |
| |
| return TPOOL_SUCCESS; |
| } |
| |
| static int tpool_threadsafe_decrement(oapv_sync_obj_t sobj, volatile int *pcnt) |
| { |
| thread_mutex_t *imutex = (thread_mutex_t *)(sobj); |
| int temp = 0; |
| |
| #if WINDOWS_MUTEX_SYNC |
| // let's lock the mutex |
| DWORD dw_wait_result = WaitForSingleObject(imutex->lmutex, INFINITE); // wait for infinite time |
| |
| switch(dw_wait_result) { |
| // The thread got ownership of the mutex |
| case WAIT_OBJECT_0: |
| temp = *pcnt; |
| *pcnt = --temp; |
| // Release ownership of the mutex object |
| ReleaseMutex(imutex->lmutex); |
| break; |
| // The thread got ownership of an abandoned mutex |
| // The database is in an indeterminate state |
| case WAIT_ABANDONED: |
| temp = *pcnt; |
| temp--; |
| *pcnt = temp; |
| break; |
| } |
| #else |
| EnterCriticalSection(&imutex->c_section); |
| temp = *pcnt; |
| *pcnt = --temp; |
| LeaveCriticalSection(&imutex->c_section); |
| #endif |
| return temp; |
| } |
| |
| oapv_sync_obj_t oapv_tpool_sync_obj_create() |
| { |
| thread_mutex_t *imutex = (thread_mutex_t *)malloc(sizeof(thread_mutex_t)); |
| if(0 == imutex) { |
| return 0; // failure case |
| } |
| |
| #if WINDOWS_MUTEX_SYNC |
| // initialize the created mutex instance |
| imutex->lmutex = CreateMutex(NULL, FALSE, NULL); |
| if(0 == imutex->lmutex) { |
| if(imutex) { |
| free(imutex); |
| } |
| return 0; |
| } |
| #else |
| // initialize the critical section |
| InitializeCriticalSection(&(imutex->c_section)); |
| #endif |
| return imutex; |
| } |
| |
| tpool_result_t oapv_tpool_sync_obj_delete(oapv_sync_obj_t *sobj) |
| { |
| thread_mutex_t *imutex = (thread_mutex_t *)(*sobj); |
| #if WINDOWS_MUTEX_SYNC |
| // release the mutex |
| CloseHandle(imutex->lmutex); |
| #else |
| // delete critical section |
| DeleteCriticalSection(&imutex->c_section); |
| #endif |
| |
| // free the memory |
| free(imutex); |
| *sobj = NULL; |
| |
| return TPOOL_SUCCESS; |
| } |
| |
| void oapv_tpool_enter_cs(oapv_sync_obj_t sobj) |
| { |
| thread_mutex_t *imutex = (thread_mutex_t *)(sobj); |
| EnterCriticalSection(&imutex->c_section); |
| } |
| void oapv_tpool_leave_cs(oapv_sync_obj_t sobj) |
| { |
| thread_mutex_t *imutex = (thread_mutex_t *)(sobj); |
| LeaveCriticalSection(&imutex->c_section); |
| } |
| |
| #endif |
| |
| tpool_result_t oapv_tpool_init(oapv_tpool_t *tp, int maxtask) |
| { |
| // assign handles to threadcontroller object |
| // handles for create, run, join and terminate will be given to controller object |
| |
| tp->create = tpool_create_thread; |
| tp->run = tpool_assign_task; |
| tp->join = tpool_retrieve_result; |
| tp->release = tpool_terminate_thread; |
| tp->max_task_cnt = maxtask; |
| |
| return TPOOL_SUCCESS; |
| } |
| |
| tpool_result_t oapv_tpool_deinit(oapv_tpool_t *tp) |
| { |
| // reset all the handler to NULL |
| tp->create = NULL; |
| tp->run = NULL; |
| tp->join = NULL; |
| tp->release = NULL; |
| tp->max_task_cnt = 0; |
| |
| return TPOOL_SUCCESS; |
| } |
| |
| int oapv_tpool_spinlock_wait(volatile int *addr, int val) |
| { |
| int temp; |
| |
| while(1) { |
| temp = *addr; // thread safe volatile read |
| if(temp == val || temp == -1) { |
| break; |
| } |
| } |
| return temp; |
| } |
| |
| void threadsafe_assign(volatile int *addr, int val) |
| { |
| // thread safe volatile assign |
| *addr = val; |
| } |