王朝网络
分享
 
 
 

C++完成端口组件实现

王朝c/c++·作者佚名  2006-01-09
宽屏版  字体: |||超大  

在windows平台上,处理总多的TCP连接的机制中效率最高的是完成端口模型,关于完成端口模型的介绍可参照《Win32 多线程程序设计》(侯捷翻译)和《windows网络编程》。

异步机制是完成端口的基础,完成端口模型的本质思想是将"启动异步操作的线程"和"提供服务的线程"(即工作者线程)拆伙。

先看看所有TCP连接管理组件都要实现的接口 ITcpManager

#ifndef ITCPMANAGER_H

#define ITCPMANAGER_H

#include "../../Infrastructure/IEvent.h"

#include "IMessageDispatcher.h"

/*

ITcpManager 所有基于TCP网络管理者的基础接口 。

创 作 者:sky

时 间:2005.6

修订时间:2005.6

*/

enum TcpUserAction

{

UA_Connected , UA_Disconnected , UA_FunctionAccess , //标准的功能访问

};

enum DisconnectedCause

{

Logoff ,

LineOff ,

ServerStopped , //服务器停止时关闭连接

InvalidUser , //强行关闭非法用户的连接

TimeOut //在一段时间内没有收到check消息

};

interface ITcpClientsController

{

public:

virtual ~ITcpClientsController(){}

//主动给某个客户发信息 ,线程安全

virtual void SendData(int ConnectID, byte* data ,int offset ,int size) = 0 ;

//主动关闭连接

virtual void DisposeOneConnection(int connectID ,DisconnectedCause cause) = 0 ;

};

class UserActionEventArgs

{

public:

int ConnectID ;

TcpUserAction action ;

};

class ServiceCommittedEventArgs

{

public:

char* data ;

int length ;

int connectID ;

};

interface ITcpManager : public ITcpClientsController

{

public:

virtual ~ITcpManager(){}

//Events

IEvent<ITcpManager* ,int>* ConnectionCountChanged ;

IEvent<ITcpManager* ,ServiceCommittedEventArgs&>* ServiceCommitted ;

IEvent<ITcpManager* ,UserActionEventArgs&>* UserAction ;

//Method

virtual void Start() = 0 ;

virtual void Stop() = 0 ;

virtual int ConnectionCount() = 0 ;

};

#endif

接口的定义很清晰明了--这也是接口的本质,如果你对IEvent事件还不是很熟悉,请参见我前面的文章。

接下来看看实现了ITcpManager的完成端口组件的头文件:

#ifndef COMPLETINGPORT_H

#define COMPLETINGPORT_H

#include <WINSOCK2.H>

#include <afx.h>

#include "../Foundation/ITcpManager.h"

#include "../Foundation/TCPListener.h"

#include "../Foundation/IMessageDispatcher.h"

#include "../../Collection/SafeObjectList.h"

#include "WorkerThreadAssistant.h"

#include "ContextKeyManager.h"

typedef SafeObjectList<HANDLE> OverlapEventList ;

void WorkerThreadStart(void* para) ;

void ListenThreadStart(void* para) ;

class CompletingPortManager :public ITcpManager ,public IEventHandler<ContextKeyManager* ,int>

{

private:

WorkerThreadAssistant workerThread_assistant ;

OverlapEventList overlapEventList ;

ContextKeyManager* context_manager ;

TcpListener* listener ;

IMessageDispatcher* curDispatcher ;

HANDLE completing_port ;

int workThreadNum ;

volatile bool isStop ;

public: //interface接口

//Events

IEvent<ITcpManager* ,int>* ConnectionCountChanged ;

IEvent<ITcpManager* ,ServiceCommittedEventArgs&>* ServiceCommitted ;

IEvent<ITcpManager* ,UserActionEventArgs&>* UserAction ;

//Method

void Start() ;

void Stop() ;

int ConnectionCount() ;

//主动给某个客户发信息

void SendData(int ConnectID, byte* data ,int offset ,int size) ;

//主动关闭连接

void DisposeOneConnection(int connectID ,DisconnectedCause cause) ;

public:

friend void WorkerThreadStart(void* para) ;

friend void ListenThreadStart(void* para) ;

CompletingPortManager(IMessageDispatcher* i_dispatcher ,int port ,int workThread_Num )

{

this->isStop = true ;

this->workThreadNum = workThread_Num ;

this->context_manager = new ContextKeyManager() ;

this->listener = new TcpListener(port) ;

this->curDispatcher = i_dispatcher ;

//subscribe Event

this->context_manager->ConnectionCountChanged->Register(this) ;

//event

this->ConnectionCountChanged = new EventPublisher<ITcpManager* ,int> ;

this->ServiceCommitted = new EventPublisher<ITcpManager* ,ServiceCommittedEventArgs&> ;

this->UserAction = new EventPublisher<ITcpManager* ,UserActionEventArgs&> ;

this->completing_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE ,NULL ,0 ,0 ) ;

}

