class Server
{
private:
pthread_mutex_t _queryQueueMutex;
// ... code ...
public:
Server();
bool f();
// ... code ...
};
Server::Server()
{
if (pthread_mutex_init(&_queryQueueMutex, NULL))
throw "err";
}
bool Server::f()
{
pthread_mutex_lock(&_queryQueueMutex);
// ... code ...
pthread_mutex_unlock(&_queryQueueMutex);
return false;
}
Работа с pthread mutex
Возник вопрос с работой pthread_mutex_t и pthread_mutex_lock.
Есть класс многопоточного сервера:
Код:
В котором для синхронизации некоторого действия потоков используется мьютекс (pthread_mutex_t _queryQueueMutex).
При создании объекта сервера как
Код:
Server* srv = new Server();
srv->f();
srv->f();
При создании объекта сервера как
Код:
Server srv();
srv.f();
srv.f();
Логика подсказывает что дело в инициализации мьютексов и выделении памяти, но никаких ошибок не возникает, pthread_mutex_trylock отрабатывает без ошибок. Проблема не критична, но очень хочется разобраться в сути. Вот тут нашел похожую проблему, однако без решения.
Буду крайне признателен за помощь в данном вопросе.
Хрустальный шар показывает: отсутствие конструктора копирования и оператора присваивания и соотв. неадекватное управление ресурсами.
Код:
private:
Server(Server const&);
void operator =(Server const&);
Server(Server const&);
void operator =(Server const&);
На с++ и тем более под линукс до этого ничего не писал. По этому возникли вопросы. Возможо решение тривиально, однако я на данный момент его не нашел.
А остроумные комментарии про хрустальные шары, телепатов и прочее разве все еще в моде?
Цитата: wAngel
А остроумные комментарии про хрустальные шары, телепатов и прочее разве все еще в моде?
Все благодаря вашим стараниям.
на pastebin):
Повторюсь, опыта разработки на с++ под линукс у меня нет.
Желающих самоутверждаться язвительными комментариями прошу воздержаться от неконструктивных высказываний.
Заранее спасибо.
Полный пример проблемы (
Код:
#include <iostream>
#include <queue>
#include <map>
#include <pthread.h>
using namespace std;
#define COUNT_WORKERS 10
#define MAX_QUEUE_SIZE 100
class Server
{
private:
std::queue<string> _queryQueue;
pthread_mutex_t _queryQueueMutex;
pthread_t _workerThreads[COUNT_WORKERS];
pthread_mutex_t _workerSuspendMutex;
pthread_cond_t _workerResumeCondition;
pthread_t _thread;
Server(Server const&);
void operator=(Server const&);
public:
Server();
bool start();
static void* threadFunc(void* ptr);
static void* threadWorkerFunc(void* ptr);
};
//-----------------------------------------------------------------------------
Server::Server()
{
if (pthread_mutex_init(&_queryQueueMutex, NULL))
throw "err";
if (pthread_mutex_init(&_workerSuspendMutex, NULL))
throw "err";
} // End
//-----------------------------------------------------------------------------
bool Server::start()
{
if (pthread_create(&_thread, NULL, threadFunc, this))
return false;
// Worker threads.
for (int i = 0; i < COUNT_WORKERS; i++)
if (pthread_create(&_workerThreads, NULL, threadWorkerFunc, this))
return false;
pthread_join(_thread, NULL);
return true;
} // End
//-----------------------------------------------------------------------------
void* Server::threadFunc(void* ptr)
{
cout << "Start thread" << endl;
Server* server = (Server*)ptr;
while (1)
{
cout << "Request" << endl;
sleep(1);
pthread_mutex_lock(&server->_queryQueueMutex);
if (server->_queryQueue.size() < MAX_QUEUE_SIZE)
{
server->_queryQueue.push("text");
pthread_mutex_unlock(&server->_queryQueueMutex);
// Wake up all worker threads.
pthread_cond_broadcast(&server->_workerResumeCondition);
continue;
}
pthread_mutex_unlock(&server->_queryQueueMutex);
}
return NULL;
} // End
//-----------------------------------------------------------------------------
void* Server::threadWorkerFunc(void* ptr)
{
Server* server = (Server*)ptr;
while (true)
{
// Lock queue mutex.
pthread_mutex_lock(&server->_queryQueueMutex);
// If queue is empty, waiting new requests.
if (server->_queryQueue.empty())
{
// Unlock queue mutex.
pthread_mutex_unlock(&server->_queryQueueMutex);
// Suspend current worker thread.
pthread_mutex_lock(&server->_workerSuspendMutex);
pthread_cond_wait(&server->_workerResumeCondition, &server->_workerSuspendMutex);
cout << "Wake up" << endl;
pthread_mutex_unlock(&server->_workerSuspendMutex);
continue;
}
// Load request from queue.
string request = server->_queryQueue.front();
server->_queryQueue.pop();
pthread_mutex_unlock(&server->_queryQueueMutex);
} // End while
return NULL;
} // End
//-----------------------------------------------------------------------------
int main()
{
try
{
// Server srv1;
// if (!srv1.start())
// return 1;
Server* srv2 = new Server();
if (!srv2->start())
return 1;
}
catch (...)
{
return 1;
}
return 0;
} // End
//-----------------------------------------------------------------------------
#include <queue>
#include <map>
#include <pthread.h>
using namespace std;
#define COUNT_WORKERS 10
#define MAX_QUEUE_SIZE 100
class Server
{
private:
std::queue<string> _queryQueue;
pthread_mutex_t _queryQueueMutex;
pthread_t _workerThreads[COUNT_WORKERS];
pthread_mutex_t _workerSuspendMutex;
pthread_cond_t _workerResumeCondition;
pthread_t _thread;
Server(Server const&);
void operator=(Server const&);
public:
Server();
bool start();
static void* threadFunc(void* ptr);
static void* threadWorkerFunc(void* ptr);
};
//-----------------------------------------------------------------------------
Server::Server()
{
if (pthread_mutex_init(&_queryQueueMutex, NULL))
throw "err";
if (pthread_mutex_init(&_workerSuspendMutex, NULL))
throw "err";
} // End
//-----------------------------------------------------------------------------
bool Server::start()
{
if (pthread_create(&_thread, NULL, threadFunc, this))
return false;
// Worker threads.
for (int i = 0; i < COUNT_WORKERS; i++)
if (pthread_create(&_workerThreads, NULL, threadWorkerFunc, this))
return false;
pthread_join(_thread, NULL);
return true;
} // End
//-----------------------------------------------------------------------------
void* Server::threadFunc(void* ptr)
{
cout << "Start thread" << endl;
Server* server = (Server*)ptr;
while (1)
{
cout << "Request" << endl;
sleep(1);
pthread_mutex_lock(&server->_queryQueueMutex);
if (server->_queryQueue.size() < MAX_QUEUE_SIZE)
{
server->_queryQueue.push("text");
pthread_mutex_unlock(&server->_queryQueueMutex);
// Wake up all worker threads.
pthread_cond_broadcast(&server->_workerResumeCondition);
continue;
}
pthread_mutex_unlock(&server->_queryQueueMutex);
}
return NULL;
} // End
//-----------------------------------------------------------------------------
void* Server::threadWorkerFunc(void* ptr)
{
Server* server = (Server*)ptr;
while (true)
{
// Lock queue mutex.
pthread_mutex_lock(&server->_queryQueueMutex);
// If queue is empty, waiting new requests.
if (server->_queryQueue.empty())
{
// Unlock queue mutex.
pthread_mutex_unlock(&server->_queryQueueMutex);
// Suspend current worker thread.
pthread_mutex_lock(&server->_workerSuspendMutex);
pthread_cond_wait(&server->_workerResumeCondition, &server->_workerSuspendMutex);
cout << "Wake up" << endl;
pthread_mutex_unlock(&server->_workerSuspendMutex);
continue;
}
// Load request from queue.
string request = server->_queryQueue.front();
server->_queryQueue.pop();
pthread_mutex_unlock(&server->_queryQueueMutex);
} // End while
return NULL;
} // End
//-----------------------------------------------------------------------------
int main()
{
try
{
// Server srv1;
// if (!srv1.start())
// return 1;
Server* srv2 = new Server();
if (!srv2->start())
return 1;
}
catch (...)
{
return 1;
}
return 0;
} // End
//-----------------------------------------------------------------------------
Повторюсь, опыта разработки на с++ под линукс у меня нет.
Желающих самоутверждаться язвительными комментариями прошу воздержаться от неконструктивных высказываний.
Заранее спасибо.
gribozavr - не инициализировалось condition.
Вопрос в Q&A на хабре.
А все телепаты могут продолжить свой нелегкий труд на благо академии.
Всем спасибо, решение проблемы найдено благодаря хабрапользователю
Код:
--- a.cc.orig 2011-11-03 09:01:20.000000000 +0200
+++ a.cc 2011-11-03 09:02:49.000000000 +0200
@@ -41,6 +41,9 @@
if (pthread_mutex_init(&_workerSuspendMutex, NULL))
throw "err";
+ if (pthread_cond_init(&_workerResumeCondition, NULL))
+ throw "err";
+
} // End
//-----------------------------------------------------------------------------
+++ a.cc 2011-11-03 09:02:49.000000000 +0200
@@ -41,6 +41,9 @@
if (pthread_mutex_init(&_workerSuspendMutex, NULL))
throw "err";
+ if (pthread_cond_init(&_workerResumeCondition, NULL))
+ throw "err";
+
} // End
//-----------------------------------------------------------------------------
Вопрос в Q&A на хабре.
А все телепаты могут продолжить свой нелегкий труд на благо академии.