理解 Reactor 和 Proactor 模式

概述

事件驱动模型是实现高性能服务器的一种方式,常见实现模式有 Reactor 和 Proactor。两者都适用于在分布式环境下,服务端需要同时处理来自多个客户端的请求。

Reactor 模式

Reactor 模式又称为 Dispatcher、Notifier。工作模式总结如下:对于应用程序提供的每一种类型的服务,都需要有一个对应的 Event Handler 处理该类型的事件。所有的 Event Handlers 都实现了同一接口,并在 Initiation Dispatcher 注册。Initiation Dispatcher 使用一个 Synchronous Event Demultiplexer 等待事件发生。当事件发生时,Synchronous Event Demultiplexer 通知 Initiation Dispatcher,Initiation Dispatcher 再回调该事件对应的 Event Handler。然后该 Event Handler 再将事件分发给实现请求服务的方法。

Reactor 组件

Reactor 模式一般包含以下组件: Reactor-Pattern

Handles

Handles 代表由 OS 管理的资源,通常包括网络连接、文件、定时器和同步对象等。

Synchronous Event Demultiplexer

Synchronous Event Demultiplexer 会在一组句柄上阻塞等待事件发生。当其中一个句柄就绪(对该句柄操作不会阻塞)时就返回。常见的 IO 事件分离器有 select、epoll 等。

Initiation Dispatcher

Initiation Dispatcher 定义一个用于注册、移除和调度 Event Handlers 的接口。一般地,Synchronous Event Demultiplexer 负责等待事件,当有新事件到来时,它会通知 Initiation Dispatcher 回调应用相关的 Event Handler。通常,事件包括连接建立事件、数据输入输出事件和超时事件。

Event Handler

Event Handler 也定义了一个接口,包含一个钩子方法,表示事件的分发操作。应用程序必须实现该方法,用于分发特定的事件。

Concrete Event Handler

Concrete Event Handler 实现了 Event Handler 的抽象方法,能完成事件调度,同时还有处理这些事件的其它方法。为了收到事件通知,应用程序需要在 Initiation Dispatcher 注册 Event Handler。这样,当事件到来时,Initiation Dispatcher 就会回调合适的 Event Handler 的钩子方法。

Reactor 工作原理

Reactor 工作原理如下图: Reactor-Diagram

  1. 应用程序在 Initiation Dispatcher 上注册 Concrete Event Handler ,声明希望 Initiation Dispatcher 通知该 Event Handler 的事件类型。Initiation Dispatcher 要求 Event Handler 回传其内部句柄。在操作系统内部,该句柄标识了该 Event Handler。
  2. 注册完 Event Handler,应用程序调用 handle_events 启动 Initiation Dispatcher 的事件循环。现在,Initiation Dispatcher 使用 Synchronous Event Demultiplexer 等待在这些句柄上出现的事件。比如,TCP 协议层可以使用 select 等待客户端建立连接。
  3. 当一个 Handle 对应的事件就绪,比如一个 TCP socket 可读,Synchronous Event Demultiplexer 就会通知 Initiation Dispatcher。
  4. 当 Initiation Dispatcher 收到 Handle 就绪的通知,就会触发对应 Event Handler 的 handle_event 方法,将事件作为参数传递给 handle_event 方法,这样 Event Handler 可以在内部继续做分离和调度。Initiation Handler 需要在内部维护一个 Handle 和 Event Handler 的映射,这样才能在 Handle 就绪时,快速找到对应的 Event Handler。

实现

Synchronous Event Demultiplexer

Synchronous Event Demultiplexer 一般使用操作系统提供了 IO 多路复用机制实现,如 select、poll 和 epoll 等。

Initiation Dispatcher

Initiation Dispatcher 需要维护一个表,用于注册和移除 Event Handler。常见的实现方式有哈希、线性搜索等。 Initiation Dispatcher 还要实现 handle_events,提供事件循环的入口,完成 handle demultiplexing 和 Event Handler dispatching。通常,该方法也是整个应用程序的入口。 如果在多线程环境中使用 Reactor,那么 Initiation Dispatcher 还需要实现必要的同步机制。 一个示例 Initiation Dispatcher 设计如下:

class Initiation_Dispatcher{
    /**
    * Demultiplex and dispatch Event_Handlers in response 
    * to client requests.
    */

public:
    /**
    * Register an Event_Handler of a particular 
    * Event_Type(e.g., READ_EVENT, ACCEPT_EVENT,etc.)
    */
    int register_handler(Event_Handler *eh, Event_Type et);
    
