|
|

楼主 |
发表于 2006-7-4 12:11:00
|
显示全部楼层
Re:(参考Ghost Cheng的模型封装)完成端口模型.封装的不好.
#include ".\CIOCPServer.h"
#pragma comment(lib,"ws2_32.lib")
CIOCPServer::CIOCPServer(void)
{
//初始化Winsock
WSADATA wsd;
WSAStartup(MAKEWORD(2, 2), &wsd);
SYSTEM_INFO SystemInfo;
GetSystemInfo(&SystemInfo);
m_dwIOCPThreadCount = 2*SystemInfo.dwNumberOfProcessors+2;
}
CIOCPServer::~CIOCPServer(void)
{
}
bool CIOCPServer::CreateSocket(void)
{
m_ListenSocket = WSASocket(AF_INET,
SOCK_STREAM,
0,
NULL,
0,
WSA_FLAG_OVERLAPPED);
if (m_ListenSocket == INVALID_SOCKET) return false;
return true;
}
int CIOCPServer::Bind(LPCTSTR lpszHostAddress,const USHORT usPort)
{
SOCKADDR_IN InternetAddr;
InternetAddr.sin_family = PF_INET;
InternetAddr.sin_port = htons(usPort);
InternetAddr.sin_addr.s_addr = inet_addr(lpszHostAddress);
//帮定到指定端口
return bind(m_ListenSocket, (SOCKADDR*)&InternetAddr, sizeof(InternetAddr));
}
int CIOCPServer: isten(int nBacklog)
{
return listen(m_ListenSocket, nBacklog);
}
bool CIOCPServer::CreateWorkThread(void)
{
m_arrayIOCPThreadHandle = new HANDLE[m_dwIOCPThreadCount];
for(DWORD i = 0; i < m_dwIOCPThreadCount; i++)
{
m_arrayIOCPThreadHandle = (HANDLE)_beginthreadex(NULL, 0, (PBEGINTHREADX_THREADFUN)IOCPWorkThread, (LPVOID)this, 0, NULL);
if(!m_arrayIOCPThreadHandle)
return false;
}
return true;
}
void CIOCPServer::CloseWorkThread(void)
{
//向IOCP发送结束线程信号
for(DWORD i = 0; i < m_dwIOCPThreadCount; i++)
PostQueuedCompletionStatus(m_hCompletionPort, 0, NULL, NULL);
//等待工作线程结束,等待时间10秒
if(WaitForMultipleObjects(m_dwIOCPThreadCount, m_arrayIOCPThreadHandle, TRUE, 10000) != WAIT_OBJECT_0)
{
//如果10秒内没有结束所有线程,就强制结束
for(DWORD i = 0; i < m_dwIOCPThreadCount; i++)
TerminateThread(m_arrayIOCPThreadHandle, 0);
}
//关闭所有工作线程句柄
for(DWORD i = 0; i < m_dwIOCPThreadCount; i++)
CloseHandle(m_arrayIOCPThreadHandle);
//释放线程句柄数组
delete [] m_arrayIOCPThreadHandle;
m_arrayIOCPThreadHandle = NULL;
}
DWORD WINAPI CIOCPServer::IOCPWorkThread(LPVOID lpParam)
{
CIOCPServer* lpThis = (CIOCPServer*)lpParam;
HANDLE hCompletionPort = lpThis->m_hCompletionPort;
LPVOID lpCompletionKey = NULL;
LPPER_IO_DATA lpOvplus = NULL;
DWORD dwByteCount = 0;
BOOL bGetFunResult = TRUE;
while(1)
{
//线程挂起,等待IOCP消息
bGetFunResult = GetQueuedCompletionStatus(hCompletionPort, &dwByteCount, (PULONG_PTR)&lpCompletionKey, (LPOVERLAPPED*)&lpOvplus, INFINITE);
if(!bGetFunResult)
{
if(!lpOvplus)
continue;
}
//收到线程结束信号
if(!lpCompletionKey)
return 0;
switch(lpOvplus->dwOpCode)
{
case OP_ACCEPT:
if(!bGetFunResult)
{
closesocket(lpOvplus->Socket); //关闭socket
lpThis->m_Net_Data.free_particle(lpOvplus); //归还结构体到内存池
break;
}
lpThis->OnIOCPAccept(dwByteCount, lpOvplus);
break;
case OP_READ:
lpThis->OnIOCPRecvFromClient(dwByteCount, (LPPER_HANDLE_DATA)lpCompletionKey, lpOvplus);
break;
case OP_WRITE:
lpThis->m_Net_Data.free_particle(lpOvplus);
break;
}
}
return 0;
}
bool CIOCPServer::CreateAcceptExThread(void)
{
m_hAcceptExOverEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
if(!m_hAcceptExOverEvent)
return false;
if(WSAEventSelect(m_ListenSocket, m_hAcceptExOverEvent, FD_ACCEPT) == SOCKET_ERROR)
return false;
m_bAcceptExCreateThread = true;
m_hAcceptEx = (HANDLE)_beginthreadex(NULL, 0, (PBEGINTHREADX_THREADFUN)AcceptExThread, (LPVOID)this, 0, NULL);
if(!m_hAcceptEx)
return false;
return true;
}
void CIOCPServer::CloseAcceptExThread(void)
{
m_bAcceptExCreateThread = false;
SetEvent(m_hAcceptExOverEvent);
if(WaitForSingleObject(m_hAcceptEx,10000)!= WAIT_OBJECT_0)
TerminateThread(m_hAcceptEx, 0);
//关闭m_hAcceptExOverEvent事件句柄
CloseHandle(m_hAcceptExOverEvent);
m_hAcceptExOverEvent = NULL;
}
DWORD WINAPI CIOCPServer::AcceptExThread(LPVOID lpParam)
{
CIOCPServer* lpThis = (CIOCPServer*)lpParam;
while(lpThis->m_bAcceptExCreateThread)
{
//无限等待,如果有AcceptEx耗尽的信号就创建新的AcceptEx
if(WaitForSingleObject(lpThis->m_hAcceptExOverEvent, INFINITE) == WAIT_FAILED)
continue;
//创建新的AcceptEx消息
lpThis->CreateAcceptEx();
}
return 0;
}
void CIOCPServer::CreateAcceptEx(const DWORD dwPostCount)
{
LINGER lingerStruct = { 0x01, 0x00 };
DWORD dwAddrLen = sizeof(sockaddr_in) + IP_STRING_LEN;
LPPER_IO_DATA lpOvplus = NULL;
BOOL bNodelay = TRUE;
for(DWORD i = 0; i < dwPostCount; i++)
{
//先从OV结构池取一个可用的结构
lpOvplus = m_Net_Data.alloc_particle();
lpOvplus->dwOpCode = OP_ACCEPT;
lpOvplus->lpSession = NULL;
lpOvplus->dwBytes = 0;
ZeroMemory(&lpOvplus->Overlapped,sizeof(OVERLAPPED));
//创建一个SOCKET
lpOvplus->Socket = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
if(lpOvplus->Socket == INVALID_SOCKET)
{
m_Net_Data.free_particle(lpOvplus);
continue;
}
setsockopt(lpOvplus->Socket, IPPROTO_TCP, TCP_NODELAY, (char*)&bNodelay, sizeof(BOOL));
setsockopt(lpOvplus->Socket, SOL_SOCKET, SO_LINGER, (char*)&lingerStruct, sizeof(LINGER));
if(!m_lpAcceptExFun(m_ListenSocket, lpOvplus->Socket, lpOvplus->arrayDataBuf, 0, dwAddrLen, dwAddrLen, &lpOvplus->dwBytes, &lpOvplus->Overlapped))
{
if(WSAGetLastError() != ERROR_IO_PENDING)
{
closesocket(lpOvplus->Socket);
m_Net_Data.free_particle(lpOvplus); //归还结构体到内存池
continue;
}
}
}
}
void CIOCPServer::Close(void)
{
if (m_ListenSocket != INVALID_SOCKET)
{
shutdown(m_ListenSocket, 2);
closesocket(m_ListenSocket);
}
}
void CIOCPServer::GetAcceptEx(void)
{
DWORD dwResult;
GUID guidAcceptEx = WSAID_ACCEPTEX;
WSAIoctl(m_ListenSocket, SIO_GET_EXTENSION_FUNCTION_POINTER, &guidAcceptEx, sizeof(GUID), &m_lpAcceptExFun, sizeof(LPFN_ACCEPTEX), &dwResult, NULL, NULL);
}
void CIOCPServer::GetAcceptExSockaddrsFun(void)
{
DWORD dwResult;
GUID guidGetAcceptExSockaddrs = WSAID_GETACCEPTEXSOCKADDRS;
WSAIoctl(m_ListenSocket, SIO_GET_EXTENSION_FUNCTION_POINTER, &guidGetAcceptExSockaddrs, sizeof(GUID), &m_lpGetAcceptExSockaddrsFun, sizeof(LPFN_GETACCEPTEXSOCKADDRS), &dwResult, NULL, NULL);
}
void CIOCPServer::GetAcceptExSockaddrsInfo(LPPER_IO_DATA lpOvplus, DWORD* lpdwIP, USHORT* lpusPort)
{
int nLen = sizeof(sockaddr);
int sLen = sizeof(sockaddr_in) + IP_STRING_LEN;
sockaddr_in SockAddr;
sockaddr* lpRemoteAddr;
sockaddr* lpLocalAddr;
m_lpGetAcceptExSockaddrsFun(lpOvplus->arrayDataBuf, 0, sLen, sLen, &lpLocalAddr, &nLen, &lpRemoteAddr, &nLen);
memcpy(&SockAddr, lpRemoteAddr, sizeof(SOCKADDR));
*lpdwIP = SockAddr.sin_addr.S_un.S_addr;
*lpusPort = ntohs(SockAddr.sin_port);
}
bool CIOCPServer::BeginServer(void)
{
if(!CreateSocket())
return false;
//绑定
Bind("127.0.0.1",10000);
//监听
Listen(5);
//创建完成端口句柄
m_hCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, NULL, m_dwIOCPThreadCount);
if(!m_hCompletionPort)
return false;
//将ListenSocket绑定到IOCP
if(!CreateIoCompletionPort((HANDLE)m_ListenSocket, m_hCompletionPort,(ULONG_PTR)&m_ListenSocket, 0))
return false;
//创建工作线程
if(!CreateWorkThread())
return false;
GetAcceptEx();
GetAcceptExSockaddrsFun();
//开始投递AcceptEx
if(CreateAcceptExThread())
return false;
return true;
}
void CIOCPServer::StopServer(void)
{
Close();
CloseAllClient();
CloseAcceptExThread();
CloseWorkThread();
//关闭完成端口句柄
CloseHandle(m_hCompletionPort);
m_hCompletionPort = NULL;
//销毁结构池
m_Net_Data.Destroy();
m_Client_Context.Destroy();
m_listSession.Clear();
}
void CIOCPServer::CloseAllClient(void)
{
m_listSession._guard.lock();
for(CList<LPPER_HANDLE_DATA>::iterator lpNode = m_listSession.begin(); lpNode != m_listSession.end(); lpNode++)
closesocket((*lpNode)->Socket);
m_listSession._guard.unlock();
}
void CIOCPServer::OnIOCPAccept(const DWORD dwByteCount, LPPER_IO_DATA lpOvplus)
{
DWORD dwIP = 0;
USHORT usPort = 0;
GetAcceptExSockaddrsInfo(lpOvplus, &dwIP, &usPort);
if(!OnIpFilter(dwIP)) //检查该IP是否需要过滤,该函数由派生类实现
{
closesocket(lpOvplus->Socket); //关闭socket
m_Net_Data.free_particle(lpOvplus); //归还OV结构体到内存池
return;
}
LPPER_HANDLE_DATA lpSession = m_Client_Context.alloc_particle();
lpSession->Socket = lpOvplus->Socket;
lpSession->m_dwLastTime = GetTickCount();
lpSession->m_dwIP = dwIP;
lpSession->m_usPort = usPort;
//添加到Session链表
m_listSession.Push_Back(lpSession);
//将完成键与socket绑定到IOCP
if(CreateIoCompletionPort((HANDLE)lpSession->Socket, m_hCompletionPort, (ULONG_PTR)lpSession, 0))
{
ZeroMemory(&lpOvplus->Overlapped, sizeof(OVERLAPPED));
lpOvplus->dwOpCode = OP_READ;
lpOvplus->wDataBuf.len = NET_DATA_LONGTH;
lpOvplus->wDataBuf.buf =lpOvplus->arrayDataBuf;
lpOvplus->dwBytes = 0;
lpOvplus->lpSession = lpSession;
lpOvplus->dwFlags = 0;
if(CreateRecv(lpOvplus))
{
OnAccept();
return;
}
}
closesocket(lpOvplus->Socket); //关闭socket
m_listSession.Remove(lpSession);//从Session链表中删除
m_Client_Context.free_particle(lpSession); //将Session归还到池
m_Net_Data.free_particle(lpOvplus); //归还OV结构体到内存池
}
void CIOCPServer::OnIOCPRecvFromClient(const DWORD dwByteCount, LPPER_HANDLE_DATA lpSession, LPPER_IO_DATA lpOvplus)
{
if(dwByteCount)
{
//接收到数据
lpSession->m_dwLastTime = GetTickCount();
lpOvplus->dwBytes = dwByteCount;
//数据处理
for (int i =0;i<dwByteCount;i++)
{
printf("%x",lpOvplus->arrayDataBuf);
}
printf("\n");
//
ZeroMemory(&lpOvplus->Overlapped, sizeof(OVERLAPPED));
lpOvplus->Socket = lpSession->Socket;
lpOvplus->dwOpCode = OP_READ;
lpOvplus->wDataBuf.buf =lpOvplus->arrayDataBuf;
lpOvplus->lpSession = lpSession;
lpOvplus->dwFlags = 0;
if(CreateRecv(lpOvplus))
return;
}
closesocket(lpSession->Socket); //关闭Socket
OnDisconnectFromClient(lpSession); //通知派生类,一个Session断开了
m_listSession.Remove(lpSession);//从Session链表中删除
m_Client_Context.free_particle(lpSession); //断开后将Session归还到结构池
m_Net_Data.free_particle(lpOvplus); //归还OV结构到结构池
}
bool CIOCPServer::CreateRecv(LPPER_IO_DATA lpOvplus)
{
if(WSARecv(lpOvplus->Socket, &lpOvplus->wDataBuf, 1, &lpOvplus->dwBytes, &lpOvplus->dwFlags, &lpOvplus->Overlapped, NULL) == SOCKET_ERROR)
{
if(WSAGetLastError() != ERROR_IO_PENDING)
return false;
}
return true;
}
bool CIOCPServer::CreateSend(LPPER_IO_DATA lpOvplus)
{
if(WSASend(lpOvplus->Socket, &lpOvplus->wDataBuf, 1, &lpOvplus->dwBytes, MSG_PARTIAL, &lpOvplus->Overlapped, NULL) == SOCKET_ERROR)
{
if(WSAGetLastError() != ERROR_IO_PENDING)
return false;
}
return true;
} |
|