Справочник функций

Ваш аккаунт

Войти через: 
Забыли пароль?
Регистрация
Информацию о новых материалах можно получать и без регистрации:

Почтовая рассылка

Подписчиков: -1
Последний выпуск: 19.06.2015

Помогите с многопоточностью! (тут интересно)

63K
14 марта 2011 года
SimSonic
16 / / 14.03.2011
Значит, приложение, в нём есть класс, управляющий неким приспособлением. Управление, будем считать, производится функциями ReadFile и WriteFile. Примерная схема:
Код:
class CCtrl
{
public:
   void Thread()
   {
      инициализация;
      while(bWork)
      {
         ReadFile(hDevice, ...);
         switch(полученные данные) // парсим
         {
            case статистика: тихо_мирно_использую; break;
            case извещение:
               // ?!?!?
               break;
         };
      };
      финализация;
   }
    CCtrl() { создать поток из метода Thread(); };
   ~CCtrl() { завершить поток в Thread(); };
   // здесь самый вопрос:
   bool SendCommand(int nCommand, int nParam)
   {
      // ?!?!?
      WriteFile(hDevice, ...);
      // ?!??!
      // PROFIT !
      return bResult;
   };
};
CCtrl Controller;

Теперь, на словах:
Устройство регулярно, но не быстро, посылает данные, которые успешно считывает поток CCtrl::Thread() вызовом ReadFile. Среди этих данных идут пакеты статистики, а также особый пакет-извещение "Я сделало команду".
Команды устройству отдаются несколькими (!) другими потоками вызовом метода SendCommand, как минимум двумя. Устройство совершает команду некоторое значимое время.
Любой поток может отдать команду Controller.SendCommand(...) в любое время, независимо от других, при этом вызов функций должен заблокироваться до того момента, пока команда не будет выполнена (пока с устройства не придёт пакет "Я сделало команду" и поток CCtrl::Thread это не прочитает), при этом вернув true. Либо, если любой другой поток отдаёт любую другую (или эту же) команду устройству, то наш рассматриваемый поток должен разблокироваться и вернуть false (и он поймёт, что устройство начало выполнение другой команды), и блокируется уже поток, отдавший последнюю по времени команду.

Вопрос: помогите правильно и адекватно всё это синхронизовать.
Предполагаю использование semaphore / event / mutex / critical section / WaitForSingle(Multiple)Object(s), и т.п., но вот в голове схема синхронизации почему-то сама в этот раз не возникает =)
278
14 марта 2011 года
Alexander92
1.1K / / 04.08.2008
Вам нужно подробно познакомиться с техникой синхронизацией потоков в принципе, это вам поможет. Мне сложно говорить что-либо конкретное, не зная подробно вашей задачи, однако распишу в двух словах; если возникнут какие-то вопросы по сути - отвечу.

В Windows существует несколько объектов синхронизации.
1. События (events). Объекты, поддерживающие два состояния (условно говоря - включен/выключен, signaled/non-signaled). Переключение между состояниями задается непосредственно программистом.
2. Мьютексы (mutexes). Также поддерживает два состояния; считается включенным, если его объект принадлежит какому-либо потоку, и выключенным в противном случае.
3. Семафоры (semaphores). Кроме состояния, имеет счетчик (между нулем и заданным максимальным значением). Считается включенным при значении счетчика, отличном от нуля, и выключенным при нулевом значении счетчика.
4. Критические секции (critical sections). Применяются для контролируемого многопоточного доступа к ресурсу; ресурс, используемый внутри критической секции, автоматически блокируется для других потоков.

В дополнение к этому перечню, привожу список функций для работы с синхронизацией. Отдельное внимание обращаю на функции WaitForSingleObject() и WaitForMultipleObjects(), они могут оказаться вам наиболее полезными.
63K
14 марта 2011 года
SimSonic
16 / / 14.03.2011
Допустим, попробую сделать синхронизацию при помощи 1 критической секции и 1 события. Добавляю поля в класс:
 
