IT学习站-137zw.com

作者: 银鲜目江探
查看: 121|回复: 0

more +资源更新Forums

more +随机图赏Gallery

价值348元 RabbitMQ消息中间件技术精讲2018视频教程 百度云价值348元 RabbitMQ消息中间件技术精讲2018视频教程 百度云
微专业 - Java高级开发工程师(完整版)微专业 - Java高级开发工程师(完整版)
画画教程 SAI零基础合集(11套)202G  完整版课程分享画画教程 SAI零基础合集(11套)202G 完整版课程分享
喜马拉雅付费专辑 华语辩论冠军的思辩表达课 分享下载喜马拉雅付费专辑 华语辩论冠军的思辩表达课 分享下载
价值1169元 建设项目目标成本编制与投资收益测算 课程价值1169元 建设项目目标成本编制与投资收益测算 课程
医学生必备图谱及教材 蓝色生死恋全集奈特图谱十二本+黄...医学生必备图谱及教材 蓝色生死恋全集奈特图谱十二本+黄...

Reactor模式

Reactor模式

[复制链接]
银鲜目江探 | 显示全部楼层 发表于: 2019-11-14 10:25:04
银鲜目江探 发表于: 2019-11-14 10:25:04 | 显示全部楼层 |阅读模式
查看: 121|回复: 0

你还没有注册,无法下载本站所有资源,请立即注册!

您需要 登录 才可以下载或查看,没有帐号?立即注册

x
什么是Reactor模式

Reactor模式是一种设计模式,它是基于事件驱动的,可以并发的处理多个服务请求,当请求抵达后,依据多路复用策略,同步的派发这些请求至相关的请求处理程序。
Reactor模式角色构成

在早先的论文An Object Behavioral Pattern for
Demultiplexing and Dispatching Handles for Synchronous Events
中Reactor模式主要有五大角色组成,分别如下:
Handle:操作系统提供的一种资源,用于表示一个个的事件,在网络编程中可以是一个连接事件,一个读取事件,一个写入事件,Handle是事件产生的发源地
Synchronous Event Demultiplexer:本质上是一个系统调用,用于等待事件的发生,调用方在调用它的时候会被阻塞,一直阻塞到同步事件分离器上有事件产生为止
Initiation Dispatcher:定义了一些用于控制事件的调度方式的规范,提供对事件管理。它本身是整个事件处理器的核心所在,Initiation Dispatcher会通过Synchronous Event Demultiplexer来等待事件的发生。一旦事件发生,Initiation Dispatcher首先会分离出每一个事件,然后调用事件处理器,最后调用相关的回调方法来处理这些事件
Event Handler:定义事件处理方法以供InitiationDispatcher回调使用
Concrete Event Handler:是事件处理器的实现。它本身实现了事件处理器所提供的各种回调方法,从而实现了特定于业务的逻辑。它本质上就是我们所编写的一个个的处理器实现。
Reactor模式  技术博客

Reactor模式实现流程


  • 初始化 Initiation Dispatcher,然后将若干个Concrete Event Handler注册到 Initiation Dispatcher中,应用会标识出该事件处理器希望Initiation Dispatcher在某些事件发生时向其发出通知
  • Initiation Dispatcher 会要求每个事件处理器向其传递内部的Handle,该Handle向操作系统标识了事件处理器
  • 当所有的Concrete Event Handler都注册完毕后,就会启动 Initiation Dispatcher的事件循环,使用Synchronous Event Demultiplexer同步阻塞的等待事件的发生
  • 当与某个事件源对应的Handle变为ready状态时,Synchronous Event Demultiplexer就会通知 Initiation Dispatcher
  • Initiation Dispatcher会触发事件处理器的回调方法响应这个事件
Reactor模式  技术博客

Java NIO对Reactor的实现

在Java的NIO中,对Reactor模式有无缝的支持,即使用Selector类封装了操作系统提供的Synchronous Event Demultiplexer功能。Doug Lea(Java concurrent包的作者)在Scalable IO in Java中对此有非常详细的描述。概况来说其主要流程如下:

  • 服务器端的Reactor线程对象会启动事件循环,并使用Selector来实现IO的多路复用
  • 注册Acceptor事件处理器到Reactor中,Acceptor事件处理器所关注的事件是ACCEPT事件,这样Reactor会监听客户端向服务器端发起的连接请求事件
  • 客户端向服务器端发起连接请求,Reactor监听到了该ACCEPT事件的发生并将该ACCEPT事件派发给相应的Acceptor处理器来进行处理。Acceptor处理器通过accept()方法得到与这个客户端对应的连接(SocketChannel),然后将该连接所关注的READ/WRITE事件以及对应的READ/WRITE事件处理器注册到Reactor中,这样一来Reactor就会监听该连接的READ/WRITE事件了。
  • 当Reactor监听到有读或者写事件发生时,将相关的事件派发给对应的处理器进行处理
  • 每当处理完所有就绪的感兴趣的I/O事件后,Reactor线程会再次执行select()阻塞等待新的事件就绪并将其分派给对应处理器进行处理
