Справочник функций

Ваш аккаунт

Войти через: 
Забыли пароль?
Регистрация
Информацию о новых материалах можно получать и без регистрации:

Почтовая рассылка

Подписчиков: -1
Последний выпуск: 19.06.2015

Разделить функцию i/o на потоки, где число сокетов больше 64

7.5K
10 мая 2016 года
diprom
37 / / 15.10.2010
Здравствуйте.
Есть хорошая статья про неблокирующие сокеты
сокеты
Не могу подправить функцию, которая принимала более 64 сокетов

Код:
DWORD TCP::cSocket::WSAWaitForMultipleEventsEx(DWORD u32_Count, DWORD* pu32_Index, WSAEVENT* ph_Events, DWORD u32_Timeout)
{
    // Here *pu32_Index is at the position where the last time an event has been signaled
    // Search for a signaled event from *pu32_Index +1 upwards
    DWORD u32_Res =0;

    for (DWORD C=0; C<u32_Count; C++)
    {
        (*pu32_Index)++;

        if (*pu32_Index >= u32_Count)
            *pu32_Index = 0;

        // Check if the event is set (Timeout = 0)
        u32_Res = WaitForSingleObject(ph_Events[*pu32_Index], 0);
        if (u32_Res == WAIT_OBJECT_0)
            return WSA_WAIT_EVENT_0 + *pu32_Index;
    }

    // There is no event signaled -> this means that the server is not under stress
    // There is no reason to check the events one by one anymore.
    u32_Res = WaitForMultipleObjects(u32_Count,ph_Events,FALSE,u32_Timeout);
    //WSAWaitForMultipleEvents(u32_Count, ph_Events, FALSE, u32_Timeout, FALSE);
    if (u32_Res != WSA_WAIT_FAILED && u32_Res != WSA_WAIT_TIMEOUT)
    {
        *pu32_Index = u32_Res - WSA_WAIT_EVENT_0;
    }
    return u32_Res;
}
Пока на остановился на данном этапе.

Код:
typedef struct _WAIT_THREAD_INFORMATION
{
DWORD dwWaitObjectCount;
PHANDLE pWaitHandles;
DWORD dwWaitMSeconds;
DWORD step;
} WAIT_THREAD_INFORMATION, *PWAIT_THREAD_INFORMATION;


//static
DWORD WINAPI WaitForMultipleObjectsThread(LPVOID lpParameter = NULL
)
//
// Thread routine which calls WaitForMultipleObjects
// This is used to wait for more than MAXIMUM_WAIT_OBJECTS objects
//
{
    if (!lpParameter) return 0;
    DWORD dwReturnValue = 0;
    PWAIT_THREAD_INFORMATION pThreadInformation = NULL;
    assert( lpParameter != NULL );
    pThreadInformation = ( PWAIT_THREAD_INFORMATION )lpParameter;
    assert(pThreadInformation->dwWaitObjectCount <= MAXIMUM_WAIT_OBJECTS );
    assert(pThreadInformation->pWaitHandles != NULL );
    //assert(pThreadInformation->dwWaitMSeconds != INFINITE );
    assert(pThreadInformation->dwWaitMSeconds >= 1000 );
    //dwReturnValue = WaitForMultipleObjects(pThreadInformation->dwWaitObjectCount,
    //  pThreadInformation->pWaitHandles,TRUE,pThreadInformation->dwWaitMSeconds );
    for (DWORD C=0; C<pThreadInformation->dwWaitObjectCount; C++)
    {
        // Check if the event is set (Timeout = 0)
        DWORD u32_Res = WaitForSingleObject(pThreadInformation->pWaitHandles[C], 0);
        if (u32_Res == WAIT_OBJECT_0)
            return WSA_WAIT_EVENT_0 + (pThreadInformation->step*MAXIMUM_WAIT_OBJECTS+C);
    }
    return dwReturnValue;

}