Код:
class CCtrl { ...
   критическая_секция CrSec;
   HANDLE hEvent; // = CreateEvent(NULL, TRUE, TRUE, NULL); в конструкторе
   DWORD nThreadWhichWait; // Здесь будет ID потока, ждущего завершения своей команды
   ... };

Теперь метод, отдающий команды устройству:
Код:
bool CCtrl::SendCommand(int nCmd)
{
   // В критической секции
   CrSec.Enter();
   nThreadWhichWait = CurrentThreadID; // Теперь "Я!" буду ждать завершение команды!
   WriteFile(hDevice, "nCmd"...); // Самое главное - отдаю самую свежую команду устройству.
   PulseEvent(hEvent); // Все остальные потоки пропускаются, и я сам тут же его занимаю.
   CrSec.Leave();
   // Жду, пока меня отпустят (другая команда, или приём-извещения-в-потоке).
   WaitForSingleObject(hEvent);
   // Определяю, что сказать вызывающему...
   return (nThreadWhichWait == CurrentThreadID);
};

В потоке, принимающем данные от устройства, всё проще:
 
Код:
while(bWork)
{
   ReadFile(hDevice, pData...);
   если (pData_является_извещением_о_завершенности_команды)
      SetEvent(hEvent); // Отпускаю теоретически единственный ждущий поток
};

Где могут быть проблемы?
Вижу, что вместо PulseEvent MSDN советует conditional variables, т.к. функция unreliable, это раз.
278
14 марта 2011 года
Alexander92
1.1K / / 04.08.2008
Сходу вижу только одно: критическая секция должна быть глобальной, а не принадлежать какому-то конкретному потоку. А так - пробуйте запускать, будем смотреть походу.
63K
14 марта 2011 года
SimSonic
16 / / 14.03.2011
Alexander92, спасибо за внимание. Один поток существует параллельно с одним экземпляром класса, следовательно критическая секция на всю эту барахолку одна, глобальная в пределах объекта целиком. Управляемое устройство не всегда под рукой, сейчас (вечером) могу только строить догадки о работоспособности, ну и продумывать лучшие варианты... Кроме того, пока приложение на данном этапе неспособно выдать требуемую нагрузку по параллельным вызовам, и тем задержкам, которые будут позже.
Функцию PulseEvent крайне не хочется использовать, думаю обойтись вообще без объектов ядра. Сейчас доделаю кое-какой вариант, и выложу своё решение, но работоспособность будет проверена на малой нагрузке только завтра во второй половине дня...
278
14 марта 2011 года
Alexander92
1.1K / / 04.08.2008
Цитата: SimSonic
Один поток существует параллельно с одним экземпляром класса, следовательно критическая секция на всю эту барахолку одна, глобальная в пределах объекта целиком.


Вот я об этом и говорю, здесь вы неправы. Критическая секция должна быть глобальная в пределах всех объектов.

63K
14 марта 2011 года
SimSonic
16 / / 14.03.2011
Цитата: Alexander92
Критическая секция должна быть глобальная в пределах всех объектов.


Прошу прощения, мне кажется я не сказал важную деталь: один экземпляр класса обладает эксклюзивно одним подключенным устройством, т.е. не возникнет ситуаций, когда 1. экземпляров больше, чем устройств, 2. доступ к одному устройству осуществлён более, чем одним объектом. В таких условиях, имхо, критическая секция вполне глобальна на своём месте. С другой стороны, если сделать её совсем уж глобальной, то получится несколько медленнее управлять параллельно несколькими устройствами. Хотя, это всё теория - на практике устройство будет всего одно, ровно как и экземпляр класса в приложении.

Обошелся-таки без объектов ядра, правда длинной кода и числом переменных. Вот что стало с телом класса:

Код:
class CCtrl
{
    HANDLE hDevice;
    void   Thread(...);
    // Объекты синхронизации
    bool   bWaitForComplete; // Потоки, ждущие завершения команды, должны продолжать ждать
    UI32   nWaitCounter; // Счетчик ожидающих завершения команды потоков
    DWORD  dwLastWriterID; // ID потока, отдавшего команду последним
    bool   bThreadWork; // поток-приёмник данных должен работать
    CSonicCriticalSection WriteCS; // Критическая секция вокруг отправки команд
    void   ReleaseAllThreads(); // Освободить все ждущие подтверждения потоки
public:
    // Заслать команду и ждать, пока она будет готова (true) или отменена (false)
    virtual bool SendCommand(ECmd eCmd);
     CCtrl();
    ~CCtrl();
};