    /**
    * Remove an Event_Handler of a particular Event_Type
    */
    int remove_handler(Event_Handler *eh, Event_Type et);
    
    /**
    * Entry point into the reactive event loop
    */
    int handle_events(Time_Value *timeout = 0);
}

Event Handler

将 Handle 与对应的 Event Handler 关联在一起有两种方式:

  1. 为 Handle 创建一个 Event Handler 对象
  2. 为 Handle 创建一个 Event Handler 函数

Event Handler 对象

Event Handler 对象机制是将事件处理封装成一个类或者接口,好处是可以有状态和方法。应用程序可以继承基类,实现自己的 Event Handler。 Handler 可以只包含一个方法,如下:

class Event_Handler{
    /**
    * Abstract base class that serves as the target of the 
    * Initiation_Dispatcher
    */

public:
    /**
    * Hook method that is called back by the 
    * Initiation_Dispatcher to handle events.
    */
    virtual int handle_event(Event_Type et) = 0;
    
    /**
    * Hook method that returns the underlying I/O Handle
    */
    virtual Handle get_handle() const = 0;
}

这样做的好处是应用程序不需要修改接口就可以添加新类型的事件。但是,子类实现 handle_event 时,要使用 switch 等语句判断事件类型。

另一种方式是为每一种事件都定义一个函数,如下:

class Event_Handler{
public:
    /**
    * Hook methods that are called back by the 
    * Initiation_Dispatcher to handle particular types of 
    * events
    */
    virtual int handle_accept() = 0;
    virtual int handle_input() = 0;
    virtual int handle_output() = 0;
    virtual int handle_timeout() = 0;
    virtual int handle_close() = 0;
    
    /**
    * Hook method that returns the underlying I/O Handle.
    */
    virtual Handle get_handle() = 0;
}

使用这种方式,需要我们提前预估所有可能事件类型。

Event Handler 函数

Event Handler 函数机制则是将事件处理封装到一个函数,不需要继承就可以实现一个 Event Handler。

可以用适配器模式同时支持这两种机制。

确定 Initiation Dispatcher 的数量

多线程环境下,可以在每个线程都创建一个 Initiation Dispatcher,这些 Dispatcher 可以并行执行。

Proactor 模式

Proactor 模式是基于异步操作的。工作模式总结如下:异步 Web 服务器向 OS 发起一个异步操作,并在 Completion Dispatcher 注册一个回调。接着,OS 代表应用程序执行异步操作,并将执行结果放到一个已知的位置。Completion Dispatcher 负责拿取执行结果,并执行回调通知 Web 服务器。

Proactor 组件

Proactor 模式一般包含以下组件: Proactor-Pattern.png

Proactive Initiator

Proactive Initiator 负责发起异步操作,需要在 Asynchronous Operation Processor 注册一个 Completion Handler 和一个 Completion Dispatcher 。当异步操作完成时,Asynchronous Operation Processor 就会通知 Proactive Initiator。

Completion Handler

Completion Handler 是一个接口,定义了异步操作的完成通知。

Asynchronous Operation

Asynchronous Operation 将代表应用去执行请求(比如 IO 和定时器操作)。当应用程序调用异步操作时,这些操作不需要借助应用线程的控制;那么从应用程序的角度,这些操作就在异步执行。当异步操作完成后,Asynchronous Operation Processor 将完成通知委托给 Completion Dispatcher。

Asynchronous Operation Process

Asynchronous Operation Processor 实际执行异步操作,通常这一部分由操作系统实现。

Completion Dispatcher

当异步操作执行完成,Completion Dispatcher 代表 Asynchronous Operation Processor 回调应用程序的 Completion Handler。

Proactor 工作原理

Proactor 工作原理如下图: Proactor-Pattern

  1. Proactive Initiators 发起操作:首先,应用程序向 Asynchronous Operation Processor 发起一个异步操作。比如说,一个 Web 服务器可能会请求 OS 向网络中的一个特定的 socket 连接发送文件。为此,该 Web 服务器需要指定发送的文件和对应的 socket。不仅如此,Web 服务器还需要指定(1)当异步操作完成时需要通知的 Completion Handler 和(2)当文件传输完成后需要执行回调的 Completion Handler。
  2. Asynchronous Operation Processor 执行操作: 当应用程序发起了异步操作之后,由 Asynchronous Operation Processor 实际执行该操作。这依赖于操作系统的实现,大部分操作系统(如 Solaris 和 Windows NT)都在内核中提供了异步 IO 子系统。

  3. Asynchronous Operation Processor 通知 Completion Dispatcher:上面提到,应用程序发起异步操作时还必须指定操作完成后的 Completion Handler 和完成回调的 Completion Dispatcher。当操作完成后,Asynchronous Operation Processor 向 Completion Dispatcher 传递异步操作的执行结果,再由 Dispatcher 回调 Completion Handler。比如说,当异步向网络传输一个文件时,Asynchronous Operation Processor 会报告完成状态(成功还是失败)以及向网络中写入的字节数。
  4. Completion Dispatcher 通知应用程序:当 Dispatcher 收到通知后,调用 Completion Handler 的 completion 钩子并传递应用程序需要的数据。比如,当异步读完成后,Dispatcher 向 Handler 传递一个指针,指向新到达的数据。

