java nio 基础用法

传统的io模型问题: 

在传统的IO模型中,每个连接创建成功之后都需要一个线程来维护,每个线程包含一个while死循环,那么1w个连接对应1w个线程,继而1w个while死循环,这就带来如下几个问题:

  1. 线程资源受限:线程是操作系统中非常宝贵的资源,同一时刻有大量的线程处于阻塞状态是非常严重的资源浪费,操作系统耗不起
  2. 线程切换效率低下:单机cpu核数固定,线程爆炸之后操作系统频繁进行线程切换,应用性能急剧下降。
  3. 除了以上两个问题,IO编程中,我们看到数据读写是以字节流为单位,效率不高。

NIO编程模型

NIO编程模型中,新来一个连接不再创建一个新的线程,而是可以把这条连接直接绑定到某个固定的线程,然后这条连接所有的读写都由这个线程来负责,那么他是怎么做到的?我们用一幅图来对比一下IO与NIO

如上图所示,IO模型中,一个连接来了,会创建一个线程,对应一个while死循环,死循环的目的就是不断监测这条连接上是否有数据可以读,大多数情况下,1w个连接里面同一时刻只有少量的连接有数据可读,因此,很多个while死循环都白白浪费掉了,因为读不出啥数据。

而在NIO模型中,他把这么多while死循环变成一个死循环,这个死循环由一个线程控制,那么他又是如何做到一个线程,一个while死循环就能监测1w个连接是否有数据可读的呢?
这就是NIO模型中selector的作用,一条连接来了之后,现在不创建一个while死循环去监听是否有数据可读了,而是直接把这条连接注册到selector上,然后,通过检查这个selector,就可以批量监测出有数据可读的连接,进而读取数据,下面我再举个非常简单的生活中的例子说明IO与NIO的区别。

在一家幼儿园里,小朋友有上厕所的需求,小朋友都太小以至于你要问他要不要上厕所,他才会告诉你。幼儿园一共有100个小朋友,有两种方案可以解决小朋友上厕所的问题:

  1. 每个小朋友配一个老师。每个老师隔段时间询问小朋友是否要上厕所,如果要上,就领他去厕所,100个小朋友就需要100个老师来询问,并且每个小朋友上厕所的时候都需要一个老师领着他去上,这就是IO模型,一个连接对应一个线程。
  2. 所有的小朋友都配同一个老师。这个老师隔段时间询问所有的小朋友是否有人要上厕所,然后每一时刻把所有要上厕所的小朋友批量领到厕所,这就是NIO模型,所有小朋友都注册到同一个老师,对应的就是所有的连接都注册到一个线程,然后批量轮询。

这就是NIO模型解决线程资源受限的方案,实际开发过程中,我们会开多个线程,每个线程都管理着一批连接,相对于IO模型中一个线程管理一条连接,消耗的线程资源大幅减少

由于NIO模型中线程数量大大降低,线程切换效率因此也大幅度提高

NIO解决这个问题的方式是数据读写不再以字节为单位,而是以字节块为单位。IO模型中,每次都是从操作系统底层一个字节一个字节地读取数据,而NIO维护一个缓冲区,每次可以从这个缓冲区里面读取一块的数据,
这就好比一盘美味的豆子放在你面前,你用筷子一个个夹(每次一个),肯定不如要勺子挖着吃(每次一批)效率来得高。

 

一、Buffer demo


  
  1. public class BufferTest {
  2. public static void main(String[] args) {
  3. //静态方法常见 buffer
  4. IntBuffer buf = IntBuffer.allocate(10);
  5. int[] array = new int[]{3, 5, 1};
  6. //put一个数组到buffer中,使用put方式将
  7. // buf.put(array);
  8. //使用wrap方式会直接更改原数组
  9. buf = buf.wrap(array);
  10. //IntBuffer.wrap(array, 0, 2);
  11. buf.put(0, 7);
  12. int length = buf.limit();
  13. for (int i = 0; i < length; i++) {
  14. System.out.print(buf.get(i));
  15. }
  16. for (int i = 0; i < array.length; i++) {
  17. System.out.print(array[i]);
  18. }
  19. System.out.println(buf);
  20. /**
  21. * limit = position;
  22. * position = 0;
  23. */
  24. buf.flip();
  25. /**
  26. * position = 0;
  27. * limit = capacity;
  28. */
  29. buf.clear();
  30. System.out.println(buf);
  31. //创建一个新的字节缓冲区,共享此缓冲区的内容
  32. IntBuffer newBuffer = buf.duplicate();
  33. System.out.println(newBuffer);
  34. }
  35. }