Часть тела метода-потока, принимающего данные с устройства:
Код:
while(bThreadWork)
{
    // Читаю данные с устройства
    ReadFile(hDevice, ...);
    ...
    case извещение_о_завершении_последней_команды:
        // Не знаю, зачем, но буду работать в критической секции
        WriteCS.Enter();
        // Подтверждаю устройству получение извещения (требуется)
        WriteFile(hDevice, я_получил_извещение);
        // Отпускаю ждущий поток, теоретически один
        ReleaseAllThreads();
        // Разрешаю остальным потокам отдавать команды
        WriteCS.Leave();
        break;
    ...
};

Функция подачи команды устройству:
Код:
// Если некуда отправлять данные, выходим
    if(hDevice == INVALID_HANDLE_VALUE)
        return false;
    DWORD dwCurrentThreadID = GetCurrentThreadId(); // Идентификатор текущего потока
    WriteCS.Enter();
    dwLastWriterID = dwCurrentThreadID; // Я отдал команду последним
    ReleaseAllThreads(); // Отпускаю все остальные потоки
    // Собственно, процесс отдачи команды устройству
    WriteFile(hDevice, команда);
    // Сам-то встану ждать окончания команды
    bWaitForComplete = true;
    nWaitCounter += 1;
    WriteCS.Leave();
    // Жду ответа от потока, читающего данные с устройства
    while(bWaitForComplete)
        SwitchToThread();
    InterlockedDecrement((volatile LONG *)&nWaitCounter);
    // Возвращаю ответ, я ли это завершил свою команду?
    return (dwLastWriterID == dwCurrentThreadID);

И последнее, освобождение потоков:
 
Код:
void CCtrl::ReleaseAllThreads()
{
    WriteCS.Enter();
    bWaitForComplete = false;
    // Немного скушает текущий CPU, но, имхо, сработает очень быстро
    while(nWaitCounter)
        SwitchToThread();
    WriteCS.Leave();
};

На этом голова работать перестала, утро вечера мудренее.
535
14 марта 2011 года
Нездешний
537 / / 17.01.2008
[QUOTE=Alexander92]Критическая секция должна быть глобальная в пределах всех объектов.[/QUOTE]Если все объекты работают с одним и тем же ресурсом - тогда да. В противном случае вовсе не обязательно

[QUOTE=SimSonic]Любой поток может отдать команду Controller.SendCommand(...) в любое время, независимо от других, при этом вызов функций должен заблокироваться до того момента, пока команда не будет выполнена (пока с устройства не придёт пакет "Я сделало команду" и поток CCtrl::Thread это не прочитает), при этом вернув true. Либо, если любой другой поток отдаёт любую другую (или эту же) команду устройству, то наш рассматриваемый поток должен разблокироваться и вернуть false (и он поймёт, что устройство начало выполнение другой команды), и блокируется уже поток, отдавший последнюю по времени команду.[/QUOTE]Если одна команда должна перебивать другую, то, получается, вся эта возня затеяна только для того, чтобы пропускать статистику при ожидании ответа на команду?

[QUOTE=SimSonic]один экземпляр класса обладает эксклюзивно одним подключенным устройством...
при этом вызов функций должен заблокироваться...
Либо, если любой другой поток отдаёт любую другую (или эту же) команду устройству, то наш рассматриваемый поток должен разблокироваться[/QUOTE]Как "любой другой поток" должен "отдать команду" устройству, если "функции должны быть заблокированы"? Если это должно быть возможно, то это ничто иное, как просто вообще отсутствие всякой блокировки
278
14 марта 2011 года
Alexander92
1.1K / / 04.08.2008
Цитата: Нездешний
Если все объекты работают с одним и тем же ресурсом - тогда да. В противном случае вовсе не обязательно.


Нездешний, опередил. :)

