【风】的ACE笔记(6) Proactor机制下的异步SOCKET开发

王朝other·作者佚名  2006-01-09
宽屏版  字体: |||超大  

ACE笔记(6) Proactor机制下的异步SOCKET开发

Proactor机制和reactor机制的不同

1、在reactor机制下,所有I/O请求是同步的,即接到信号请求后,立即执行信号处理,

执行完后才开始继续监听信号请求,其接收信号请求的机制是被动的

而在Proactor机制下,I/O请求是异步的,即接到信号请求后,不立即执行信号处理(而是在莫个时刻执行该处理),

然后再继续监听信号请求,其接收信号请求的机制是主动的

2、要想符合Proactor机制的信号处理,需要从 ACE_Service_Handler 派生,而reactor机制信号处理类要从ACE_Event_Handler派生

ACE_Service_Handler 中以定义的常见回调方法:

/// 异步读完成时会被调用

virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result);

/// 在UDP SOCKET中,当异步写完成时会被调用

virtual void handle_write_dgram (const ACE_Asynch_Write_Dgram::Result &result);

/// 在UDP SOCKET中,当异步读完成时会被调用

virtual void handle_read_dgram (const ACE_Asynch_Read_Dgram::Result &result);

/// 当异步写完成时会被调用

virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result);

/// 当异步读文件完成时会被调用

virtual void handle_read_file (const ACE_Asynch_Read_File::Result &result);

/// 当异步写文件完成时会被调用

virtual void handle_write_file (const ACE_Asynch_Write_File::Result &result);

///当异步接收完成时会被调用

virtual void handle_accept (const ACE_Asynch_Accept::Result &result);

///当异步连接完成时会被调用

virtual void handle_connect (const ACE_Asynch_Connect::Result &result);

///当异步传输文件完成时会被调用

virtual void handle_transmit_file (const ACE_Asynch_Transmit_File::Result &result);

///超时时会被调用

virtual void handle_time_out (const ACE_Time_Value &tv,

const void *act = 0);

ACE_Service_Handler 类OPEN方法使用注意:

方法定义:open (ACE_HANDLE handle,ACE_Message_Block &message_block)

当客户端连接时会触发此方法

message_block 参数附带了伴随客户端连接发送过来的消息块

所以在实现OPEN方法中,要注意判断message_block 参数是否附带了消息,如果附带了,如果不想改变现有的事件数据统一处理模式,则需要自己模拟一个读完成动作,如下:

if (message_block.length () != 0)

{

// 复制消息块(引用)

ACE_Message_Block &duplicate =*message_block.duplicate ();

// 伪装一个事件读完成对象

ACE_Asynch_Read_Stream_Result_Impl *fake_result =

ACE_Proactor::instance ()->create_asynch_read_stream_result (*this,

this->handle_,

duplicate,

initial_read_size,

0,

ACE_INVALID_HANDLE,

0,

0);

//移动写指针到未写入的位置,因为读完成动作中会自动移动写指针

size_t bytes_transferred = message_block.length ();

duplicate.wr_ptr (duplicate.wr_ptr () - bytes_transferred);

//发出事件完成回调命令

fake_result->complete (message_block.length (),

1,

0);

// 删除伪装的对象

delete fake_result;

}

ACE_Asynch_Read_Stream 类常见方法

open 方法:初始化读操作

read 方法:读操作,把数据存放在一个 ACE_Message_Block 数据结构上,该结构会自动移动写指针(wr_ptr)

ACE_Asynch_Write_Stream 类常见方法

open 方法:初始化写操作

write方法:写操作,会把存在 ACE_Message_Block 数据结构上写入指定的handle中,该结构会自动移动读指针(rd_ptr)

ACE_Message_Block 类常见方法

构造函数:ACE_Message_Block (长度)

rd_ptr():返回读指针

wr_ptr(): 返回写指针

release():释放内存

init(data,len):分配内存

wr_ptr(len):把写指针向前移动LEN个位置

wr_ptr(×):把写指针指向当前指针

duplicate():复制当前消息块

ACE_Asynch_Read_Stream::Result 类常见方法、属性

用于在回调完成时获得相关完成信息的类

bytes_to_read ():想读取的字节数

bytes_transferred ():有多少个字节被接收

handle ():作用在那个handle上

success():操作是否成功

message_block ():返回消息块

下面附带一个异步I/O处理的例子(例子来源于ACE自带例子,稍有改动),该例子用来异步接收客户请求,并把客户请求的信息显示在控制台上

#include "ace/OS_main.h"

#include "ace/Service_Config.h"

#include "ace/Proactor.h"

#include "ace/Asynch_IO.h"

#include "ace/Asynch_IO_Impl.h"

#include "ace/Asynch_Acceptor.h"

#include "ace/INET_Addr.h"

#include "ace/SOCK_Connector.h"

