252 lines
4.8 KiB
C++
252 lines
4.8 KiB
C++
/*
|
|
* Copyright 2011-2013 Blender Foundation
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
#include "util/util_task.h"
|
|
#include "util/util_foreach.h"
|
|
#include "util/util_logging.h"
|
|
#include "util/util_system.h"
|
|
#include "util/util_time.h"
|
|
|
|
CCL_NAMESPACE_BEGIN
|
|
|
|
/* Task Pool */
|
|
|
|
TaskPool::TaskPool() : start_time(time_dt()), num_tasks_pushed(0)
|
|
{
|
|
}
|
|
|
|
TaskPool::~TaskPool()
|
|
{
|
|
cancel();
|
|
}
|
|
|
|
void TaskPool::push(TaskRunFunction &&task)
|
|
{
|
|
tbb_group.run(std::move(task));
|
|
num_tasks_pushed++;
|
|
}
|
|
|
|
void TaskPool::wait_work(Summary *stats)
|
|
{
|
|
tbb_group.wait();
|
|
|
|
if (stats != NULL) {
|
|
stats->time_total = time_dt() - start_time;
|
|
stats->num_tasks_handled = num_tasks_pushed;
|
|
}
|
|
|
|
num_tasks_pushed = 0;
|
|
}
|
|
|
|
void TaskPool::cancel()
|
|
{
|
|
if (num_tasks_pushed > 0) {
|
|
tbb_group.cancel();
|
|
tbb_group.wait();
|
|
num_tasks_pushed = 0;
|
|
}
|
|
}
|
|
|
|
bool TaskPool::canceled()
|
|
{
|
|
return tbb::is_current_task_group_canceling();
|
|
}
|
|
|
|
/* Task Scheduler */
|
|
|
|
thread_mutex TaskScheduler::mutex;
|
|
int TaskScheduler::users = 0;
|
|
int TaskScheduler::active_num_threads = 0;
|
|
tbb::global_control *TaskScheduler::global_control = nullptr;
|
|
|
|
void TaskScheduler::init(int num_threads)
|
|
{
|
|
thread_scoped_lock lock(mutex);
|
|
/* Multiple cycles instances can use this task scheduler, sharing the same
|
|
* threads, so we keep track of the number of users. */
|
|
++users;
|
|
if (users != 1) {
|
|
return;
|
|
}
|
|
if (num_threads > 0) {
|
|
/* Automatic number of threads. */
|
|
VLOG(1) << "Overriding number of TBB threads to " << num_threads << ".";
|
|
global_control = new tbb::global_control(tbb::global_control::max_allowed_parallelism,
|
|
num_threads);
|
|
active_num_threads = num_threads;
|
|
}
|
|
else {
|
|
active_num_threads = system_cpu_thread_count();
|
|
}
|
|
}
|
|
|
|
void TaskScheduler::exit()
|
|
{
|
|
thread_scoped_lock lock(mutex);
|
|
users--;
|
|
if (users == 0) {
|
|
delete global_control;
|
|
global_control = nullptr;
|
|
active_num_threads = 0;
|
|
}
|
|
}
|
|
|
|
void TaskScheduler::free_memory()
|
|
{
|
|
assert(users == 0);
|
|
}
|
|
|
|
int TaskScheduler::num_threads()
|
|
{
|
|
return active_num_threads;
|
|
}
|
|
|
|
/* Dedicated Task Pool */
|
|
|
|
DedicatedTaskPool::DedicatedTaskPool()
|
|
{
|
|
do_cancel = false;
|
|
do_exit = false;
|
|
num = 0;
|
|
|
|
worker_thread = new thread(function_bind(&DedicatedTaskPool::thread_run, this));
|
|
}
|
|
|
|
DedicatedTaskPool::~DedicatedTaskPool()
|
|
{
|
|
wait();
|
|
|
|
do_exit = true;
|
|
queue_cond.notify_all();
|
|
|
|
worker_thread->join();
|
|
delete worker_thread;
|
|
}
|
|
|
|
void DedicatedTaskPool::push(TaskRunFunction &&task, bool front)
|
|
{
|
|
num_increase();
|
|
|
|
/* add task to queue */
|
|
queue_mutex.lock();
|
|
if (front)
|
|
queue.emplace_front(std::move(task));
|
|
else
|
|
queue.emplace_back(std::move(task));
|
|
|
|
queue_cond.notify_one();
|
|
queue_mutex.unlock();
|
|
}
|
|
|
|
void DedicatedTaskPool::wait()
|
|
{
|
|
thread_scoped_lock num_lock(num_mutex);
|
|
|
|
while (num)
|
|
num_cond.wait(num_lock);
|
|
}
|
|
|
|
void DedicatedTaskPool::cancel()
|
|
{
|
|
do_cancel = true;
|
|
|
|
clear();
|
|
wait();
|
|
|
|
do_cancel = false;
|
|
}
|
|
|
|
bool DedicatedTaskPool::canceled()
|
|
{
|
|
return do_cancel;
|
|
}
|
|
|
|
void DedicatedTaskPool::num_decrease(int done)
|
|
{
|
|
thread_scoped_lock num_lock(num_mutex);
|
|
num -= done;
|
|
|
|
assert(num >= 0);
|
|
if (num == 0)
|
|
num_cond.notify_all();
|
|
}
|
|
|
|
void DedicatedTaskPool::num_increase()
|
|
{
|
|
thread_scoped_lock num_lock(num_mutex);
|
|
num++;
|
|
num_cond.notify_all();
|
|
}
|
|
|
|
bool DedicatedTaskPool::thread_wait_pop(TaskRunFunction &task)
|
|
{
|
|
thread_scoped_lock queue_lock(queue_mutex);
|
|
|
|
while (queue.empty() && !do_exit)
|
|
queue_cond.wait(queue_lock);
|
|
|
|
if (queue.empty()) {
|
|
assert(do_exit);
|
|
return false;
|
|
}
|
|
|
|
task = queue.front();
|
|
queue.pop_front();
|
|
|
|
return true;
|
|
}
|
|
|
|
void DedicatedTaskPool::thread_run()
|
|
{
|
|
TaskRunFunction task;
|
|
|
|
/* keep popping off tasks */
|
|
while (thread_wait_pop(task)) {
|
|
/* run task */
|
|
task();
|
|
|
|
/* delete task */
|
|
task = nullptr;
|
|
|
|
/* notify task was done */
|
|
num_decrease(1);
|
|
}
|
|
}
|
|
|
|
void DedicatedTaskPool::clear()
|
|
{
|
|
thread_scoped_lock queue_lock(queue_mutex);
|
|
|
|
/* erase all tasks from the queue */
|
|
int done = queue.size();
|
|
queue.clear();
|
|
|
|
queue_lock.unlock();
|
|
|
|
/* notify done */
|
|
num_decrease(done);
|
|
}
|
|
|
|
string TaskPool::Summary::full_report() const
|
|
{
|
|
string report = "";
|
|
report += string_printf("Total time: %f\n", time_total);
|
|
report += string_printf("Tasks handled: %d\n", num_tasks_handled);
|
|
return report;
|
|
}
|
|
|
|
CCL_NAMESPACE_END
|