535
14 марта 2011 года
Нездешний
537 / / 17.01.2008
 
Код:
WriteCS.Enter();  <------ вошли в критическую секцию
dwLastWriterID = dwCurrentThreadID;
ReleaseAllThreads();
...

void CCtrl::ReleaseAllThreads()
{
    WriteCS.Enter();  <------ все, приехали, висим (мы уже в крит.секции и пытаемся войти еще раз, не освободив ее)
63K
15 марта 2011 года
SimSonic
16 / / 14.03.2011
Цитата: Нездешний
Как "любой другой поток" должен "отдать команду" устройству, если "функции должны быть заблокированы"? Если это должно быть возможно, то это ничто иное, как просто вообще отсутствие всякой блокировки


Попытаюсь сформулировать более точно:
Функция CCtrl::SendCommand блокируется до тех пор, пока 1. команда не будет успешно завершена (return true); 2. другой поток после моего не вызвал эту же функцию, тем самым отменив мою команду (return false). Я никак не должен возвращаться из неё сразу после отдачи команды, не дожидаясь завершения.

Цитата: Нездешний
Если одна команда должна перебивать другую, то, получается, вся эта возня затеяна только для того, чтобы пропускать статистику при ожидании ответа на команду?


Нет, грубо говоря сверху над этим объектом будет висеть планировщик, делающий набор действий данным устройством по некоему своему плану. Параллельно может вмешаться с другого потока пользователь, тыркающий мышку во все стороны в своём окне управления, тем самым заборов к черту действия планировщика. Последний же, увидев, что его очередная команда вернулась с результатом false, должен продолжить свои действия, начиная с попытки повторить отмененную. Вот так.

 
Код:
WriteCS.Enter();  <------ вошли в критическую секцию
dwLastWriterID = dwCurrentThreadID;
ReleaseAllThreads();
...
void CCtrl::ReleaseAllThreads()
{
    WriteCS.Enter();  <------ все, приехали, висим (мы уже в крит.секции и пытаемся войти еще раз, не освободив ее)

Если что, один поток может входить в одну критическую секцию неограниченно (разумно) много раз, столько же раз он должен её и освободить. Вызов ReleaseAllThreads() происходит в двух местах всего - в SendCommand, внутри этой же самой К.С., и вновь внутри неё в switch-е потока приёмника. Никакой ситуации с блокировкой, имхо, не возникнет... хотя тут я ставлю вообще вход в К.С. в этой функции под сомнение - а нужно ли :)
535
15 марта 2011 года
Нездешний
537 / / 17.01.2008
[QUOTE=SimSonic]один поток может входить в одну критическую секцию неограниченно (разумно) много раз, столько же раз он должен её и освободить.[/QUOTE]Вы правы, а я ошибся

Обладает ли пользователь приоритетом над планировщиком? Т.е. пошла команда от пользователя, ждем ответ, в этот момент включается планировщик. Должна ли его команда "забороть" пользовательскую, или пользователь все же должен дождаться ответа на свою команду? Должен ли пользователь иметь возможность "забороть" свою предыдущую команду новой, или операция с точки зрения пользователя атомарна?
535
15 марта 2011 года
Нездешний
537 / / 17.01.2008
Пока же могу сказать, что при используемой схеме (вызовы методов класса из разных потоков) нужно синхронизировать доступ ко всем членам класса, с которыми возможны проблемы. Например, к файлу, к которому идет обращение из разных потоков:

Код:
class CCtrl
{
  CRITICAL_SECTION csFile;
 
  void Thread()
  {
    ...
    EnterCriticalSection(&csFile);
    ReadFile(...);
    LeaveCriticalSection(&csFile);
    ...
  }
 
  bool SendCommand()
  {
    ...
    EnterCriticalSection(&csFile);
    WriteFile(...);
    LeaveCriticalSection(&csFile);
    ...
  }
}
63K
15 марта 2011 года
SimSonic
16 / / 14.03.2011
Цитата: Нездешний

Обладает ли пользователь приоритетом над планировщиком?


Нет, скорее всего можно сказать, что они равноправны.

Цитата: Нездешний

Должен ли пользователь иметь возможность "забороть" свою предыдущую команду новой, или операция с точки зрения пользователя атомарна?


Однозначно, да. Не атомарна, и медленна, порядка секунд-минут.

Цитата: Нездешний

Т.е. пошла команда от пользователя, ждем ответ, в этот момент включается планировщик. Должна ли его команда "забороть" пользовательскую, или пользователь все же должен дождаться ответа на свою команду?


Так, так, так... когда пользователь отдал команду и прекратил тем самым действие планировщика. Он, в свою очередь, ждёт таймаут, и пытается повторить свою команду. Скорее всего, будет сделано так, что таймаут будет отсчитываться от момента завершения команды пользователя, но это в более высоком уровне кода. Теоретически может возникнуть ситуация, когда пользователь снова дёрнет мышкой аккурат под конец таймаута. В таком случае правильно было бы отдать приоритет пользователю, но т.к. время выполнения команды гораздо больше времени, требуемого пользователю на "дёрнуть мышь", почему бы не оставить это как есть, равноприоритетно.

Сбегал к устройству, пока всё работает, но проверял на одном потоке =) второго еще нету. Продолжу кодить. Кста, сделал еще команду