实现

Asynchronous Operation Processor

Asynchronous Operation Processor 负责代表应用程序异步执行操作,需要向外暴露异步操作 API 并实现异步操作引擎。

异步操作 API

异步操作 API 允许应用程序发起异步操作。设计 API 时需要考虑下面几个因素:

  1. 可移植性,不局限性某个特定平台
  2. 灵活性,可以对多个媒介(比如网络和文件)调用 API
  3. Callback:允许 Proactive Initiators 指定操作完成时应该调用哪个回调
  4. Dispatcher:允许 Proactive Initiators 指定操作完成时由哪个 Completion Dispatcher 完成回调

一个示例 API 设计如下:

class Asynch_Stream{
    // A Factory for initiating reads and writes asynchronously.
    
    /**
    * Initializes the factory with information which will be used 
    * with each asynchronous call. <handler> is notified when the operation completes. 
    * The asynchronous operations are performed on the <handle> and the results of the operations are sent to the <Completion_Dispatcher>
    */
    Asynch_Stream(Completion_Handler &handler, HANDLE handle, Completion_Dispatcher *);
    
    /**
    * This starts off an asynchronous read. Upto <bytes_to_read> will be read and stored in the <message_block>.
    */
    int read(Message_Block &message_block, u_long bytes_to_read, const void *act = 0);
    
    /**
    * This starts off an asynchronous write. Upto <bytes_to_write> will be written from the <message_block>.
    */
    int write(Message_Block &message_block, u_long bytes_to_write, const void *act = 0);
}

异步操作引擎

当应用程序发起异步操作后,Asynchronous Operation Processor 必须不借用应用线程的控制执行该操作。现代操作系统(比如 POSIX 异步 IO 和 WinNT overlapped IO)都提供了异步操作机制。在这样的情况下,可以使用操作系统异步 API 实现引擎。 如果操作系统不支持异步操作,那么还可以在应用程序层面实现引擎。一个最直观的办法是使用额外的线程执行异步操作。

Completion Dispatcher

当 Completion Dispatcher 收到来自 Asynchronous Operation Processor 的操作完成通知,会回调相应的 Completion Handler。那么,实现 Completion Dispatcher 时需要(1)实现回调机制和(2)定义用于执行回调的并发策略

实现回调

回调机制使得 Proactive Initiators 在发起操作时能指定对应回调。通常有如下方法实现:

  1. Callback class:Completion Handler 对 Completion Dispatcher 暴露一个接口。当操作完成时,Dispatcher 回调该接口的方法,并传递完成信息。
  2. Function pointer:Completion Dispatcher 通过回调函数指针调用 Completion Handler
执行回调并发策略

Completion Dispatcher 可以利用以下几种并发策略完成回调:

  1. Dynamic-thread dispatching:Completion Dispatcher 每次都创建一个线程回调 Completion Handler
  2. Post-reactive dispatching:Proactive Initiator 生成一个 事件对象或者条件变量,操作完成后 Completion Dispatcher 发出对应信号。
  3. Call-throught dispatching:Completion Dispatcher 借助 Asynchronous Operation Processor 的线程执行 Completion Handler。
  4. Thread Poll dispatching:Completion Dispatcher 使用线程池执行 Completion Handler

Completion Handler

Completion Handler 的实现需要考虑以下因素:

State Integrity

一个 Completion Handler 需要维护一个特定请求的状态信息。比如,OS 可能会通知 Web 服务器只有一部分文件被写入到网络。那么,Completion Handler 必须再次发起请求直到所有文件内容都被写入到网络。所以,Handler 必须知道文件来自哪里,剩余可写入字节数,文件指针位置等信息。

Resource Management

Completion Handler 自身需要维护资源,避免造成死锁

参考

  1. IO设计模式:Reactor和Proactor对比
  2. Reactor
  3. Proactor

Previous post: 理解 select、poll 和 epoll

Next post: LeetCode Reservoir Sampling 问题总结