DWORD WSAWaitForMultipleEventsEx2(DWORD   dwWaitHandleCount, DWORD* pu32_Index,
                                  WSAEVENT*  pWaitHandles,DWORD     dwMaxWaitMSeconds)
{
    HANDLE *                  pThreadWaitHandles    = NULL;
    WAIT_THREAD_INFORMATION * pWaitInformationArray = NULL;

    DWORD                     dwThreadCount         = 0;
    DWORD                     dwLastWaitCount       = 0;
    DWORD                     dwWaitHandleIndex     = 0;
    DWORD                     dwMaximumWaitObjects  = MAXIMUM_WAIT_OBJECTS;

    if( dwWaitHandleCount    <=  0 ) return 0;
    if( pWaitHandles         == NULL ) return 0;
    if ( dwMaxWaitMSeconds    < 1000 ) return 0;
    if( dwMaximumWaitObjects < MAXIMUM_WAIT_OBJECTS ) return 0;
    DWORD u32_Res = 0;    
    {
        //
        // Create separate threads to wait on maximum wait objects
        // and then make this thread wait on thread handles
        //
        dwThreadCount   = dwWaitHandleCount / dwMaximumWaitObjects;
        dwLastWaitCount = dwWaitHandleCount % dwMaximumWaitObjects;

        if( dwLastWaitCount > 0 )
        {
            dwThreadCount++;
        }

        //
        // This function can handle a maximum of
        // MAXIMUM_WAIT_OBJECTS * MAXIMUM_WAIT_OBJECT handles
        //
        if( dwThreadCount > dwMaximumWaitObjects )
        {
            dwThreadCount     = dwMaximumWaitObjects;
            dwLastWaitCount   = 0;
        }

        pThreadWaitHandles    = new HANDLE[ dwThreadCount ];
        pWaitInformationArray = new WAIT_THREAD_INFORMATION[ dwThreadCount ];

        if( pThreadWaitHandles == NULL || pWaitInformationArray == NULL )
        {
            //
            // Failure
            //
            goto Finished;
        }
        for( DWORD count = 0; count < dwThreadCount; count++)
        {
            //
            // Set information for the thread
            //
            pWaitInformationArray[ count ].dwWaitMSeconds =  dwMaxWaitMSeconds;
            pWaitInformationArray[ count ].pWaitHandles =   &pWaitHandles[ dwWaitHandleIndex ];
            pWaitInformationArray[ count ].step = count;
            if( count != dwThreadCount - 1 || dwLastWaitCount == 0 )
            {
                pWaitInformationArray[ count ].dwWaitObjectCount =   dwMaximumWaitObjects;
                dwWaitHandleIndex += dwMaximumWaitObjects;
            }
            else
            {
                pWaitInformationArray[count].dwWaitObjectCount =   dwLastWaitCount;
                dwWaitHandleIndex += dwLastWaitCount;
                pWaitInformationArray[ count ].step = count;
            }

            pThreadWaitHandles[ count ] =
                CreateThread( NULL,
                              0,
                              WaitForMultipleObjectsThread,
                              &pWaitInformationArray[ count ],
                              0,
                              NULL );
            if( pThreadWaitHandles[ count ] == NULL )
            {
                //
                // Not able to create threads break from the loop
                // and wait for threads we already created.
                // dwThreadCount doesnt include this iteration
                //
                dwThreadCount = count;
                break;
            }
        }

        //
        // Failure is ignored
        //
       u32_Res =  WaitForMultipleObjects( dwThreadCount,
                                pThreadWaitHandles,
                                FALSE,
                                dwMaxWaitMSeconds );   
    //  DWORD u32_Res = WSAWaitForMultipleEvents(dwThreadCount, pThreadWaitHandles, FALSE, dwMaxWaitMSeconds, FALSE);
        if (u32_Res != WSA_WAIT_FAILED && u32_Res != WSA_WAIT_TIMEOUT)
        {
        *pu32_Index = u32_Res - WSA_WAIT_EVENT_0;
        }
    }

Finished:

    if( pThreadWaitHandles != NULL )
    {
        for( DWORD count = 0; count < dwThreadCount; count++)
        {
            if( pThreadWaitHandles[ count ] != NULL )
            {
                CloseHandle( pThreadWaitHandles[ count ] );
                pThreadWaitHandles[ count ] = NULL;
            }
        }

        delete [] pThreadWaitHandles;
        pThreadWaitHandles = NULL;
    }

    if( pWaitInformationArray != NULL )
    {
        delete [] pWaitInformationArray;
        pWaitInformationArray = NULL;
    }

    return u32_Res;
}
7.5K
12 мая 2016 года
diprom
37 / / 15.10.2010
Кое-что получилось. Нужны тесты и ваша помощь.