#include "ace/SOCK_Acceptor.h"

#include "ace/SOCK_Stream.h"

#include "ace/Message_Block.h"

#include "ace/Get_Opt.h"

#include "ace/OS_NS_sys_stat.h"

static u_short port = ACE_DEFAULT_SERVER_PORT;

static int done = 0;

static int initial_read_size = BUFSIZ;

class Receiver : public ACE_Service_Handler

{

public:

Receiver (void);

~Receiver (void);

virtual void open (ACE_HANDLE handle,

ACE_Message_Block &message_block);

protected:

virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result);

virtual void handle_write_file (const ACE_Asynch_Write_File::Result &result);

private:

int initiate_read_stream (void);

ACE_Asynch_Read_Stream rs_;

ACE_HANDLE handle_;

// Handle for IO to remote peer.

};

Receiver::Receiver (void)

: dump_file_ (ACE_INVALID_HANDLE),

handle_ (ACE_INVALID_HANDLE)

{}

Receiver::~Receiver (void){}

void

Receiver::open (ACE_HANDLE handle,

ACE_Message_Block &message_block)

{

ACE_DEBUG ((LM_DEBUG,

"%N:%l:Receiver::open called\n"));

this->handle_ = handle;

// 打开SOCKET读取流

if (this->rs_.open (*this, this->handle_) == -1)

{

ACE_ERROR ((LM_ERROR,

"%p\n",

"ACE_Asynch_Read_Stream::open"));

return;

}

if (message_block.length () != 0)

{

// 复制消息块(引用)

ACE_Message_Block &duplicate =*message_block.duplicate ();

// 伪装一个事件读完成对象

ACE_Asynch_Read_Stream_Result_Impl *fake_result =

ACE_Proactor::instance ()->create_asynch_read_stream_result (*this,

this->handle_,

duplicate,

initial_read_size,

0,

ACE_INVALID_HANDLE,

0,

0);

//移动写指针到未写入的位置,因为读完成动作中会自动移动写指针

size_t bytes_transferred = message_block.length ();

duplicate.wr_ptr (duplicate.wr_ptr () - bytes_transferred);

//发出事件完成回调命令

fake_result->complete (message_block.length (),

1,

0);

// 删除伪装的对象

delete fake_result;

}

else

// 没有附带数据,则开始一个新的读操作

if (this->initiate_read_stream () == -1)

return;

}

int

Receiver::initiate_read_stream (void)

{

ACE_Message_Block *mb = 0;

ACE_NEW_RETURN (mb,

ACE_Message_Block (BUFSIZ + 1),

-1);

// 开始读操作

if (this->rs_.read (*mb,

mb->size () - 1) == -1)

ACE_ERROR_RETURN ((LM_ERROR,

"%p\n",

"ACE_Asynch_Read_Stream::read"),

-1);

return 0;

}

void

Receiver::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)

{

//开始读操作

ACE_DEBUG ((LM_DEBUG,

"handle_read_stream called\n"));

//显示读取的信息

result.message_block ().rd_ptr ()[result.bytes_transferred ()] = '\0';

ACE_DEBUG ((LM_DEBUG, "********************\n"));

ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_read", result.bytes_to_read ()));

ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ()));

ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ()));

ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ()));

ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ()));

ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) result.completion_key ()));

ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ()));

ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message:", result.message_block ().rd_ptr ()));

ACE_DEBUG ((LM_DEBUG, "********************\n"));

if (result.success () && result.bytes_transferred () != 0)

{

result.message_block ().release();

// 如果还存在未读取数据,则继续读取

if (this->initiate_read_stream () == -1)

return;

}

else

{

//不存在,则释放消息块并关闭SOCKET连接

ACE_DEBUG ((LM_DEBUG,

"Receiver completed\n"));

result.message_block ().release ();

done = 0;

ACE_OS::closesocket (this->handle_);

}

}

int

ACE_TMAIN (int argc, ACE_TCHAR *argv[])

{

ACE_Asynch_Acceptor<Receiver> acceptor;

//打开SOCKET端口

if (acceptor.open (ACE_INET_Addr (port),

initial_read_size,

1) == -1)

return -1;

int success = 1;

while (success > 0 && !done)

// 处理和分发事件

success = ACE_Proactor::instance ()->handle_events ();

return 0;

}

//下面的代码时帮助编译器解析上面的模板

#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)

template class ACE_Asynch_Acceptor<Receiver>;

#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)

#pragma instantiate ACE_Asynch_Acceptor<Receiver>

#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */

 
 
 
免责声明:本文为网络用户发布,其观点仅代表作者个人观点,与本站无关,本站仅提供信息存储服务。文中陈述内容未经本站证实,其真实性、完整性、及时性本站不作任何保证或承诺,请读者仅作参考,并请自行核实相关内容。
© 2005- 王朝网络 版权所有