Синхронизация потоков
поток:
{
SOCKET s = 0;
int rcv_size, snd_size, sz = sizeof(int);
long ptr = 0, bytesRead = 0, p_size = MAX_BUF;
int cnt = 0;
char number[10];
char *query_str;
char *recv_str, *send_str, *szPage, *tmp;
int exp_count = 0;
int ovector[VECT_SIZE];
char substring[MAX_STR];
int matches = 0;
int i = 0,j = 0;
s = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
getsockopt(s, SOL_SOCKET, SO_SNDBUF, (char*)&snd_size, &sz);
if ((send_str = (char*)malloc(snd_size)) == NULL)
error(10);
getsockopt(s, SOL_SOCKET, SO_RCVBUF, (char*)&rcv_size, &sz);
if ((recv_str = (char*)malloc(rcv_size)) == NULL)
error(10);
closesocket(s);
if ((szPage = (char*)malloc(MAX_BUF)) == NULL)
error(10);
while (1) {
[COLOR="Red"]cnt = counter;
if (cnt > stopCount)
break;
while (InterlockedExchangeAdd(&counter, 1) == cnt)
Sleep(10);[/COLOR]
s = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
connect(s, (SOCKADDR*)&sin_remote, sizeof(sin_remote));
memset(send_str, 0, snd_size);
strcpy(send_str, "GET http://");
strcat(send_str, host);
[COLOR="SeaGreen"]sprintf(number, "%d", cnt);
query_str = substr(query, "{ID}",number);
strcat(send_str, query_str);[/COLOR]
free(query_str);
strcat(send_str, " HTTP/1.0\nHost: ");
strcat(send_str, host);
strcat(send_str, "\nUser-Agent: ");
strcat(send_str, agent);
strcat(send_str, "\nAccept: */*\nAccept-Language: en-US\n\n\n");
send(s, send_str, (int)strlen(send_str)-1, 0);
memset(szPage, 0, p_size);
ptr = 0;
while (1) {
memset(recv_str, 0, rcv_size);
if ((bytesRead = recv(s, recv_str, rcv_size, 0)) <= 0)
break;
ptr += bytesRead;
if (ptr >= p_size) {
p_size += MAX_BUF;
printf("p_size = %d\n", p_size);
if ((tmp = realloc(szPage, p_size)) == NULL)
error(10);
}
strcat(szPage, recv_str);
}
closesocket(s);
if ((matches = pcre_exec(re, NULL, szPage, ptr, 0, 0, ovector, VECT_SIZE)) <= 0)
fputs("Not found\n", out);
for (i = 1; i <= matches; i++) {
memset(substring, 0, MAX_STR);
pcre_copy_substring(szPage, ovector, matches, i-1, substring, MAX_STR);
fprintf(out, "%s\n", substring);
}
}
free(send_str);
free(recv_str);
free(szPage);
return 0;
}
В потоке я просто подставляю в ссылку (HTTP запрос) определенное число (код выделен зеленым), потом вызываю InterlockedExchangeAdd чтобы увеличить это число на 1 (т.к. counter - общая переменная для всех потоков).
вызов из программы:
counter = startCount;
for (i = 0; i <= stopCount; i++) {
if (i >= THREADS_NUM) break;
hThread = (HANDLE)_beginthreadex(NULL, 0, thread, NULL, 0, &dwThreadID);
}
WaitForMultipleObjects(i-1, hThread, TRUE, INFINITE);
startCount - начальная граница диапазона
stopCount - соответсвтенно конечная граница диапазона
Так вот, когда я запускаю этот код, получаю асинхронный вывод в файл, т.е. фактически строки просто пишутся туда "от балды".
Вопрос: какой алгоритм нужно использовать, чтобы сделать вывод синхронным?
for (i = 1; i <= matches; i++) {
memset(substring, 0, MAX_STR);
pcre_copy_substring(szPage, ovector, matches, i-1, substring, MAX_STR);
fprintf(out, "%s\n", substring);
}
LeaveCriticalSection(&cs);
Естественно перед этим делаю InitializeCriticalSection. Когда запускаю он вылетает с ошибкой "Память не может быть "written""
Вообщем здесь надо выдумывать какой-нибудь хитро-мудрый алгоритм синхронизации, который бы не позволял другим потокам писать в файл, если у одного из них есть полученные данные, еще в файл не записанные. При этом желательно сохранить производительность и не попать в Deadlock. Читайте Рихтера. Может у него что найдете.
Или, как советует Oxotnik333, писать все в разные файлы, но скажем дополнительно перед каждой последовательностью данных писать время её записи или скажем её порядковый номер, который ведется глобально. А потом, принимая во внимание эту дополнительную иформацию, слить файлы в один.
Работает же, и главное. Глюков не замечал
Не совсем в тему топика но все же: большой разницы нет, просто _beginthreadex инициализирует струкутры для работы библиотеки C/С++ в многопоточной среде. CreateThread этого не делает. Но C/С++ библиотеки, поставляемые сегодня со средами разработки переписаны так, что даже при использовании CreateThread при первом вызове старых функций, которые могут работать неправильно в многопоточной среде, структуры эти все равно инициализируются. Просто при завершении потока их некому будет удалить, поэтому возможна утечка памяти, да и функцией signal в таком потоке пользоваться не стоит.
Семафоры и еще раз семафоры. Сколько раз убеждался - критические секции ЗЛО. Они очень сильно садят производительность так как тормозят ВСЕ потоки у процесса. Как альтернативой критических секций пользуйтесь монитором, если с семафорами неудобно.
Если вы слушаете несколько сокетов, как вы можете выявлять порядок следования строк? А если даете строкам номера то какой смысл в том, чтобы они были записаны именно в этом порядке?
Ваш критический ресурс - это не файл, а та сущность, которая дает строкам номера и разбирает их (или както их обрабатывает), именно его нужно защищать семафором (или мутексом - что вам больше нравится)
Откуда такая информация?
Семафор - системный объект, критическая секция - нет. Поэтому операции с семафорами в разы медленее, чем с критической секцией.
И что подразумевается под фразой "тормозят ВСЕ потоки у процесса" ?
Как альтернативой критических секций пользуйтесь монитором, если с семафорами неудобно.
Что такое здесь "монитор" ?
Семафор - системный объект, критическая секция - нет.
Ну с производительностью я загнул - давно дело было, когда на дельфях сидел, не исключаю возможности того что сам дурак - неверно расставил их. :o
А вот тут пожалуй не соглашусь. Критическая секция на многопроцессорных (многоядерных) системах реализуется через семафор, об этом ясно сказано в МСДН (описание АПИ функции InitializeCriticalSectionAndSpinCount).
Монитор - инструмент синхронизации. Реализует 2 функции вида: Enter(int) и Release(int), концептуально реализуется через таблицу семафоров, в C# эквивалентен конструкции lock(object obj) { .... }, в Win32 API его не встречал...
А вот тут пожалуй не соглашусь. Критическая секция на многопроцессорных (многоядерных) системах реализуется через семафор, об этом ясно сказано в МСДН (описание АПИ функции InitializeCriticalSectionAndSpinCount).
Заметь: только на многопроцессорных системах и только, если SpinCount > 0.
Авто же использует обычную критическую секцию (SpinCount = 0).
Кстати, создается там не семафор, хотя он так называется, а event:
&CriticalSection->LockSemaphore,
DESIRED_EVENT_ACCESS,
NULL,
SynchronizationEvent,
FALSE
);
for (i = 1; i <= matches; i++) {
memset(substring, 0, MAX_STR);
pcre_copy_substring(szPage, ovector, matches, i-1, substring, MAX_STR);
fprintf(out, "%s\n", substring);
}
LeaveCriticalSection(&cs);
Естественно перед этим делаю InitializeCriticalSection. Когда запускаю он вылетает с ошибкой "Память не может быть "written""
Event попробуй поюзать. Доступ к секции возможен(ивент читабельный?), тогда работаем)))