鄙文主要讨论如何在asio上扩展自定义的异步io操作。包括asio中经典的Context/Service插件机制,以及我们将经常打交道的几个Service,还有使用asio调度器的一些注意事项等。
在使用asio开发时,我们都会创建两类对象:一类是exection context,另一类是IO object. 比如我们以asio的tcp socket为例:
asio::io_context ioc{};asio::ip::tcp::socket socket{ ioc };
asio::io_context是execution context的实现类,而asio::ip::tcp::socket则是IO object. 以上短短的两行代码,用户就已经通过IO object对象,将tcp/ip的socket功能装载入io_context中。也就是将socket service,注册到了io_context中。注册的扳机是asio::detail::io_object_impl模板类。
asio::ip::tcp::socket类型,抛开中间的各种实例化和继承的关系,它本质上是asio::detail::io_object_impl实例化类型的外观。如下面代码所示(代码经过简化处理,方便展示概念,与asio有出入)
// in namemspace asio
class socket
{
private:
detail::io_object_impl<
detail::reactive_socket_service<ip::tcp>,
io_context::executor
> impl_;
public:
socket(const io_context::executor& ex)
: impl_(0, ex)
{}
};
可以看到socket外观类,使用了reactive_socket_service(这里以linux平台为例),和io_context导出的executor,对io_object_impl进行实例化;而socket构造函数,将io_context关联的executor传递给impl_成员。我们再来看看io_object_impl对应的构造函数:
// in namespace asio::detail
template <typename IoObjectService,
typename Executor = io_context::executor_type>
class io_object_impl
{
public:
// The type of the service that will be used to provide I/O operations.
typedef IoObjectService service_type;
// The underlying implementation type of I/O object.
typedef typename service_type::implementation_type implementation_type;
// The type of the executor associated with the object.
typedef Executor executor_type;
// Construct an I/O object using an executor.
explicit io_object_impl(int, const executor_type& ex)
: service_(&asio::use_service<IoObjectService>(
io_object_impl::get_context(ex))),
executor_(ex)
{
service_->construct(implementation_);
}
private:
// The service associated with the I/O object.
service_type* service_;
// The underlying implementation of the I/O object.
implementation_type implementation_;
// The associated executor.
executor_type executor_;
};
构造函数非常明确的展示了io_object_impl通过Executor获取Execution Context对象,并通过use_service函数向Execution Context中注册Service. 对implementation_type相关的内容,我们暂时跳过。这里着重关注use_service函数,它是asio向其异步IO事件框架中,装载具体IO功能(这里是reactor_socket_service)的关键入口。
Context/Service大家也许很陌生,但换一个说法也许大家就很熟悉了。例如COM中使用的Component/Interface组件对象模型,还有游戏引擎中常用的Object/Component模型。如下图所示:
use_service函数实际上就是向Execution Context中管理Service集合的类Service Registry中申请一个指定的Service对象。而service_registry则会保证每一种Service只有一个,没有的时候创建一个新的为原则,最终将有效的Service引用返回。当然,以上操作都是线程安全的。值得一提的是,Service Registry使用了一个id来区分不同的Service,而id的生成则是通过CRTP配合std::type_info来生成的,如下代码所示:
// Special derived service id type to keep classes header-file only.
template <typename Type>
class service_id
: public execution_context::id
{
};
// Special service base class to keep classes header-file only.
template <typename Type>
class execution_context_service_base
: public execution_context::service
{
public:
static service_id<Type> id;
// Constructor.
execution_context_service_base(execution_context& e)
: execution_context::service(e)
{
}
};
// implement your private service
template <typename PortSpace>
class rdma_core_service :
public execution_context_service_base<rdma_core_service<PortSpace>>
{
// ....
};
需要注意的是CRTP中的传递的实现类千万不要写错了,这里是rdma_core_service<PortSpace>
. 否则底层计算的ID会出错。如果ID是某一个已经存在的Service的ID,会直接将那个Service返回给用户,这将会是非常危险的事情。虽然CRTP可以在正确的情况下用户保证在运行时保证类型安全,但是错误的使用并不会有任何编译期报错,并将灾难抛给运行时。
到现在为止,我们应该知道扩展asio的关键是实现自己的Service,并实现自己的io_object_impl外观类。Service负责实现具体的同步或者异步IO的操作,而外观类则提供封装好的接口,让用户方便地,正确地使用Service提供的能力。下面,我么展开讨论一下:
回顾一下第一节贴出的io_object_impl代码段,其中有一个implementation_的成员。它的类型是io_object_impl通过Service中导出的implementation_type而来。它是IO Object真正的数据实现,并由各个Service来定义。这样做的好处是,可以让每个平台来处理自己的实现细节。比如Linux平台的reactor_socket_service使用的socket类型是fd,而windows平台的iocp_socket_service则使用的是SOCKET,implementation_type的使用,可以让io_object_impl不用关心平台相关的数据类型差异和实现细节。
例如本系列文档讨论的在asio上封装rdma-core(https://github.com/linux-rdma/rdma-core)功能的Service
template <typename PortSpace>
class rdma_core_service :
public execution_context_service_base<rdma_core_service<PortSpace>>
{
public:
// The implementation type of the cm id.
struct implementation_type
{
// The native cm representation.
rdma_cm_id_data* cm_id_data_;
// state for memory region
rdma_mr_state mr_state_;
// if there is a queue pair attached to this io object
bool has_qp_;
};
};
io_object_impl会实现自己的The Rule of Five(https://en.cppreference.com/w/cpp/language/rule_of_three). 例如第一节io_object_impl中的构造函数,转发给了Service的construct函数。这样的函数照例,我们需要实现如下:
template <typename PortSpace>
class rdma_core_service :
public execution_context_service_base<rdma_core_service<PortSpace>>
{
public:
// Construct a new implementation.
inline void construct(implementation_type& impl);
// Destroy a implementation
inline void destroy(implementation_type& impl);
// move constructor
inline void move_construct(implementation_type& impl, implementation_type& other_impl);
// move assign
inline void move_assign(implementation_type& impl,
rdma_core_service& other_service, implementation_type& other_impl);
};
io_object_impl是Noncopyable,所以只用实现Move Constructor和Move Assignment Operator. 至于Converter Constructor,则根据用户自己的需求来决定。
Service中还有几个需要Overwrite的函数,如下列代码所示:
template <typename PortSpace>
class rdma_core_service :
public execution_context_service_base<rdma_core_service<PortSpace>>
{
public:
/// Destructor.
ASIO_DECL virtual ~rdma_core_service()
/// Destroy all user-defined handler objects owned by the service.
ASIO_DECL virtual void shutdown();
/// Handle notification of a fork-related event to perform any necessary
/// housekeeping.
/**
* This function is not a pure virtual so that services only have to
* implement it if necessary. The default implementation does nothing.
*/
ASIO_DECL virtual void notify_fork(execution_context::fork_event event);
};
Virtual Destructor就不多说了, 因为Service Registry持有的是基类指针,清理Service的时候析构函数得是Virtual. Shutdown则是给有状态的Service在整个asio发起Shutdown的时候清理和校验自己扩展点。notify_fork只适用于Linux平台,用来响应Fork系统调用的事件。
如第一节代码所示,用户并不直接调用Service的接口来适用asio的功能,而是通过IO Object. 因此,我们还要实现对应的IO Object外观类。以普遍理性而论,Service与IO Object外观是一对多的关系,例如在linux平台上Socket和Socket Acceptor都是对应于reactive_socket_service. 反过来,因为IO Object会使用一个确定的Service导出的implementation_type,IO object只能对应一个确定的Service. 例如一个rdma_connetion的实现:
// IO object for rdma, provides rdma functionality
template <typename PortSpace, typename Executor = any_io_executor>
class rdma_connection
{
public:
// ....
#if ASIO_HAS_IOCP
using service_type = detail::rdma_iocp_connector_service<PortSpace>;
#else
using service_type = detail::rdma_core_service<PortSpace>;
#endif
private:
detail::io_object_impl<service_type> impl_;
public: // implement interface
void open(port_space_type const& port_space);
bool is_open() const
{
return impl_.get_service().is_open(impl_.get_implementation());
}
// .....
};
通过外观类调用Service的接口,可以参考is_open成员方法的实现。这里为了方便说明,简单起见只用了一个同步的,实现简单的接口作为展示。
至此,我们就可以在asio上扩展我们的“插件”了。可以发现我们并没有侵入式的修改asio的任何代码,这都得益于Context/Service对象模型强大的可扩展性。
asio是异步IO框架,如果我们需要支持自定义的异步IO操作,还需要做一些额外的事工作。在工作开始之前,我们需要了解一些asio底层调度器的细节。
asio的实现中没有一个具体的统一的Scheduler实现,原因是Linux平台上使用了Reactor模式,而Windows平台则是基于IOCP的Proactor模式。两个模式有差异,但asio的选择是在Linux平台上实现一个scheduler类,并与Reactor一同工作,提供与Proactor模式相同的接口。最后,asio对Service层提供了一套统一的Scheduler机制,如下图:
由上图我们可以看到,Reactor中额外实现了一个scheduler类,它内部管理了一个Completion Queue,来模拟与IOCP中等同的功能,并允许用户直接向该队列Post一个回调。
统一的Scheduler机制,也就是各个平台实现的context_impl类,并不能抹平Reactor与Proactor所有的差异。对于Proactor而言,IO函数的调用就直接发起了异步IO操作(Windows平台中WSA系列的函数)。而Reactor依旧是同步IO,因此Reactor额外提供了start_op接口。该接口将根据IO的类型,将IO事件注册给Reactor,并同步地在Scheduler收到IO事件后,选择适当的时机执行IO操作。
调度的过程和细节,asio封装的Reactor与Proactor也有所不同。基于IOCP的Completion Queue中,只有IO完成事件回调和用户主动Post入队的回调;而基于Reactor的Completion Queue中,除了这两个回调以外,还有从Reactor中通知的同步IO事件。
对asio中Reactor与Proactor的机制有所了解之后,我们便可以继续在不同平台上适配异步操作的工作了。
首先是Reactor模式下的异步操作封装。我们知道,在Reactor模式下,asio有同步调用IO函数的机制。对asio源码稍作阅读后,不难发现这些秘密都藏在asio::detail::reactor_op类中。我么只要继承该类,并覆盖两个关键的函数,一个是同步执行IO操作的函数,另一个是IO完成的函数,就能够实现自定义的异步操作。当然,在Reactor模式下的异步操作,是模拟的异步操作。
// reactor_op implemetion
class reactor_op : public operation
{ /******/ };
class your_reactor_async_op : public reactor_op
{
public:
your_reactor_async_op(asio::error_code const& success_ec, ...)
: reactor_op(success_ec, do_perform, do_complete)
//, other initialization
{}
static status do_perform(reactor_op* base)
{
auto* this_op = static_cast<your_reactor_async_op*>(base);
// TODO ... call your sync io
}
static void do_complete(void* owner, operation* base,
const asio::error_code& /*ec*/,
std::size_t /*bytes_transferred*/)
{
auto* this_op = static_cast<your_reactor_async_op*>(base);
// TODO ... do your io completion logic
}
};
reactor_op类是继承自operation类,该类也是Scheduler中Completion Queue链表的节点类,因此我们可以把要模拟的某个异步IO操作的perform和complete业务都实现在一起。
扩展IOCP Proactor的操作就比Reactor模式下简单许多。operation类继承自OVERLAPPED结构体,可以直接把op操作传递给异步IO函数的LPOVERLAPPED指针参数。因此我们只需要考虑IO complete回调即可。
class your_iocp_async_op : public operation
{
public:
your_iocp_async_op(...)
: operation(do_complete)
//, other initialization
{}
static void do_complete(void* owner, operation* base,
const asio::error_code& /*ec*/,
std::size_t /*bytes_transferred*/)
{
auto* this_op = static_cast<your_iocp_async_op*>(base);
// TODO ... do your io completion logic
}
};
有些IO库或者标准中的IO完成事件,并不能统一到asio的标准统一Scheduler中。例如rdma中的Completion Queue(CQ). 这种设计也是合理的,毕竟使用统一的调度,并不能满足对低延迟极度敏感的场景。
适配类似rdma中的CQ,一个简单可行的做法,可以参考在asio上尝试RDMA中第二节的内容。除此之外,既然rdma提供了CQ的概念和Poll CQ的接口,说明用户一定存在自己主动使用它们的场景。于是笔者尝试提供了CQ对象,让用户可以在任意线程主动Poll CQ.
asio::io_context ioc{};
asio::rdma::completion_queue cq{ ioc };
std::thread thread {
[&] { cq.run(); }
}
该场景下,io_context中的统一调度CQ的Service将会停止工作,并把Poll CQ的权限交给用户。
有了适配各个平台的异步操作定义之后,剩下的工作就是为io_object_impl与Service实现异步接口了。io_object_impl外观类的实现尽量统一,但是每个平台的Operation类型实现大概率不一样,asio在外观类层通过Initializer的机制,把实现上的差异转发给了各个平台的Service,我们以rdma_connect的一个实现为例:
// IO object for rdma, provides rdma functionality
template <typename PortSpace, typename Executor = any_io_executor>
class rdma_connection
{
public:
// .......
class initiate_async_connect;
// async connect
template <
ASIO_COMPLETION_TOKEN_FOR(void (asio::error_code))
ConnectToken ASIO_DEFAULT_COMPLETION_TOKEN_TYPE(executor_type)>
ASIO_INITFN_AUTO_RESULT_TYPE_PREFIX(ConnectToken,
void (asio::error_code))
async_connect(endpoint_type const& endpoint,
ASIO_MOVE_ARG(ConnectToken) token
ASIO_DEFAULT_COMPLETION_TOKEN(executor_type))
ASIO_INITFN_AUTO_RESULT_TYPE_SUFFIX((
async_initiate<ConnectToken, void (asio::error_code)>(
declval<initiate_async_resolve_route>(), token,
declval<asio::error_code&>())))
{
asio::error_code open_ec{};
if(!is_open())
{
open_ec = asio::error::try_again;
}
#if ASIO_HAS_IOCP
if(!open_ec)
{
impl_.get_service().allocate_qp(
impl_.get_implementation(), open_ec);
}
#endif
return async_initiate<ConnectToken, void(asio::error_code)>(
initiate_async_connect(this), token, endpoint, open_ec);
}
// initializer for async_connect_op
class initiate_async_connect
{
public:
using executor_type = Executor;
explicit initiate_async_connect(rdma_connection* self) ASIO_NOEXCEPT
: self_(self) {}
template <typename Handler>
void operator() (ASIO_MOVE_ARG(Handler) handler,
endpoint_type const& endpoint,
asio::error_code const& open_ec) const
{
ASIO_CONNECT_HANDLER_CHECK(Handler, handler) type_check;
if(open_ec)
{
asio::post(self_->impl_.get_executor(),
asio::detail::bind_handler(
ASIO_MOVE_CAST(Handler)(handler), open_ec));
}
else
{
detail::non_const_lvalue<Handler> handler2(handler);
self_->impl_.get_service().async_connect(
self_->impl_.get_implementation(), endpoint,
handler2.value, self_->impl_.get_executor());
}
}
private:
rdma_connection* self_;
};
};
Initializer还担任了很多编译期检查的工作,比如Handler的类型是否满足,Buffer是否满足ConstBufferSequence的约束等。在跟祁宇的一次讨论中,他还提到了Initializer的设计还可以让用户去特化async_result来实现自定义异步操作,是一个很强大的用户扩展点。
io_object_impl通过Initializer转发给Service之后,剩下的事情就很简单了。Service的异步操作接口主要负责创建对应的异步操作对象,初始化操作对象,并通知Scheduler发起IO操作。
以rdma在Linux和Windows平台上的实现为例:
// rdma-core for linux
template <typename PortSpace>
template <typename Handler, typename IoExecutor>
void rdma_core_service<PortSpace>::async_connect(implementation_type& impl,
endpoint_type const& endpoint,
Handler& handler, const IoExecutor& io_ex)
{
bool const is_continuation =
asio_handler_cont_helpers::is_continuation(handler);
// TODO ... cancellation
using op = rdma_cm_connect_op<PortSpace, Handler, IoExecutor>;
typename op::ptr p = { asio::detail::addressof(handler),
op::ptr::allocate(handler), 0 };
p.p = new (p.v) op(success_ec_, implNaN_id_data_->cm_id_, get_context(), handler, io_ex);
cm_event_service_.start_connect_op(implNaN_id_data_, p.p, endpoint.data(), is_continuation);
p.v = p.p = 0;
}
// iocp for windows
template <typename PortSpace>
template <typename Handler, typename IoExecutor>
void rdma_iocp_connector_service<PortSpace>::async_connect(
implementation_type& impl, endpoint_type const& endpoint,
Handler& handler, const IoExecutor& io_ex)
{
// TODO ... cancellation
using op = rdma_nd_connect_op<Handler, IoExecutor>;
typename op::ptr p = { asio::detail::addressof(handler),
op::ptr::allocate(handler), 0 };
p.p = new (p.v) op(impl.connector_.Get(), handler, io_ex);
start_connect_op(impl, endpoint, p.p);
p.v = p.p = 0;
}
需要注意的是,Reactor与Proactor模式的差异在这里就显现出来了。Reactor在这里发起的是一个reactor_op操作, 注册给Connection Mananger也就是这里的cm_event_service_对象,真正的connect函数调用是在Connection Mananger的事件Channel对应的fd有POLLIN消息的时候,才会调用。而对于IOCP Proactor而言,start_connect_op会直接非阻塞地调用Connector的Connect函数,并在Connection创建成功后,通过IOCP的Completion Queue通知用户。
本文简要地讨论了如何在asio中扩展自己的异步IO操作,简单分析了Context/Service对象模型,简述如何实现自己的IO Object外观类以及对应的Service类。最后讨论了如何在Reactor和Proactor上扩展自定义的异步操作。