Netty的一些使用


1. 传统的Socket

在机器中数据要进行传输,我们知道,计算机中都是二进制的世界。而二进制加上编码组成了各种各样的文件系统。在同一个机器下,数据的传输用的是数据流。比如Java体系中的字节流、字符流等。而计算机中要进行数据通讯,则要使用到TCP协议了,Socket就是在TCP的基础上做的一个抽象。客户端/服务端建立起连接之后,每个Socket上会有一个输入流和一个输出流,客户端的输出流对应到服务端的输入流。数据以此进行传输。客户端/服务端以此进行数据通讯。

1.1 编码

服务端

package cn.europa;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;

public class Server {

    public static void main(String[] args) {
        Server server = new Server();
        server.start();
    }

    private void start() {
        try {

            ServerSocket serverSocket = new ServerSocket(9890);
            while (true) {
                Socket socket = serverSocket.accept();
                Thread thread = new Thread(new Task(socket));
                thread.start();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private class Task implements Runnable {
        private final Socket socket;

        public Task(Socket socket) {
            this.socket = socket;
        }

        @Override
        public void run() {
            try {
                while (!Thread.interrupted()) {
                    InputStream inputStream = socket.getInputStream();
                    byte[] buffer = new byte[100];
                    int length = 100;
                    StringBuilder stb = new StringBuilder();
                    while (length == 100) {
                        length = inputStream.read(buffer);
                        stb.append(new String(buffer, 0, length));
                    }
                    System.out.println(Thread.currentThread().getName() + "线程读取--------------------------:\n" + stb.toString());
                    OutputStream outputStream = socket.getOutputStream();
                    outputStream.write("你好".getBytes());
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

客户端

package cn.europa.nio;

import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Scanner;

public class NioClient {

    public static void main(String[] args) {
        try {
            SocketChannel channel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9999));
            channel.configureBlocking(false);
            while (!Thread.interrupted()) {
                Scanner scanner = new Scanner(System.in);
                System.out.println("请输入>>");
                while (scanner.hasNext()) {
                    System.out.println("请输入>>");
                    String next = scanner.next();
                    channel.write(ByteBuffer.wrap(next.getBytes()));
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }

    }
}

1.2 传统socket总结

如上的代码中,每来一个socket,服务端都需新建一个线程去进行连接,并处理对应的业务逻辑。并且建立连接的过程是阻塞的。效率比较低下。但是好在编程模型简单。


2. 非阻塞式IO

在此基础上,还有一个通常非阻塞模型,他能让服务端在配置为非阻塞的情况下,监听连接以及其他IO操作的时候,能一直轮询,若有数据则进行业务处理,否则线程可以处理自己的事情,避免一直阻塞。

2.1 编码

服务端

package cn.europa.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;

public class NioServer {

    public static void main(String[] args) {
        try {
            ServerSocketChannel server = ServerSocketChannel.open();
            server.configureBlocking(false);
            server.bind(new InetSocketAddress(9999));

            while (true) {
                SocketChannel socketChannel = server.accept();

                if (socketChannel == null) {
                    System.out.println("----无建立连接,主线程干别的事情");
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    continue;

                }
                new Thread(new Task(socketChannel)).start();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private static class Task implements Runnable {

        private SocketChannel socketChannel;

        public Task(SocketChannel socketChannel) throws IOException {
            this.socketChannel = socketChannel;
            // 将此channel设置为非阻塞的。
//            socketChannel.configureBlocking(false);
        }

        @Override
        public void run() {
            while (!Thread.interrupted()) {
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                try {
                    int length = 0;
                    StringBuilder stb = new StringBuilder();
                    while (length == 0) {
                        System.out.println(Thread.currentThread().getName() + "正在读取读取。");
                        length = socketChannel.read(buffer);
                        stb.append(new String(buffer.array()).trim());
                    }
                    System.out.println("线程" + Thread.currentThread().getName() + "读取数据-----------------------------" + stb.toString());
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            if (socketChannel != null) {
                try {
                    socketChannel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}


3. NIO

NIO的意思在Java中表示的是New IO,而非non-blocking IO。这个io模型其实对应的是IO多路复用模型。 IO多路复用机制的实现是他有一个Selector组件,而Seletor组件对应的实现是各个操作系统上的select函数、epoll函数等。他允许文件句柄的连接、读写等事件都注册到内存中,被这个selector查询出来,从而实现了一个内部线程对所有的io事件进行了分发。减小了客户端、服务端的资源浪费,提升了速率。

3.1 编码

服务端

package cn.europa.reactor;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class RServer {

    private static Selector selector;
    private static ServerSocketChannel serverChannel;

    public static void main(String[] args) {
        try {
            selector = Selector.open();
            serverChannel = ServerSocketChannel.open();

            serverChannel.configureBlocking(false);
            serverChannel.bind(new InetSocketAddress(9099));

            serverChannel.register(selector, SelectionKey.OP_ACCEPT);

            while (!Thread.interrupted()) {
                selector.select();
                Set<SelectionKey> keys = selector.selectedKeys();
                Iterator<SelectionKey> keyIt = keys.iterator();
                while (keyIt.hasNext()) {
                    SelectionKey key = keyIt.next();
                    if (key.isAcceptable()) {
                        ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
                        SocketChannel channel = serverSocketChannel.accept();
                        // 1、注册新io事件时一定要判断是否是null并且在null时重试。不然不会注册新事件。
                        if (channel == null) {
                            continue;
                        }
                        channel.configureBlocking(false);
                        channel.register(selector, SelectionKey.OP_READ);
                    }

                    if (key.isReadable()) {
                        new Thread(new IOHandler(key)).start();
                    }
                    // 2、需要remove掉这个key,nio中这个是累加的,不然下次会再次处理上一个io事件。
                    keyIt.remove();
                }
            }

        } catch (
                IOException e) {
            e.printStackTrace();
        }
    }

    private static class IOHandler implements Runnable {
        private SelectionKey key;

        public IOHandler(SelectionKey key) {
            this.key = key;
        }

        @Override
        public void run() {
            SocketChannel channel = (SocketChannel) key.channel();
            ByteBuffer buffer = ByteBuffer.allocate(100);
            int length = 0;
            StringBuilder stb = new StringBuilder();
            while (length == 0) {
                try {
                    length = channel.read(buffer);
                    stb.append(new String(buffer.array()).trim());
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("线程" + Thread.currentThread().getName() + "正在处理-----------------------" + stb.toString());
        }
    }
}

客户端

package cn.europa.reactor;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Scanner;

public class RClient {

    public static void main(String[] args) {
        try {
            SocketChannel channel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9099));
            channel.configureBlocking(false);
            while (!Thread.interrupted()) {
                Scanner scanner = new Scanner(System.in);
                System.out.println("请输入>>");
                while (scanner.hasNext()) {
                    System.out.println("请输入>>");
                    String next = scanner.next();
                    ByteBuffer buffer = ByteBuffer.allocate(100);
                    for (int i = 0; i < 1000; i++) {
                        buffer.put(next.getBytes());
                        // 涉及到了buffer的翻转
                        buffer.flip();
                        channel.write(buffer);
                        buffer.clear();
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }

    }
}

3.2 问题

可以看到,nio的编程模型并不简单,而且带来的调试十分复杂。并且,nio中有一些bug较为难处理。

3.3 半包问题

运行上面代码,可以看到如下输出,我明明只输入了几个字符,而输出中却输出了一大串(沾包),有时候还输出了乱码(半包,破碎的数据包),这些统称为半包问题。

netty1 netty2

4. Netty

4.1 介绍

其实Netty并不复杂,搞懂了他的几个组件后,他的编程模型十分简单,并且功能强大。

4.2 组件

  • Channel

  • PipeLine

  • ChannelHandler

  • ChannelHandlerContext

4.3 说明

以上几个组件就是编程中最常用的组件了,混个眼熟,然后看下面这张处理流程图。大概就能搞清楚各个组件的作用了。

netty

当然,netty中功能肯定不止这些,比如长连接的实现、怎么解决半包问题、自定义协议、内存零拷贝、各种内置的handler等等。都需要具体去学习,上图只是简单的解析了他的工作原理,仅此而已。