Task scheduler: Use real pthread's TLS to access active thread's data
This allows us to avoid TLS stored in pool which gives us advantage of using pre-allocated tasks pool for the pools created from non-main thread. Even on systems with slow pthread TLS it should not be a problem because we access it once at a pool construction time. If we want to use this more often (for example, to get rid of push_from_thread) we'll have to do much more accurate benchmark.
This commit is contained in:
@@ -124,19 +124,10 @@ struct TaskPool {
|
|||||||
*/
|
*/
|
||||||
bool run_in_background;
|
bool run_in_background;
|
||||||
|
|
||||||
/* This TLS is used for caching task pointers for thread id 0.
|
/* This is a task scheduler's ID of a thread at which pool was constructed.
|
||||||
* This could either point to a global scheduler's TLS for thread 0 if the
|
* It will be used to access task TLS.
|
||||||
* pool is created form the main thread or point to task_mempool_local
|
|
||||||
* otherwise.
|
|
||||||
*
|
|
||||||
* This way we solve possible threading conflicts accessing same global
|
|
||||||
* memory pool from multiple threads from which wait_work() is called.
|
|
||||||
*
|
|
||||||
* TODO(sergey): Use real pthread's TLS to access current thread's TLS
|
|
||||||
* and use it instead.
|
|
||||||
*/
|
*/
|
||||||
TaskThreadLocalStorage *task_tls;
|
int thread_id;
|
||||||
TaskThreadLocalStorage task_tls_local;
|
|
||||||
|
|
||||||
#ifdef DEBUG_STATS
|
#ifdef DEBUG_STATS
|
||||||
TaskMemPoolStats *mempool_stats;
|
TaskMemPoolStats *mempool_stats;
|
||||||
@@ -154,6 +145,9 @@ struct TaskScheduler {
|
|||||||
ThreadCondition queue_cond;
|
ThreadCondition queue_cond;
|
||||||
|
|
||||||
volatile bool do_exit;
|
volatile bool do_exit;
|
||||||
|
|
||||||
|
/* NOTE: In pthread's TLS we store the whole TaskThread structure. */
|
||||||
|
pthread_key_t tls_id_key;
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef struct TaskThread {
|
typedef struct TaskThread {
|
||||||
@@ -181,12 +175,7 @@ BLI_INLINE TaskThreadLocalStorage *get_task_tls(TaskPool *pool,
|
|||||||
TaskScheduler *scheduler = pool->scheduler;
|
TaskScheduler *scheduler = pool->scheduler;
|
||||||
BLI_assert(thread_id >= 0);
|
BLI_assert(thread_id >= 0);
|
||||||
BLI_assert(thread_id <= scheduler->num_threads);
|
BLI_assert(thread_id <= scheduler->num_threads);
|
||||||
if (thread_id == 0) {
|
return &scheduler->task_threads[thread_id].tls;
|
||||||
return pool->task_tls;
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
return &scheduler->task_threads[thread_id].tls;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
BLI_INLINE void free_task_tls(TaskThreadLocalStorage *tls)
|
BLI_INLINE void free_task_tls(TaskThreadLocalStorage *tls)
|
||||||
@@ -202,6 +191,7 @@ static Task *task_alloc(TaskPool *pool, const int thread_id)
|
|||||||
BLI_assert(thread_id <= pool->scheduler->num_threads);
|
BLI_assert(thread_id <= pool->scheduler->num_threads);
|
||||||
if (thread_id != -1) {
|
if (thread_id != -1) {
|
||||||
BLI_assert(thread_id >= 0);
|
BLI_assert(thread_id >= 0);
|
||||||
|
BLI_assert(thread_id <= pool->scheduler->num_threads);
|
||||||
TaskThreadLocalStorage *tls = get_task_tls(pool, thread_id);
|
TaskThreadLocalStorage *tls = get_task_tls(pool, thread_id);
|
||||||
TaskMemPool *task_mempool = &tls->task_mempool;
|
TaskMemPool *task_mempool = &tls->task_mempool;
|
||||||
/* Try to re-use task memory from a thread local storage. */
|
/* Try to re-use task memory from a thread local storage. */
|
||||||
@@ -331,6 +321,8 @@ static void *task_scheduler_thread_run(void *thread_p)
|
|||||||
int thread_id = thread->id;
|
int thread_id = thread->id;
|
||||||
Task *task;
|
Task *task;
|
||||||
|
|
||||||
|
pthread_setspecific(scheduler->tls_id_key, thread);
|
||||||
|
|
||||||
/* keep popping off tasks */
|
/* keep popping off tasks */
|
||||||
while (task_scheduler_thread_wait_pop(scheduler, &task)) {
|
while (task_scheduler_thread_wait_pop(scheduler, &task)) {
|
||||||
TaskPool *pool = task->pool;
|
TaskPool *pool = task->pool;
|
||||||
@@ -377,6 +369,8 @@ TaskScheduler *BLI_task_scheduler_create(int num_threads)
|
|||||||
scheduler->task_threads = MEM_callocN(sizeof(TaskThread) * (num_threads + 1),
|
scheduler->task_threads = MEM_callocN(sizeof(TaskThread) * (num_threads + 1),
|
||||||
"TaskScheduler task threads");
|
"TaskScheduler task threads");
|
||||||
|
|
||||||
|
pthread_key_create(&scheduler->tls_id_key, NULL);
|
||||||
|
|
||||||
/* launch threads that will be waiting for work */
|
/* launch threads that will be waiting for work */
|
||||||
if (num_threads > 0) {
|
if (num_threads > 0) {
|
||||||
int i;
|
int i;
|
||||||
@@ -408,6 +402,8 @@ void BLI_task_scheduler_free(TaskScheduler *scheduler)
|
|||||||
BLI_condition_notify_all(&scheduler->queue_cond);
|
BLI_condition_notify_all(&scheduler->queue_cond);
|
||||||
BLI_mutex_unlock(&scheduler->queue_mutex);
|
BLI_mutex_unlock(&scheduler->queue_mutex);
|
||||||
|
|
||||||
|
pthread_key_delete(scheduler->tls_id_key);
|
||||||
|
|
||||||
/* delete threads */
|
/* delete threads */
|
||||||
if (scheduler->threads) {
|
if (scheduler->threads) {
|
||||||
int i;
|
int i;
|
||||||
@@ -476,7 +472,7 @@ static void task_scheduler_clear(TaskScheduler *scheduler, TaskPool *pool)
|
|||||||
nexttask = task->next;
|
nexttask = task->next;
|
||||||
|
|
||||||
if (task->pool == pool) {
|
if (task->pool == pool) {
|
||||||
task_data_free(task, 0);
|
task_data_free(task, pool->thread_id);
|
||||||
BLI_freelinkN(&scheduler->queue, task);
|
BLI_freelinkN(&scheduler->queue, task);
|
||||||
|
|
||||||
done++;
|
done++;
|
||||||
@@ -519,11 +515,11 @@ static TaskPool *task_pool_create_ex(TaskScheduler *scheduler, void *userdata, c
|
|||||||
BLI_mutex_init(&pool->user_mutex);
|
BLI_mutex_init(&pool->user_mutex);
|
||||||
|
|
||||||
if (BLI_thread_is_main()) {
|
if (BLI_thread_is_main()) {
|
||||||
pool->task_tls = &scheduler->task_threads[0].tls;
|
pool->thread_id = 0;
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
pool->task_tls = &pool->task_tls_local;
|
TaskThread *thread = pthread_getspecific(scheduler->tls_id_key);
|
||||||
memset(pool->task_tls, 0, sizeof(TaskThreadLocalStorage));
|
pool->thread_id = thread->id;
|
||||||
}
|
}
|
||||||
|
|
||||||
#ifdef DEBUG_STATS
|
#ifdef DEBUG_STATS
|
||||||
@@ -577,11 +573,6 @@ void BLI_task_pool_free(TaskPool *pool)
|
|||||||
|
|
||||||
BLI_mutex_end(&pool->user_mutex);
|
BLI_mutex_end(&pool->user_mutex);
|
||||||
|
|
||||||
/* Free local TLS. */
|
|
||||||
if (pool->task_tls == &pool->task_tls_local) {
|
|
||||||
free_task_tls(pool->task_tls);
|
|
||||||
}
|
|
||||||
|
|
||||||
#ifdef DEBUG_STATS
|
#ifdef DEBUG_STATS
|
||||||
printf("Thread ID Allocated Reused Discarded\n");
|
printf("Thread ID Allocated Reused Discarded\n");
|
||||||
for (int i = 0; i < pool->scheduler->num_threads + 1; ++i) {
|
for (int i = 0; i < pool->scheduler->num_threads + 1; ++i) {
|
||||||
@@ -638,6 +629,13 @@ void BLI_task_pool_work_and_wait(TaskPool *pool)
|
|||||||
{
|
{
|
||||||
TaskScheduler *scheduler = pool->scheduler;
|
TaskScheduler *scheduler = pool->scheduler;
|
||||||
|
|
||||||
|
#ifndef NDEBUG
|
||||||
|
if (!BLI_thread_is_main()) {
|
||||||
|
TaskThread *thread = pthread_getspecific(scheduler->tls_id_key);
|
||||||
|
BLI_assert(pool->thread_id == thread->id);
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
BLI_mutex_lock(&pool->num_mutex);
|
BLI_mutex_lock(&pool->num_mutex);
|
||||||
|
|
||||||
while (pool->num != 0) {
|
while (pool->num != 0) {
|
||||||
@@ -665,10 +663,10 @@ void BLI_task_pool_work_and_wait(TaskPool *pool)
|
|||||||
/* if found task, do it, otherwise wait until other tasks are done */
|
/* if found task, do it, otherwise wait until other tasks are done */
|
||||||
if (found_task) {
|
if (found_task) {
|
||||||
/* run task */
|
/* run task */
|
||||||
work_task->run(pool, work_task->taskdata, 0);
|
work_task->run(pool, work_task->taskdata, pool->thread_id);
|
||||||
|
|
||||||
/* delete task */
|
/* delete task */
|
||||||
task_free(pool, task, 0);
|
task_free(pool, task, pool->thread_id);
|
||||||
|
|
||||||
/* notify pool task was done */
|
/* notify pool task was done */
|
||||||
task_pool_num_decrease(pool, 1);
|
task_pool_num_decrease(pool, 1);
|
||||||
@@ -885,7 +883,8 @@ static void task_parallel_range_ex(
|
|||||||
BLI_task_pool_push_from_thread(task_pool,
|
BLI_task_pool_push_from_thread(task_pool,
|
||||||
parallel_range_func,
|
parallel_range_func,
|
||||||
userdata_chunk_local, false,
|
userdata_chunk_local, false,
|
||||||
TASK_PRIORITY_HIGH, 0);
|
TASK_PRIORITY_HIGH,
|
||||||
|
task_pool->thread_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
BLI_task_pool_work_and_wait(task_pool);
|
BLI_task_pool_work_and_wait(task_pool);
|
||||||
@@ -1091,7 +1090,8 @@ void BLI_task_parallel_listbase(
|
|||||||
BLI_task_pool_push_from_thread(task_pool,
|
BLI_task_pool_push_from_thread(task_pool,
|
||||||
parallel_listbase_func,
|
parallel_listbase_func,
|
||||||
NULL, false,
|
NULL, false,
|
||||||
TASK_PRIORITY_HIGH, 0);
|
TASK_PRIORITY_HIGH,
|
||||||
|
task_pool->thread_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
BLI_task_pool_work_and_wait(task_pool);
|
BLI_task_pool_work_and_wait(task_pool);
|
||||||
|
Reference in New Issue
Block a user