~CompletingPortManager()

{

CloseHandle(this->completing_port) ;

delete this->context_manager ;

delete this->listener ;

//event

delete this->ConnectionCountChanged ;

delete this->ServiceCommitted ;

delete this->UserAction ;

}

//ContextKeyManager::ConnectionCountChanged事件

void HandleEvent(ContextKeyManager* sender ,int para)

{

((EventPublisher<ITcpManager* ,int>*)(this->ConnectionCountChanged))->Invoke(this ,para) ;

}

private:

void ListenThread() ;

void CreateWorkerThreads() ;

void ServeCmpltdOverlap() ;

void BindConnectionToCompPort(SOCKET connection ,ContextKey** out_pky);

//进行异步接收数据

void RecieveData(ContextKey* pkey);

void CloseAllWorkerThread() ;

void CloseAllOverlapEvent() ;

void ClearRespondStreamList(RespondStreamList* rs_list) ;

};

#endif

完成端口组件的.CPP文件如下:

#include "CompletingPortManager.h"

#include "../../Threading/Thread.h"

#ifdef _DEBUG

#define new DEBUG_NEW

#undef THIS_FILE

static char THIS_FILE[] = __FILE__;

#endif

//global

void WorkerThreadStart(void* para)

{

CompletingPortManager* cpm = (CompletingPortManager*)para ;

cpm->ServeCmpltdOverlap() ;

}

void ListenThreadStart(void* para)

{

CompletingPortManager* cpm = (CompletingPortManager*)para ;

cpm->ListenThread() ;

}

//public

void CompletingPortManager::Start()

{

if(this->isStop)

{

this->isStop = FALSE ;

this->CreateWorkerThreads() ;

Thread thread ;

thread.Start(ListenThreadStart ,this) ;

//unsigned int dwThreadId;

//HANDLE hThread = (HANDLE)_beginthreadex(NULL, 0 , (unsigned int (_stdcall*)(void*))&ListenThreadStart , this, 0, &dwThreadId);

}

}

//public

void CompletingPortManager::Stop()

{

if(this->isStop)

{

return ;

}

this->isStop = true ;

Sleep(500) ; //等待监听线程结束

this->listener->Close() ;

this->CloseAllWorkerThread() ;//等所有线程退出后才返回

this->context_manager->ClearAllKeys() ; //关闭所有连接

this->CloseAllOverlapEvent() ;

}

//public

int CompletingPortManager::ConnectionCount()

{

return this->context_manager->GetConnectionCount() ;

}

//public

void CompletingPortManager::SendData(int ConnectID, byte* data ,int offset ,int size)

{

ContextKey* key = this->context_manager->GetContextKey(ConnectID) ;

if(key != NULL)

{

key->netStream->Write((char*)(data +offset) ,size) ;

}

}

//public

void CompletingPortManager::DisposeOneConnection(int connectID ,DisconnectedCause cause)

{

this->context_manager->ClearPkey(connectID) ;

UserActionEventArgs args ;

args.action = UA_Disconnected ;

args.ConnectID = connectID ;

((EventPublisher<ITcpManager* ,UserActionEventArgs&>*)(this->UserAction))->Invoke(this ,args) ;

}

//private

void CompletingPortManager::CloseAllWorkerThread()

{

while(! this->workerThread_assistant.IsSafeToExit())//等待到达安全点

{

Sleep(200) ;

}

}

//private

void CompletingPortManager::CloseAllOverlapEvent()

{

int count = this->overlapEventList.Count() ;

for(int i=0 ;i<count ;i++)

{

HANDLE handle ;

this->overlapEventList.GetElement(i ,handle) ;

CloseHandle(handle) ;

}

this->overlapEventList.Clear() ;

}

//private

void CompletingPortManager::CreateWorkerThreads()

