int pool::add_thread() {
// wait the thread for 1 second
boost::system_time wait_time = boost::get_system_time() + boost::posix_time::milliseconds(1000);
try {
// create thread
boost::shared_ptr<boost::thread> temp_thread(new boost::thread(boost::bind(&thread_starter, this)));
// wait for thread's report or timeout
boost::unique_lock<boost::mutex> lock(start_flag_mutex);
start_flag.timed_wait(lock, wait_time);
// thread is ready, put it to other ready threads
thread_pool.push_back(temp_thread);
} catch (boost::thread_resource_error) {
return -1;
}
}
boost::thread - синхронизация запуска потоков в пуле
Код:
сам поток:
Код:
void pool::thread_routine() {
// notify the main thread, that we've started
start_flag.notify_one();
// lock mutex to wait for conditional variable signal
boost::mutex::scoped_lock lock(thr_wait_mutex);
// wait for a signal to start
thr_wait.wait(lock);
// main task executing
...
}
// notify the main thread, that we've started
start_flag.notify_one();
// lock mutex to wait for conditional variable signal
boost::mutex::scoped_lock lock(thr_wait_mutex);
// wait for a signal to start
thr_wait.wait(lock);
// main task executing
...
}
запуск потоков на выполнение задач:
Код:
int pool::run_tasks() {
thr_wait.notify_all();
}
thr_wait.notify_all();
}
Основная проблема синхронизации:
- запускается несколько потоков
- часто бывает, что основной поток, запустил все потоки из пула, и, не передавая им управления, послал сигнал на выполнение задач. После этого, дочерние потоки зависают в ожидании сигнала, который уже был дан. Попытался решить блокировкой основного потока по сигналу, который дает создаваемый дочерний поток на старте (отражено в приведенном коде).
- приведенный код тоже имеет недостаток:
запущенный дочерний поток иногда успевает проскочить и послать сигнал раньше, чем основной поток начнет ожидать его сигнала. Результат: основной поток, прождав секунду, считает что дочерний поток не готов к выполнению задачи.
- Использовть не сам boost::thread, а созданный на его базе класс, который имел бы метод для рапорта о готовности. Но опять же, остается вероятность, что при выполнении, между двумя командами дочернего потока (установка внутренней переменной готовности и блокировка по переменной состояния), вклинится сигнал основного потока, и будем иметь зависание дочернего потока
- записывать id в какой-нить внутренний массив, затем ждать сигнала. основной поток проверяет массив на готовность дочерних потоков, затем дает сигнал на выполнение задач. остается та же вероятность вклинивания основного потока, что и в предыдущем пункте.
- соединить сигнал готовности, посылаемый из дочернего потока с одним из предыдущих вариантов. В таком случае:
- если сигнал из дочернего потока будет дан раньше, чем его начнет ожидать основной поток, то после истечения таймаута и проверки потока на готовность вручную, состояние потока будет определено, как "готов к выполенинию"
- если выполнение основного потока вклинится сразу после установки флага готовности, он все равно будет ждать сигнала от дочернего потока.
НО! есть опасность, что после установки флага готовности, дочерний поток так и не доберется до посылки сигнала (процессорное время будет занято выполнением других потоков). А, основной поток решит, что он уже готов и пошлет сигнал => дочерний поток повиснет на ожидании сигнала, который уже был дан.
А у тебя нету цикла даже выбора новой задачи. По идее там должно быть что то типа этого (за опущеним вызова функций синхронзации =)):
Код:
void pool::thread_routine() {
while (условие выхода) {
- попытаться взять новую задачу;
if (получилось взять) {
- выполнить задачу;
} else {
условная_переменная.wait(...);
}
}
- какие то действия перед выходом;
}
while (условие выхода) {
- попытаться взять новую задачу;
if (получилось взять) {
- выполнить задачу;
} else {
условная_переменная.wait(...);
}
}
- какие то действия перед выходом;
}
Раз уж в пул можно добавлять задачи - неплохо бы сделать в нем очередь, в которую изначально будут складываться задачи. Во первых это даст возможность при полностью загруженном пуле, подождать новым задачам освобождения потока. А во вторых просто при добавлении в очередь - шлешь сигнал условной переменной и один из спящих потоков проснется и возьмет задачу из очереди. Если все заняты - возьмут, как освободятся.
Код:
for (int i = 0; i < 4; i++) {
merge_sort *temp = new merge_sort;
if (thpool.add_thread() == -1) {
cerr << "Thread didn't started" << endl;
ret_status = EXIT_FAILURE;
exit(1);
}
thpool.push_task(temp);
MergeSortArr.push_back(boost::shared_ptr<merge_sort > (temp));
}
thpool.run_tasks();
merge_sort *temp = new merge_sort;
if (thpool.add_thread() == -1) {
cerr << "Thread didn't started" << endl;
ret_status = EXIT_FAILURE;
exit(1);
}
thpool.push_task(temp);
MergeSortArr.push_back(boost::shared_ptr<merge_sort > (temp));
}
thpool.run_tasks();
Т.е. сначала создали 4 потока и запихнули в очередь 4 задачи (merge_sort - производная от класса задачи), затем их сразу все запустили. А до момента вызова run_tasks() потоки должны спать. Вопрос то в том, что поток у меня сразу засыпает при старте, на пытаясь ничего выковырять из очереди задач. Но либо он засыпает навечно :), либо основной поток непонимает, что дочерний запустился, и спит, готовый проснуться для получения задачи.
З.Ы. merge_sort *temp = new merge_sort; потом переделаю
З.З.Ы. хотя... возможно подтупливаю. сейчас подправлю еще немного, посмотрю. возможно, идея запустить все сразу не очень хорошая.
хотя некоторые сомнения насчет возможной рассинхронизации, когда дочерний поток наглухо повиснет на условной переменной, остались. в общем, посмотрю.
Цитата: ~ArchimeD~
хотя некоторые сомнения насчет возможной рассинхронизации, когда дочерний поток наглухо повиснет на условной переменной, остались. в общем, посмотрю.
Это в каком таком случае он может зависнуть? :)