Doug Lea 在Scalable IO in Java中分别描述了单线程的Reactor,多线程模式的Reactor以及多Reactor线程模式。
单线程的Reactor,主要依赖Java NIO中的Channel,Buffer,Selector,SelectionKey。在单线程Reactor模式中,不仅I/O操作在该Reactor线程上,连非I/O的业务操作也在该线程上进行处理了,这可能会大大延迟I/O请求的响应
Reactor模式  技术博客

在多线程Reactor中添加了一个工作线程池,将非I/O操作从Reactor线程中移出转交给工作者线程池来执行。这样能够提高Reactor线程的I/O响应,不至于因为一些耗时的业务逻辑而延迟对后面I/O请求的处理,但是所有的I/O操作依旧由一个Reactor来完成,包括I/O的accept()、read()、write()以及connect()操作
Reactor模式  技术博客

多Reactor线程模式将“接受客户端的连接请求”和“与该客户端的通信”分在了两个Reactor线程来完成。mainReactor完成接收客户端连接请求的操作,它不负责与客户端的通信,而是将建立好的连接转交给subReactor线程来完成与客户端的通信,这样一来就不会因为read()数据量太大而导致后面的客户端连接请求得不到即时处理的情况。并且多Reactor线程模式在海量的客户端并发请求的情况下,还可以通过实现subReactor线程池来将海量的连接分发给多个subReactor线程,在多核的操作系统中这能大大提升应用的负载和吞吐量
Reactor模式  技术博客

代码示例:
Reactor模式  技术博客 1735841-20190908095726443-1326056267