Код:
void SendCommandAsync(...)
{
    // Если некуда отправлять данные, выходим
    if(hDevice == INVALID_HANDLE_VALUE)
        return false;
    WriteCS.Enter();
    dwLastWriterID = GetCurrentThreadId(); // Я отдал команду последним
    ReleaseAllThreads(); // Отпускаю все остальные потоки
    // Собственно, процесс отдачи команды устройству
    WriteFile(hDevice, команда);
    WriteCS.Leave();
};

которая не блокируется. Авось кому этот тред на форуме еще пригодится...
38K
15 марта 2011 года
alt@zir
29 / / 28.08.2008
если bWaitForComplete локальная переменная функции SendCommand тогда на while(bWaitForComplete) все закончится...
если bWaitForComplete это bWaitForCmdComplete тогда рассмотрим такой сценарий:
поток 1 входит в SendCommand , доходим до while(bWaitForComplete) SwitchToThread();
Произошла передача управления другому потоку... и где гарантия что это поток чтения?
поток 2 получив управление также входит в SendCommand, переходит в ReleaseAllThreads, сбрасывает флаг bWaitForComplete и... ой...
планировщик решил отдать управление потоку 1, а он в свое время спокойно выходит из функции:) но ответ-то еще не получен:)
p.s.
что-то я вообще не могу догнать логику твоих алгоритмов...
63K
15 марта 2011 года
SimSonic
16 / / 14.03.2011
alt@zir, спасибо за вопрос: bWaitForComplete это действительно bWaitForCmdComplete, опечатался. Исправил свой пост на предыдущей странице.

Цитата: alt@zir

Тогда рассмотрим такой сценарий:
Поток 1 входит в SendCommand, доходим до while(bWaitForComplete) SwitchToThread();
Произошла передача управления другому потоку... и где гарантия, что это поток чтения?


В том-то и дело, что это не обязательно должен быть поток чтения! и нужно определить, какой именно поток отпустит...

Цитата: alt@zir

Поток 2, получив управление, также входит в SendCommand, переходит в ReleaseAllThreads, сбрасывает флаг bWaitForComplete и...
Планировщик решил отдать управление потоку 1, а он в свое время спокойно выходит из функции : ) но ответ-то еще не получен : )


Да, да, да! Ты прав! Если параллельный поток отдал команду (по времени получается более позднюю), то ожидаемая предыдущая команда не может быть выполнена! Устройство начинает исполнять новую, свежую. Для этого перед выходом и стоит сравнение ID потоков, текущего с тем, который отдал более позднюю команду. И если "моя" команда не завершена - return false. Если "моя" - return true. А меняется ID потока в поле класса DWORD CCtrl::dwLastWriterID только внутри критической секции и до фактической отправки команды. Одновременно висит не более одного SendCommand, и если был предыдущий - он вернет незавершенность отданной команды, и более верхняя иерархия позже сможет приступить к её повторению. Вот логика :)

