Справочник функций

Ваш аккаунт

Войти через: 
Забыли пароль?
Регистрация
Информацию о новых материалах можно получать и без регистрации:

Почтовая рассылка

Подписчиков: -1
Последний выпуск: 19.06.2015

Синхронизация потоков

7.2K
02 августа 2008 года
polaroid
94 / / 05.07.2008
Стоит такая задача: считывать данные с сокета (HTTP протокол), парсить их и записывать в файл. Сделать это нужно в несколько потоков(допустим 5). Никак не могу допереть, как синхронизировать запись в файл, т.е в моей реализации строки записываются не в том порядке, в котором должны.

поток:
Код:
DWORD WINAPI thread(void *argv)
{
    SOCKET s = 0;
    int rcv_size, snd_size, sz = sizeof(int);
    long ptr = 0, bytesRead = 0, p_size = MAX_BUF;

    int cnt = 0;
    char number[10];
    char *query_str;
    char *recv_str, *send_str, *szPage, *tmp;

    int exp_count = 0;
    int ovector[VECT_SIZE];
    char substring[MAX_STR];
    int matches = 0;

    int i = 0,j = 0;

    s = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);

    getsockopt(s, SOL_SOCKET, SO_SNDBUF, (char*)&snd_size, &sz);
    if ((send_str = (char*)malloc(snd_size)) == NULL)
        error(10);

    getsockopt(s, SOL_SOCKET, SO_RCVBUF, (char*)&rcv_size, &sz);
    if ((recv_str = (char*)malloc(rcv_size)) == NULL)
        error(10);

    closesocket(s);

    if ((szPage = (char*)malloc(MAX_BUF)) == NULL)
        error(10);

    while (1) {
        [COLOR="Red"]cnt = counter;
        if (cnt > stopCount)
            break;

        while (InterlockedExchangeAdd(&counter, 1) == cnt)
            Sleep(10);[/COLOR]

        s = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
       
        connect(s, (SOCKADDR*)&sin_remote, sizeof(sin_remote));

        memset(send_str, 0, snd_size);
        strcpy(send_str, "GET http://");
        strcat(send_str, host);
        [COLOR="SeaGreen"]sprintf(number, "%d", cnt);
        query_str = substr(query, "{ID}",number);
        strcat(send_str, query_str);[/COLOR]
        free(query_str);
        strcat(send_str, " HTTP/1.0\nHost: ");
        strcat(send_str, host);
        strcat(send_str, "\nUser-Agent: ");
        strcat(send_str, agent);
        strcat(send_str, "\nAccept: */*\nAccept-Language: en-US\n\n\n");
   
        send(s, send_str, (int)strlen(send_str)-1, 0);

        memset(szPage, 0, p_size);
        ptr = 0;

        while (1) {
            memset(recv_str, 0, rcv_size);
            if ((bytesRead = recv(s, recv_str, rcv_size, 0)) <= 0)
                break;

            ptr += bytesRead;

            if (ptr >= p_size) {
                p_size += MAX_BUF;
                printf("p_size = %d\n", p_size);
                if ((tmp = realloc(szPage, p_size)) == NULL)
                    error(10);
            }

            strcat(szPage, recv_str);
        }
        closesocket(s);

        if ((matches = pcre_exec(re, NULL, szPage, ptr, 0, 0, ovector, VECT_SIZE)) <= 0)
            fputs("Not found\n", out);

        for (i = 1; i <= matches; i++) {
            memset(substring, 0, MAX_STR);
            pcre_copy_substring(szPage, ovector, matches, i-1, substring, MAX_STR);
            fprintf(out, "%s\n", substring);
        }
    }

    free(send_str);
    free(recv_str);
    free(szPage);
    return 0;
}


В потоке я просто подставляю в ссылку (HTTP запрос) определенное число (код выделен зеленым), потом вызываю InterlockedExchangeAdd чтобы увеличить это число на 1 (т.к. counter - общая переменная для всех потоков).

вызов из программы:
 
Код:
#define THREADS_NUM 5
           counter = startCount;
    for (i = 0; i <= stopCount; i++) {
        if (i >= THREADS_NUM) break;
        hThread = (HANDLE)_beginthreadex(NULL, 0, thread, NULL, 0, &dwThreadID);
    }

    WaitForMultipleObjects(i-1, hThread, TRUE, INFINITE);


startCount - начальная граница диапазона
stopCount - соответсвтенно конечная граница диапазона

