IO和NIO

在以前,我们要进行不同机器上的网络通信,使用的是OIO,也就是ServerSocket和Socket那一套。 这套技术,会造成服务端每建立一个连接就需要开启一个线程处理,造成的后果就是线程资源浪费。对服务端来说,造成oom、cpu拉满等问题都是可能的。

package cn.europa;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;

public class Server {

    public static void main(String[] args) {
        Server server = new Server();
        server.start();
    }

    private void start() {
        try {

            ServerSocket serverSocket = new ServerSocket(9890);
            while (true) {
                Socket socket = serverSocket.accept();
                Thread thread = new Thread(new Task(socket));
                thread.start();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private class Task implements Runnable {
        private final Socket socket;

        public Task(Socket socket) {
            this.socket = socket;
        }

        @Override
        public void run() {
            try {
                while (!Thread.interrupted()) {
                    InputStream inputStream = socket.getInputStream();
                    byte[] buffer = new byte[100];
                    int length = 100;
                    StringBuilder stb = new StringBuilder();
                    while (length == 100) {
                        length = inputStream.read(buffer);
                        stb.append(new String(buffer, 0, length));
                    }
                    System.out.println(Thread.currentThread().getName() + "线程读取--------------------------:\n" + stb.toString());
                    OutputStream outputStream = socket.getOutputStream();
                    outputStream.write("你好".getBytes());
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}
package cn.europa;

import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.Scanner;

public class Client {

    public static void main(String[] args) {
        try {
            Socket socket = new Socket("127.0.0.1", 9890);
            while (!Thread.interrupted()) {
                OutputStream outputStream = socket.getOutputStream();
                Scanner scanner = new Scanner(System.in);
                System.out.println("请输入>>");
                while (scanner.hasNext()) {
                    System.out.println("请输入>>");
                    String next = scanner.next();
                    outputStream.write(next.getBytes());
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }

    }
}

NIO的演进

在这里需要提前了解一个概念, Java里的nio并非单独是linux上的nio,其实他也支持multiplexing io模型,也就是多路复用。

1. 非阻塞式的IO模型

linux里的非阻塞io模型,其实指的是在read、write这两大操作系统指令的时候,会非阻塞。比如如果没有数据进行操作,会直接返回空等。但是在Java nio中,也能看到类似的形式,但其实概念上是不一样的。Java nio在accept的时候,服务端可以一直循环并在没有accept到连接的时候处理其他事情。

package cn.europa.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;

public class NioServer {

    public static void main(String[] args) {
        try {
            ServerSocketChannel server = ServerSocketChannel.open();
            server.configureBlocking(false);
            server.bind(new InetSocketAddress(9999));

            while (true) {
                SocketChannel socketChannel = server.accept();

                if (socketChannel == null) {
                    System.out.println("----无建立连接,主线程干别的事情");
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    continue;

                }
                new Thread(new Task(socketChannel)).start();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private static class Task implements Runnable {

        private SocketChannel socketChannel;

        public Task(SocketChannel socketChannel) throws IOException {
            this.socketChannel = socketChannel;
            // 将此channel设置为非阻塞的。
//            socketChannel.configureBlocking(false);
        }

        @Override
        public void run() {
            while (!Thread.interrupted()) {
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                try {
                    int length = 0;
                    StringBuilder stb = new StringBuilder();
                    while (length == 0) {
                        System.out.println(Thread.currentThread().getName() + "正在读取读取。");
                        length = socketChannel.read(buffer);
                        stb.append(new String(buffer.array()).trim());
                    }
                    System.out.println("线程" + Thread.currentThread().getName() + "读取数据-----------------------------" + stb.toString());
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            if (socketChannel != null) {
                try {
                    socketChannel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
package cn.europa.nio;

import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Scanner;

public class NioClient {

    public static void main(String[] args) {
        try {
            SocketChannel channel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9999));
            channel.configureBlocking(false);
            while (!Thread.interrupted()) {
                Scanner scanner = new Scanner(System.in);
                System.out.println("请输入>>");
                while (scanner.hasNext()) {
                    System.out.println("请输入>>");
                    String next = scanner.next();
                    channel.write(ByteBuffer.wrap(next.getBytes()));
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }

    }
}

2. NIO的三大组件

2.1 Selector

出了上述之外,多路复用还有其他的强大点,这里需要了解到多路复用这个知识。也就是Linux里的select(早期)、epoll。mac os里的kqueue。在不同的操作系统里叫法不一样而已。他其实是一个内部不断循环的函数,将注册到selector上的事件循环到,并让你能够获取到当前事件。

package cn.europa.reactor;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class RServer {

    private static Selector selector;
    private static ServerSocketChannel serverChannel;

    public static void main(String[] args) {
        try {
            selector = Selector.open();
            serverChannel = ServerSocketChannel.open();

            serverChannel.configureBlocking(false);
            serverChannel.bind(new InetSocketAddress(9099));

            serverChannel.register(selector, SelectionKey.OP_ACCEPT);

            while (!Thread.interrupted()) {
                selector.select();
                Set<SelectionKey> keys = selector.selectedKeys();
                Iterator<SelectionKey> keyIt = keys.iterator();
                while (keyIt.hasNext()) {
                    SelectionKey key = keyIt.next();
                    if (key.isAcceptable()) {
                        ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
                        SocketChannel channel = serverSocketChannel.accept();
                        // 1、注册新io事件时一定要判断是否是null并且在null时重试。不然不会注册新事件。
                        if (channel == null) {
                            continue;
                        }
                        channel.configureBlocking(false);
                        channel.register(selector, SelectionKey.OP_READ);
                    }

                    if (key.isReadable()) {
                        new Thread(new IOHandler(key)).start();
                    }
                    // 2、需要remove掉这个key,nio中这个是累加的,不然下次会再次处理上一个io事件。
                    keyIt.remove();
                }
            }

        } catch (
                IOException e) {
            e.printStackTrace();
        }
    }

    private static class IOHandler implements Runnable {
        private SelectionKey key;

        public IOHandler(SelectionKey key) {
            this.key = key;
        }

        @Override
        public void run() {
            SocketChannel channel = (SocketChannel) key.channel();
            ByteBuffer buffer = ByteBuffer.allocate(100);
            int length = 0;
            StringBuilder stb = new StringBuilder();
            while (length == 0) {
                try {
                    length = channel.read(buffer);
                    stb.append(new String(buffer.array()).trim());
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("线程" + Thread.currentThread().getName() + "正在处理-----------------------" + stb.toString());
        }
    }
}

2.2 Buffer

不同于OIO中基于stream进行数据操作,NIO中会用缓冲区进行数据交换,服务端和客户端会基于这个缓冲区进行数据读取和写入。当然它可能比想象的复杂,包括需要了解翻转等概念。

2.3 Channel

关于channel,在nio中,不同于socket、一个连接就是一个通道,在底层就是操作系统上的文件描述符的操作,对于连接,在服务器端会有监听连接的文件描述符,监听成千上万的客户端连接。对于传输数据,客户端和服务端都有一个传输文件描述符,建立起专项通道进行数据的传输。

3. NIO的问题

NIO饱受诟病的几点

  • 在你io事件在处理的时候,可能select又循环到了这个io事件,然后又处理了一次。
  • 编程较复杂,在上述服务端代码的注视处均需注意,不然可能就踩坑了。
  • 沾包、半包的问题。说起来可能比较难理解,看下客户端代码和运行结果。我们的想法是客户端没输入一次,就进行一次数据传输write。
package cn.europa.reactor;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Scanner;

public class RClient {

    public static void main(String[] args) {
        try {
            SocketChannel channel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9099));
            channel.configureBlocking(false);
            while (!Thread.interrupted()) {
                Scanner scanner = new Scanner(System.in);
                System.out.println("请输入>>");
                while (scanner.hasNext()) {
                    System.out.println("请输入>>");
                    String next = scanner.next();
                    ByteBuffer buffer = ByteBuffer.allocate(100);
                    for (int i = 0; i < 1000; i++) {
                        buffer.put(next.getBytes());
                        // 涉及到了buffer的翻转
                        buffer.flip();
                        channel.write(buffer);
                        buffer.clear();
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }

    }
}

image image image

为了更明显,我选择了重复的词语。 可以看到,明明只发了几个字,结果每次读取。要么读了好几次(沾包),要么读到了个破碎的包(半包)。 这是因为nio是基于缓冲区的,客户端只负责往里写。而每次服务端来捞取固定长度字节的时候,可能客户端已经写了好多字节了,也可能还没写到100个字节。

Netty

netty的组件比较多

  • channel
  • pipeline
  • bootstrap
  • bytebuf
  • handler

等等.. image

关于netty