二、FileChannel demo


  
  1. public class FileChannelTest {
  2. public static void testFileChannel() throws IOException {
  3. RandomAccessFile aFile = new RandomAccessFile("D:/nio-data.txt", "rw");
  4. FileChannel channel = aFile.getChannel();
  5. //分配一个新的缓冲区
  6. ByteBuffer allocate = ByteBuffer.allocate(48);
  7. int bytesRead = channel.read(allocate);
  8. while (bytesRead != -1) {
  9. System.out.println("Read " + bytesRead);
  10. allocate.flip();
  11. while (allocate.hasRemaining()) {
  12. System.out.print((char) allocate.get());
  13. }
  14. allocate.clear();
  15. bytesRead = channel.read(allocate);
  16. }
  17. aFile.close();
  18. }
  19. public static void fileChannelDemo() throws IOException {
  20. //定义一个byteBuffer
  21. ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
  22. FileChannel inputChannel = new FileInputStream("D:/nio-data.txt").getChannel();
  23. FileChannel outputChannel = new FileOutputStream("D:/nio-data.txt", true).getChannel();
  24. //读取数据
  25. byteBuffer.clear();
  26. int len = inputChannel.read(byteBuffer);
  27. System.out.println(new String(byteBuffer.array(), "UTF-8"));
  28. System.out.println(new String(byteBuffer.array(), 0, len, "UTF-8"));
  29. ByteBuffer byteBuffer2 = ByteBuffer.wrap("奥会计师八度空间".getBytes());
  30. outputChannel.write(byteBuffer2);
  31. outputChannel.close();
  32. inputChannel.close();
  33. }
  34. public static void main(String[] args) {
  35. try {
  36. FileChannelTest.fileChannelDemo();
  37. } catch (Exception e) {
  38. e.printStackTrace();
  39. }
  40. }
  41. }

 三、不使用 选择器 selector 的 ServerSocketChannel 和 SocketChannel  的demo

服务端:


  
  1. public class NioChannelServer {
  2. private ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
  3. //获取一个intBuffer视图,操作视图的同时原缓冲区也会改变
  4. private IntBuffer intBuffer = byteBuffer.asIntBuffer();
  5. private SocketChannel socketChannel = null;
  6. private ServerSocketChannel serverSocketChannel = null;
  7. /**
  8. * 打开服务端的通道
  9. *
  10. * @throws Exception
  11. */
  12. public void openChannel() throws Exception {
  13. serverSocketChannel = ServerSocketChannel.open();
  14. serverSocketChannel.socket().bind(new InetSocketAddress(8888));
  15. System.out.println("服务端通道已经打开");
  16. }
  17. /**
  18. * 等待新的连接
  19. *
  20. * @throws Exception
  21. */
  22. public void waitReqConn() throws Exception {
  23. while (true) {
  24. socketChannel = serverSocketChannel.accept();
  25. if (null != socketChannel) {
  26. System.out.println("新的连接加入!");
  27. }
  28. //处理请求
  29. processReq();
  30. socketChannel.close();
  31. }
  32. }
  33. private void processReq() throws IOException {
  34. System.out.println("开始读取和处理客户端数据。。");
  35. byteBuffer.clear();
  36. socketChannel.read(byteBuffer);
  37. int result = intBuffer.get(0) + intBuffer.get(1);
  38. byteBuffer.flip();
  39. byteBuffer.clear();
  40. //修改视图,byteBuffer也会变化
  41. intBuffer.put(0, result);
  42. socketChannel.write(byteBuffer);
  43. System.out.println("读取处理完成");
  44. }
  45. public void start() {
  46. try {
  47. //打开通道
  48. openChannel();
  49. //等待客户端连接
  50. waitReqConn();
  51. socketChannel.close();
  52. System.out.println("服务端处理完毕");
  53. } catch (Exception e) {
  54. e.printStackTrace();
  55. }
  56. }
  57. public static void main(String[] args){
  58. new NioChannelServer().start();
  59. }
  60. }