{

SYSTEM_INFO sysinfo;

DWORD dwThreads;

DWORD i;

GetSystemInfo(&sysinfo);

dwThreads = this->workThreadNum ; //sysinfo.dwNumberOfProcessors * 32 + 2;

for (i=0; i<dwThreads; i++)

{

Thread thread ;

thread.Start(WorkerThreadStart ,this) ;

// HANDLE hThread;

// hThread = (HANDLE)_beginthreadex(NULL, 0, (unsigned int (_stdcall*)(void*))&WorkerThreadStart , this, 0, &dwThreadId);

workerThread_assistant.IncreaseWorkingThreadNum() ;

}

}

//private

void CompletingPortManager::BindConnectionToCompPort(SOCKET connection ,ContextKey** out_pky)

{

ContextKey* pkey = new ContextKey(connection) ;

*out_pky = pkey ;

CreateIoCompletionPort((HANDLE)connection ,this->completing_port ,(DWORD)pkey ,0);

}

//public

void CompletingPortManager::ListenThread()

{

if(! this->isStop)

{

return ;

}

SOCKET connection ;

BOOL succeed = this->listener->Start() ;

if(!succeed)

{

MessageBox(NULL ,"无法启动监听线程 !" ,"Tip" ,0) ;

return ;

}

ContextKey* pkey ;

while((! this->isStop) && (this->listener->Pending()))

{

connection = this->listener->AcceptSocket() ; //当关闭监听socket时,该函数也会返回

if(this->isStop)

{

closesocket(connection) ;

break ;

}

this->BindConnectionToCompPort(connection ,&pkey) ;

this->context_manager->RegisterContextKey(pkey) ;

UserActionEventArgs args ;

args.action = UA_Connected ;

args.ConnectID = connection ;

((EventPublisher<ITcpManager* ,UserActionEventArgs&>*)(this->UserAction))->Invoke(this ,args) ;

this->RecieveData(pkey) ;

}

}

//private 进行异步接收数据

void CompletingPortManager::RecieveData(ContextKey* pkey)

{

if(this->context_manager->ContainsPkey(pkey))

{

DWORD Flags = 0;

pkey->wsa_buf.len = pkey->requestData.BufferSize() - pkey->requestData.LeftDataLength ;

pkey->wsa_buf.buf = (char*)(pkey->requestData.Buff + pkey->requestData.LeftDataLength) ;

ZeroMemory(&(pkey->ovlapped) ,sizeof(OVERLAPPED)) ;

unsigned long numRead = 0 ;

pkey->netStream->AsynRead(pkey->wsa_buf ,&numRead ,&(pkey->ovlapped)) ;

}

}

//private 工作者线程

void CompletingPortManager::ServeCmpltdOverlap()

{

BOOL bResult;

int dwNumRead;

ContextKey *pkey;

LPOVERLAPPED lpOverlapped;

DWORD temp ;

//回复客户

RespondStreamList* respond_list = new RespondStreamList() ;

RespondStream* respond_stream = NULL ;

BOOL socket_closed = FALSE ;

BOOL succeed = FALSE ;

// DWORD bytes_send ;

// WSABUF wsabuff_send ;

OVERLAPPED ovlapped_send ;

memset(&ovlapped_send ,0 ,sizeof(OVERLAPPED)) ;

ovlapped_send.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);

this->overlapEventList.Add(ovlapped_send.hEvent) ;

ovlapped_send.hEvent = (HANDLE)((DWORD)ovlapped_send.hEvent | 0x1);//表示不关心异步结果,避免Completion Packets

while(! this->isStop)

