Java入门系列-25-NIO(实现非阻塞网络通信)

作者 : 开心源码 本文共9994个字,预计阅读时间需要25分钟 发布时间: 2022-05-12 共217人阅读

还记得之前详情NIO时比照传统IO的一大特点吗?就是NIO是非阻塞式的,这篇文章带大家来看一下非阻塞的网络操作。

补充:以数组的形式使用缓冲区

package testnio;import java.io.IOException;import java.io.RandomAccessFile;import java.nio.ByteBuffer;import java.nio.channels.FileChannel;public class TestBufferArray {    public static void main(String[] args) throws IOException {        RandomAccessFile raf1=new RandomAccessFile("D:/1.txt","rw");                //1.获取通道        FileChannel channel1=raf1.getChannel();                //2.创立缓冲区数组        ByteBuffer buf1=ByteBuffer.allocate(512);        ByteBuffer buf2=ByteBuffer.allocate(512);        ByteBuffer[] bufs= {buf1,buf2};        //3.将数据读入缓冲区数组        channel1.read(bufs);                for (ByteBuffer byteBuffer : bufs) {            byteBuffer.flip();        }        System.out.println(new String(bufs[0].array(),0,bufs[0].limit()));        System.out.println("-----------");        System.out.println(new String(bufs[1].array(),0,bufs[1].limit()));                //写入缓冲区数组到通道中        RandomAccessFile raf2=new RandomAccessFile("D:/2.txt","rw");        FileChannel channel2=raf2.getChannel();        channel2.write(bufs);            }}

使用NIO实现阻塞式网络通信

TCP协议的网络通信传统实现方式是通过套接字编程(Socket和ServerSocket),NIO实现TCP网络通信需要用到 Channel 接口的两个实现类:SocketChannel和ServerSocketChannel

使用NIO实现阻塞式网络通信

用户端

package com.jikedaquan.blockingnio;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.FileChannel;import java.nio.channels.SocketChannel;import java.nio.file.Paths;import java.nio.file.StandardOpenOption;public class Client {    public static void main(String[] args) {        SocketChannel sChannel=null;        FileChannel inChannel=null;        try {            //1、获取通道            sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 1666));            //用于读取文件                        inChannel = FileChannel.open(Paths.get("F:/a.jpg"), StandardOpenOption.READ);            //2、分配指定大小的缓冲区            ByteBuffer buf=ByteBuffer.allocate(1024);            //3、读取本地文件,发送到服务器端            while(inChannel.read(buf)!=-1) {                buf.flip();                sChannel.write(buf);                buf.clear();            }        } catch (IOException e) {            e.printStackTrace();        }finally {            //关闭通道            if (inChannel!=null) {                try {                    inChannel.close();                } catch (IOException e) {                    e.printStackTrace();                }            }            if(sChannel!=null) {                try {                    sChannel.close();                } catch (IOException e) {                    e.printStackTrace();                }            }        }    }}

new InetSocketAddress(“127.0.0.1”, 1666) 用于向用户端套接字通道(SocketChannel)绑定要连接地址和端口

服务端

package com.jikedaquan.blockingnio;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.FileChannel;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.nio.file.Paths;import java.nio.file.StandardOpenOption;public class Server {    public static void main(String[] args) {        ServerSocketChannel ssChannel=null;        FileChannel outChannel=null;        SocketChannel sChannel=null;        try {            //1、获取通道            ssChannel = ServerSocketChannel.open();            //用于保存文件的通道            outChannel = FileChannel.open(Paths.get("F:/b.jpg"), StandardOpenOption.WRITE,StandardOpenOption.CREATE);            //2、绑定要监听的端口号            ssChannel.bind(new InetSocketAddress(1666));            //3、获取用户端连接的通道            sChannel = ssChannel.accept();            //4、分配指定大小的缓冲区            ByteBuffer buf=ByteBuffer.allocate(1024);            //5、接收用户端的数据,并保存到本地            while(sChannel.read(buf)!=-1) {                buf.flip();                outChannel.write(buf);                buf.clear();            }        } catch (IOException e) {            e.printStackTrace();        }finally {            //6、关闭通道            if(sChannel!=null) {                try {                    sChannel.close();                } catch (IOException e) {                    e.printStackTrace();                }            }            if(outChannel!=null) {                try {                    outChannel.close();                } catch (IOException e) {                    e.printStackTrace();                }            }            if(ssChannel!=null) {                try {                    ssChannel.close();                } catch (IOException e) {                    e.printStackTrace();                }               }               }    }   }

服务端套接字仅绑定要监听的端口就可 ssChannel.bind(new InetSocketAddress(1666));

上面的代码使用NIO实现的网络通信,可能有同学会问,没有看到阻塞效果啊,的确是阻塞式的看不到效果,由于用户端发送一次数据就结束了,服务端也是接收一次数据就结束了。那假如服务端接收完成数据后,再向用户端反馈呢?

能够看到阻塞效果的网络通信

用户端

package com.jikedaquan.blockingnio2;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.FileChannel;import java.nio.channels.SocketChannel;import java.nio.file.Paths;import java.nio.file.StandardOpenOption;public class Client {    public static void main(String[] args) {        SocketChannel sChannel=null;        FileChannel inChannel=null;        try {            sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 1666));            inChannel = FileChannel.open(Paths.get("F:/a.jpg"), StandardOpenOption.READ);            ByteBuffer buf=ByteBuffer.allocate(1024);            while(inChannel.read(buf)!=-1) {                buf.flip();                sChannel.write(buf);                buf.clear();            }                        //sChannel.shutdownOutput();//去掉注释掉将不会阻塞            //接收服务器端的反馈            int len=0;            while((len=sChannel.read(buf))!=-1) {                buf.flip();                System.out.println(new String(buf.array(),0,len));                buf.clear();            }        } catch (IOException e) {            e.printStackTrace();        }finally {            if(inChannel!=null) {                try {                    inChannel.close();                } catch (IOException e) {                    e.printStackTrace();                }            }            if(sChannel!=null) {                try {                    sChannel.close();                } catch (IOException e) {                    e.printStackTrace();                }            }        }    }}

服务端

package com.jikedaquan.blockingnio2;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.FileChannel;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.nio.file.Paths;import java.nio.file.StandardOpenOption;public class Server {    public static void main(String[] args) {        ServerSocketChannel ssChannel=null;        FileChannel outChannel=null;        SocketChannel sChannel=null;        try {            ssChannel = ServerSocketChannel.open();            outChannel = FileChannel.open(Paths.get("F:/a.jpg"),StandardOpenOption.WRITE,StandardOpenOption.CREATE);            ssChannel.bind(new InetSocketAddress(1666));            sChannel = ssChannel.accept();            ByteBuffer buf=ByteBuffer.allocate(1024);            while(sChannel.read(buf)!=-1) {                buf.flip();                outChannel.write(buf);                buf.clear();            }            //发送反馈给用户端            buf.put("服务端接收数据成功".getBytes());            buf.flip();            sChannel.write(buf);        } catch (IOException e) {            e.printStackTrace();        }finally {            if(sChannel!=null) {                try {                    sChannel.close();                } catch (IOException e) {                    e.printStackTrace();                }            }            if(outChannel!=null) {                try {                    outChannel.close();                } catch (IOException e) {                    e.printStackTrace();                }            }            if(ssChannel!=null) {                try {                    ssChannel.close();                } catch (IOException e) {                    e.printStackTrace();                }            }        }    }}

服务端将向用户端发送两次数据

选择器(Selector)

想要实现非阻塞的IO,必需要先弄懂选择器。Selector 笼统类,可通过调用此类的 open 方法创立选择器,该方法将使用系统的默认选择器提供者创立新的选择器。

将通道设置为非阻塞之后,需要将通道注册到选择器中,注册的同时需要指定一个选择键的类型 (SelectionKey)。

选择键(SelectionKey)可以认为是一种标记,标记通道的类型和状态。

SelectionKey的静态字段:
OP_ACCEPT:用于套接字接受操作的操作集位
OP_CONNECT:用于套接字连接操作的操作集位
OP_READ:用于读取操作的操作集位
OP_WRITE:用于写入操作的操作集位

用于检测通道状态的方法:

方法名称说明
isAcceptable()测试此键的通道能否已准备好接受新的套接字连接
isConnectable()测试此键的通道能否已完成其套接字连接操作
isReadable()测试此键的通道能否已准备好进行读取
isWritable()测试此键的通道能否已准备好进行写入

将通道注册到选择器:

ssChannel.register(selector, SelectionKey.OP_ACCEPT);

IO操作准备就绪的通道大于0,轮询选择器

while(selector.select()>0) {    //获取选择键,根据不同的状态做不同的操作}

实现非阻塞式TCP协议网络通信

非阻塞模式:channel.configureBlocking(false);

用户端

package com.jikedaquan.nonblockingnio;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.SocketChannel;import java.util.Date;import java.util.Scanner;public class Client {    public static void main(String[] args) {        SocketChannel sChannel=null;        try {            //1、获取通道            sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1",1666));                        //2、切换非阻塞模式            sChannel.configureBlocking(false);                        //3、分配指定大小的缓冲区            ByteBuffer buf=ByteBuffer.allocate(1024);            //4、发送数据给服务端            Scanner scanner=new Scanner(System.in);            //循环从控制台录入数据发送给服务端            while(scanner.hasNext()) {                                String str=scanner.next();                buf.put((new Date().toString()+"\n"+str).getBytes());                buf.flip();                sChannel.write(buf);                buf.clear();            }        } catch (IOException e) {            e.printStackTrace();        }finally {            //5、关闭通道            if(sChannel!=null) {                try {                    sChannel.close();                } catch (IOException e) {                    e.printStackTrace();                }            }        }    }}

服务端

package com.jikedaquan.nonblockingnio;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.util.Iterator;public class Server {    public static void main(String[] args) throws IOException {                //1、获取通道        ServerSocketChannel ssChannel=ServerSocketChannel.open();        //2、切换非阻塞模式        ssChannel.configureBlocking(false);        //3、绑定监听的端口号        ssChannel.bind(new InetSocketAddress(1666));        //4、获取选择器        Selector selector=Selector.open();        //5、将通道注册到选择器上,并指定“监听接收事件”        ssChannel.register(selector, SelectionKey.OP_ACCEPT);                //6、轮询式的获取选择器上已经 “准备就绪”的事件        while(selector.select()>0) {            //7、获取当前选择器中所有注册的“选择键(已就绪的监听事件)”            Iterator<SelectionKey> it=selector.selectedKeys().iterator();            while(it.hasNext()) {                //8、获取准备就绪的事件                SelectionKey sk=it.next();                //9、判断具体是什么事件准备就绪                if(sk.isAcceptable()) {                    //10、若“接收就绪”,获取用户端连接                    SocketChannel sChannel=ssChannel.accept();                    //11、切换非阻塞模式                    sChannel.configureBlocking(false);                    //12、将该通道注册到选择器上                    sChannel.register(selector, SelectionKey.OP_READ);                }else if(sk.isReadable()) {                    //13、获取当前选择器上“读就绪”状态的通道                    SocketChannel sChannel=(SocketChannel)sk.channel();                    //14、读取数据                    ByteBuffer buf=ByteBuffer.allocate(1024);                    int len=0;                    while((len=sChannel.read(buf))>0) {                        buf.flip();                        System.out.println(new String(buf.array(),0,len));                        buf.clear();                    }                }                //15、取消选择键 SelectionKey                it.remove();            }                    }    }}

服务端接收用户端的操作需要在判断 isAcceptable() 方法内将就绪的套接字通道以读操作注册到 选择器中

在判断 isReadable() 内从通道中获取数据

实现非阻塞式UDP协议网络通信

发送端

package com.jikedaquan.nonblockingnio;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.DatagramChannel;import java.util.Scanner;public class TestDatagramSend {    public static void main(String[] args) throws IOException {        //获取通道        DatagramChannel dChannel=DatagramChannel.open();        //非阻塞        dChannel.configureBlocking(false);        ByteBuffer buf=ByteBuffer.allocate(1024);        Scanner scanner=new Scanner(System.in);        while(scanner.hasNext()) {            String str=scanner.next();            buf.put(str.getBytes());            buf.flip();            //发送数据到目标地址和端口            dChannel.send(buf,new InetSocketAddress("127.0.0.1", 1666));            buf.clear();        }        dChannel.close();    }}

接收端

package com.jikedaquan.nonblockingnio;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.DatagramChannel;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.util.Iterator;public class TestDatagramReceive {    public static void main(String[] args) throws IOException {        //获取通道        DatagramChannel dChannel=DatagramChannel.open();        dChannel.configureBlocking(false);        //绑定监听端口        dChannel.bind(new InetSocketAddress(1666));        //获取选择器        Selector selector=Selector.open();        //读操作注册通道        dChannel.register(selector, SelectionKey.OP_READ);        while(selector.select()>0) {            Iterator<SelectionKey> it=selector.selectedKeys().iterator();            //迭代选择键            while(it.hasNext()) {                SelectionKey sk=it.next();                //通道可读                if(sk.isReadable()) {                    ByteBuffer buf=ByteBuffer.allocate(1024);                    //接收数据存入缓冲区                    dChannel.receive(buf);                    buf.flip();                    System.out.println(new String(buf.array(),0,buf.limit()));                    buf.clear();                }            }                        it.remove();        }    }}

说明
1. 本站所有资源来源于用户上传和网络,如有侵权请邮件联系站长!
2. 分享目的仅供大家学习和交流,您必须在下载后24小时内删除!
3. 不得使用于非法商业用途,不得违反国家法律。否则后果自负!
4. 本站提供的源码、模板、插件等等其他资源,都不包含技术服务请大家谅解!
5. 如有链接无法下载、失效或广告,请联系管理员处理!
6. 本站资源售价只是摆设,本站源码仅提供给会员学习使用!
7. 如遇到加密压缩包,请使用360解压,如遇到无法解压的请联系管理员
开心源码网 » Java入门系列-25-NIO(实现非阻塞网络通信)

发表回复