<div style="height: 0; width: 0; max-height: 0; max-width: 0; overflow: hidden; font-size: 0em; padding: 0; margin: 0;" title="MDH:<pre>### 什么是Reactor模式<br><br>Reactor模式是一种设计模式，它是基于事件驱动的，可以并发的处理多个服务请求，当请求抵达后，依据多路复用策略，同步的派发这些请求至相关的请求处理程序。<br><br>### Reactor模式角色构成<br><br>在早先的论文[An Object Behavioral Pattern for<br>Demultiplexing and Dispatching Handles for Synchronous Events](https://gitee.com/mo-se-de-feng/notes/raw/master/images/reactor-siemens.pdf)中Reactor模式主要有五大角色组成，分别如下：<br><br>**Handle**：操作系统提供的一种资源，用于表示一个个的事件，在网络编程中可以是一个连接事件，一个读取事件，一个写入事件，Handle是事件产生的发源地<br>**Synchronous Event Demultiplexer**：本质上是一个系统调用，用于等待事件的发生，调用方在调用它的时候会被阻塞，一直阻塞到同步事件分离器上有事件产生为止<br>**Initiation Dispatcher**：定义了一些用于控制事件的调度方式的规范，提供对事件管理。它本身是整个事件处理器的核心所在，Initiation Dispatcher会通过Synchronous Event Demultiplexer来等待事件的发生。一旦事件发生，Initiation Dispatcher首先会分离出每一个事件，然后调用事件处理器，最后调用相关的回调方法来处理这些事件<br>**Event Handler**：定义事件处理方法以供InitiationDispatcher回调使用<br>**Concrete Event Handler**：是事件处理器的实现。它本身实现了事件处理器所提供的各种回调方法，从而实现了特定于业务的逻辑。它本质上就是我们所编写的一个个的处理器实现。<br><br>![img](https://gitee.com/mo-se-de-feng/notes/raw/master/images/2.1.3%20Reactor%E6%A8%A1%E5%BC%8F.PNG)<br><br>### Reactor模式实现流程<br><br>1. 初始化 Initiation Dispatcher，然后将若干个Concrete Event Handler注册到 Initiation Dispatcher中，应用会标识出该事件处理器希望Initiation Dispatcher在某些事件发生时向其发出通知<br>2. Initiation Dispatcher 会要求每个事件处理器向其传递内部的Handle，该Handle向操作系统标识了事件处理器<br>3. 当所有的Concrete Event Handler都注册完毕后，就会启动 Initiation Dispatcher的事件循环，使用Synchronous Event Demultiplexer同步阻塞的等待事件的发生<br>4. 当与某个事件源对应的Handle变为ready状态时，Synchronous Event Demultiplexer就会通知 Initiation Dispatcher<br>5. Initiation Dispatcher会触发事件处理器的回调方法响应这个事件<br><br>![img](https://gitee.com/mo-se-de-feng/notes/raw/master/images/2.1.3%20Reactor%E4%BA%8B%E4%BB%B6.PNG)<br><br>### Java NIO对Reactor的实现<br><br>在Java的NIO中，对Reactor模式有无缝的支持，即使用Selector类封装了操作系统提供的Synchronous Event Demultiplexer功能。Doug Lea（Java concurrent包的作者）在[Scalable IO in Java](https://gitee.com/mo-se-de-feng/notes/raw/master/images/Scalable%20IO%20in%20Java.pdf)中对此有非常详细的描述。概况来说其主要流程如下：<br><br>1. 服务器端的Reactor线程对象会启动事件循环，并使用Selector来实现IO的多路复用<br>2. 注册Acceptor事件处理器到Reactor中，Acceptor事件处理器所关注的事件是ACCEPT事件，这样Reactor会监听客户端向服务器端发起的连接请求事件<br>3. 客户端向服务器端发起连接请求，Reactor监听到了该ACCEPT事件的发生并将该ACCEPT事件派发给相应的Acceptor处理器来进行处理。Acceptor处理器通过accept()方法得到与这个客户端对应的连接(SocketChannel)，然后将该连接所关注的READ/WRITE事件以及对应的READ/WRITE事件处理器注册到Reactor中，这样一来Reactor就会监听该连接的READ/WRITE事件了。<br>4. 当Reactor监听到有读或者写事件发生时，将相关的事件派发给对应的处理器进行处理<br>5. 每当处理完所有就绪的感兴趣的I/O事件后，Reactor线程会再次执行select()阻塞等待新的事件就绪并将其分派给对应处理器进行处理<br><br>Doug Lea 在Scalable IO in Java中分别描述了单线程的Reactor，多线程模式的Reactor以及多Reactor线程模式。<br><br>单线程的Reactor，主要依赖Java NIO中的Channel，Buffer，Selector，SelectionKey。在单线程Reactor模式中，不仅I/O操作在该Reactor线程上，连非I/O的业务操作也在该线程上进行处理了，这可能会大大延迟I/O请求的响应<br><br>![img](https://gitee.com/mo-se-de-feng/notes/raw/master/images/2.1.3%20%E5%8D%95%E7%BA%BF%E7%A8%8BReactor.PNG)<br><br>在多线程Reactor中添加了一个工作线程池，将非I/O操作从Reactor线程中移出转交给工作者线程池来执行。这样能够提高Reactor线程的I/O响应，不至于因为一些耗时的业务逻辑而延迟对后面I/O请求的处理，但是所有的I/O操作依旧由一个Reactor来完成，包括I/O的accept()、read()、write()以及connect()操作<br><br>![img](https://gitee.com/mo-se-de-feng/notes/raw/master/images/2.1.3%20%E5%A4%9A%E7%BA%BF%E7%A8%8BReactor.PNG)<br><br>多Reactor线程模式将“接受客户端的连接请求”和“与该客户端的通信”分在了两个Reactor线程来完成。mainReactor完成接收客户端连接请求的操作，它不负责与客户端的通信，而是将建立好的连接转交给subReactor线程来完成与客户端的通信，这样一来就不会因为read()数据量太大而导致后面的客户端连接请求得不到即时处理的情况。并且多Reactor线程模式在海量的客户端并发请求的情况下，还可以通过实现subReactor线程池来将海量的连接分发给多个subReactor线程，在多核的操作系统中这能大大提升应用的负载和吞吐量<br><br>![img](https://gitee.com/mo-se-de-feng/notes/raw/master/images/2.1.3%20%E5%A4%9AReactor%E6%A8%A1%E5%BC%8F.PNG)<br><br>代码示例：<br><br>```<br>// NIO selector 多路复用reactor线程模型<br>public class NIOReactor {<br><br>  // 处理业务操作的线程池<br>  private static ExecutorService workPool = Executors.newCachedThreadPool();<br><br>  // 封装了selector.select()等事件轮询的代码<br>  abstract class ReactorThread extends Thread {<br><br>    Selector selector;<br>    LinkedBlockingQueue&lt;Runnable&gt; taskQueue = new LinkedBlockingQueue&lt;&gt;();<br><br>    volatile boolean running = false;<br><br>    private ReactorThread() throws IOException {<br>      selector = Selector.open();<br>    }<br><br>    // Selector监听到有事件后,调用这个方法<br>    public abstract void handler(SelectableChannel channel) throws Exception;<br><br>    @Override<br>    public void run() {<br>      // 轮询Selector事件<br>      while (running) {<br>        try {<br>          // 执行队列中的任务<br>          Runnable task;<br>          while ((task = taskQueue.poll()) != null) {<br>            task.run();<br>          }<br>          selector.select(1000);<br>          // 获取查询结果<br>          Set&lt;SelectionKey&gt; selectionKeys = selector.selectedKeys();<br>          // 遍历查询结果<br>          Iterator&lt;SelectionKey&gt; keyIterator = selectionKeys.iterator();<br>          while (keyIterator.hasNext()) {<br>            // 被封装的查询结果<br>            SelectionKey selectionKey = keyIterator.next();<br>            keyIterator.remove();<br>            int readyOps = selectionKey.readyOps();<br>            // 关注 Read 和 Accept两个事件<br>            if ((readyOps &amp; (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0<br>                || readyOps == 0) {<br>              try {<br>                SelectableChannel channel = (SelectableChannel) selectionKey.attachment();<br>                channel.configureBlocking(false);<br>                handler(channel);<br>                // 如果关闭了,就取消这个KEY的订阅<br>                if (!channel.isOpen()) {<br>                  selectionKey.cancel();<br>                }<br><br>              } catch (Exception e) {<br>                // 如果有异常,就取消这个KEY的订阅<br>                selectionKey.cancel();<br>                e.printStackTrace();<br>              }<br>            }<br>          }<br><br>        } catch (Exception e) {<br>          e.printStackTrace();<br>        }<br>      }<br>    }<br><br>    private SelectionKey register(SelectableChannel channel) throws Exception {<br>      // 为什么register要以任务提交的形式，让reactor线程去处理？<br>      // 因为线程在执行channel注册到selector的过程中，会和调用selector.select()方法的线程争用同一把锁<br>      // 而select()方法实在eventLoop中通过while循环调用的，争抢的可能性很高，<br>      // 为了让register能更快的执行，就放到同一个线程来处理<br>      FutureTask&lt;SelectionKey&gt; futureTask =<br>          new FutureTask&lt;&gt;(() -&gt; channel.register(selector, 0, channel));<br>      taskQueue.add(futureTask);<br>      return futureTask.get();<br>    }<br><br>    private void doStart() {<br>      if (!running) {<br>        running = true;<br>        start();<br>      }<br>    }<br>  }<br><br>  private ServerSocketChannel serverSocketChannel;<br><br>  // 1、创建多个线程 - accept处理reactor线程 (accept线程)<br>  private ReactorThread[] mainReactorThreads = new ReactorThread[1];<br><br>  // 2、创建多个线程 - io处理reactor线程  (I/O线程)<br>  private ReactorThread[] subReactorThreads = new ReactorThread[8];<br><br>  // 初始化线程组<br>  private void newGroup() throws IOException {<br>    // 创建mainReactor线程, 只负责处理serverSocketChannel<br>    for (int i = 0; i &lt; mainReactorThreads.length; i++) {<br>      mainReactorThreads[i] =<br>          new ReactorThread() {<br>            AtomicInteger incr = new AtomicInteger(0);<br><br>            @Override<br>            public void handler(SelectableChannel channel) throws Exception {<br>              // 只做请求分发，不做具体的数据读取<br>              ServerSocketChannel ch = (ServerSocketChannel) channel;<br>              SocketChannel socketChannel = ch.accept();<br>              socketChannel.configureBlocking(false);<br>              // 收到连接建立的通知之后，分发给I/O线程继续去读取数据<br>              int index = incr.getAndIncrement() % subReactorThreads.length;<br>              ReactorThread workEventLoop = subReactorThreads[index];<br>              workEventLoop.doStart();<br>              SelectionKey selectionKey = workEventLoop.register(socketChannel);<br>              selectionKey.interestOps(SelectionKey.OP_READ);<br>              System.out.println(<br>                  Thread.currentThread().getName() + "收到新连接 : " + socketChannel.getRemoteAddress());<br>            }<br>          };<br>    }<br><br>    // 创建IO线程,负责处理客户端连接以后socketChannel的IO读写<br>    for (int i = 0; i &lt; subReactorThreads.length; i++) {<br>      subReactorThreads[i] =<br>          new ReactorThread() {<br><br>            @Override<br>            public void handler(SelectableChannel channel) throws Exception {<br>              // work线程只负责处理IO处理，不处理accept事件<br>              SocketChannel ch = (SocketChannel) channel;<br>              ByteBuffer requestBuffer = ByteBuffer.allocate(1024);<br>              while (ch.isOpen() &amp;&amp; ch.read(requestBuffer) != -1) {<br>                // 长连接情况下,需要手动判断数据有没有读取结束 (此处做一个简单的判断: 超过0字节就认为请求结束了)<br>                if (requestBuffer.position() &gt; 0) break;<br>              }<br>              if (requestBuffer.position() == 0) return; // 如果没数据了, 则不继续后面的处理<br>              requestBuffer.flip();<br>              byte[] content = new byte[requestBuffer.limit()];<br>              requestBuffer.get(content);<br>              System.out.println(new String(content));<br>              System.out.println(<br>                  Thread.currentThread().getName() + "收到数据,来自：" + ch.getRemoteAddress());<br><br>              // TODO 业务操作 数据库、接口...<br>              workPool.submit(() -&gt; {});<br><br>              // 响应结果 200<br>              String response =<br>                  "HTTP/1.1 200 OK\r\n" + "Content-Length: 11\r\n\r\n" + "Hello World";<br>              ByteBuffer buffer = ByteBuffer.wrap(response.getBytes());<br>              while (buffer.hasRemaining()) {<br>                ch.write(buffer);<br>              }<br>            }<br>          };<br>    }<br>  }<br><br>  // 始化channel,并且绑定一个eventLoop线程<br>  private void initAndRegister() throws Exception {<br>    // 1、 创建ServerSocketChannel<br>    serverSocketChannel = ServerSocketChannel.open();<br>    serverSocketChannel.configureBlocking(false);<br>    // 2、 将serverSocketChannel注册到selector<br>    int index = new Random().nextInt(mainReactorThreads.length);<br>    mainReactorThreads[index].doStart();<br>    SelectionKey selectionKey = mainReactorThreads[index].register(serverSocketChannel);<br>    selectionKey.interestOps(SelectionKey.OP_ACCEPT);<br>  }<br><br>  // 绑定端口<br>  private void bind() throws IOException {<br>    //  1、 正式绑定端口，对外服务<br>    serverSocketChannel.bind(new InetSocketAddress(8080));<br>    System.out.println("启动完成，端口8080");<br>  }<br><br>  public static void main(String[] args) throws Exception {<br>    NIOReactor nioReactor = new NIOReactor();<br>    // 1、 创建main和sub两组线程<br>    nioReactor.newGroup();<br>    // 2、 创建serverSocketChannel，注册到mainReactor线程上的selector上<br>    nioReactor.initAndRegister();<br>    // 3、 为serverSocketChannel绑定端口<br>    nioReactor.bind();<br>  }<br>}<br><br>```<br><br><br></pre><p><img src="https://img2018.cnblogs.com/blog/1735841/201909/1735841-20190908095726443-1326056267.png" alt="" data-mce-src="https://img2018.cnblogs.com/blog/1735841/201909/1735841-20190908095726443-1326056267.png"></p><pre><br></pre>">​
来源:http://www.137zw.com
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
137zw.com IT学习站致力于免费提供精品的java技术教程和python技术教程,CCNA书籍/资料/CCNP书籍/资料教程/CCIE书籍/资料/H3C学习/认证/一级建造师考试/微软学习/认证/包括基础教程和高级实战教程,同时也提供分享网站源码下载和互联网相关一系列的技术教程,我们想做的就是让知识分享更有价值!(IT学习站官方唯一域名地址:www.137zw.com 请谨防假冒网站!)本站所有资源全部收集于互联网或网友自行分享,分享目的仅供大家学习与参考,如无意中侵犯您的合法权益,请联系本站管理员进行删除处理!
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

浙ICP备19022368号-1|Archiver|手机版|IT学习站-137zw.com

GMT+8, 2020-7-4 10:45 , Processed in 0.244339 second(s), 33 queries .

快速回复 返回顶部 返回列表