客户端:


  
  1. public class NioChannelClient {
  2. private SocketChannel socketChannel = null;
  3. private ByteBuffer buff = ByteBuffer.allocate(8);
  4. private IntBuffer intBuffer = buff.asIntBuffer();
  5. public SocketChannel connect() throws IOException {
  6. return SocketChannel.open(new InetSocketAddress("127.0.0.1", 8888));
  7. }
  8. public int getSum(int a, int b) {
  9. int result = 0;
  10. try {
  11. socketChannel = connect();
  12. sendRequest(a, b);
  13. result = receiveResult();
  14. } catch (Exception e) {
  15. e.printStackTrace();
  16. }
  17. return result;
  18. }
  19. private int receiveResult() throws IOException {
  20. buff.clear();
  21. socketChannel.read(buff);
  22. return intBuffer.get(0);
  23. }
  24. private void sendRequest(int a, int b) throws IOException {
  25. buff.clear();
  26. intBuffer.put(0,a);
  27. intBuffer.put(1,b);
  28. socketChannel.write(buff);
  29. System.out.println("客户端发送请求 ("+a+"+"+b+")");
  30. }
  31. public static void main(String[] args){
  32. Random random = new Random();
  33. for (int i = 0; i <10 ; i++) {
  34. int result = new NioChannelClient().getSum(random.nextInt(100),random.nextInt(100));
  35. System.out.println(result);
  36. }
  37. }
  38. }

 

四、使用 selector 方式 实现 ServerSocketChannel  和 SocketChannel

选择器(Selector) 是 SelectableChannle 对象的多路复用器,Selector 可以同时监控多个 SelectableChannel 的 IO 状况,也就是说,利用 Selector可使一个单独的线程管理多个 Channel,selector 是非阻塞 IO 的核心。

选择器(Selector)的应用:

当通道使用register(Selector sel, int ops)方法将通道注册选择器时,选择器对通道事件进行监听,通过第二个参数指定监听的事件类型。

其中可监听的事件类型包括以下:

  读 : SelectionKey.OP_READ (1)

  写 : SelectionKey.OP_WRITE (4)

  连接 : SelectionKey.OP_CONNECT (8)

  接收 : SelectionKey.OP_ACCEPT (16)

如果需要监听多个事件是:

  int key = SelectionKey.OP_READ | SelectionKey.OP_WRITE ; //表示同时监听读写操作

服务端:


  
  1. public class SelectorServer {
  2. private Selector selector = null;
  3. private ServerSocketChannel serverSocketChannel = null;
  4. private int keys = 0;
  5. public void initServer() throws IOException {
  6. selector = Selector.open();
  7. serverSocketChannel = ServerSocketChannel.open();
  8. serverSocketChannel.bind(new InetSocketAddress("127.0.0.1", 8888));
  9. serverSocketChannel.configureBlocking(false);
  10. //服务端通道注册accept事件
  11. serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
  12. }
  13. private void listen() throws IOException {
  14. System.out.println("服务端已经启动");
  15. while (true) {
  16. //让通道选择器至少选择一个通道,阻塞的方法
  17. keys = this.selector.select();
  18. //selector.wakeup();//可以唤醒阻塞的select()方法
  19. //设置超时时间,非阻塞
  20. //this.selector.select(1000);
  21. System.out.println(keys);
  22. Iterator<SelectionKey> itor = this.selector.selectedKeys().iterator();
  23. if (keys > 0) {
  24. //进行轮询
  25. while (itor.hasNext()) {
  26. try{
  27. SelectionKey key = itor.next();
  28. if (key.isAcceptable()) {
  29. //serverSocketChannel = (ServerSocketChannel) key.channel();
  30. //获取和客户端连接的服务端渠道
  31. SocketChannel channel = serverSocketChannel.accept();
  32. channel.configureBlocking(false);
  33. channel.write(ByteBuffer.wrap("hello".getBytes()));
  34. //还需要读取客户端发过来的数据,所以需要注册一个读取数据的事件
  35. channel.register(selector, SelectionKey.OP_READ);
  36. } else if (key.isReadable()) {
  37. read(key);
  38. }
  39. }finally {
  40. //处理完一个key,就删除,防止重复处理
  41. itor.remove();
  42. }
  43. }
  44. } else {
  45. System.out.println("select finished without any keys");
  46. }
  47. }
  48. }
  49. private void read(SelectionKey key) throws IOException {
  50. SocketChannel socketChannel = (SocketChannel) key.channel();
  51. ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
  52. int len = socketChannel.read(byteBuffer);
  53. String msg = new String(byteBuffer.array(), 0, len);
  54. System.out.println("服务端接收到的消息是" + msg);
  55. }
  56. public void start() {
  57. try {
  58. initServer();
  59. listen();
  60. } catch (IOException e) {
  61. e.printStackTrace();
  62. }
  63. }
  64. public static void main(String[] args) {
  65. new SelectorServer().start();
  66. }
  67. }