Так вот, когда я запускаю этот код, получаю асинхронный вывод в файл, т.е. фактически строки просто пишутся туда "от балды".
Вопрос: какой алгоритм нужно использовать, чтобы сделать вывод синхронным?
288
02 августа 2008 года
nikitozz
1.2K / / 09.03.2007
Насколько я понял в вашей программе файл - это разделяемый ресурс для ваших потоков. Поэтому по идее запись в файл неплохо было бы заключить в критическую секцию или что-то подобное.
11
02 августа 2008 года
oxotnik333
2.9K / / 03.08.2007
а в разные нелья? каждый поток в свой файл, а потом их слить по завершении
7.2K
02 августа 2008 года
polaroid
94 / / 05.07.2008
Не, в разные нельзя, т.к. важен порядок следования строк, да и сливать их потом будет напряжно (допустим если они каждый метров по 10). А насчет критических секций - что-то вылетает у меня после этого.

 
Код:
EnterCriticalSection(&cs);
for (i = 1; i <= matches; i++) {
    memset(substring, 0, MAX_STR);
    pcre_copy_substring(szPage, ovector, matches, i-1, substring, MAX_STR);
    fprintf(out, "%s\n", substring);
}
LeaveCriticalSection(&cs);


Естественно перед этим делаю InitializeCriticalSection. Когда запускаю он вылетает с ошибкой "Память не может быть "written""
288
03 августа 2008 года
nikitozz
1.2K / / 09.03.2007
Попробуйте поискать ошибку. Или использовать скажем Mutex. В любом случае какой-либо способ синхронизации непременно нужен, т.к. нельзя допустить, чтобы два или более потоков одновременно писали в один файл, иначе в файле будет полная билебирда. Хотя и при таком способе совсем не факт, что данные будут записаны в файл именно в том порядке, в котором они получены. Предположим следующий сценарий: первый поток получил данные, но квант его процессорного времени закончился и его вытеснили, а второй начал работать и успел и получить данные, и записать их в файл. Когда первый поток снова начнет работать, он запишет свои данные в файл. И получится, что в файле сначала хранятся данные из второго потока (полученные после первого) и лишь потом из первого.
Вообщем здесь надо выдумывать какой-нибудь хитро-мудрый алгоритм синхронизации, который бы не позволял другим потокам писать в файл, если у одного из них есть полученные данные, еще в файл не записанные. При этом желательно сохранить производительность и не попать в Deadlock. Читайте Рихтера. Может у него что найдете.
Или, как советует Oxotnik333, писать все в разные файлы, но скажем дополнительно перед каждой последовательностью данных писать время её записи или скажем её порядковый номер, который ведется глобально. А потом, принимая во внимание эту дополнительную иформацию, слить файлы в один.
11
03 августа 2008 года
oxotnik333
2.9K / / 03.08.2007
вот как я делал (правда на С++, но думаю вытащить из класса не составит труда)
7.2K
03 августа 2008 года
polaroid
94 / / 05.07.2008
Ошибка была из-за того, что я вставил по невнимательности DestroyCriticalSection перед WaitForMultipleObjects(). Но все равно потоки ведут себя как хотят, т.е. записывают строки не поочередно, а в том порядке, в котором они успевают получить доступ. Кстати охотник - зря ты в своем классе используешь CreateThread, т.к. Microsoft советую юзать _beginthreadex
11
03 августа 2008 года
oxotnik333
2.9K / / 03.08.2007
Цитата: polaroid
Кстати охотник - зря ты в своем классе используешь CreateThread, т.к. Microsoft советую юзать _beginthreadex


Работает же, и главное. Глюков не замечал

7.2K
03 августа 2008 года
polaroid
94 / / 05.07.2008
Спасибо тебе, правда код еще не до конца просмотрел, думаю, что решу проблему
288
03 августа 2008 года
nikitozz
1.2K / / 09.03.2007
Цитата: polaroid
Кстати охотник - зря ты в своем классе используешь CreateThread, т.к. Microsoft советую юзать _beginthreadex



Не совсем в тему топика но все же: большой разницы нет, просто _beginthreadex инициализирует струкутры для работы библиотеки C/С++ в многопоточной среде. CreateThread этого не делает. Но C/С++ библиотеки, поставляемые сегодня со средами разработки переписаны так, что даже при использовании CreateThread при первом вызове старых функций, которые могут работать неправильно в многопоточной среде, структуры эти все равно инициализируются. Просто при завершении потока их некому будет удалить, поэтому возможна утечка памяти, да и функцией signal в таком потоке пользоваться не стоит.

5
03 августа 2008 года
hardcase
4.5K / / 09.08.2005
Цитата: polaroid
Ошибка была из-за того, что я вставил по невнимательности DestroyCriticalSection перед WaitForMultipleObjects().

