Task scheduler: Refactor the way we store thread-spedific data

Basically move all thread-specific data (currently it's only task
memory pool) from a dedicated array of taskScheduler to TaskThread.
This way we can add more thread-specific data in the future with
less of a hassle.
This commit is contained in:
Sergey Sharybin
2017-03-06 11:12:07 +01:00
parent 9522f8acf0
commit a07ad02156

View File

@@ -102,6 +102,10 @@ typedef struct TaskMemPoolStats {
} TaskMemPoolStats; } TaskMemPoolStats;
#endif #endif
typedef struct TaskThreadLocalStorage {
TaskMemPool task_mempool;
} TaskThreadLocalStorage;
struct TaskPool { struct TaskPool {
TaskScheduler *scheduler; TaskScheduler *scheduler;
@@ -120,16 +124,19 @@ struct TaskPool {
*/ */
bool run_in_background; bool run_in_background;
/* This pool is used for caching task pointers for thread id 0. /* This TLS is used for caching task pointers for thread id 0.
* This could either point to a global scheduler's task_mempool[0] if the * This could either point to a global scheduler's TLS for thread 0 if the
* pool is handled form the main thread or point to task_mempool_local * pool is created form the main thread or point to task_mempool_local
* otherwise. * otherwise.
* *
* This way we solve possible threading conflicts accessing same global * This way we solve possible threading conflicts accessing same global
* memory pool from multiple threads from which wait_work() is called. * memory pool from multiple threads from which wait_work() is called.
*
* TODO(sergey): Use real pthread's TLS to access current thread's TLS
* and use it instead.
*/ */
TaskMemPool *task_mempool; TaskThreadLocalStorage *task_tls;
TaskMemPool task_mempool_local; TaskThreadLocalStorage task_tls_local;
#ifdef DEBUG_STATS #ifdef DEBUG_STATS
TaskMemPoolStats *mempool_stats; TaskMemPoolStats *mempool_stats;
@@ -139,7 +146,6 @@ struct TaskPool {
struct TaskScheduler { struct TaskScheduler {
pthread_t *threads; pthread_t *threads;
struct TaskThread *task_threads; struct TaskThread *task_threads;
TaskMemPool *task_mempool;
int num_threads; int num_threads;
bool background_thread_only; bool background_thread_only;
@@ -153,10 +159,11 @@ struct TaskScheduler {
typedef struct TaskThread { typedef struct TaskThread {
TaskScheduler *scheduler; TaskScheduler *scheduler;
int id; int id;
TaskThreadLocalStorage tls;
} TaskThread; } TaskThread;
/* Helper */ /* Helper */
static void task_data_free(Task *task, const int thread_id) BLI_INLINE void task_data_free(Task *task, const int thread_id)
{ {
if (task->free_taskdata) { if (task->free_taskdata) {
if (task->freedata) { if (task->freedata) {
@@ -168,12 +175,26 @@ static void task_data_free(Task *task, const int thread_id)
} }
} }
BLI_INLINE TaskMemPool *get_task_mempool(TaskPool *pool, const int thread_id) BLI_INLINE TaskThreadLocalStorage *get_task_tls(TaskPool *pool,
const int thread_id)
{ {
TaskScheduler *scheduler = pool->scheduler;
BLI_assert(thread_id >= 0);
BLI_assert(thread_id <= scheduler->num_threads);
if (thread_id == 0) { if (thread_id == 0) {
return pool->task_mempool; return pool->task_tls;
}
else {
return &scheduler->task_threads[thread_id].tls;
}
}
BLI_INLINE void free_task_tls(TaskThreadLocalStorage *tls)
{
TaskMemPool *task_mempool = &tls->task_mempool;
for (int i = 0; i < task_mempool->num_tasks; ++i) {
MEM_freeN(task_mempool->tasks[i]);
} }
return &pool->scheduler->task_mempool[thread_id];
} }
static Task *task_alloc(TaskPool *pool, const int thread_id) static Task *task_alloc(TaskPool *pool, const int thread_id)
@@ -181,15 +202,16 @@ static Task *task_alloc(TaskPool *pool, const int thread_id)
BLI_assert(thread_id <= pool->scheduler->num_threads); BLI_assert(thread_id <= pool->scheduler->num_threads);
if (thread_id != -1) { if (thread_id != -1) {
BLI_assert(thread_id >= 0); BLI_assert(thread_id >= 0);
TaskMemPool *mem_pool = get_task_mempool(pool, thread_id); TaskThreadLocalStorage *tls = get_task_tls(pool, thread_id);
TaskMemPool *task_mempool = &tls->task_mempool;
/* Try to re-use task memory from a thread local storage. */ /* Try to re-use task memory from a thread local storage. */
if (mem_pool->num_tasks > 0) { if (task_mempool->num_tasks > 0) {
--mem_pool->num_tasks; --task_mempool->num_tasks;
/* Success! We've just avoided task allocation. */ /* Success! We've just avoided task allocation. */
#ifdef DEBUG_STATS #ifdef DEBUG_STATS
pool->mempool_stats[thread_id].num_reuse++; pool->mempool_stats[thread_id].num_reuse++;
#endif #endif
return mem_pool->tasks[mem_pool->num_tasks]; return task_mempool->tasks[task_mempool->num_tasks];
} }
/* We are doomed to allocate new task data. */ /* We are doomed to allocate new task data. */
#ifdef DEBUG_STATS #ifdef DEBUG_STATS
@@ -204,11 +226,12 @@ static void task_free(TaskPool *pool, Task *task, const int thread_id)
task_data_free(task, thread_id); task_data_free(task, thread_id);
BLI_assert(thread_id >= 0); BLI_assert(thread_id >= 0);
BLI_assert(thread_id <= pool->scheduler->num_threads); BLI_assert(thread_id <= pool->scheduler->num_threads);
TaskMemPool *mem_pool = get_task_mempool(pool, thread_id); TaskThreadLocalStorage *tls = get_task_tls(pool, thread_id);
if (mem_pool->num_tasks < MEMPOOL_SIZE - 1) { TaskMemPool *task_mempool = &tls->task_mempool;
if (task_mempool->num_tasks < MEMPOOL_SIZE - 1) {
/* Successfully allowed the task to be re-used later. */ /* Successfully allowed the task to be re-used later. */
mem_pool->tasks[mem_pool->num_tasks] = task; task_mempool->tasks[task_mempool->num_tasks] = task;
++mem_pool->num_tasks; ++task_mempool->num_tasks;
} }
else { else {
/* Local storage saturated, no other way than just discard /* Local storage saturated, no other way than just discard
@@ -351,16 +374,18 @@ TaskScheduler *BLI_task_scheduler_create(int num_threads)
num_threads = 1; num_threads = 1;
} }
scheduler->task_threads = MEM_callocN(sizeof(TaskThread) * (num_threads + 1),
"TaskScheduler task threads");
/* launch threads that will be waiting for work */ /* launch threads that will be waiting for work */
if (num_threads > 0) { if (num_threads > 0) {
int i; int i;
scheduler->num_threads = num_threads; scheduler->num_threads = num_threads;
scheduler->threads = MEM_callocN(sizeof(pthread_t) * num_threads, "TaskScheduler threads"); scheduler->threads = MEM_callocN(sizeof(pthread_t) * num_threads, "TaskScheduler threads");
scheduler->task_threads = MEM_callocN(sizeof(TaskThread) * num_threads, "TaskScheduler task threads");
for (i = 0; i < num_threads; i++) { for (i = 0; i < num_threads; i++) {
TaskThread *thread = &scheduler->task_threads[i]; TaskThread *thread = &scheduler->task_threads[i + 1];
thread->scheduler = scheduler; thread->scheduler = scheduler;
thread->id = i + 1; thread->id = i + 1;
@@ -368,9 +393,6 @@ TaskScheduler *BLI_task_scheduler_create(int num_threads)
fprintf(stderr, "TaskScheduler failed to launch thread %d/%d\n", i, num_threads); fprintf(stderr, "TaskScheduler failed to launch thread %d/%d\n", i, num_threads);
} }
} }
scheduler->task_mempool = MEM_callocN(sizeof(*scheduler->task_mempool) * (num_threads + 1),
"TaskScheduler task_mempool");
} }
return scheduler; return scheduler;
@@ -400,17 +422,12 @@ void BLI_task_scheduler_free(TaskScheduler *scheduler)
/* Delete task thread data */ /* Delete task thread data */
if (scheduler->task_threads) { if (scheduler->task_threads) {
MEM_freeN(scheduler->task_threads); for (int i = 0; i < scheduler->num_threads + 1; ++i) {
} TaskThreadLocalStorage *tls = &scheduler->task_threads[i].tls;
free_task_tls(tls);
/* Delete task memory pool */
if (scheduler->task_mempool) {
for (int i = 0; i <= scheduler->num_threads; ++i) {
for (int j = 0; j < scheduler->task_mempool[i].num_tasks; ++j) {
MEM_freeN(scheduler->task_mempool[i].tasks[j]);
}
} }
MEM_freeN(scheduler->task_mempool);
MEM_freeN(scheduler->task_threads);
} }
/* delete leftover tasks */ /* delete leftover tasks */
@@ -502,11 +519,11 @@ static TaskPool *task_pool_create_ex(TaskScheduler *scheduler, void *userdata, c
BLI_mutex_init(&pool->user_mutex); BLI_mutex_init(&pool->user_mutex);
if (BLI_thread_is_main()) { if (BLI_thread_is_main()) {
pool->task_mempool = scheduler->task_mempool; pool->task_tls = &scheduler->task_threads[0].tls;
} }
else { else {
pool->task_mempool = &pool->task_mempool_local; pool->task_tls = &pool->task_tls_local;
pool->task_mempool_local.num_tasks = 0; memset(pool->task_tls, 0, sizeof(TaskThreadLocalStorage));
} }
#ifdef DEBUG_STATS #ifdef DEBUG_STATS
@@ -560,11 +577,9 @@ void BLI_task_pool_free(TaskPool *pool)
BLI_mutex_end(&pool->user_mutex); BLI_mutex_end(&pool->user_mutex);
/* Free local memory pool, those pointers are lost forever. */ /* Free local TLS. */
if (pool->task_mempool == &pool->task_mempool_local) { if (pool->task_tls == &pool->task_tls_local) {
for (int i = 0; i < pool->task_mempool_local.num_tasks; i++) { free_task_tls(pool->task_tls);
MEM_freeN(pool->task_mempool_local.tasks[i]);
}
} }
#ifdef DEBUG_STATS #ifdef DEBUG_STATS