客户端:

 


  
  1. public class SelectorClient {
  2. private Selector selector;
  3. private ByteBuffer outBuffer = ByteBuffer.allocate(1024);
  4. private ByteBuffer inputBuffer = ByteBuffer.allocate(1024);
  5. private int keys = 0;
  6. private SocketChannel socketChannel = null;
  7. public void initClient() throws IOException {
  8. selector = Selector.open();
  9. socketChannel = SocketChannel.open();
  10. //客户端通道配置为非阻塞
  11. socketChannel.configureBlocking(false);
  12. //连接服务端
  13. socketChannel.connect(new InetSocketAddress("127.0.0.1", 8888));
  14. //注册客户端连接服务器的事件
  15. socketChannel.register(selector, SelectionKey.OP_CONNECT);
  16. }
  17. private void listen() throws IOException {
  18. while (true) {
  19. keys = this.selector.select();
  20. System.out.println(keys);
  21. if (keys > 0) {
  22. Iterator<SelectionKey> iter = this.selector.selectedKeys().iterator();
  23. while (iter.hasNext()) {
  24. try{
  25. SelectionKey key = iter.next();
  26. if (key.isConnectable()) {
  27. SocketChannel channel = (SocketChannel) key.channel();
  28. if (channel.isConnectionPending()) {
  29. channel.finishConnect();
  30. System.out.println("完成连接");
  31. }
  32. //连接完成之后,肯定还要做其它的事情,比如写
  33. channel.register(selector, SelectionKey.OP_WRITE);
  34. } else if (key.isWritable()) {
  35. SocketChannel channel = (SocketChannel) key.channel();
  36. outBuffer.clear();
  37. System.out.println("客户端正在写数据。。");
  38. //从控制台写消息
  39. Scanner scanner = new Scanner(System.in);
  40. while (true) {
  41. String msg = scanner.next();
  42. channel.write(ByteBuffer.wrap(msg.getBytes()));
  43. if("end".equals(msg)) {
  44. break;
  45. }
  46. }
  47. channel.register(selector, SelectionKey.OP_READ);
  48. System.out.println("客户端写数据完成。。。");
  49. } else if (key.isReadable()) {
  50. SocketChannel socketChannel = (SocketChannel) key.channel();
  51. inputBuffer.clear();
  52. int len = socketChannel.read(inputBuffer);
  53. System.out.println("读取服务端发送的消息:" + new String(inputBuffer.array()));
  54. }
  55. }finally{
  56. iter.remove();
  57. }
  58. }
  59. } else {
  60. System.out.println("select finished without any keys");
  61. }
  62. }
  63. }
  64. public void start() {
  65. try {
  66. initClient();
  67. listen();
  68. } catch (Exception e) {
  69. e.printStackTrace();
  70. }
  71. }
  72. public static void main(String[] args){
  73. new SelectorClient().start();
  74. }
  75. }

nio的非阻塞是对于网络通道来说的,需要使用Channel.configureBlocking(false)来设置通道为非阻塞的,如果没设置,默认是阻塞的。 

 

 

文章来源: blog.csdn.net,作者:血煞风雨城2018,版权归原作者所有,如需转载,请联系作者。

原文链接:blog.csdn.net/qq_31905135/article/details/88862106

(完)