535
15 марта 2011 года
Нездешний
537 / / 17.01.2008
Значит, приоритеты пользователя и планировщика равны. В этом случае, возможно, будет удобнее организовать очереди команд и входящих сообщений. Псевдокод:
Код:
queue qCmd;//очередь команд
queue qReceivedData;//очередь входящих данных
DWORD dwLastSendTreadID;//ответ на чью команду ждем?

CRITICAL_SECTION csCmd;
CRITICAL_SECTION csReceivedData;
CRITICAL_SECTION csThreadID;

void Thread()
{
  while (!bStopped)
  {
    //если очередь команд не пуста - пошлем одну
    EnterCriticalSection(&csCmd);
    if (qCmd.size() != 0)
    {
      send_command(qCmd.front());
      qCmd.pop();
    }
    LeaveCriticalSection(&csCmd);
   
    //если удастся принять сообщение - положим в очередь принятых
    EnterCriticalSection(&csReceivedData);    
    if (Read)
    {
      qReceivedData.push(readed_data);
    }
    LeaveCriticalSection(&csReceivedData);
  }
}

void SendCommand()
{
  //пошлем команду
  EnterCriticalSection(&csSend);
  qCmd.push(command);
  EnterCriticalSection(&csThreadID);
  dwLastSendThreadID = GetCurrentThreadID();
  LeaveCriticalSection(&csThreadID);
  LeaveCriticalSection(&csSend);
 
  //ждем ответ
  bool bOut = false;
  bool bReturn = false;
  while (!bOut)
  {
    EnterCriticalSection(&csReceivedData);
    received_data = qReceivedData.front();
    qReceivedData.pop();
    LeaveCriticalSection(&csReceivedData);
   
    EnterCriticalSection(&csThreadID);
    if (received_data == command_response)
    {
      bOut = true; //ответ получен
      if (dwLastSendThreadID == GetCurrentThreadId())  bReturn = true; //ответ получен на команду из данного потока
      else bReturn = false; //ответ получен на команду из другого потока
    }
    LeaveCriticalSection(&csThreadID);
  }
 
  return bReturn;
}

//получение статистики
Statistics GetStatistics()
{
  EnterCriticalSection(&csReceivedData);
  received_data = qReceivedData.front();
  qReceivedData.pop();
  LeaveCriticalSection(&csReceivedData);
 
  if (received_data == statistics_type) return received_data;
}
38K
15 марта 2011 года
alt@zir
29 / / 28.08.2008
устройство исполняет только одну команду, поданную последней? то есть ты таким образом тупо вытесняешь команды от разных потоков, а что-то другое выполняет их планирование. я правильно тебя понимаю?
что-то я не пойму, а что за помощь тебе нужна?:))

Согласен с Нездешним, мне тоже кажется очереди под контролем планировщика покрасивей будет.
63K
15 марта 2011 года
SimSonic
16 / / 14.03.2011
Нездешний, бегло глянул, сейчас некогда уже, вечером попробую прицепить такой вариант - выглядит несколько короче моего, и переменных не так много... благодарю!

alt@zir, ну пост опубликовал, потому что идей вообще не было, с чего начать. Просто сидел, и не мог придумать. А потом переросло в обсуждение, как именно сделать это надёжнее и быстрее. А почему бы и нет? должны же быть нетривиальные вопросы! ))

Цитата:
То есть ты таким образом тупо вытесняешь команды от разных потоков, а что-то другое выполняет их планирование, я правильно тебя понимаю?


В целом, правильно. Планирует высокоуровневый планировщик, а также сидящий за монитором пользователь. Каждый - хочет от устройства своего. Кто последний по времени, на того устройство и работает. При этом каждый видит, обломался он или нет. Вотъ.

Цитата:
Согласен с Нездешним, мне тоже кажется очереди под контролем планировщика покрасивей будет.


Мне и не хватало альтернативных вариантов, т.к. конкретно такое делаю сам в первый раз :)

Реклама на сайте | Обмен ссылками | Ссылки | Экспорт (RSS) | Контакты
Добавить статью | Добавить исходник | Добавить хостинг-провайдера | Добавить сайт в каталог