{

bResult = GetQueuedCompletionStatus( //在底层由系统提供高效的同步机制

this->completing_port,

(DWORD*)&dwNumRead,

&temp,

&lpOverlapped,

10

);

if(bResult == 0)

{

DWORD rc = GetLastError() ;

if((this->isStop) &&(rc == WAIT_TIMEOUT))

{

break ;

}

else

{

continue ;

}

}

pkey = (ContextKey *)temp ;

socket_closed = FALSE ;

if ((bResult == FALSE) || (dwNumRead == 0)) //当客户端正常关闭socket或掉线时,异步WSARecv调用会向队列投递一个特殊的Completion Packets-并且读到的字节长度为0

{

socket_closed = TRUE ;

}

else// Got a valid data block! 处理数据

{

try

{

pkey->requestData.ValidCount = dwNumRead + pkey->requestData.LeftDataLength ;

succeed = this->curDispatcher->DealRequestMessage(pkey->requestData ,respond_list) ;

if(succeed)

{

int count = respond_list->Count() ;

for(int i=0 ;i<count ;i++)

{

respond_stream = respond_list->GetElement(i) ;

//同步发送

BOOL sendSucceed = pkey->netStream->Write(respond_stream->data ,respond_stream->length) ;

if(! sendSucceed)

{

socket_closed = TRUE ;

break ;

}

else

{

ServiceCommittedEventArgs arg ;

arg.connectID = pkey->netStream->SocketID() ;

arg.data = respond_stream->data ;

arg.length = respond_stream->length ;

((EventPublisher<ITcpManager* ,ServiceCommittedEventArgs&>*)this->ServiceCommitted)->Invoke(this,arg) ;

UserActionEventArgs actArgs ;

actArgs.action = UA_Connected ;

actArgs.ConnectID = pkey->netStream->SocketID() ;

((EventPublisher<ITcpManager* ,UserActionEventArgs&>*)this->UserAction)->Invoke(this ,actArgs);

}

respond_stream = NULL ;

}

}

else

{

socket_closed = TRUE ;

}

}

catch(...)//(ContextKey* )

{

socket_closed = TRUE ;

}

if(socket_closed)

{

this->DisposeOneConnection(pkey->netStream->SocketID() ,LineOff) ;

}

this->ClearRespondStreamList(respond_list) ; //清空并删除所有的RespondStream

if((! this->isStop) && (!socket_closed))

{

if(pkey->requestData.IsFirstMsg)

{

pkey->requestData.IsFirstMsg = false ;

}

this->RecieveData(pkey) ;//继续从socket接收消息

}

}

}

delete respond_list ;

this->workerThread_assistant.DecreaseWorkingThreadNum() ; //工作线程正常退出

}

//private

void CompletingPortManager::ClearRespondStreamList(RespondStreamList* rs_list)

{

RespondStream* respond_stream = NULL ;

int count = rs_list->Count() ;

for(int i=0 ;i<count ;i++)

{

respond_stream = rs_list->GetElement(i) ;

respond_stream->ClearAll() ;

delete respond_stream ;

respond_stream = NULL ;

}

rs_list->Clear() ;

}

当然,还有很多的基本设施没有包含进来,所以如果你打算使用我的这个完成端口组件可email至sky.zhuwei@163.com索取所有相关的源码。

 
 
 
免责声明:本文为网络用户发布,其观点仅代表作者个人观点,与本站无关,本站仅提供信息存储服务。文中陈述内容未经本站证实,其真实性、完整性、及时性本站不作任何保证或承诺,请读者仅作参考,并请自行核实相关内容。
2023年上半年GDP全球前十五强
 百态   2023-10-24
美众议院议长启动对拜登的弹劾调查
 百态   2023-09-13
上海、济南、武汉等多地出现不明坠落物
 探索   2023-09-06
印度或要将国名改为“巴拉特”
 百态   2023-09-06
男子为女友送行,买票不登机被捕
 百态   2023-08-20
手机地震预警功能怎么开?
 干货   2023-08-06
女子4年卖2套房花700多万做美容:不但没变美脸,面部还出现变形
 百态   2023-08-04
住户一楼被水淹 还冲来8头猪
 百态   2023-07-31
女子体内爬出大量瓜子状活虫
 百态   2023-07-25
地球连续35年收到神秘规律性信号,网友:不要回答!
 探索   2023-07-21
全球镓价格本周大涨27%
 探索   2023-07-09
钱都流向了那些不缺钱的人,苦都留给了能吃苦的人
 探索   2023-07-02
倩女手游刀客魅者强控制(强混乱强眩晕强睡眠)和对应控制抗性的关系
 百态   2020-08-20
美国5月9日最新疫情:美国确诊人数突破131万
 百态   2020-05-09
荷兰政府宣布将集体辞职
 干货   2020-04-30
倩女幽魂手游师徒任务情义春秋猜成语答案逍遥观:鹏程万里
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案神机营:射石饮羽
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案昆仑山:拔刀相助
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案天工阁:鬼斧神工
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案丝路古道:单枪匹马
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案镇郊荒野:与虎谋皮
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案镇郊荒野:李代桃僵
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案镇郊荒野:指鹿为马
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案金陵:小鸟依人
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案金陵:千金买邻
 干货   2019-11-12
 
>>返回首页<<
推荐阅读
 
 
频道精选
 
静静地坐在废墟上,四周的荒凉一望无际,忽然觉得,凄凉也很美
© 2005- 王朝网络 版权所有