Код:
typedef struct _WAIT_THREAD_INFORMATION
{
DWORD dwWaitObjectCount;
PHANDLE pWaitHandles;
DWORD dwWaitMSeconds;
DWORD step;
} WAIT_THREAD_INFORMATION, *PWAIT_THREAD_INFORMATION;


//static
DWORD WINAPI WaitForMultipleObjectsThread(LPVOID lpParameter = NULL)
//
// Thread routine which calls WaitForMultipleObjects
// This is used to wait for more than MAXIMUM_WAIT_OBJECTS objects
//
{
    if (!lpParameter) return 0;
    DWORD u32_Res = 0;
    PWAIT_THREAD_INFORMATION pThreadInformation = NULL;
    assert( lpParameter != NULL ); 
    pThreadInformation = ( PWAIT_THREAD_INFORMATION )lpParameter;
    assert(pThreadInformation->dwWaitObjectCount <= MAXIMUM_WAIT_OBJECTS );
    assert(pThreadInformation->pWaitHandles != NULL );
    u32_Res=WaitForMultipleObjects(pThreadInformation->dwWaitObjectCount,
                                pThreadInformation->pWaitHandles,
                                FALSE,
                               pThreadInformation->dwWaitMSeconds);
    return (u32_Res)+(MAXIMUM_WAIT_OBJECTS*pThreadInformation->step);
}


