Java NIO之Selector

作者 : 开心源码 本文共5382个字,预计阅读时间需要14分钟 发布时间: 2022-05-12 共76人阅读

最后详情一下Selector,选择器提供选择执行已经就绪的任务的能力,这使得多元I/O成为了可能,就绪执行和多元选择使得单线程能够有效地同时管理多个I/O通道。选择器的执行主要分为以下几个步骤:

1、创立一个或者者多个可选择的通道(SelectableChannel)

2、将这些创立的通道注册到选择器对象中

3、选择器会记住开发者关心的通道,它们也会追踪对应的通道能否已经就绪

4、开发者调用一个选择器对象的select()方法,当方法从阻塞状态返回时,选择键会被升级

5、获取选择键的集合,找到当时已经就绪的通道,通过遍历这些键,开发者可以选择对已就绪的通道要做的操作

选择器

选择器的作用是管理了被注册的通道集合和它们的就绪状态,假设我们有四个Socket通道的选择器,可以通过下面方式创立:

Selector selector = Selector.open(); channel1.register(selector, SelectionKey.OP_READ); channel2.register(selector, SelectionKey.OP_WRITE); channel3.register(selector, SelectionKey.OP_READ | OP_WRITE); channel4.register(selector, SelectionKey.OP_READ | OP_ACCEPT); ready = selector.select(10000); 

select()方法在将线程置于睡眠状态直到这些感兴趣的事件中的一个发生或者者10秒钟过去,这就是所谓的事件驱动。

通道是调用register方法注册到选择器上的,从代码里面可以看到register()方法接受一个Selector对象作为参数,以及一个名为ops的整数型参数,第二个参数表示关心的通道操作。有四种被定义的可选择操作:读(read)、写(write)、连接(connect)和接受(accept)。

注意并非所有的操作都在所有的可选择通道上被支持,例如SocketChannel就不支持accept。

选择键

一个键表示一个特定的通道对象和一个特定的选择器对象之间的注册关系。

public abstract class SelectionKey{    public static final int OP_READ;    public static final int OP_WRITE;    public static final int OP_CONNECT;    public static final int OP_ACCEPT;    public abstract SelectableChannel channel();    public abstract Selector selector();    public abstract void cancel();    public abstract boolean isValid();    public abstract int interestOps();    public abstract void iterestOps(int ops);    public abstract int readyOps();    public final boolean isReadable();    public final boolean isWritable();    public final boolean isConnectable();    public final boolean isAcceptable();    public final Object attach(Object ob);    public final Object attachment();}

选择器维护着注册过的通道的集合,并且这些注册关系中的任意一个都是封装在SelectionKey对象中的。每一个Selector对象维护三种键的集合:

public abstract class Selector{    ...    public abstract Set keys();    public abstract Set selectedKeys();    public abstract int select() throws IOException;    public abstract int select(long timeout) throws IOException;    public abstract int selectNow() throws IOException;    public abstract void wakeup();    ...   }

已注册的键的集合(Registered key set)
与选择器关联的已经注册的键的集合,并不是所有注册过的键都有效,这个集合通过keys()方法返回,并且可能是空的。这些键的集合是不可以直接修改的,试图这么做将引发java.lang.UnsupportedOperationException。

已选择的键的集合(Selected key set)
已注册的键的集合的子集,这个集合的每个成员都是相关的通道被选择器判断为已经准备好的并且包含于键的interest集合中的操作。这个集合通过selectedKeys()方法返回(有可能是空的)。键可以直接从这个集合中移除,但不能增加。试图向已选择的键的集合中增加元素将抛java.lang.UnsupportedOperationException。

已取消的键的集合(Cancelled key set)
已注册的键的集合的子集,这个集合包含了cancel()方法被调用过的键(这个键已经被无效化),但它们还没有被注销。这个集合是选择器对象的私有成员,因此无法直接访问。

下面结合之前的Channel和Buffer,看一下如何写和使用选择器实现服务端Socket数据接收的程序。

