Помогите с многопоточностью! (тут интересно)
{
public:
void Thread()
{
инициализация;
while(bWork)
{
ReadFile(hDevice, ...);
switch(полученные данные) // парсим
{
case статистика: тихо_мирно_использую; break;
case извещение:
// ?!?!?
break;
};
};
финализация;
}
CCtrl() { создать поток из метода Thread(); };
~CCtrl() { завершить поток в Thread(); };
// здесь самый вопрос:
bool SendCommand(int nCommand, int nParam)
{
// ?!?!?
WriteFile(hDevice, ...);
// ?!??!
// PROFIT !
return bResult;
};
};
CCtrl Controller;
Теперь, на словах:
Устройство регулярно, но не быстро, посылает данные, которые успешно считывает поток CCtrl::Thread() вызовом ReadFile. Среди этих данных идут пакеты статистики, а также особый пакет-извещение "Я сделало команду".
Команды устройству отдаются несколькими (!) другими потоками вызовом метода SendCommand, как минимум двумя. Устройство совершает команду некоторое значимое время.
Любой поток может отдать команду Controller.SendCommand(...) в любое время, независимо от других, при этом вызов функций должен заблокироваться до того момента, пока команда не будет выполнена (пока с устройства не придёт пакет "Я сделало команду" и поток CCtrl::Thread это не прочитает), при этом вернув true. Либо, если любой другой поток отдаёт любую другую (или эту же) команду устройству, то наш рассматриваемый поток должен разблокироваться и вернуть false (и он поймёт, что устройство начало выполнение другой команды), и блокируется уже поток, отдавший последнюю по времени команду.
Вопрос: помогите правильно и адекватно всё это синхронизовать.
Предполагаю использование semaphore / event / mutex / critical section / WaitForSingle(Multiple)Object(s), и т.п., но вот в голове схема синхронизации почему-то сама в этот раз не возникает =)
В Windows существует несколько объектов синхронизации.
1. События (events). Объекты, поддерживающие два состояния (условно говоря - включен/выключен, signaled/non-signaled). Переключение между состояниями задается непосредственно программистом.
2. Мьютексы (mutexes). Также поддерживает два состояния; считается включенным, если его объект принадлежит какому-либо потоку, и выключенным в противном случае.
3. Семафоры (semaphores). Кроме состояния, имеет счетчик (между нулем и заданным максимальным значением). Считается включенным при значении счетчика, отличном от нуля, и выключенным при нулевом значении счетчика.
4. Критические секции (critical sections). Применяются для контролируемого многопоточного доступа к ресурсу; ресурс, используемый внутри критической секции, автоматически блокируется для других потоков.
В дополнение к этому перечню, привожу список функций для работы с синхронизацией. Отдельное внимание обращаю на функции WaitForSingleObject() и WaitForMultipleObjects(), они могут оказаться вам наиболее полезными.
критическая_секция CrSec;
HANDLE hEvent; // = CreateEvent(NULL, TRUE, TRUE, NULL); в конструкторе
DWORD nThreadWhichWait; // Здесь будет ID потока, ждущего завершения своей команды
... };
Теперь метод, отдающий команды устройству:
{
// В критической секции
CrSec.Enter();
nThreadWhichWait = CurrentThreadID; // Теперь "Я!" буду ждать завершение команды!
WriteFile(hDevice, "nCmd"...); // Самое главное - отдаю самую свежую команду устройству.
PulseEvent(hEvent); // Все остальные потоки пропускаются, и я сам тут же его занимаю.
CrSec.Leave();
// Жду, пока меня отпустят (другая команда, или приём-извещения-в-потоке).
WaitForSingleObject(hEvent);
// Определяю, что сказать вызывающему...
return (nThreadWhichWait == CurrentThreadID);
};
В потоке, принимающем данные от устройства, всё проще:
{
ReadFile(hDevice, pData...);
если (pData_является_извещением_о_завершенности_команды)
SetEvent(hEvent); // Отпускаю теоретически единственный ждущий поток
};
Где могут быть проблемы?
Вижу, что вместо PulseEvent MSDN советует conditional variables, т.к. функция unreliable, это раз.
Функцию PulseEvent крайне не хочется использовать, думаю обойтись вообще без объектов ядра. Сейчас доделаю кое-какой вариант, и выложу своё решение, но работоспособность будет проверена на малой нагрузке только завтра во второй половине дня...
Вот я об этом и говорю, здесь вы неправы. Критическая секция должна быть глобальная в пределах всех объектов.
Прошу прощения, мне кажется я не сказал важную деталь: один экземпляр класса обладает эксклюзивно одним подключенным устройством, т.е. не возникнет ситуаций, когда 1. экземпляров больше, чем устройств, 2. доступ к одному устройству осуществлён более, чем одним объектом. В таких условиях, имхо, критическая секция вполне глобальна на своём месте. С другой стороны, если сделать её совсем уж глобальной, то получится несколько медленнее управлять параллельно несколькими устройствами. Хотя, это всё теория - на практике устройство будет всего одно, ровно как и экземпляр класса в приложении.
Обошелся-таки без объектов ядра, правда длинной кода и числом переменных. Вот что стало с телом класса:
{
HANDLE hDevice;
void Thread(...);
// Объекты синхронизации
bool bWaitForComplete; // Потоки, ждущие завершения команды, должны продолжать ждать
UI32 nWaitCounter; // Счетчик ожидающих завершения команды потоков
DWORD dwLastWriterID; // ID потока, отдавшего команду последним
bool bThreadWork; // поток-приёмник данных должен работать
CSonicCriticalSection WriteCS; // Критическая секция вокруг отправки команд
void ReleaseAllThreads(); // Освободить все ждущие подтверждения потоки
public:
// Заслать команду и ждать, пока она будет готова (true) или отменена (false)
virtual bool SendCommand(ECmd eCmd);
CCtrl();
~CCtrl();
};
Часть тела метода-потока, принимающего данные с устройства:
{
// Читаю данные с устройства
ReadFile(hDevice, ...);
...
case извещение_о_завершении_последней_команды:
// Не знаю, зачем, но буду работать в критической секции
WriteCS.Enter();
// Подтверждаю устройству получение извещения (требуется)
WriteFile(hDevice, я_получил_извещение);
// Отпускаю ждущий поток, теоретически один
ReleaseAllThreads();
// Разрешаю остальным потокам отдавать команды
WriteCS.Leave();
break;
...
};
Функция подачи команды устройству:
if(hDevice == INVALID_HANDLE_VALUE)
return false;
DWORD dwCurrentThreadID = GetCurrentThreadId(); // Идентификатор текущего потока
WriteCS.Enter();
dwLastWriterID = dwCurrentThreadID; // Я отдал команду последним
ReleaseAllThreads(); // Отпускаю все остальные потоки
// Собственно, процесс отдачи команды устройству
WriteFile(hDevice, команда);
// Сам-то встану ждать окончания команды
bWaitForComplete = true;
nWaitCounter += 1;
WriteCS.Leave();
// Жду ответа от потока, читающего данные с устройства
while(bWaitForComplete)
SwitchToThread();
InterlockedDecrement((volatile LONG *)&nWaitCounter);
// Возвращаю ответ, я ли это завершил свою команду?
return (dwLastWriterID == dwCurrentThreadID);
И последнее, освобождение потоков:
{
WriteCS.Enter();
bWaitForComplete = false;
// Немного скушает текущий CPU, но, имхо, сработает очень быстро
while(nWaitCounter)
SwitchToThread();
WriteCS.Leave();
};
На этом голова работать перестала, утро вечера мудренее.
[QUOTE=SimSonic]Любой поток может отдать команду Controller.SendCommand(...) в любое время, независимо от других, при этом вызов функций должен заблокироваться до того момента, пока команда не будет выполнена (пока с устройства не придёт пакет "Я сделало команду" и поток CCtrl::Thread это не прочитает), при этом вернув true. Либо, если любой другой поток отдаёт любую другую (или эту же) команду устройству, то наш рассматриваемый поток должен разблокироваться и вернуть false (и он поймёт, что устройство начало выполнение другой команды), и блокируется уже поток, отдавший последнюю по времени команду.[/QUOTE]Если одна команда должна перебивать другую, то, получается, вся эта возня затеяна только для того, чтобы пропускать статистику при ожидании ответа на команду?
[QUOTE=SimSonic]один экземпляр класса обладает эксклюзивно одним подключенным устройством...
при этом вызов функций должен заблокироваться...
Либо, если любой другой поток отдаёт любую другую (или эту же) команду устройству, то наш рассматриваемый поток должен разблокироваться[/QUOTE]Как "любой другой поток" должен "отдать команду" устройству, если "функции должны быть заблокированы"? Если это должно быть возможно, то это ничто иное, как просто вообще отсутствие всякой блокировки
Нездешний, опередил. :)
dwLastWriterID = dwCurrentThreadID;
ReleaseAllThreads();
...
void CCtrl::ReleaseAllThreads()
{
WriteCS.Enter(); <------ все, приехали, висим (мы уже в крит.секции и пытаемся войти еще раз, не освободив ее)
Попытаюсь сформулировать более точно:
Функция CCtrl::SendCommand блокируется до тех пор, пока 1. команда не будет успешно завершена (return true); 2. другой поток после моего не вызвал эту же функцию, тем самым отменив мою команду (return false). Я никак не должен возвращаться из неё сразу после отдачи команды, не дожидаясь завершения.
Нет, грубо говоря сверху над этим объектом будет висеть планировщик, делающий набор действий данным устройством по некоему своему плану. Параллельно может вмешаться с другого потока пользователь, тыркающий мышку во все стороны в своём окне управления, тем самым заборов к черту действия планировщика. Последний же, увидев, что его очередная команда вернулась с результатом false, должен продолжить свои действия, начиная с попытки повторить отмененную. Вот так.
dwLastWriterID = dwCurrentThreadID;
ReleaseAllThreads();
...
void CCtrl::ReleaseAllThreads()
{
WriteCS.Enter(); <------ все, приехали, висим (мы уже в крит.секции и пытаемся войти еще раз, не освободив ее)
Если что, один поток может входить в одну критическую секцию неограниченно (разумно) много раз, столько же раз он должен её и освободить. Вызов ReleaseAllThreads() происходит в двух местах всего - в SendCommand, внутри этой же самой К.С., и вновь внутри неё в switch-е потока приёмника. Никакой ситуации с блокировкой, имхо, не возникнет... хотя тут я ставлю вообще вход в К.С. в этой функции под сомнение - а нужно ли :)
Обладает ли пользователь приоритетом над планировщиком? Т.е. пошла команда от пользователя, ждем ответ, в этот момент включается планировщик. Должна ли его команда "забороть" пользовательскую, или пользователь все же должен дождаться ответа на свою команду? Должен ли пользователь иметь возможность "забороть" свою предыдущую команду новой, или операция с точки зрения пользователя атомарна?
{
CRITICAL_SECTION csFile;
void Thread()
{
...
EnterCriticalSection(&csFile);
ReadFile(...);
LeaveCriticalSection(&csFile);
...
}
bool SendCommand()
{
...
EnterCriticalSection(&csFile);
WriteFile(...);
LeaveCriticalSection(&csFile);
...
}
}
Обладает ли пользователь приоритетом над планировщиком?
Нет, скорее всего можно сказать, что они равноправны.
Должен ли пользователь иметь возможность "забороть" свою предыдущую команду новой, или операция с точки зрения пользователя атомарна?
Однозначно, да. Не атомарна, и медленна, порядка секунд-минут.
Т.е. пошла команда от пользователя, ждем ответ, в этот момент включается планировщик. Должна ли его команда "забороть" пользовательскую, или пользователь все же должен дождаться ответа на свою команду?
Так, так, так... когда пользователь отдал команду и прекратил тем самым действие планировщика. Он, в свою очередь, ждёт таймаут, и пытается повторить свою команду. Скорее всего, будет сделано так, что таймаут будет отсчитываться от момента завершения команды пользователя, но это в более высоком уровне кода. Теоретически может возникнуть ситуация, когда пользователь снова дёрнет мышкой аккурат под конец таймаута. В таком случае правильно было бы отдать приоритет пользователю, но т.к. время выполнения команды гораздо больше времени, требуемого пользователю на "дёрнуть мышь", почему бы не оставить это как есть, равноприоритетно.
Сбегал к устройству, пока всё работает, но проверял на одном потоке =) второго еще нету. Продолжу кодить. Кста, сделал еще команду
{
// Если некуда отправлять данные, выходим
if(hDevice == INVALID_HANDLE_VALUE)
return false;
WriteCS.Enter();
dwLastWriterID = GetCurrentThreadId(); // Я отдал команду последним
ReleaseAllThreads(); // Отпускаю все остальные потоки
// Собственно, процесс отдачи команды устройству
WriteFile(hDevice, команда);
WriteCS.Leave();
};
которая не блокируется. Авось кому этот тред на форуме еще пригодится...
если bWaitForComplete это bWaitForCmdComplete тогда рассмотрим такой сценарий:
поток 1 входит в SendCommand , доходим до while(bWaitForComplete) SwitchToThread();
Произошла передача управления другому потоку... и где гарантия что это поток чтения?
поток 2 получив управление также входит в SendCommand, переходит в ReleaseAllThreads, сбрасывает флаг bWaitForComplete и... ой...
планировщик решил отдать управление потоку 1, а он в свое время спокойно выходит из функции:) но ответ-то еще не получен:)
p.s.
что-то я вообще не могу догнать логику твоих алгоритмов...
Тогда рассмотрим такой сценарий:
Поток 1 входит в SendCommand, доходим до while(bWaitForComplete) SwitchToThread();
Произошла передача управления другому потоку... и где гарантия, что это поток чтения?
В том-то и дело, что это не обязательно должен быть поток чтения! и нужно определить, какой именно поток отпустит...
Поток 2, получив управление, также входит в SendCommand, переходит в ReleaseAllThreads, сбрасывает флаг bWaitForComplete и...
Планировщик решил отдать управление потоку 1, а он в свое время спокойно выходит из функции : ) но ответ-то еще не получен : )
Да, да, да! Ты прав! Если параллельный поток отдал команду (по времени получается более позднюю), то ожидаемая предыдущая команда не может быть выполнена! Устройство начинает исполнять новую, свежую. Для этого перед выходом и стоит сравнение ID потоков, текущего с тем, который отдал более позднюю команду. И если "моя" команда не завершена - return false. Если "моя" - return true. А меняется ID потока в поле класса DWORD CCtrl::dwLastWriterID только внутри критической секции и до фактической отправки команды. Одновременно висит не более одного SendCommand, и если был предыдущий - он вернет незавершенность отданной команды, и более верхняя иерархия позже сможет приступить к её повторению. Вот логика :)
queue qReceivedData;//очередь входящих данных
DWORD dwLastSendTreadID;//ответ на чью команду ждем?
CRITICAL_SECTION csCmd;
CRITICAL_SECTION csReceivedData;
CRITICAL_SECTION csThreadID;
void Thread()
{
while (!bStopped)
{
//если очередь команд не пуста - пошлем одну
EnterCriticalSection(&csCmd);
if (qCmd.size() != 0)
{
send_command(qCmd.front());
qCmd.pop();
}
LeaveCriticalSection(&csCmd);
//если удастся принять сообщение - положим в очередь принятых
EnterCriticalSection(&csReceivedData);
if (Read)
{
qReceivedData.push(readed_data);
}
LeaveCriticalSection(&csReceivedData);
}
}
void SendCommand()
{
//пошлем команду
EnterCriticalSection(&csSend);
qCmd.push(command);
EnterCriticalSection(&csThreadID);
dwLastSendThreadID = GetCurrentThreadID();
LeaveCriticalSection(&csThreadID);
LeaveCriticalSection(&csSend);
//ждем ответ
bool bOut = false;
bool bReturn = false;
while (!bOut)
{
EnterCriticalSection(&csReceivedData);
received_data = qReceivedData.front();
qReceivedData.pop();
LeaveCriticalSection(&csReceivedData);
EnterCriticalSection(&csThreadID);
if (received_data == command_response)
{
bOut = true; //ответ получен
if (dwLastSendThreadID == GetCurrentThreadId()) bReturn = true; //ответ получен на команду из данного потока
else bReturn = false; //ответ получен на команду из другого потока
}
LeaveCriticalSection(&csThreadID);
}
return bReturn;
}
//получение статистики
Statistics GetStatistics()
{
EnterCriticalSection(&csReceivedData);
received_data = qReceivedData.front();
qReceivedData.pop();
LeaveCriticalSection(&csReceivedData);
if (received_data == statistics_type) return received_data;
}
что-то я не пойму, а что за помощь тебе нужна?:))
Согласен с Нездешним, мне тоже кажется очереди под контролем планировщика покрасивей будет.
alt@zir, ну пост опубликовал, потому что идей вообще не было, с чего начать. Просто сидел, и не мог придумать. А потом переросло в обсуждение, как именно сделать это надёжнее и быстрее. А почему бы и нет? должны же быть нетривиальные вопросы! ))
В целом, правильно. Планирует высокоуровневый планировщик, а также сидящий за монитором пользователь. Каждый - хочет от устройства своего. Кто последний по времени, на того устройство и работает. При этом каждый видит, обломался он или нет. Вотъ.
Мне и не хватало альтернативных вариантов, т.к. конкретно такое делаю сам в первый раз :)