DWORD TCP::cSocket::WSAWaitForMultipleEventsEx2(DWORD   dwWaitHandleCount,  WSAEVENT*  pWaitHandles,DWORD     dwMaxWaitMSeconds)
{
    HANDLE *                  pThreadWaitHandles    = NULL;
    WAIT_THREAD_INFORMATION * pWaitInformationArray = NULL;

    DWORD                     dwThreadCount         = 0;
    DWORD                     dwLastWaitCount       = 0;
    DWORD                     dwWaitHandleIndex     = 0;
    DWORD                     dwMaximumWaitObjects  = MAXIMUM_WAIT_OBJECTS;

    if( dwWaitHandleCount    <=  0 ) return 0;
    if( pWaitHandles         == NULL ) return 0;
    if ( dwMaxWaitMSeconds    < 1000 ) return 0;
    if( dwMaximumWaitObjects < MAXIMUM_WAIT_OBJECTS ) return 0;

    DWORD u32_Res = 0;    
    {
        //
        // Create separate threads to wait on maximum wait objects
        // and then make this thread wait on thread handles
        //
        dwThreadCount   = dwWaitHandleCount / dwMaximumWaitObjects;
        dwLastWaitCount = dwWaitHandleCount % dwMaximumWaitObjects;

        if( dwLastWaitCount > 0 )
        {
            dwThreadCount++;
        }
       
        //
        // This function can handle a maximum of
        // MAXIMUM_WAIT_OBJECTS * MAXIMUM_WAIT_OBJECT handles
        //
        if( dwThreadCount > dwMaximumWaitObjects )
        {
            dwThreadCount     = dwMaximumWaitObjects;
            dwLastWaitCount   = 0;
        }

        pThreadWaitHandles    = new HANDLE[ dwThreadCount ];
        memset(pThreadWaitHandles,0,sizeof(HANDLE)*dwThreadCount);
        pWaitInformationArray = new WAIT_THREAD_INFORMATION[ dwThreadCount ];
        memset(pWaitInformationArray,0,sizeof(WAIT_THREAD_INFORMATION)*dwThreadCount);

        if( pThreadWaitHandles == NULL || pWaitInformationArray == NULL )
        {
            //
            // Failure
            //
            goto Finished;
        }

        for( DWORD count = 0; count < dwThreadCount; count++)
        {
            //
            // Set information for the thread
            //
            pWaitInformationArray[ count ].dwWaitMSeconds =  dwMaxWaitMSeconds;
            pWaitInformationArray[ count ].pWaitHandles =  &pWaitHandles[ dwWaitHandleIndex ];
            pWaitInformationArray[ count ].step = count;
            if( count != dwThreadCount - 1 || dwLastWaitCount == 0 )
            {
                pWaitInformationArray[ count ].dwWaitObjectCount =   dwMaximumWaitObjects;
                dwWaitHandleIndex += dwMaximumWaitObjects;
            }
            else
            {
                pWaitInformationArray[count].dwWaitObjectCount =   dwLastWaitCount;
                dwWaitHandleIndex += dwLastWaitCount;
            }

            pThreadWaitHandles[ count ] =
                CreateThread( NULL,
                              0,
                              WaitForMultipleObjectsThread,
                              &pWaitInformationArray[ count ],
                              0,
                              NULL );
            if( pThreadWaitHandles[ count ] == NULL )
            {
                dwThreadCount = count;
                break;
            }
        }
       for(DWORD i = 0,j = dwThreadCount; i < j; ++i)
        {
            DWORD result = WaitForSingleObject(pThreadWaitHandles[i], dwMaxWaitMSeconds);
            if (result == WAIT_OBJECT_0)
            {
                DWORD exitcode=0;
                BOOL rc = GetExitCodeThread(pThreadWaitHandles[i], &exitcode);
                if (!rc)
                {
                    // handle error from GetExitCodeThread()...
                    u32_Res = 0;
                    goto Finished;
                }else
                {
                    return exitcode;
                }
            }
            else
                continue;

        }
    }

Finished:

    if( pThreadWaitHandles != NULL )
    {
        for( DWORD count = 0; count < dwThreadCount; count++)
        {
            if( pThreadWaitHandles[ count ] != NULL )
            {
                CloseHandle( pThreadWaitHandles[ count ] );
                pThreadWaitHandles[ count ] = NULL;
            }
        }

        delete [] pThreadWaitHandles;
        pThreadWaitHandles = NULL;
    }

    if( pWaitInformationArray != NULL )
    {
        delete [] pWaitInformationArray;
        pWaitInformationArray = NULL;
    }

    return u32_Res;
}
И меняем u32_Index = WSAWaitForMultipleEventsEx(mi_List.mu32_Count+1, &mu32_WaitIndex, mi_List.mh_Events, mu32_EventTimeout)
на
u32_Index = WSAWaitForMultipleEventsEx2(mi_List.mu32_Count+1, mi_List.mh_Events, mu32_EventTimeout);
в Socket.cpp

DWORD TCP::cSocket::ProcessEvents(DWORD* pu32_Events, // OUT
DWORD* pu32_IP, // OUT
SOCKET* ph_Socket, // OUT
cMemory** ppi_RecvMem, // OUT
DWORD* pu32_Read, // OUT
DWORD* pu32_Sent,// OUT
DWORD* tmpsize,
unsigned char** tmpdata
)
7.5K
13 мая 2016 года
diprom
37 / / 15.10.2010
Подправил, но на большем количестве > 220 начинает падать, пока чего-то не понимаю.
Надо подправить
DWORD WINAPI WaitForMultipleObjectsThread(LPVOID lpParameter = NULL)
или
DWORD TCP::cSocket::WSAWaitForMultipleEventsEx2(DWORD dwWaitHandleCount, WSAEVENT* pWaitHandles,DWORD dwMaxWaitMSeconds)
Прикрепленные файлы:
1.5Мб
Загрузок: 935
Реклама на сайте | Обмен ссылками | Ссылки | Экспорт (RSS) | Контакты
Добавить статью | Добавить исходник | Добавить хостинг-провайдера | Добавить сайт в каталог