服务端

 1 public class SelectorServer 2 { 3     private static int PORT = 1234; 4      5     public static void main(String[] args) throws Exception 6     { 7         // 先确定端口号 8         int port = PORT; 9         if (args != null && args.length > 0)10         {11             port = Integer.parseInt(args[0]);12         }13         // 打开一个ServerSocketChannel14         ServerSocketChannel ssc = ServerSocketChannel.open();15         // 获取ServerSocketChannel绑定的Socket16         ServerSocket ss = ssc.socket();17         // 设置ServerSocket监听的端口18         ss.bind(new InetSocketAddress(port));19         // 设置ServerSocketChannel为非阻塞模式20         ssc.configureBlocking(false);21         // 打开一个选择器22         Selector selector = Selector.open();23         // 将ServerSocketChannel注册到选择器上去并监听accept事件24         ssc.register(selector, SelectionKey.OP_ACCEPT);25         while (true)26         {27             // 这里会发生阻塞,等待就绪的通道28             int n = selector.select();29             // 没有就绪的通道则什么也不做30             if (n == 0)31             {32                 continue;33             }34             // 获取SelectionKeys上已经就绪的通道的集合35             Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();36             // 遍历每一个Key37             while (iterator.hasNext())38             {39                 SelectionKey sk = iterator.next();40                 // 通道上能否有可接受的连接41                 if (sk.isAcceptable())42                 {43                     ServerSocketChannel ssc1 = (ServerSocketChannel)sk.channel();44                     SocketChannel sc = ssc1.accept();45                     sc.configureBlocking(false);46                     sc.register(selector, SelectionKey.OP_READ);47                 }48                 // 通道上能否有数据可读49                 else if (sk.isReadable())50                 {51                     readDataFromSocket(sk);52                 }53                 iterator.remove();54             }55         }56     }57     58     private static ByteBuffer bb = ByteBuffer.allocate(1024);59     60     // 从通道中读取数据61     protected static void readDataFromSocket(SelectionKey sk) throws Exception62     {63         SocketChannel sc = (SocketChannel)sk.channel();64         bb.clear();65         while (sc.read(bb) > 0)66         {67             bb.flip();68             while (bb.hasRemaining())69             {70                 System.out.print((char)bb.get());71             }72             System.out.println();73             bb.clear();74         }75     }76 }

满足isAcceptable()则表示该通道上有数据到来了,此时我们做的事情不是获取该通道—>创立一个线程来读取该通道上的数据,这么做就和BIO没有区别了。我们做的事情只是简单地将对应的SocketChannel注册到选择器上,通过传入OP_READ标记,告诉选择器我们关心新的Socket通道什么时候可以准备好读数据。

满足isReadable()则表示新注册的Socket通道已经可以读取数据了,此时调用readDataFromSocket方法读取SocketChannel中的数据。

用户端

选择器用户端的代码,没什么要求,只需向服务器端发送数据即可以了。

1 public class SelectorClient 2 { 3     private static final String STR = "Hello World!"; 4     private static final String REMOTE_IP = "127.0.0.1"; 5     private static final int THREAD_COUNT = 5; 6      7     private static class NonBlockingSocketThread extends Thread 8     { 9         public void run()10         {11             try12             {13                 int port = 1234;14                 SocketChannel sc = SocketChannel.open();15                 sc.configureBlocking(false);16                 sc.connect(new InetSocketAddress(REMOTE_IP, port));17                 while (!sc.finishConnect())18                 {19                     System.out.println("同" + REMOTE_IP + "的连接正在建立,请稍等!");20                     Thread.sleep(10);21                 }22                 System.out.println("连接已建立,待写入内容至指定ip+端口!时间为" + System.currentTimeMillis());23                 String writeStr = STR + this.getName();24                 ByteBuffer bb = ByteBuffer.allocate(writeStr.length());25                 bb.put(writeStr.getBytes());26                 bb.flip(); // 写缓冲区的数据之前肯定要先反转(flip)27                 sc.write(bb);28                 bb.clear();29                 sc.close();30             } 31             catch (IOException e)32             {33                 e.printStackTrace();34             } 35             catch (InterruptedException e)36             {37                 e.printStackTrace();38             }39         }40     }41     42     public static void main(String[] args) throws Exception43     {44         NonBlockingSocketThread[] nbsts = new NonBlockingSocketThread[THREAD_COUNT];45         for (int i = 0; i < THREAD_COUNT; i++)46             nbsts[i] = new NonBlockingSocketThread();47         for (int i = 0; i < THREAD_COUNT; i++)48             nbsts[i].start();49         // 肯定要join保证线程代码先于sc.close()运行,否则会有AsynchronousCloseException50         for (int i = 0; i < THREAD_COUNT; i++)51             nbsts[i].join();52     }53 }

说明
1. 本站所有资源来源于用户上传和网络,如有侵权请邮件联系站长!
2. 分享目的仅供大家学习和交流,您必须在下载后24小时内删除!
3. 不得使用于非法商业用途,不得违反国家法律。否则后果自负!
4. 本站提供的源码、模板、插件等等其他资源,都不包含技术服务请大家谅解!
5. 如有链接无法下载、失效或广告,请联系管理员处理!
6. 本站资源售价只是摆设,本站源码仅提供给会员学习使用!
7. 如遇到加密压缩包,请使用360解压,如遇到无法解压的请联系管理员
开心源码网 » Java NIO之Selector

发表回复