Семафоры и еще раз семафоры. Сколько раз убеждался - критические секции ЗЛО. Они очень сильно садят производительность так как тормозят ВСЕ потоки у процесса. Как альтернативой критических секций пользуйтесь монитором, если с семафорами неудобно.

7.2K
03 августа 2008 года
polaroid
94 / / 05.07.2008
Ок, попробую через семафор.
7.2K
03 августа 2008 года
polaroid
94 / / 05.07.2008
Блин, никак не могу разобраться с семафорами. Я так понял нужно использовать одну из Wait функций, чтобы заставить поток ждать. Но что-то результат от этого не меняется. Может кто знает где можно посмотреть реализацию подобной задачи (с семафорами или без). буду благодарен
5
03 августа 2008 года
hardcase
4.5K / / 09.08.2005
Цитата: polaroid
Не, в разные нельзя, т.к. важен порядок следования строк

Если вы слушаете несколько сокетов, как вы можете выявлять порядок следования строк? А если даете строкам номера то какой смысл в том, чтобы они были записаны именно в этом порядке?
Ваш критический ресурс - это не файл, а та сущность, которая дает строкам номера и разбирает их (или както их обрабатывает), именно его нужно защищать семафором (или мутексом - что вам больше нравится)

3
03 августа 2008 года
Green
4.8K / / 20.01.2000
Цитата: hardcase
Семафоры и еще раз семафоры. Сколько раз убеждался - критические секции ЗЛО. Они очень сильно садят производительность так как тормозят ВСЕ потоки у процесса.


Откуда такая информация?
Семафор - системный объект, критическая секция - нет. Поэтому операции с семафорами в разы медленее, чем с критической секцией.
И что подразумевается под фразой "тормозят ВСЕ потоки у процесса" ?

Цитата: hardcase

Как альтернативой критических секций пользуйтесь монитором, если с семафорами неудобно.


Что такое здесь "монитор" ?

5
04 августа 2008 года
hardcase
4.5K / / 09.08.2005
Цитата: Green
Откуда такая информация?
Семафор - системный объект, критическая секция - нет.

Ну с производительностью я загнул - давно дело было, когда на дельфях сидел, не исключаю возможности того что сам дурак - неверно расставил их. :o

Цитата: Green
Поэтому операции с семафорами в разы медленее, чем с критической секцией.

А вот тут пожалуй не соглашусь. Критическая секция на многопроцессорных (многоядерных) системах реализуется через семафор, об этом ясно сказано в МСДН (описание АПИ функции InitializeCriticalSectionAndSpinCount).

Цитата: Green
Что такое здесь "монитор" ?

Монитор - инструмент синхронизации. Реализует 2 функции вида: Enter(int) и Release(int), концептуально реализуется через таблицу семафоров, в C# эквивалентен конструкции lock(object obj) { .... }, в Win32 API его не встречал...

3
04 августа 2008 года
Green
4.8K / / 20.01.2000
Цитата: hardcase

А вот тут пожалуй не соглашусь. Критическая секция на многопроцессорных (многоядерных) системах реализуется через семафор, об этом ясно сказано в МСДН (описание АПИ функции InitializeCriticalSectionAndSpinCount).


Заметь: только на многопроцессорных системах и только, если SpinCount > 0.
Авто же использует обычную критическую секцию (SpinCount = 0).

Кстати, создается там не семафор, хотя он так называется, а event:

 
Код:
Status = NtCreateEvent(
                    &CriticalSection->LockSemaphore,
                    DESIRED_EVENT_ACCESS,
                    NULL,
                    SynchronizationEvent,
                    FALSE
                );
42K
16 августа 2008 года
passerBY
1 / / 16.08.2008
Цитата: polaroid
Не, в разные нельзя, т.к. важен порядок следования строк, да и сливать их потом будет напряжно (допустим если они каждый метров по 10). А насчет критических секций - что-то вылетает у меня после этого.

 
Код:
EnterCriticalSection(&cs);
for (i = 1; i <= matches; i++) {
    memset(substring, 0, MAX_STR);
    pcre_copy_substring(szPage, ovector, matches, i-1, substring, MAX_STR);
    fprintf(out, "%s\n", substring);
}
LeaveCriticalSection(&cs);


Естественно перед этим делаю InitializeCriticalSection. Когда запускаю он вылетает с ошибкой "Память не может быть "written""



Event попробуй поюзать. Доступ к секции возможен(ивент читабельный?), тогда работаем)))

Реклама на сайте | Обмен ссылками | Ссылки | Экспорт (RSS) | Контакты
Добавить статью | Добавить исходник | Добавить хостинг-провайдера | Добавить сайт в каталог