深入分析 Java I/O 的工作机制

转自:https://www.ibm.com/developerworks/cn/java/j-lo-javaio/

 

Java 的 I/O 类库的基本架构

I/O 问题是任何编程语言都无法回避的问题,可以说 I/O 问题是整个人机交互的核心问题,因为 I/O 是机器获取和交换信息的主要渠道。在当今这个数据大爆炸时代,I/O 问题尤其突出,很容易成为一个性能瓶颈。正因如此,所以 Java 在 I/O 上也一直在做持续的优化,如从 1.4 开始引入了 NIO,提升了 I/O 的性能。关于 NIO 我们将在后面详细介绍。

Java 的 I/O 操作类在包 java.io 下,大概有将近 80 个类,但是这些类大概可以分成四组,分别是:

  1. 基于字节操作的 I/O 接口:InputStream 和 OutputStream
  2. 基于字符操作的 I/O 接口:Writer 和 Reader
  3. 基于磁盘操作的 I/O 接口:File
  4. 基于网络操作的 I/O 接口:Socket

前两组主要是根据传输数据的数据格式,后两组主要是根据传输数据的方式,虽然 Socket 类并不在 java.io 包下,但是我仍然把它们划分在一起,因为我个人认为 I/O 的核心问题要么是数据格式影响 I/O 操作,要么是传输方式影响 I/O 操作,也就是将什么样的数据写到什么地方的问题,I/O 只是人与机器或者机器与机器交互的手段,除了在它们能够完成这个交互功能外,我们关注的就是如何提高它的运行效率了,而数据格式和传输方式是影响效率最关键的因素了。我们后面的分析也是基于这两个因素来展开的。

基于字节的 I/O 操作接口

基于字节的 I/O 操作接口输入和输出分别是:InputStream 和 OutputStream,InputStream 输入流的类继承层次如下图所示:

图 1. InputStream 相关类层次结构(查看大图

图 1. InputStream 相关类层次结构

输入流根据数据类型和操作方式又被划分成若干个子类,每个子类分别处理不同操作类型,OutputStream 输出流的类层次结构也是类似,如下图所示:

图 2. OutputStream 相关类层次结构(查看大图

图 2. OutputStream 相关类层次结构

这里就不详细解释每个子类如何使用了,如果不清楚的话可以参考一下 JDK 的 API 说明文档,这里只想说明两点,一个是操作数据的方式是可以组合使用的,如这样组合使用

OutputStream out = new BufferedOutputStream(new ObjectOutputStream(new FileOutputStream("fileName"))

还有一点是流最终写到什么地方必须要指定,要么是写到磁盘要么是写到网络中,其实从上面的类图中我们发现,写网络实际上也是写文件,只不过写网络还有一步需要处理就是底层操作系统再将数据传送到其它地方而不是本地磁盘。关于网络 I/O 和磁盘 I/O 我们将在后面详细介绍。

基于字符的 I/O 操作接口

不管是磁盘还是网络传输,最小的存储单元都是字节,而不是字符,所以 I/O 操作的都是字节而不是字符,但是为啥有操作字符的 I/O 接口呢?这是因为我们的程序中通常操作的数据都是以字符形式,为了操作方便当然要提供一个直接写字符的 I/O 接口,如此而已。我们知道字符到字节必须要经过编码转换,而这个编码又非常耗时,而且还会经常出现乱码问题,所以 I/O 的编码问题经常是让人头疼的问题。关于 I/O 编码问题请参考另一篇文章 《深入分析Java中的中文编码问题》

下图是写字符的 I/O 操作接口涉及到的类,Writer 类提供了一个抽象方法 write(char cbuf[], int off, int len) 由子类去实现。

图 3. Writer 相关类层次结构(查看大图

图 3. Writer 相关类层次结构

读字符的操作接口也有类似的类结构,如下图所示:

图 4.Reader 类层次结构(查看大图

图 4.Reader 类层次结构

读字符的操作接口中也是 int read(char cbuf[], int off, int len),返回读到的 n 个字节数,不管是 Writer 还是 Reader 类它们都只定义了读取或写入的数据字符的方式,也就是怎么写或读,但是并没有规定数据要写到哪去,写到哪去就是我们后面要讨论的基于磁盘和网络的工作机制。

字节与字符的转化接口

另外数据持久化或网络传输都是以字节进行的,所以必须要有字符到字节或字节到字符的转化。字符到字节需要转化,其中读的转化过程如下图所示:

图 5. 字符解码相关类结构

图 5. 字符解码相关类结构

InputStreamReader 类是字节到字符的转化桥梁,InputStream 到 Reader 的过程要指定编码字符集,否则将采用操作系统默认字符集,很可能会出现乱码问题。StreamDecoder 正是完成字节到字符的解码的实现类。也就是当你用如下方式读取一个文件时:

清单 1.读取文件
1
2
3
4
5
6
7
8
9
try {
           StringBuffer str = new StringBuffer();
           char[] buf = new char[1024];
           FileReader f = new FileReader("file");
           while(f.read(buf)>0){
               str.append(buf);
           }
           str.toString();
} catch (IOException e) {}

FileReader 类就是按照上面的工作方式读取文件的,FileReader 是继承了 InputStreamReader 类,实际上是读取文件流,然后通过 StreamDecoder 解码成 char,只不过这里的解码字符集是默认字符集。

写入也是类似的过程如下图所示:

图 6. 字符编码相关类结构

图 6. 字符编码相关类结构

通过 OutputStreamWriter 类完成,字符到字节的编码过程,由 StreamEncoder 完成编码过程。

磁盘 I/O 工作机制

前面介绍了基本的 Java I/O 的操作接口,这些接口主要定义了如何操作数据,以及介绍了操作两种数据结构:字节和字符的方式。还有一个关键问题就是数据写到何处,其中一个主要方式就是将数据持久化到物理磁盘,下面将介绍如何将数据持久化到物理磁盘的过程。

我们知道数据在磁盘的唯一最小描述就是文件,也就是说上层应用程序只能通过文件来操作磁盘上的数据,文件也是操作系统和磁盘驱动器交互的一个最小单元。值得注意的是 Java 中通常的 File 并不代表一个真实存在的文件对象,当你通过指定一个路径描述符时,它就会返回一个代表这个路径相关联的一个虚拟对象,这个可能是一个真实存在的文件或者是一个包含多个文件的目录。为何要这样设计?因为大部分情况下,我们并不关心这个文件是否真的存在,而是关心这个文件到底如何操作。例如我们手机里通常存了几百个朋友的电话号码,但是我们通常关心的是我有没有这个朋友的电话号码,或者这个电话号码是什么,但是这个电话号码到底能不能打通,我们并不是时时刻刻都去检查,而只有在真正要给他打电话时才会看这个电话能不能用。也就是使用这个电话记录要比打这个电话的次数多很多。

何时真正会要检查一个文件存不存?就是在真正要读取这个文件时,例如 FileInputStream 类都是操作一个文件的接口,注意到在创建一个 FileInputStream 对象时,会创建一个 FileDescriptor 对象,其实这个对象就是真正代表一个存在的文件对象的描述,当我们在操作一个文件对象时可以通过 getFD() 方法获取真正操作的与底层操作系统关联的文件描述。例如可以调用 FileDescriptor.sync() 方法将操作系统缓存中的数据强制刷新到物理磁盘中。

下面以清单 1 的程序为例,介绍下如何从磁盘读取一段文本字符。如下图所示:

图 7. 从磁盘读取文件

图 7. 从磁盘读取文件

当传入一个文件路径,将会根据这个路径创建一个 File 对象来标识这个文件,然后将会根据这个 File 对象创建真正读取文件的操作对象,这时将会真正创建一个关联真实存在的磁盘文件的文件描述符 FileDescriptor,通过这个对象可以直接控制这个磁盘文件。由于我们需要读取的是字符格式,所以需要 StreamDecoder 类将 byte 解码为 char 格式,至于如何从磁盘驱动器上读取一段数据,由操作系统帮我们完成。至于操作系统是如何将数据持久化到磁盘以及如何建立数据结构需要根据当前操作系统使用何种文件系统来回答,至于文件系统的相关细节可以参考另外的文章。

Java Socket 的工作机制

Socket 这个概念没有对应到一个具体的实体,它是描述计算机之间完成相互通信一种抽象功能。打个比方,可以把 Socket 比作为两个城市之间的交通工具,有了它,就可以在城市之间来回穿梭了。交通工具有多种,每种交通工具也有相应的交通规则。Socket 也一样,也有多种。大部分情况下我们使用的都是基于 TCP/IP 的流套接字,它是一种稳定的通信协议。

下图是典型的基于 Socket 的通信的场景:

图 8.Socket 通信示例

图 8.Socket 通信示例

主机 A 的应用程序要能和主机 B 的应用程序通信,必须通过 Socket 建立连接,而建立 Socket 连接必须需要底层 TCP/IP 协议来建立 TCP 连接。建立 TCP 连接需要底层 IP 协议来寻址网络中的主机。我们知道网络层使用的 IP 协议可以帮助我们根据 IP 地址来找到目标主机,但是一台主机上可能运行着多个应用程序,如何才能与指定的应用程序通信就要通过 TCP 或 UPD 的地址也就是端口号来指定。这样就可以通过一个 Socket 实例唯一代表一个主机上的一个应用程序的通信链路了。

建立通信链路

当客户端要与服务端通信,客户端首先要创建一个 Socket 实例,操作系统将为这个 Socket 实例分配一个没有被使用的本地端口号,并创建一个包含本地和远程地址和端口号的套接字数据结构,这个数据结构将一直保存在系统中直到这个连接关闭。在创建 Socket 实例的构造函数正确返回之前,将要进行 TCP 的三次握手协议,TCP 握手协议完成后,Socket 实例对象将创建完成,否则将抛出 IOException 错误。

与之对应的服务端将创建一个 ServerSocket 实例,ServerSocket 创建比较简单只要指定的端口号没有被占用,一般实例创建都会成功,同时操作系统也会为 ServerSocket 实例创建一个底层数据结构,这个数据结构中包含指定监听的端口号和包含监听地址的通配符,通常情况下都是“*”即监听所有地址。之后当调用 accept() 方法时,将进入阻塞状态,等待客户端的请求。当一个新的请求到来时,将为这个连接创建一个新的套接字数据结构,该套接字数据的信息包含的地址和端口信息正是请求源地址和端口。这个新创建的数据结构将会关联到 ServerSocket 实例的一个未完成的连接数据结构列表中,注意这时服务端与之对应的 Socket 实例并没有完成创建,而要等到与客户端的三次握手完成后,这个服务端的 Socket 实例才会返回,并将这个 Socket 实例对应的数据结构从未完成列表中移到已完成列表中。所以 ServerSocket 所关联的列表中每个数据结构,都代表与一个客户端的建立的 TCP 连接。

数据传输

传输数据是我们建立连接的主要目的,如何通过 Socket 传输数据,下面将详细介绍。

当连接已经建立成功,服务端和客户端都会拥有一个 Socket 实例,每个 Socket 实例都有一个 InputStream 和 OutputStream,正是通过这两个对象来交换数据。同时我们也知道网络 I/O 都是以字节流传输的。当 Socket 对象创建时,操作系统将会为 InputStream 和 OutputStream 分别分配一定大小的缓冲区,数据的写入和读取都是通过这个缓存区完成的。写入端将数据写到 OutputStream 对应的 SendQ 队列中,当队列填满时,数据将被发送到另一端 InputStream 的 RecvQ 队列中,如果这时 RecvQ 已经满了,那么 OutputStream 的 write 方法将会阻塞直到 RecvQ 队列有足够的空间容纳 SendQ 发送的数据。值得特别注意的是,这个缓存区的大小以及写入端的速度和读取端的速度非常影响这个连接的数据传输效率,由于可能会发生阻塞,所以网络 I/O 与磁盘 I/O 在数据的写入和读取还要有一个协调的过程,如果两边同时传送数据时可能会产生死锁,在后面 NIO 部分将介绍避免这种情况。

NIO 的工作方式

BIO 带来的挑战

BIO 即阻塞 I/O,不管是磁盘 I/O 还是网络 I/O,数据在写入 OutputStream 或者从 InputStream 读取时都有可能会阻塞。一旦有线程阻塞将会失去 CPU 的使用权,这在当前的大规模访问量和有性能要求情况下是不能接受的。虽然当前的网络 I/O 有一些解决办法,如一个客户端一个处理线程,出现阻塞时只是一个线程阻塞而不会影响其它线程工作,还有为了减少系统线程的开销,采用线程池的办法来减少线程创建和回收的成本,但是有一些使用场景仍然是无法解决的。如当前一些需要大量 HTTP 长连接的情况,像淘宝现在使用的 Web 旺旺项目,服务端需要同时保持几百万的 HTTP 连接,但是并不是每时每刻这些连接都在传输数据,这种情况下不可能同时创建这么多线程来保持连接。即使线程的数量不是问题,仍然有一些问题还是无法避免的。如这种情况,我们想给某些客户端更高的服务优先级,很难通过设计线程的优先级来完成,另外一种情况是,我们需要让每个客户端的请求在服务端可能需要访问一些竞争资源,由于这些客户端是在不同线程中,因此需要同步,而往往要实现这些同步操作要远远比用单线程复杂很多。以上这些情况都说明,我们需要另外一种新的 I/O 操作方式。

NIO 的工作机制

我们先看一下 NIO 涉及到的关联类图,如下:

图 9.NIO 相关类图

图 9.NIO 相关类图

上图中有两个关键类:Channel 和 Selector,它们是 NIO 中两个核心概念。我们还用前面的城市交通工具来继续比喻 NIO 的工作方式,这里的 Channel 要比 Socket 更加具体,它可以比作为某种具体的交通工具,如汽车或是高铁等,而 Selector 可以比作为一个车站的车辆运行调度系统,它将负责监控每辆车的当前运行状态:是已经出战还是在路上等等,也就是它可以轮询每个 Channel 的状态。这里还有一个 Buffer 类,它也比 Stream 更加具体化,我们可以将它比作为车上的座位,Channel 是汽车的话就是汽车上的座位,高铁上就是高铁上的座位,它始终是一个具体的概念,与 Stream 不同。Stream 只能代表是一个座位,至于是什么座位由你自己去想象,也就是你在去上车之前并不知道,这个车上是否还有没有座位了,也不知道上的是什么车,因为你并不能选择,这些信息都已经被封装在了运输工具(Socket)里面了,对你是透明的。NIO 引入了 Channel、Buffer 和 Selector 就是想把这些信息具体化,让程序员有机会控制它们,如:当我们调用 write() 往 SendQ 写数据时,当一次写的数据超过 SendQ 长度是需要按照 SendQ 的长度进行分割,这个过程中需要有将用户空间数据和内核地址空间进行切换,而这个切换不是你可以控制的。而在 Buffer 中我们可以控制 Buffer 的 capacity,并且是否扩容以及如何扩容都可以控制。

理解了这些概念后我们看一下,实际上它们是如何工作的,下面是典型的一段 NIO 代码:

清单 2. NIO 工作代码示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public void selector() throws IOException {
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        Selector selector = Selector.open();
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);//设置为非阻塞方式
        ssc.socket().bind(new InetSocketAddress(8080));
        ssc.register(selector, SelectionKey.OP_ACCEPT);//注册监听的事件
        while (true) {
            Set selectedKeys = selector.selectedKeys();//取得所有key集合
            Iterator it = selectedKeys.iterator();
            while (it.hasNext()) {
                SelectionKey key = (SelectionKey) it.next();
                if ((key.readyOps() & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT) {
                    ServerSocketChannel ssChannel = (ServerSocketChannel) key.channel();
                 SocketChannel sc = ssChannel.accept();//接受到服务端的请求
                    sc.configureBlocking(false);
                    sc.register(selector, SelectionKey.OP_READ);
                    it.remove();
                } else if
                ((key.readyOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) {
                    SocketChannel sc = (SocketChannel) key.channel();
                    while (true) {
                        buffer.clear();
                        int n = sc.read(buffer);//读取数据
                        if (n <= 0) {
                            break;
                        }
                        buffer.flip();
                    }
                    it.remove();
                }
            }
        }
}

调用 Selector 的静态工厂创建一个选择器,创建一个服务端的 Channel 绑定到一个 Socket 对象,并把这个通信信道注册到选择器上,把这个通信信道设置为非阻塞模式。然后就可以调用 Selector 的 selectedKeys 方法来检查已经注册在这个选择器上的所有通信信道是否有需要的事件发生,如果有某个事件发生时,将会返回所有的 SelectionKey,通过这个对象 Channel 方法就可以取得这个通信信道对象从而可以读取通信的数据,而这里读取的数据是 Buffer,这个 Buffer 是我们可以控制的缓冲器。

在上面的这段程序中,是将 Server 端的监听连接请求的事件和处理请求的事件放在一个线程中,但是在实际应用中,我们通常会把它们放在两个线程中,一个线程专门负责监听客户端的连接请求,而且是阻塞方式执行的;另外一个线程专门来处理请求,这个专门处理请求的线程才会真正采用 NIO 的方式,像 Web 服务器 Tomcat 和 Jetty 都是这个处理方式,关于 Tomcat 和 Jetty 的 NIO 处理方式可以参考文章《 Jetty 的工作原理和与 Tomcat 的比较》。

下图是描述了基于 NIO 工作方式的 Socket 请求的处理过程:

图 10. 基于 NIO 的 Socket 请求的处理过程

图 10. 基于 NIO 的 Socket 请求的处理过程

上图中的 Selector 可以同时监听一组通信信道(Channel)上的 I/O 状态,前提是这个 Selector 要已经注册到这些通信信道中。选择器 Selector 可以调用 select() 方法检查已经注册的通信信道上的是否有 I/O 已经准备好,如果没有至少一个信道 I/O 状态有变化,那么 select 方法会阻塞等待或在超时时间后会返回 0。上图中如果有多个信道有数据,那么将会将这些数据分配到对应的数据 Buffer 中。所以关键的地方是有一个线程来处理所有连接的数据交互,每个连接的数据交互都不是阻塞方式,所以可以同时处理大量的连接请求。

Buffer 的工作方式

上面介绍了 Selector 将检测到有通信信道 I/O 有数据传输时,通过 selelct() 取得 SocketChannel,将数据读取或写入 Buffer 缓冲区。下面讨论一下 Buffer 如何接受和写出数据?

Buffer 可以简单的理解为一组基本数据类型的元素列表,它通过几个变量来保存这个数据的当前位置状态,也就是有四个索引。如下表所示:

表 1.Buffer 中的参数项

在实际操作数据时它们有如下关系图:

Figure xxx. Requires a heading

我们通过 ByteBuffer.allocate(11) 方法创建一个 11 个 byte 的数组缓冲区,初始状态如上图所示,position 的位置为 0,capacity 和 limit 默认都是数组长度。当我们写入 5 个字节时位置变化如下图所示:

Figure xxx. Requires a heading

这时我们需要将缓冲区的 5 个字节数据写入 Channel 通信信道,所以我们需要调用 byteBuffer.flip() 方法,数组的状态又发生如下变化:

Figure xxx. Requires a heading

这时底层操作系统就可以从缓冲区中正确读取这 5 个字节数据发送出去了。在下一次写数据之前我们在调一下 clear() 方法。缓冲区的索引状态又回到初始位置。

这里还要说明一下 mark,当我们调用 mark() 时,它将记录当前 position 的前一个位置,当我们调用 reset 时,position 将恢复 mark 记录下来的值。

还有一点需要说明,通过 Channel 获取的 I/O 数据首先要经过操作系统的 Socket 缓冲区再将数据复制到 Buffer 中,这个的操作系统缓冲区就是底层的 TCP 协议关联的 RecvQ 或者 SendQ 队列,从操作系统缓冲区到用户缓冲区复制数据比较耗性能,Buffer 提供了另外一种直接操作操作系统缓冲区的的方式即 ByteBuffer.allocateDirector(size),这个方法返回的 byteBuffer 就是与底层存储空间关联的缓冲区,它的操作方式与 linux2.4 内核的 sendfile 操作方式类似。

I/O 调优

下面就磁盘 I/O 和网络 I/O 的一些常用的优化技巧进行总结如下:

磁盘 I/O 优化

性能检测

我们的应用程序通常都需要访问磁盘读取数据,而磁盘 I/O 通常都很耗时,我们要判断 I/O 是否是一个瓶颈,我们有一些参数指标可以参考:

如我们可以压力测试应用程序看系统的 I/O wait 指标是否正常,例如测试机器有 4 个 CPU,那么理想的 I/O wait 参数不应该超过 25%,如果超过 25% 的话,I/O 很可能成为应用程序的性能瓶颈。Linux 操作系统下可以通过 iostat 命令查看。

通常我们在判断 I/O 性能时还会看另外一个参数就是 IOPS,我们应用程序需要最低的 IOPS 是多少,而我们的磁盘的 IOPS 能不能达到我们的要求。每个磁盘的 IOPS 通常是在一个范围内,这和存储在磁盘的数据块的大小和访问方式也有关。但是主要是由磁盘的转速决定的,磁盘的转速越高磁盘的 IOPS 也越高。

现在为了提高磁盘 I/O 的性能,通常采用一种叫 RAID 的技术,就是将不同的磁盘组合起来来提高 I/O 性能,目前有多种 RAID 技术,每种 RAID 技术对 I/O 性能提升会有不同,可以用一个 RAID 因子来代表,磁盘的读写吞吐量可以通过 iostat 命令来获取,于是我们可以计算出一个理论的 IOPS 值,计算公式如下所以:

( 磁盘数 * 每块磁盘的 IOPS)/( 磁盘读的吞吐量 +RAID 因子 * 磁盘写的吞吐量 )=IOPS

这个公式的详细信息请查阅参考资料 Understanding Disk I/O

提升 I/O 性能

提升磁盘 I/O 性能通常的方法有:

  1. 增加缓存,减少磁盘访问次数
  2. 优化磁盘的管理系统,设计最优的磁盘访问策略,以及磁盘的寻址策略,这里是在底层操作系统层面考虑的。
  3. 设计合理的磁盘存储数据块,以及访问这些数据块的策略,这里是在应用层面考虑的。如我们可以给存放的数据设计索引,通过寻址索引来加快和减少磁盘的访问,还有可以采用异步和非阻塞的方式加快磁盘的访问效率。
  4. 应用合理的 RAID 策略提升磁盘 IO,每种 RAID 的区别我们可以用下表所示:
表 2.RAID 策略

网络 I/O 优化

网络 I/O 优化通常有一些基本处理原则:

  1. 一个是减少网络交互的次数:要减少网络交互的次数通常我们在需要网络交互的两端会设置缓存,比如 Oracle 的 JDBC 驱动程序,就提供了对查询的 SQL 结果的缓存,在客户端和数据库端都有,可以有效的减少对数据库的访问。关于 Oracle JDBC 的内存管理可以参考《 Oracle JDBC 内存管理》。除了设置缓存还有一个办法是,合并访问请求:如在查询数据库时,我们要查 10 个 id,我可以每次查一个 id,也可以一次查 10 个 id。再比如在访问一个页面时通过会有多个 js 或 css 的文件,我们可以将多个 js 文件合并在一个 HTTP 链接中,每个文件用逗号隔开,然后发送到后端 Web 服务器根据这个 URL 链接,再拆分出各个文件,然后打包再一并发回给前端浏览器。这些都是常用的减少网络 I/O 的办法。
  2. 减少网络传输数据量的大小:减少网络数据量的办法通常是将数据压缩后再传输,如 HTTP 请求中,通常 Web 服务器将请求的 Web 页面 gzip 压缩后在传输给浏览器。还有就是通过设计简单的协议,尽量通过读取协议头来获取有用的价值信息。比如在代理程序设计时,有 4 层代理和 7 层代理都是来尽量避免要读取整个通信数据来取得需要的信息。
  3. 尽量减少编码:通常在网络 I/O 中数据传输都是以字节形式的,也就是通常要序列化。但是我们发送要传输的数据都是字符形式的,从字符到字节必须编码。但是这个编码过程是比较耗时的,所以在要经过网络 I/O 传输时,尽量直接以字节形式发送。也就是尽量提前将字符转化为字节,或者减少字符到字节的转化过程。
  4. 根据应用场景设计合适的交互方式:所谓的交互场景主要包括同步与异步阻塞与非阻塞方式,下面将详细介绍。

同步与异步

所谓同步就是一个任务的完成需要依赖另外一个任务时,只有等待被依赖的任务完成后,依赖的任务才能算完成,这是一种可靠的任务序列。要么成功都成功,失败都失败,两个任务的状态可以保持一致。而异步是不需要等待被依赖的任务完成,只是通知被依赖的任务要完成什么工作,依赖的任务也立即执行,只要自己完成了整个任务就算完成了。至于被依赖的任务最终是否真正完成,依赖它的任务无法确定,所以它是不可靠的任务序列。我们可以用打电话和发短信来很好的比喻同步与异步操作。

在设计到 IO 处理时通常都会遇到一个是同步还是异步的处理方式的选择问题。因为同步与异步的 I/O 处理方式对调用者的影响很大,在数据库产品中都会遇到这个问题。因为 I/O 操作通常是一个非常耗时的操作,在一个任务序列中 I/O 通常都是性能瓶颈。但是同步与异步的处理方式对程序的可靠性影响非常大,同步能够保证程序的可靠性,而异步可以提升程序的性能,必须在可靠性和性能之间做个平衡,没有完美的解决办法。

阻塞与非阻塞

阻塞与非阻塞主要是从 CPU 的消耗上来说的,阻塞就是 CPU 停下来等待一个慢的操作完成 CPU 才接着完成其它的事。非阻塞就是在这个慢的操作在执行时 CPU 去干其它别的事,等这个慢的操作完成时,CPU 再接着完成后续的操作。虽然表面上看非阻塞的方式可以明显的提高 CPU 的利用率,但是也带了另外一种后果就是系统的线程切换增加。增加的 CPU 使用时间能不能补偿系统的切换成本需要好好评估。

两种的方式的组合

组合的方式可以由四种,分别是:同步阻塞、同步非阻塞、异步阻塞、异步非阻塞,这四种方式都对 I/O 性能有影响。下面给出分析,并有一些常用的设计用例参考。

表 3. 四种组合方式

虽然异步和非阻塞能够提升 I/O 的性能,但是也会带来一些额外的性能成本,例如会增加线程数量从而增加 CPU 的消耗,同时也会导致程序设计的复杂度上升。如果设计的不合理的话反而会导致性能下降。在实际设计时要根据应用场景综合评估一下。

下面举一些异步和阻塞的操作实例:

在 Cassandra 中要查询数据通常会往多个数据节点发送查询命令,但是要检查每个节点返回数据的完整性,所以需要一个异步查询同步结果的应用场景,部分代码如下:

清单 3.异步查询同步结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
class AsyncResult implements IAsyncResult{
   private byte[] result_;
   private AtomicBoolean done_ = new AtomicBoolean(false);
   private Lock lock_ = new ReentrantLock();
   private Condition condition_;
   private long startTime_;
   public AsyncResult(){       
       condition_ = lock_.newCondition();// 创建一个锁
       startTime_ = System.currentTimeMillis();
   }   
/*** 检查需要的数据是否已经返回,如果没有返回阻塞 */
public byte[] get(){
       lock_.lock();
       try{
           if (!done_.get()){condition_.await();}
       }catch (InterruptedException ex){
           throw new AssertionError(ex);
       }finally{lock_.unlock();}
       return result_;
}
/*** 检查需要的数据是否已经返回 */
   public boolean isDone(){return done_.get();}
/*** 检查在指定的时间内需要的数据是否已经返回,如果没有返回抛出超时异常 */
   public byte[] get(long timeout, TimeUnit tu) throws TimeoutException{
       lock_.lock();
       try{            boolean bVal = true;
           try{
               if ( !done_.get() ){
          long overall_timeout = timeout - (System.currentTimeMillis() - startTime_);
                   if(overall_timeout > 0)// 设置等待超时的时间
                       bVal = condition_.await(overall_timeout, TimeUnit.MILLISECONDS);
                   else bVal = false;
               }
           }catch (InterruptedException ex){
               throw new AssertionError(ex);
           }
           if ( !bVal && !done_.get() ){// 抛出超时异常
               throw new TimeoutException("Operation timed out.");
           }
       }finally{lock_.unlock();      }
       return result_;
}
/*** 该函数拱另外一个线程设置要返回的数据,并唤醒在阻塞的线程 */
   public void result(Message response){       
       try{
           lock_.lock();
           if ( !done_.get() ){               
               result_ = response.getMessageBody();// 设置返回的数据
               done_.set(true);
               condition_.signal();// 唤醒阻塞的线程
           }
       }finally{lock_.unlock();}       
   }   
}

总结

本文阐述的内容较多,从 Java 基本 I/O 类库结构开始说起,主要介绍了磁盘 I/O 和网络 I/O 的基本工作方式,最后介绍了关于 I/O 调优的一些方法。

一文读懂Java线程状态转换

前言

本文描述Java线程线程状态及状态转换,不会涉及过多理论,主要以代码示例说明线程状态如何转换。

基础知识

1. 线程状态

Thread源码中的状态说明:

 

线程可以有6种状态:

  • New(新建)
  • Runnable(可运行)
  • Blocked(被阻塞)
  • Waiting(等待)
  • Timed waiting(计时等待)
  • Terminated(被终止)
  1. New:new Thread()后线程的状态就是新建。
  2. Runnable:线程一旦调用start()方法,无论是否运行,状态都为Runable,注意Runable状态指示表示线程可以运行,不表示线程当下一定在运行,线程是否运行由虚拟机所在操作系统调度决定。
  3. 被阻塞:线程试图获取一个内部对象的Monitor(进入synchronized方法或synchronized块)但是其他线程已经抢先获取,那此线程被阻塞,知道其他线程释放Monitor并且线程调度器允许当前线程获取到Monitor,此线程就恢复到可运行状态。
  4. 等待:当一个线程等待另一个线程通知调度器一个条件时,线程进入等待状态。
  5. 计时等待:和等待类似,某些造成等待的方法会允许传入超时参数,这类方法会造成计时等待,收到其他线程的通知或者超时都会恢复到可运行状态。
  6. 被终止:线程执行完毕正常结束或执行过程中因未捕获异常意外终止都会是线程进入被终止状态。

2. 线程状态转换

线程从“新建”到“被终止”会历经多次状态转换,所有可能的转换如下图:

 

【图一】

观察状态转化图,我们发现“可运行”状态为所有状态的必经状态。我们分析出四条基本的状态转换线路图。

  • 新建—>可运行—>被终止
  • 新建—>可运行—>被阻塞—>可运行—>被终止
  • 新建—>可运行—>等待—>可运行—>被终止
  • 新建—>可运行—>计时等待—>可运行—>被终止

“新建”和“被终止”状态分别为起始和结束状态,和“可运行”状态不可逆。其他状态均能和“可运行”状态相互转换。

用代码说话

让我们用代码演示线程状态是如何转换的,大家重点关注两个问题?

  • 什么操作会改变线程状态?
  • 改变的状态是如何恢复的?

一、 新建—>可运行—>被终止

这个状态转换时创建的线程生命周期。

/**
 * NEW->RUNNABLE->TERMINATED
 */
public class ThreadStateNRT {

    public static void main(String[] args) {
        Thread thread=new Thread(new Task());
        print(thread.getName(),thread.getState());
        thread.start();
        //等待线程执行完毕。
        try {
            thread.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        print(thread.getName(),thread.getState());
    }

    private static class Task implements Runnable{
        @Override
        public void run() {
            print(Thread.currentThread().getName(),Thread.currentThread().getState());
        }
    }
    private static final String stringFormat="%s:%s";
    private static void print(String threadName,Thread.State state){
        System.out.println(String.format(stringFormat,threadName,state));
    }
}

其中,print()方法用来打印线程信息。后面的代码示例均不在展示。

运行程序结果为:

Thread-0:NEW 
Thread-0:RUNNABLE 
Thread-0:TERMINATED

二、 新建—>可运行—>被阻塞—>可运行—>被终止

只有一种方法能出现阻塞状态,那就是synchronized同步原语。我们需要两个线程其中一个线程被另一个阻塞来展示。

/**
 * NEW->RUNNABLE->BLOCKED->RUNNABLE->TERMINATED
 */
public class ThreadStateNRBRT {
    //锁
    private static final Object lock=new Object();

    public static void main(String[] args) {
        //辅助线程,制造synchronized状态。
        Thread assistantThread = new Thread(new SynTask());
        assistantThread.start();
        try {
            //保证assistantThread先执行。
            Thread.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        Thread showThread = new Thread(new Task());
        print(showThread.getName(), showThread.getState());
        showThread.start();
        print(showThread.getName(),showThread.getState());
        //因为无法判断显示线程何时执行,所以循环直到显示线程执行。
        while (true){
            if(showThread.getState()==Thread.State.BLOCKED){
                print(showThread.getName(), Thread.State.BLOCKED);
                break;
            }
        }
        //等待两个线程执行完毕。
        try {
            assistantThread.join();
            showThread.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //线程执行完毕打印状态。
        print(showThread.getName(), showThread.getState());
    }

    private static class SynTask implements Runnable {
        @Override
        public void run() {
            //锁定一定时间
            synchronized (lock){
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    private static class Task implements Runnable {
        @Override
        public void run() {
            synchronized (lock){
                print(Thread.currentThread().getName(),Thread.currentThread().getState());
            }
        }
    }
}

执行一下你有可能看到正确结果:

Thread-1:NEW
Thread-1:RUNNABLE
Thread-1:BLOCKED
Thread-1:RUNNABLE
Thread-1:TERMINATED

为什么是有可能呢?我们调整一下代码,例如将加锁的时间调小一点:

private static class SynTask implements Runnable {
        @Override
        public void run() {
            //锁定一定时间
            synchronized (lock){
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
}

注意此处Thread.sleep(10),我们只锁住十毫秒。

再运行一下,控制台可能打印出这样的结果且程序不会结束:

Thread-1:NEW
Thread-1:RUNNABLE
Thread-1:RUNNABLE

造成以上结果的原因是我么无法保证两个线程的执行顺序,也无法证主线程一定能打印出显示线程阻塞的状态。

         while (true){
            if(showThread.getState()==Thread.State.BLOCKED){
                print(showThread.getName(), Thread.State.BLOCKED);
                break;
            }
        }

所以执行在这段代码死循环了。

调整一下代码,保证不会因为参数调整改变线程之间的执行顺序和打印结果。

/**
 * NEW->RUNNABLE->BLOCKED->RUNNABLE->TERMINATED
 */
public class ThreadStateNRBRT_New {
    //锁
    private static final Object lock=new Object();
    //锁定标志
    private volatile static boolean lockFlag=true;
    //执行顺序
    private volatile static int order=0;

    public static void main(String[] args) {
        //展示线程
        Thread showThread = new Thread(new Task());
        print(showThread.getName(), showThread.getState());
        showThread.start();
        print(showThread.getName(), showThread.getState());
        //辅助线程,制造synchronized状态。
        Thread assistantThread = new Thread(new SynTask());
        assistantThread.start();


        //循环读取展示线程状态,直到读到展示线程状态为BLOCKED,才让辅助线程退出同步。
        while (true){
            if(showThread.getState()==Thread.State.BLOCKED){
                print(showThread.getName(), Thread.State.BLOCKED);
                lockFlag=false;
                break;
            }
        }

        try {
            assistantThread.join();
            showThread.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //线程执行完毕打印状态。
        print(showThread.getName(), showThread.getState());
    }

    private static class SynTask implements Runnable {
        @Override
        public void run() {
            while (true) {
                //保证先进入同步范围。
                if (order == 0) {
                    synchronized (lock) {
                        //启动另一个同步
                        order=1;
                        //等待主线程读取到线程阻塞状态,退出同步。
                        while (lockFlag) {
                            try {
                                Thread.sleep(10);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                    break;
                }
            }
        }
    }

    private static class Task implements Runnable {
        @Override
        public void run() {
            while (true){
                //保证后进入同步范围。
                if (order==1){
                    synchronized (lock){
                        print(Thread.currentThread().getName(),Thread.currentThread().getState());
                    }
                    break;
                }
            }
        }
    }
}

我们用order保证线程进入同步区的顺序,用lockFlag保证只有在打印出显示线程的被阻塞状态后辅助线程才退出同步区。这样无论如何执行我们都会得到同样的结果。

Thread-0:NEW
Thread-0:RUNNABLE
Thread-0:BLOCKED
Thread-0:RUNNABLE
Thread-0:TERMINATED

三、 新建—>可运行—>等待—>可运行—>被终止

这里我们展示两种三种方法造成线程的等待状态

  • Object.wait()
  • java.util.concurrent.locks.Locke.lock()
  • java.util.concurrent.locks.Condition.await()

其他方法如Thread.join()等大家可以参考示例代码自己实现。

1. Object.wait()

/**
 * NEW->RUNNABLE->WAITING->RUNNABLE->TERMINATED
 */
public class ThreadStateNRWRT {
    //锁
    private static final Object lock=new Object();

    public static void main(String[] args) {
        //展示线程
        Thread showThread = new Thread(new WaitTask());
        print(showThread.getName(), showThread.getState());
        showThread.start();
        print(showThread.getName(),showThread.getState());
        //循环读取展示线程状态,直到读到展示线程状态为WAITING,才让辅助线程唤醒等待线程。
        while (true){
            if(showThread.getState()==Thread.State.WAITING){
                print(showThread.getName(), Thread.State.WAITING);
                break;
            }
        }
        synchronized (lock){
            lock.notify();
        }


        try {
            showThread.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //线程执行完毕打印状态。
        print(showThread.getName(), showThread.getState());
    }


    private static class WaitTask implements Runnable {
        @Override
        public void run() {
            //等待
            synchronized (lock){
                try {
                    lock.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            print(Thread.currentThread().getName(),Thread.currentThread().getState());
        }
    }
}

2. java.util.concurrent.locks.Locke.lock()

/**
 * NEW->RUNNABLE->WAITING->RUNNABLE->TERMINATED
 */
public class ThreadStateNRWRTLock {
    //锁
    private  static Lock lock=new ReentrantLock();
    //锁定标志
    private volatile static boolean lockFlag=true;
    //执行顺序
    private volatile static int order=0;

    public static void main(String[] args) {
        //展示线程
        Thread showThread = new Thread(new Task());
        print(showThread.getName(), showThread.getState());
        showThread.start();
        print(showThread.getName(), showThread.getState());
        //辅助线程,制造synchronized状态。
        Thread assistantThread = new Thread(new SynTask());
        assistantThread.start();
        //循环读取展示线程状态,直到读到展示线程状态为BLOCKED,才让辅助线程退出同步。
        while (true){
            if(showThread.getState()==Thread.State.WAITING){
                print(showThread.getName(), Thread.State.WAITING);
                lockFlag=false;
                break;
            }
        }
        try {
            assistantThread.join();
            showThread.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //线程执行完毕打印状态。
        print(showThread.getName(), showThread.getState());
    }

    private static class SynTask implements Runnable {
        @Override
        public void run() {
            while (true) {
                //保证先进入同步范围。
                if (order == 0) {
                    //加锁
                    lock.lock();
                    try {
                        order=1;
                        while (lockFlag) {
                            try {
                                Thread.sleep(10);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                    }finally {
                        lock.unlock();
                    }
                    break;
                }
            }
        }
    }

    private static class Task implements Runnable {
        @Override
        public void run() {
            while (true){
                //保证后进入同步范围。
                if (order==1){
                    lock.lock();
                    try{
                        print(Thread.currentThread().getName(),Thread.currentThread().getState());
                    }finally {
                        lock.unlock();
                    }
                    break;
                }
            }
        }
    }
}

3. java.util.concurrent.locks.Condition.await()

/**
 * NEW->RUNNABLE->WAITING->RUNNABLE->TERMINATED
 */
public class ThreadStateNRWRTCondition {
    //锁
    private static Lock lock=new ReentrantLock();
    private static Condition condition=lock.newCondition();

    public static void main(String[] args) {
        //展示线程
        Thread showThread = new Thread(new WaitTask());
        print(showThread.getName(), showThread.getState());
        showThread.start();
        print(showThread.getName(),showThread.getState());
        //循环读取展示线程状态,直到读到展示线程状态为WAITING,才让辅助线程唤醒等待线程。
        while (true){
            if(showThread.getState()==Thread.State.WAITING){
                print(showThread.getName(), Thread.State.WAITING);
                break;
            }
        }

        lock.lock();
        try{
            condition.signal();
        }finally {
            lock.unlock();
        }


        try {
            showThread.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //线程执行完毕打印状态。
        print(showThread.getName(), showThread.getState());
    }


    private static class WaitTask implements Runnable {
        @Override
        public void run() {
            //等待
            lock.lock();
            try{
                try {
                    condition.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }finally {
                lock.unlock();
            }
            print(Thread.currentThread().getName(),Thread.currentThread().getState());
        }
    }
}

4. 运行结果

三段代码的运行结果都是:

Thread-0:NEW
Thread-0:RUNNABLE
Thread-0:WAITING
Thread-0:RUNNABLE
Thread-0:TERMINATED

四、新建—>可运行—>计时等待—>可运行—>被终止

我们展示两个方法造成计时等待状态

  • Object.wait(long timeout)
  • java.util.concurrent.locks.Condition.await(long time, TimeUnit unit)

其他方法如Thread.sleep(long millis),Thread.join(long millis)等大家可以自己实现。
感觉凡是有超时方法的方法都能让线程状态进入计时等待,但是这个没有经过验证,所以只是一个猜想。

1. Object.wait(long timeout)

/**
 * NEW->RUNNABLE->TIMED_WAITING->RUNNABLE->TERMINATED
 */
public class ThreadStateNRTWRT {
    //锁
    private static final Object lock=new Object();

    public static void main(String[] args) {
        //展示线程
        Thread showThread = new Thread(new WaitTask());
        print(showThread.getName(), showThread.getState());
        showThread.start();
        print(showThread.getName(),showThread.getState());
        //循环读取展示线程状态,直到读到展示线程状态为TIMED_WAITING。
        while (true){
            if(showThread.getState()==Thread.State.TIMED_WAITING){
                print(showThread.getName(), Thread.State.TIMED_WAITING);
                break;
            }
        }

        try {
            showThread.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //线程执行完毕打印状态。
        print(showThread.getName(), showThread.getState());
    }


    private static class WaitTask implements Runnable {
        @Override
        public void run() {
            //等待
            synchronized (lock){
                try {
                    lock.wait(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            print(Thread.currentThread().getName(),Thread.currentThread().getState());
        }
    }
}

2. java.util.concurrent.locks.Condition.await(long time, TimeUnit unit)

/**
 * NEW->RUNNABLE->TIMED_WAITING->RUNNABLE->TERMINATED
 */
public class ThreadStateNRTWRTCondition {
    //锁
    private static Lock lock=new ReentrantLock();
    private static Condition condition=lock.newCondition();

    public static void main(String[] args) {
        //展示线程
        Thread showThread = new Thread(new WaitTask());
        print(showThread.getName(), showThread.getState());
        showThread.start();
        print(showThread.getName(),showThread.getState());
        //循环读取展示线程状态,直到读到展示线程状态为TIMED_WAITING。
        while (true){
            if(Thread.State.TIMED_WAITING==showThread.getState()){
                print(showThread.getName(), Thread.State.TIMED_WAITING);
                break;
            }
        }
        try {
            showThread.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //线程执行完毕打印状态。
        print(showThread.getName(), showThread.getState());
    }


    private static class WaitTask implements Runnable {
        @Override
        public void run() {
            //等待
            lock.lock();
           try{
                try {
                    condition.await(1,TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }finally {
               lock.unlock();
           }
            print(Thread.currentThread().getName(),Thread.currentThread().getState());
        }
    }
}

3. 运行结果

两段程序的运行结果相同:

Thread-0:NEW
Thread-0:RUNNABLE
Thread-0:TIMED_WAITING
Thread-0:RUNNABLE
Thread-0:TERMINATED

结语

至此,我们已经介绍了线程状态转换的所有情况,了解线程状态转换对分析多线程代码运行很帮助。希望本篇文章对大家今后工作有所助力。

参考

《Java核心技术+卷1》第九版

 

转自:https://segmentfault.com/a/1190000016197831?utm_source=tag-newest

接口限流算法

 在开发高并发系统时,有三把利器来保护系统:缓存、降级和限流。下面来看看限流量的一些算法:

1.计数器法:

 

 

       它是限流算法中最简单最容易的一种算法,比如我们要求某一个接口,1分钟内的请求不能超过10次,我们可以在开始时设置一个计数器,

每次请求,该计数器+1;如果该计数器的值大于10并且与第一次请求的时间间隔在1分钟内,那么说明请求过多;如果该请求与第一次请求

的时间间隔大于1分钟,并且该计数器的值还在限流范围内,那么重置该计数器。具体代码如下:

public class CounterDemo {
public long timeStamp = getNowTime();
public int reqCount = 0;
public final int limit = 100; // 时间窗口内最大请求数
public final long interval = 1000; // 时间窗口ms
public boolean grant() {
long now = getNowTime();
if (now < timeStamp + interval) {
// 在时间窗口内
reqCount++;
// 判断当前时间窗口内是否超过最大请求控制数
return reqCount <= limit;
}
else {
timeStamp = now;
// 超时后重置
reqCount = 1;
return true;
}
}
}
不过,以上代码有致命问题,当遇到恶意请求,在0:59时,瞬间请求100次,并且在1:00请求100次,那么这个用户在1秒内请求了200次,
用户可以在重置节点突发请求,而瞬间超过我们设置的速率限制,用户可能通过算法漏洞击垮我们的应用。如下图,如何解决呢,看下边的滑动窗口算法。

 

 

2.滑动窗口算法:

 

 

        在上图中,整个红色矩形框是一个时间窗口,在我们的例子中,一个时间窗口就是1分钟,然后我们将时间窗口进行划分,如上图我们把滑动窗口

划分为6格,所以每一格代表10秒,每超过10秒,我们的时间窗口就会向右滑动一格,每一格都有自己独立的计数器,例如:一个请求在0:35到达,

那么0:30到0:39的计数器会+1,那么滑动窗口是怎么解决临界点的问题呢?如上图,0:59到达的100个请求会在灰色区域格子中,而1:00到达的请求

会在红色格子中,窗口会向右滑动一格,那么此时间窗口内的总请求数共200个,超过了限定的100,所以此时能够检测出来触发了限流。

回头看看计数器算法,会发现,其实计数器算法就是窗口滑动算法,只不过计数器算法没有对时间窗口进行划分,所以是一格。

由此可见,当滑动窗口的格子划分越多,限流的统计就会越精确。

3.漏桶算法:

漏桶算法,又称 leaky bucket ,如下图:

 

 

这个算法很简单。首先,我们有一个固定容量的桶,有水进来,也有水出去。对于流进来的水,我们无法预计共有多少水流进来,也无法预计流水速度,但

对于流出去的水来说,这个桶可以固定水流的速率,而且当桶满的时候,多余的水会溢出来。

public class LeakyDemo {
public long timeStamp = getNowTime();
public int capacity; // 桶的容量
public int rate; // 水漏出的速度
public int water; // 当前水量(当前累积请求数)
public boolean grant() {
long now = getNowTime();
water = max(0, water – (now – timeStamp) * rate); // 先执行漏水,计算剩余水量
timeStamp = now;
if ((water + 1) < capacity) {
// 尝试加水,并且水还未满
water += 1;
return true;
}
else {
// 水满,拒绝加水
return false;
}
}
}

4.令牌桶算法:
又称token bucket,如下图:

 

 

 

从上图中可以看出,令牌算法有点复杂,桶里存放着令牌token。桶一开始是空的,token以固定的速率r往桶里面填充,直到达到桶的容量,多余的token会
被丢弃。每当一个请求过来时,就会尝试着移除一个token,如果没有token,请求无法通过。

public class TokenBucketDemo {
public long timeStamp = getNowTime();
public int capacity; // 桶的容量
public int rate; // 令牌放入速度
public int tokens; // 当前令牌数量
public boolean grant() {
long now = getNowTime();
// 先添加令牌
tokens = min(capacity, tokens + (now – timeStamp) * rate);
timeStamp = now;
if (tokens < 1) {
// 若不到1个令牌,则拒绝
return false;
}
else {
// 还有令牌,领取令牌
tokens -= 1;
return true;
}
}
}

总结

计数器 VS 滑动窗口

计数器算法是最简单的算法,可以看成是滑动窗口的低精度实现。滑动窗口由于需要存储多份的计数器(每一个格子存一份),所以滑动窗口在实现上需要更多的存储空间。也就是说,如果滑动窗口的精度越高,需要的存储空间就越大。

漏桶算法 VS 令牌桶算法

漏桶算法和令牌桶算法最明显的区别是令牌桶算法允许流量一定程度的突发。因为默认的令牌桶算法,取走token是不需要耗费时间的,也就是说,假设桶内有100个token时,那么可以瞬间允许100个请求通过。

令牌桶算法由于实现简单,且允许某些流量的突发,对用户友好,所以被业界采用地较多。当然我们需要具体情况具体分析,只有最合适的算法,没有最优的算法。

 

原文:

https://www.cnblogs.com/clds/p/5850070.html

https://blog.csdn.net/ljj821061514/article/details/52512943

浅析负载均衡的6种算法,Ngnix的5种算法

常见的几种负载均衡算法

1、轮询法

将请求按顺序轮流地分配到后端服务器上,它均衡地对待后端的每一台服务器,而不关心服务器实际的连接数和当前的系统负载。

2、随机法

通过系统的随机算法,根据后端服务器的列表大小值来随机选取其中的一台服务器进行访问。由概率统计理论可以得知,随着客户端调用服务端的次数增多,

其实际效果越来越接近于平均分配调用量到后端的每一台服务器,也就是轮询的结果。

3、源地址哈希法

源地址哈希的思想是根据获取客户端的IP地址,通过哈希函数计算得到的一个数值,用该数值对服务器列表的大小进行取模运算,得到的结果便是客服端要访问服务器的序号。采用源地址哈希法进行负载均衡,同一IP地址的客户端,当后端服务器列表不变时,它每次都会映射到同一台后端服务器进行访问。

4、加权轮询法

不同的后端服务器可能机器的配置和当前系统的负载并不相同,因此它们的抗压能力也不相同。给配置高、负载低的机器配置更高的权重,让其处理更多的请;而配置低、负载高的机器,给其分配较低的权重,降低其系统负载,加权轮询能很好地处理这一问题,并将请求顺序且按照权重分配到后端。

5、加权随机法

与加权轮询法一样,加权随机法也根据后端机器的配置,系统的负载分配不同的权重。不同的是,它是按照权重随机请求后端服务器,而非顺序。

6、最小连接数法

最小连接数算法比较灵活和智能,由于后端服务器的配置不尽相同,对于请求的处理有快有慢,它是根据后端服务器当前的连接情况,动态地选取其中当前

积压连接数最少的一台服务器来处理当前的请求,尽可能地提高后端服务的利用效率,将负责合理地分流到每一台服务器。

Nginx的5种负载均衡算法

1、轮询(默认)

每个请求按时间顺序逐一分配到不同的后端服务器,如果后端服务器down掉,能自动剔除。

2、weight

指定轮询几率,weight和访问比率成正比,用于后端服务器性能不均的情况。

例如:

upstream bakend {  
  server 192.168.0.14 weight=10;  
  server 192.168.0.15 weight=10;  
}

3、ip_hash

每个请求按访问ip的hash结果分配,这样每个访客固定访问一个后端服务器,可以解决session的问题。

例如:

upstream bakend {  
  ip_hash;  
  server 192.168.0.14:88;  
  server 192.168.0.15:80;  
}

4、fair(第三方)

按后端服务器的响应时间来分配请求,响应时间短的优先分配。

upstream backend {  
  server server1;  
  server server2;  
  fair;  
}

5、url_hash(第三方)

按访问url的hash结果来分配请求,使每个url定向到同一个后端服务器,后端服务器为缓存时比较有效。

例:在upstream中加入hash语句,server语句中不能写入weight等其他的参数,hash_method是使用的hash算法。

upstream backend {  
  server squid1:3128;  
  server squid2:3128;  
  hash $request_uri;  
  hash_method crc32;  
}

tips:

upstream bakend{#定义负载均衡设备的Ip及设备状态  
  ip_hash;  
  server 127.0.0.1:9090 down;  
  server 127.0.0.1:8080 weight=2;  
  server 127.0.0.1:6060;  
  server 127.0.0.1:7070 backup;  
}

在需要使用负载均衡的server中增加

proxy_pass http://bakend/;

每个设备的状态设置为:

1.down 表示单前的server暂时不参与负载
2.weight 默认为1.weight越大,负载的权重就越大。
3.max_fails :允许请求失败的次数默认为1.当超过最大次数时,返回proxy_next_upstream 模块定义的错误
4.fail_timeout:max_fails次失败后,暂停的时间。
5.backup: 其它所有的非backup机器down或者忙的时候,请求backup机器。所以这台机器压力会最轻。

nginx支持同时设置多组的负载均衡,用来给不用的server来使用。

client_body_in_file_only:设置为On,可以讲client post过来的数据记录到文件中用来做debug。

client_body_temp_path:设置记录文件的目录,可以设置最多3层目录。

location:对URL进行匹配,可以进行重定向或者进行新的代理,负载均衡。

转自:https://www.jianshu.com/p/b67af78f1088

浅谈Spring中JDK动态代理与CGLIB动态代理

前言
Spring是Java程序员基本不可能绕开的一个框架,它的核心思想是IOC(控制反转)和AOP(面向切面编程)。在Spring中这两个核心思想都是基于设计模式实现的,IOC思想的实现基于工厂模式,AOP思想的实现则是基于代理模式。

代理模式:代理类和被代理类实现共同的接口(或继承),代理类中存有指向被代理类的索引,实际执行时通过调用代理类的方法、实际执行的是被代理类的方法。
代理解决的问题当两个类需要通信时,引入第三方代理类,将两个类的关系解耦,让我们只了解代理类即可,而且代理的出现还可以让我们完成与另一个类之间的关系的统一管理,但是切记,代理类和委托类要实现相同的接口,因为代理真正调用的还是委托类的方法。
代理模式常见的实现有两种,静态代理和动态代理。

静态代理与动态代理
静态代理,是编译时增强,AOP 框架会在编译阶段生成 AOP 代理类,在程序运行前代理类的.class文件就已经存在了。常见的实现:JDK静态代理,AspectJ 。
动态代理,是运行时增强,它不修改代理类的字节码,而是在程序运行时,运用反射机制,在内存中临时为方法生成一个AOP对象,这个AOP对象包含了目标对象的全部方法,并且在特定的切点做了增强处理,并回调原对象的方法。常见的实现:JDK、CGLIB、Javassist(Hibernate中的使用动态代理)

不过Spring AOP的实现没有用到静态代理,而是采用了动态代理的方式,有两种,JDK动态代理和CGLIB动态代理。下面简述二者的差异。

Spring中如何判断使用哪种动态代理方式?
version:spring-aop-5.0.7.RELEASE.jar
class:org.springframework.aop.framework.ProxyFactoryBean
类源码上对类的说明:

implementation that builds an AOP proxy based on beans in Spring
翻译一下:
在Spring中基于bean构建AOP代理的实现

这个类就是Spring中对于bean构建AOP代理的实现,跟踪下源码流程,翻译下执行流程:

/**
* Return a proxy. Invoked when clients obtain beans from this factory bean.
* Create an instance of the AOP proxy to be returned by this factory.
* The instance will be cached for a singleton, and create on each call to
* {@code getObject()} for a proxy.
* @return a fresh AOP proxy reflecting the current state of this factory
*/
@Override
@Nullable
public Object getObject() throws BeansException {
//初始化拦截器链
initializeAdvisorChain();
//Spring中有singleton类型和prototype类型这两种不同的Bean
//是否是singleton类型,是,返回singleton类型的代理对象
if (isSingleton()) {
return getSingletonInstance();
}
//否,返回prototype类型的代理对象
else {
if (this.targetName == null) {
logger.warn(“Using non-singleton proxies with singleton targets is often undesirable. ” +
“Enable prototype proxies by setting the ‘targetName’ property.”);
}
return newPrototypeInstance();
}
}

singleton作用域:当把一个Bean定义设置为singleton作用域是,Spring IoC容器中只会存在一个共享的Bean实例,并且所有对Bean的请求,只要id与该Bean定义相匹配,则只会返回该Bean的同一实例。值得强调的是singleton作用域是Spring中的缺省作用域。
prototype作用域:prototype作用域的Bean会导致在每次对该Bean请求(将其注入到另一个Bean中,或者以程序的方式调用容器的getBean()方法)时都会创建一个新的Bean实例。根据经验,对有状态的Bean应使用prototype作用域,而对无状态的Bean则应该使用singleton作用域。
对于具有prototype作用域的Bean,有一点很重要,即Spring不能对该Bean的整个生命周期负责。具有prototype作用域的Bean创建后交由调用者负责销毁对象回收资源。
简单的说:
singleton只有一个实例,也即是单例模式。
prototype访问一次创建一个实例,相当于new。
由于singleton作用域是Spring中的缺省作用域,则继续追踪getSingletonInstance()方法。

/**
* Return the singleton instance of this class’s proxy object,
* lazily creating it if it hasn’t been created already.
* @return the shared singleton proxy
*/
private synchronized Object getSingletonInstance() {
if (this.singletonInstance == null) {
this.targetSource = freshTargetSource();
if (this.autodetectInterfaces && getProxiedInterfaces().length == 0 && !isProxyTargetClass()) {
// Rely on AOP infrastructure to tell us what interfaces to proxy.
Class targetClass = getTargetClass();
if (targetClass == null) {
throw new FactoryBeanNotInitializedException(“Cannot determine target class for proxy”);
}
setInterfaces(ClassUtils.getAllInterfacesForClass(targetClass, this.proxyClassLoader));
}
// Initialize the shared singleton instance.
super.setFrozen(this.freezeProxy);
//获取代理对象实例
this.singletonInstance = getProxy(createAopProxy());
}
return this.singletonInstance;
}

很明显,核心就在getProxy(createAopProxy())方法中的createAopProxy(),追踪createAopProxy()创建AOP代理实例方法

/**
* Subclasses should call this to get a new AOP proxy. They should not
* create an AOP proxy with {@code this} as an argument.
*/
protected final synchronized AopProxy createAopProxy() {
//active:在创建第一个AOP代理时设置为true
if (!this.active) {
//激活此代理配置。
activate();
}
//this:AdvisedSupport config,AOP形式配置
//获取AOP代理工厂,以指定AOP形式配置创建代理实例
return getAopProxyFactory().createAopProxy(this);
}

很明显,createAopProxy(AdvisedSupport config)就是创建AOP代理实例,不过这里戳进去是接口,Spring中对默认实现类是org.springframework.aop.framework.DefaultAopProxyFactory,看看里面的实现逻辑

@Override
public AopProxy createAopProxy(AdvisedSupport config) throws AopConfigException {
if (config.isOptimize() || config.isProxyTargetClass() || hasNoUserSuppliedProxyInterfaces(config)) {
Class targetClass = config.getTargetClass();
if (targetClass == null) {
throw new AopConfigException(“TargetSource cannot determine target class: ” +
“Either an interface or a target is required for proxy creation.”);
}
if (targetClass.isInterface() || Proxy.isProxyClass(targetClass)) {
return new JdkDynamicAopProxy(config);
}
return new ObjenesisCglibAopProxy(config);
}
else {
return new JdkDynamicAopProxy(config);
}
}

终于看到JDK和CGLIB的字样了,这个方法决定了是使用JDK动态代理还是CGLIB动态代理。下面对if中的判断逻辑逐个翻译解释

config.isOptimize():是否优化,看到否的逻辑是JDK,就可以知道Spring认为CGLIB动态代理的性能更高点。。。
config.isProxyTargetClass():是否直接代理目标类以及任何接口
hasNoUserSuppliedProxyInterfaces(config):是否没有指定代理接口
targetClass.isInterface():确定指定的对象是否表示接口类型
Proxy.isProxyClass(targetClass):是否是代理类
再看看这个类的说明:

In general, specify {@code proxyTargetClass} to enforce a CGLIB proxy,or specify one or more interfaces to use a JDK dynamic proxy.
谷歌翻译一下
通常,指定{@code proxyTargetClass}来强制执行CGLIB代理,或指定一个或多个接口以使用JDK动态代理

结合类说明和判断逻辑,可以得出结论:

在代理对象不是借口类型或不是代理类时,指定proxyTargetClass=true后,执行CGLIB代理
代理对象是接口类型或是代理类,使用JDK代理
两种动态代理的使用
下方代码是使用JDK动态代理和CGLIB动态代理的示例,这里不探究其底层实现,而是从API的使用比较二者的差异。

import org.springframework.cglib.proxy.Enhancer;
import org.springframework.cglib.proxy.MethodInterceptor;

import java.lang.reflect.Proxy;

public class ProxyService {

/**
* jdk动态代理
*
* @param object 被代理类对象
* @return 代理实例
*/
public static Object jdkProxyObject(Object object) {
//拦截器
SimpleInterceptor interceptor = new SimpleInterceptor();
return Proxy.newProxyInstance(
object.getClass().getClassLoader(),
object.getClass().getInterfaces(),
(proxy, method, args) -> {
//拦截器 – 前置处理
interceptor.before();
Object result = method.invoke(object, args);
//拦截器 – 后置处理
interceptor.after();
return result;
});
}

/**
* cglib动态代理
*
* @param object 被代理类对象
* @return 代理实例
*/
public static Object cglibProxyObject(Object object) {
//模拟拦截器
SimpleInterceptor interceptor = new SimpleInterceptor();
Enhancer enhancer = new Enhancer();
enhancer.setSuperclass(object.getClass());
enhancer.setCallback((MethodInterceptor) (o, method, objects, methodProxy) -> {
//拦截器 – 前置处理
interceptor.before();
Object result = method.invoke(object, objects);
//拦截器 – 后置处理
interceptor.after();
return result;
});
return enhancer.create();
}

}

public class SimpleInterceptor {

public void before() {
System.out.println(“—–” + this.getClass().getSimpleName() + “do before” + “—–“);
}

public void after() {
System.out.println(“—–” + this.getClass().getSimpleName() + “do after” + “—–“);
}

}

JDK动态代理

使用JDK动态代理需要使用:
Proxy.newProxyInstance(ClassLoader loader, Class[] interfaces, InvocationHandler h)
源码方法说明:

Returns an instance of a proxy class for the specified interfaces that dispatches method invocations to the specified invocation handler
谷歌翻译一下:
返回指定接口的代理类的实例,该接口将方法调用分派给指定的调用处理程序。

参数说明:

ClassLoader loader: the class loader to define the proxy class,用于定义代理类的类加载器
Class[] interfaces: the list of interfaces for the proxy class,代理类的接口列表
InvocationHandler h: to implement,由代理实例的调用处理程序实现的接口
根据说明传参,可以估摸出,jdk动态代理的实现原理是实现代理对象的接口生成兄弟类。所以使用jdk动态代理必须满足以下条件:
1. 代理对象必须实现一个或多个接口
2. 返回的代理实例是指定接口的代理类的实例,也就是必须以对象实现的接口接收实例,而不是代理类

CGLIB动态代理

使用Spring cglib动态代理需要使用:
org.springframework.cglib.proxy.Enhancer
由于spring的Sources下载下来并没有Javadoc,没法展示源码上的方法说明。。。
不过从enhancer.setSuperclass(Class superclass) 可以看出cglib代理的特点:

代理对象不能被final修饰,因为cglib代理的实现原理是操作字节码生成代理对象子类,而被final修饰的类不能被继承
因为是子类,所以不必像jdk代理一样必须以对象实现的接口接收实例,代理对象类同样可以接收代理实例
实现原理
JDK动态代理:基于反射,生成实现代理对象接口的匿名类,通过生成代理实例时传递的InvocationHandler处理程序实现方法增强。
CGLIB动态代理:基于操作字节码,通过加载代理对象的类字节码,为代理对象创建一个子类,并在子类中拦截父类方法并织入方法增强逻辑。底层是依靠ASM(开源的java字节码编辑类库)操作字节码实现的。
性能比较
性能比较分为两个部分:生成代理实例性能、代理实例运行性能
由于看到网上有博客提到jdk版本升级会提高动态代理的性能,秉持着实事求是的原则,必须要测试下版本升级后的比较结果,测试的jdk版本为jdk1.8.0_171和jdk-10.0.1。
count数分别定义为100、1000、10000,100000,为避免干扰,方法都是单独执行,每个count执行三次,结果聚合,方便比较。

定义一个简单的service及实现

public interface SimpleService {
void consumer();
}

import java.util.Date;

public class SimpleServiceImpl implements SimpleService {
@Override
public void consumer() {
new Date();
}
}

比较生成代理实例性能
JDK

public class JdkTest {

public static void main(String[] args) {
/*———-jdk———-*/
int count = 100;
long jdkStart = System.currentTimeMillis();
for (int j = 0; j < count; j++) {
SimpleService service = new SimpleServiceImpl();
SimpleService proxy = (SimpleService) ProxyService.jdkProxyObject(service);
}
long jdkEnd = System.currentTimeMillis();

System.out.println(“==================================================”);
System.out.println(“java.version:” + System.getProperty(“java.version”));
System.out.println(“new count:” + count);
System.out.println(“jdk new proxy spend time(ms):” + (jdkEnd – jdkStart));
}

}

CGLIB

public class CglibTest {
public static void main(String[] args) {
/*———-cglib———-*/
int count = 100000;
long cglibStart = System.currentTimeMillis();
for (int j = 0; j < count; j++) {
SimpleService service = new SimpleServiceImpl();
SimpleService proxy = (SimpleService) ProxyService.cglibProxyObject(service);
}
long cglibEnd = System.currentTimeMillis();

System.out.println(“==================================================”);
System.out.println(“java.version:” + System.getProperty(“java.version”));
System.out.println(“new count:” + count);
System.out.println(“cglib new proxy spend time(ms):” + (cglibEnd – cglibStart));
}
}

count分别为100,1000,10000,100000,打印结果聚合:
java.version:1.8.0_161

==================================================
java.version:1.8.0_171
new count:100
jdk new proxy spend time(ms):143
jdk new proxy spend time(ms):164
jdk new proxy spend time(ms):154
cglib new proxy spend time(ms):412
cglib new proxy spend time(ms):452
cglib new proxy spend time(ms):428

==================================================
java.version:1.8.0_171
new count:1000
jdk new proxy spend time(ms):151
jdk new proxy spend time(ms):158
jdk new proxy spend time(ms):172
cglib new proxy spend time(ms):489
cglib new proxy spend time(ms):415
cglib new proxy spend time(ms):431

==================================================
java.version:1.8.0_171
new count:10000
jdk new proxy spend time(ms):214
jdk new proxy spend time(ms):218
jdk new proxy spend time(ms):227
cglib new proxy spend time(ms):468
cglib new proxy spend time(ms):486
cglib new proxy spend time(ms):650

==================================================
java.version:1.8.0_171
new count:100000
jdk new proxy spend time(ms):278
jdk new proxy spend time(ms):299
jdk new proxy spend time(ms):304
cglib new proxy spend time(ms):612
cglib new proxy spend time(ms):632
cglib new proxy spend time(ms):684

java.version:10.0.1

==================================================
java.version:10.0.1
new count:100
jdk new proxy spend time(ms):92
jdk new proxy spend time(ms):93
jdk new proxy spend time(ms):85
cglib new proxy spend time(ms):288
cglib new proxy spend time(ms):330
cglib new proxy spend time(ms):347

==================================================
java.version:10.0.1
new count:1000
jdk new proxy spend time(ms):88
jdk new proxy spend time(ms):94
jdk new proxy spend time(ms):105
cglib new proxy spend time(ms):339
cglib new proxy spend time(ms):306
cglib new proxy spend time(ms):341

==================================================
java.version:10.0.1
new count:10000
jdk new proxy spend time(ms):128
jdk new proxy spend time(ms):132
jdk new proxy spend time(ms):125
cglib new proxy spend time(ms):376
cglib new proxy spend time(ms):409
cglib new proxy spend time(ms):446

==================================================
java.version:10.0.1
new count:100000
jdk new proxy spend time(ms):170
jdk new proxy spend time(ms):220
jdk new proxy spend time(ms):196
cglib new proxy spend time(ms):530
cglib new proxy spend time(ms):555
cglib new proxy spend time(ms):633

对比结果,生成代理实例性能:JDK > CGLIB;JDK版本升级后对动态代理生成实例性能有提升。

比较生成代理实例性能
JDK

public class JdkTest {

public static void main(String[] args) {
/*———-jdk———-*/
int count = 100;
long jdkStart = System.currentTimeMillis();
SimpleService service = new SimpleServiceImpl();
SimpleService proxy = (SimpleService) ProxyService.jdkProxyObject(service);
for (int j = 0; j < count; j++) {
proxy.consumer();
}
long jdkEnd = System.currentTimeMillis();

System.out.println(“==================================================”);
System.out.println(“java.version:” + System.getProperty(“java.version”));
System.out.println(“new count:” + count);
System.out.println(“jdk proxy consumer spend time(ms):” + (jdkEnd – jdkStart));
}

}

CGLIB

public class CglibTest {

public static void main(String[] args) {
/*———-cglib———-*/
int count = 100;
long cglibStart = System.currentTimeMillis();
SimpleService service = new SimpleServiceImpl();
SimpleService proxy = (SimpleService) ProxyService.cglibProxyObject(service);
for (int j = 0; j < count; j++) {
proxy.consumer();
}
long cglibEnd = System.currentTimeMillis();

System.out.println(“==================================================”);
System.out.println(“java.version:” + System.getProperty(“java.version”));
System.out.println(“new count:” + count);
System.out.println(“cglib proxy consumer spend time(ms):” + (cglibEnd – cglibStart));
}
}

count分别为100,1000,10000,100000,打印结果聚合:
java.version:1.8.0_161

==================================================
java.version:1.8.0_171
new count:100
jdk proxy consumer spend time(ms):148
jdk proxy consumer spend time(ms):137
jdk proxy consumer spend time(ms):168
cglib proxy consumer spend time(ms):464
cglib proxy consumer spend time(ms):447
cglib proxy consumer spend time(ms):438

==================================================
java.version:1.8.0_171
new count:1000
jdk proxy consumer spend time(ms):141
jdk proxy consumer spend time(ms):170
jdk proxy consumer spend time(ms):150
cglib proxy consumer spend time(ms):408
cglib proxy consumer spend time(ms):410
cglib proxy consumer spend time(ms):422

==================================================
java.version:1.8.0_171
new count:10000
jdk proxy consumer spend time(ms):179
jdk proxy consumer spend time(ms):173
jdk proxy consumer spend time(ms):165
cglib proxy consumer spend time(ms):416
cglib proxy consumer spend time(ms):463
cglib proxy consumer spend time(ms):415

==================================================
java.version:1.8.0_171
new count:100000
jdk proxy consumer spend time(ms):176
jdk proxy consumer spend time(ms):158
jdk proxy consumer spend time(ms):187
cglib proxy consumer spend time(ms):637
cglib proxy consumer spend time(ms):459
cglib proxy consumer spend time(ms):436

java.version:10.0.1

==================================================
java.version:10.0.1
new count:100
jdk proxy consumer spend time(ms):79
jdk proxy consumer spend time(ms):90
jdk proxy consumer spend time(ms):85
cglib proxy consumer spend time(ms):281
cglib proxy consumer spend time(ms):352
cglib proxy consumer spend time(ms):338

==================================================
java.version:10.0.1
new count:1000
jdk proxy consumer spend time(ms):89
jdk proxy consumer spend time(ms):97
jdk proxy consumer spend time(ms):106
cglib proxy consumer spend time(ms):304
cglib proxy consumer spend time(ms):303
cglib proxy consumer spend time(ms):359

==================================================
java.version:10.0.1
new count:10000
jdk proxy consumer spend time(ms):113
jdk proxy consumer spend time(ms):99
jdk proxy consumer spend time(ms):108
cglib proxy consumer spend time(ms):347
cglib proxy consumer spend time(ms):339
cglib proxy consumer spend time(ms):349

==================================================
java.version:10.0.1
new count:100000
jdk proxy consumer spend time(ms):106
jdk proxy consumer spend time(ms):102
jdk proxy consumer spend time(ms):107
cglib proxy consumer spend time(ms):342
cglib proxy consumer spend time(ms):380
cglib proxy consumer spend time(ms):385

对比结果,代理实例运行性能:JDK > CGLIB;JDK版本升级后对动态代理的代理实例运行性能有提升。
说实话看到对比结果我是震惊的,我之前一直认为CGLIB动态代理的性能应该优于JDK动态代理,通过对比结果,可以看出,动态代理总体性能:JDK > CGLIB,难怪Spring默认的动态代理方式为JDK。

小结
Spring中动态代理选择逻辑:

 

JDK动态代理特点:

代理对象必须实现一个或多个接口
以接口形式接收代理实例,而不是代理类
CGLIB动态代理特点:

代理对象不能被final修饰
以类或接口形式接收代理实例
JDK与CGLIB动态代理的性能比较:

生成代理实例性能:JDK > CGLIB
代理实例运行性能:JDK > CGLIB

转自:https://blog.csdn.net/wangzhihao1994/article/details/80913210

Docker基本命令汇总

 

Docker的三大核心概念:镜像、容器、仓库

镜像:类似虚拟机的镜像、用俗话说就是安装文件。

容器:类似一个轻量级的沙箱,容器是从镜像创建应用运行实例,可以将其启动、开始、停止、删除、而这些容器都是相互隔离、互不可见的。

仓库:类似代码仓库,是Docker集中存放镜像文件的场所。

 

1.设置docker自启动

使用yum安装好docker后,设置开机启动。

[root@CentOS ~]# systemctl enable docker

 

2.docker的启动、停止、重启

复制代码
[root@localhost ~]# service docker restart
Redirecting to /bin/systemctl restart docker.service
[root@localhost ~]# service docker stop
Redirecting to /bin/systemctl stop docker.service
[root@localhost ~]# service docker start
Redirecting to /bin/systemctl start docker.service
复制代码

 

3.docker镜像

docker search 用于搜索线上镜像仓库,这样就可以搜索出在Docker Hub上所有带redis的公共的可用镜像。

复制代码
[root@CentOS ~]# docker search redis
INDEX       NAME                                           DESCRIPTION                                     STARS     OFFICIAL   AUTOMATED
docker.io   docker.io/redis                                Redis is an open source key-value store th...   5472      [OK]
docker.io   docker.io/bitnami/redis                        Bitnami Redis Docker Image                      82                   [OK]
docker.io   docker.io/sameersbn/redis                                                                      68                   [OK]
docker.io   docker.io/tenstartups/redis-commander                                                          32                   [OK]
docker.io   docker.io/hypriot/rpi-redis                    Raspberry Pi compatible redis image             31
docker.io   docker.io/kubeguide/redis-master               redis-master with "Hello World!"                25
docker.io   docker.io/kubeguide/guestbook-redis-slave      Guestbook redis slave                           20
docker.io   docker.io/redislabs/redis                      Clustered in-memory database engine compat...   15
docker.io   docker.io/webhippie/redis                      Docker images for redis                         9                    [OK]
docker.io   docker.io/arm32v7/redis                        Redis is an open source key-value store th...   7
docker.io   docker.io/rediscommander/redis-commander       Alpine image for redis-commander - Redis m...   7                    [OK]
docker.io   docker.io/oliver006/redis_exporter              Prometheus Exporter for Redis Metrics. Su...   5
docker.io   docker.io/rtoma/logspout-redis-logstash        Logspout including Redis adapter for sendi...   5
docker.io   docker.io/centos/redis-32-centos7              Redis in-memory data structure store, used...   3
docker.io   docker.io/dynomitedb/redis                     Redis backend for DynomiteDB.                   2                    [OK]
...
复制代码

docker pull 拉取服务器镜像仓库里的镜像。

复制代码
[root@CentOS ~]# docker pull redis
Using default tag: latest
Trying to pull repository docker.io/library/redis ...
latest: Pulling from docker.io/library/redis
be8881be8156: Pull complete
d6f5ea773ca3: Pull complete
735cc65c0db4: Pull complete
ff89c30e4d8c: Pull complete
59bf782a86b3: Pull complete
ce8aaa9fe90a: Pull complete
Digest: sha256:096cff9e6024603decb2915ea3e501c63c5bb241e1b56830a52acfd488873843
Status: Downloaded newer image for docker.io/redis:latest
复制代码

不指定版本,默认会下载最新的一个版本。由于官方的镜像在国外,有时网速较慢,所以推荐下载国内的镜像,比如阿里云,网易云。

docker push 推送本地镜像到服务器。

 

查看镜像

    docker images:列出images

    docker images -a:列出所有的images(包含历史)

    docker images –tree :显示镜像的所有层(layer)

    docker rmi  :删除一个或多个image

 

4.docker创建一个容器

上面介绍了docker镜像的拉取,我们接下来看下如何创建一个容器。

docker run 命令格式

复制代码
Usage: docker run [OPTIONS] IMAGE [COMMAND] [ARG...]  
 
  -d, --detach=false         指定容器运行于前台还是后台,默认为false   
  -i, --interactive=false   打开STDIN,用于控制台交互  
  -t, --tty=false            分配tty设备,该可以支持终端登录,默认为false  
  -u, --user=""              指定容器的用户  
  -a, --attach=[]            登录容器(必须是以docker run -d启动的容器)
  -w, --workdir=""           指定容器的工作目录 
  -c, --cpu-shares=0        设置容器CPU权重,在CPU共享场景使用  
  -e, --env=[]               指定环境变量,容器中可以使用该环境变量  
  -m, --memory=""            指定容器的内存上限  
  -P, --publish-all=false    指定容器暴露的端口  
  -p, --publish=[]           指定容器暴露的端口 
  -h, --hostname=""          指定容器的主机名  
  -v, --volume=[]            给容器挂载存储卷,挂载到容器的某个目录  
  --volumes-from=[]          给容器挂载其他容器上的卷,挂载到容器的某个目录
  --cap-add=[]               添加权限,权限清单详见:http://linux.die.net/man/7/capabilities  
  --cap-drop=[]              删除权限,权限清单详见:http://linux.die.net/man/7/capabilities  
  --cidfile=""               运行容器后,在指定文件中写入容器PID值,一种典型的监控系统用法  
  --cpuset=""                设置容器可以使用哪些CPU,此参数可以用来容器独占CPU  
  --device=[]                添加主机设备给容器,相当于设备直通  
  --dns=[]                   指定容器的dns服务器  
  --dns-search=[]            指定容器的dns搜索域名,写入到容器的/etc/resolv.conf文件  
  --entrypoint=""            覆盖image的入口点  
  --env-file=[]              指定环境变量文件,文件格式为每行一个环境变量  
  --expose=[]                指定容器暴露的端口,即修改镜像的暴露端口  
  --link=[]                  指定容器间的关联,使用其他容器的IP、env等信息  
  --lxc-conf=[]              指定容器的配置文件,只有在指定--exec-driver=lxc时使用  
  --name=""                  指定容器名字,后续可以通过名字进行容器管理,links特性需要使用名字  
  --net="bridge"             容器网络设置:
                                bridge 使用docker daemon指定的网桥     
                                host     //容器使用主机的网络  
                                container:NAME_or_ID  >//使用其他容器的网路,共享IP和PORT等网络资源  
                                none 容器使用自己的网络(类似--net=bridge),但是不进行配置 
  --privileged=false         指定容器是否为特权容器,特权容器拥有所有的capabilities  
  --restart="no"             指定容器停止后的重启策略:
                                no:容器退出时不重启  
                                on-failure:容器故障退出(返回值非零)时重启 
                                always:容器退出时总是重启  
  --rm=false                 指定容器停止后自动删除容器(不支持以docker run -d启动的容器)  
  --sig-proxy=true           设置由代理接受并处理信号,但是SIGCHLD、SIGSTOP和SIGKILL不能被代理 
复制代码

 

实例:

[root@CentOS ~]# docker run -p 8090:8080 -p 50000:50000 --restart always --link Redis:redis --name jenkins -v /home/jenkins:/home/jenkins_home --privileged=true dokcer.io/jenkins/jenkins

     -p:把容器的8080端口映射到宿主机8090上

     -v:主机的目录/home/jenkins映射到容器的目录/home/jenkins_home

     –name:给容器起个名字jenkins,docker.io/jenkins/jenkins是你下载的镜像

     –restart:always 容器退出时总是重启

  –privileged=true:挂载主机目录Docker访问出现Permission denied的解决办法

     –link:为redis容器起个别名Redis,访问的时候使用别名Redis

 

5.查看docker容器

 docker ps      #查看正在运行的容器

[root@CentOS ~]# docker ps
CONTAINER ID        IMAGE                       COMMAND                  CREATED             STATUS              PORTS                                              NAMES
faa1b0be4e0b        docker.io/jenkins/jenkins   "/sbin/tini -- /us..."   15 hours ago        Up 4 seconds        0.0.0.0:8080->8080/tcp, 0.0.0.0:50000->50000/tcp   jenkins

docker ps -a   #查看所有容器

复制代码
[root@CentOS ~]# docker ps -a
CONTAINER ID        IMAGE                       COMMAND                  CREATED             STATUS                       PORTS                                              NAMES
45af2f4d1f82        docker.io/redis             "docker-entrypoint..."   15 hours ago        Exited (255) 3 minutes ago   0.0.0.0:6378->6379/tcp                             redis1
ffa152fb76cc        docker.io/redis             "docker-entrypoint..."   15 hours ago        Exited (255) 3 minutes ago   0.0.0.0:6379->6379/tcp                             redis
faa1b0be4e0b        docker.io/jenkins/jenkins   "/sbin/tini -- /us..."   15 hours ago        Up 2 minutes                 0.0.0.0:8080->8080/tcp, 0.0.0.0:50000->50000/tcp   jenkins
复制代码

 

6.启动、停止、重启、删除docker容器

根据id和name对容器进行操作

复制代码
#启动容器
docker start 
#停止容器
docker stop 
#重启容器
docker restart 
#删除容器
docker rm 
#删除所有容器
docker rm $(docker ps -a -q)
复制代码

 

7.进入docker容器

#进入容器
docker exec -it containerID /bin/bash

 

ctrl+d 退出容器且关闭

ctrl+p+q 退出容器但不关闭

 

8.查看容器日志

docker logs -f -t –tail 行数 容器名

复制代码
[root@CentOS ~]# docker logs -f -t --tail 10 redis #查看容器最后10行日志
2018-08-09T05:55:16.204504000Z 1:C 09 Aug 05:55:16.135 # oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo
2018-08-09T05:55:16.205232000Z 1:C 09 Aug 05:55:16.151 # Redis version=4.0.10, bits=64, commit=00000000, modified=0, pid=1, just started
2018-08-09T05:55:16.205822000Z 1:C 09 Aug 05:55:16.151 # Warning: no config file specified, using the default config. In order to specify a config file use redis-server /path/to/redis.conf
2018-08-09T05:55:16.206545000Z 1:M 09 Aug 05:55:16.199 * Running mode=standalone, port=6379.
2018-08-09T05:55:16.207152000Z 1:M 09 Aug 05:55:16.200 # WARNING: The TCP backlog setting of 511 cannot be enforced because /proc/sys/net/core/somaxconn is set to the lower value of 128.
2018-08-09T05:55:16.207713000Z 1:M 09 Aug 05:55:16.200 # Server initialized
2018-08-09T05:55:16.208315000Z 1:M 09 Aug 05:55:16.200 # WARNING overcommit_memory is set to 0! Background save may fail under low memory condition. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.
2018-08-09T05:55:16.209055000Z 1:M 09 Aug 05:55:16.201 # WARNING you have Transparent Huge Pages (THP) support enabled in your kernel. This will create latency and memory usage issues with Redis. To fix this issue run the command 'echo never > /sys/kernel/mm/transparent_hugepage/enabled' as root, and add it to your /etc/rc.local in order to retain the setting after a reboot. Redis must be restarted after THP is disabled.
2018-08-09T05:55:16.209806000Z 1:M 09 Aug 05:55:16.201 * DB loaded from disk: 0.000 seconds
2018-08-09T05:55:16.210405000Z 1:M 09 Aug 05:55:16.202 * Ready to accept connections
复制代码

 

9.Docker网络操作

复制代码
#在主机上创建一个网络
docker network create mynet

#查看自定义bridge网络
docker network inspect mynet

#移除网络要求网络中所有的容器关闭或断开与此网络的连接时,才能够使用移除命令
docker network disconnet mynet 容器ID

#移除网络
docker network rm mynet
复制代码

 

转自:https://www.cnblogs.com/shenh/p/9225351.html

dubbo 教程

今天 Apache Dubbo 晋升为 Apache 基金会顶级项目  祝贺!!!

dubbo地址:http://dubbo.apache.org/zh-cn/

dubbo中文文档:http://dubbo.apache.org/zh-cn/docs/user/quick-start.html

1. Dubbo是什么?

Dubbo是一个分布式服务框架,致力于提供高性能和透明化的RPC远程服务调用方案,以及SOA服务治理方案。简单的说,dubbo就是个服务框架,如果没有分布式的需求,其实是不需要用的,只有在分布式的时候,才有dubbo这样的分布式服务框架的需求,并且本质上是个服务调用的东东,说白了就是个远程服务调用的分布式框架
其核心部分包含:
1. 远程通讯: 提供对多种基于长连接的NIO框架抽象封装,包括多种线程模型,序列化,以及“请求-响应”模式的信息交换方式。
2. 集群容错: 提供基于接口方法的透明远程过程调用,包括多协议支持,以及软负载均衡,失败容错,地址路由,动态配置等集群支持。
3. 自动发现: 基于注册中心目录服务,使服务消费方能动态的查找服务提供方,使地址透明,使服务提供方可以平滑增加或减少机器。

2. Dubbo能做什么?

1.透明化的远程方法调用,就像调用本地方法一样调用远程方法,只需简单配置,没有任何API侵入。      
2.软负载均衡及容错机制,可在内网替代F5等硬件负载均衡器,降低成本,减少单点。
3. 服务自动注册与发现,不再需要写死服务提供方地址,注册中心基于接口名查询服务提供者的IP地址,并且能够平滑添加或删除服务提供者。

Dubbo采用全Spring配置方式,透明化接入应用,对应用没有任何API侵入,只需用Spring加载Dubbo的配置即可,Dubbo基于Spring的Schema扩展进行加载。

3. dubbo的架构

dubbo架构图如下所示:
 

 

 

节点角色说明:

       Provider: 暴露服务的服务提供方。

       Consumer: 调用远程服务的服务消费方。

       Registry: 服务注册与发现的注册中心。

       Monitor: 统计服务的调用次调和调用时间的监控中心。

       Container: 服务运行容器。

调用关系说明:

0 服务容器负责启动,加载,运行服务提供者。

1. 服务提供者在启动时,向注册中心注册自己提供的服务。

2. 服务消费者在启动时,向注册中心订阅自己所需的服务。

3. 注册中心返回服务提供者地址列表给消费者,如果有变更,注册中心将基于长连接推送变更数据给消费者。

4. 服务消费者,从提供者地址列表中,基于软负载均衡算法,选一台提供者进行调用,如果调用失败,再选另一台调用。

5. 服务消费者和提供者,在内存中累计调用次数和调用时间,定时每分钟发送一次统计数据到监控中心。

4. dubbo使用方法。

Dubbo采用全spring配置方式,透明化接入应用,对应用没有任何API侵入,只需用Spring加载Dubbo的配置即可,Dubbo基于Spring的Schema扩展进行加载。如果不想使用Spring配置,而希望通过API的方式进行调用(不推荐),可以参见:

http://code.alibabatech.com/wiki/display/dubbo/User+Guide-zh#UserGuide-zh-API%E9%85%8D%E7%BD%AE

下面我们就来看看spring配置方式的写法。

服务提供者:

1. 下载zookeeper注册中心,下载地址:http://www.apache.org/dyn/closer.cgi/zookeeper/  下载后解压即可,进入D:\apach-zookeeper-3.4.5\bin,

双击zkServer.cmd启动注册中心服务。

2. 定义服务接口: (该接口需单独打包,在服务提供方和消费方共享)

复制代码
package com.unj.dubbotest.provider;

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;


public class DemoService{
    
     public String sayHello(String name);
     public List getUsers() ;
}
复制代码
      在服务提供方实现接口:(对服务消费方隐藏实现)
复制代码
package com.unj.dubbotest.provider;

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;


public class DemoServiceImpl implements DemoService{
    
     public String sayHello(String name) {
            return "Hello " + name;
     }
     public List getUsers() {
         List list = new ArrayList();
         User u1 = new User();
         u1.setName("jack");
         u1.setAge(20);
         u1.setSex("男");
         
         User u2 = new User();
         u2.setName("tom");
         u2.setAge(21);
         u2.setSex("女");
         
         User u3 = new User();
         u3.setName("rose");
         u3.setAge(19);
         u3.setSex("女");
         
         list.add(u1);
         list.add(u2);
         list.add(u3);
         return list;
     }
}
复制代码

用Spring配置声明暴露服务:

复制代码


 
     
    
    
    
    
 
    
  
      
       
  
    
    
 
    
    
    
复制代码

加载Spring配置,启动服务:

复制代码
package com.unj.dubbotest.provider;

import org.springframework.context.support.ClassPathXmlApplicationContext;

public class Provider {
 
    public static void main(String[] args) throws Exception {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] {"applicationContext.xml"});
        context.start();
 
        System.in.read(); // 为保证服务一直开着,利用输入流的阻塞来模拟
    }
 
}
复制代码

服务消费者:

1.通过Spring配置引用远程服务:
复制代码



    
    

    
    
    

    
    

复制代码

2.加载Spring配置,并调用远程服务:

调用结果为:

 

dubbo管理页面:

 

应用页面:

 

转自:http://blog.csdn.net/wilsonke/article/details/39896595

   https://www.cnblogs.com/xujiming/p/5449065.html

MySQL中的存储过程、游标和存储函数

MySQL中的存储过程
首先来看两个问题:

1.什么是存储过程?

存储过程(Stored Procedure)是在数据库系统中,一组为了完成特定功能的SQL语句集,经编译后存储在数据库中,用户通过指定存储过程的名字并给出参数(如果该存储过程有参数的话)来执行它。

2.为什么要使用存储过程?

《MySQL必知必会》这本书中给出了如下几条主要理由:

1)通过把处理封装在容易使用的单元中,简化复杂的操作;

2)由于不要求反复建立一系列处理步骤,这保证了数据的完整性。 如果所有开发人员和应用程序都使用同一(试验和测试)存储过程,则所使用的代码都是相同的。这一点的延伸就是防止错误。需要执行的步骤越多,出错的可能性就越大。防止错误保证了数据的一致性;

3)简化对变动的管理。如果表名、列名或业务逻辑(或别的内容) 有变化,只需要更改存储过程的代码。使用它的人员甚至不需要知道这些变化。这一点的延伸就是安全性。通过存储过程限制对基础数据的访问减少了数据讹误(无意识的或别的原因所导致的数据讹误)的机会;

4)提高性能。存储过程只在创建时进行编译,以后每次执行存储过程都不需要再重新编译,而一般SQL语句每执行一次就编译一次,因此使用存储过程比使用单独的SQL语句要快;

5)存在一些只能用在单个请求中的MySQL元素和特性,存储过程可以使用它们来编写功能更强更灵活的代码。

总的来说,使用存储过程有3个主要的好处,即简单、安全、高性能。同时,我们也要知道存储过程的一些缺陷:存储过程的编写比基本SQL语句复杂,编写存储过程需要更高的技能,更丰富的经验;我们可能没有创建存储过程的安全访问权限,许多数据库管理员限制存储过程的创建权限,允许用户使用存储过程,但不允许他们创建存储过程。尽管有这些缺陷,但存储过程还是非常有用的。

下面进入正题:

存储过程中变量的声明与分配
在存储过程中,我们通常用已声明的变量来保存直接/间接结果。 这些变量是存储过程中的的本地变量,只能用于存储过程中,无法在其他代码块中访问。

声明变量

要在存储过程中声明一个变量,可以使用declare语句,如下所示:

declare variable_name datatype(size) default default_value;
下面来更详细地解释上面的语句:

首先,在declare关键字后面要指定变量名。变量名必须遵循MySQL表列名称的命名规则。

其次,指定变量的数据类型及其大小。变量可以有任何MySQL数据类型如int,varchar,datetime等。

最后,当声明一个变量时,它的初始值为null。但是可以使用default关键字为变量分配默认值。

例如,可以声明一个名为total_sale的变量,数据类型为int,默认值为0,如下所示:

declare total_sale int default 0;
MySQL允许使用单个declare语句声明共享相同数据类型的两个或多个变量,如下所示:

declare x, y int default 0;

我们声明了两个整数变量x和y,并将它们的默认值都设置为0。
分配变量值

当声明了一个变量后,就可以开始使用它了。要为变量分配一个值,可以使用set语句,例如:

declare total_count int default 0;

set total_count = 10;
上面语句中,把10分配给变量total_count。

除了set语句之外,还可以使用select into语句将查询的结果分配给一个变量。 如下所示:

declare total_products int default 0;
select count(*) into total_products from products;
在上面的示例中:

首先,声明一个名为total_products的变量,并将其值初始化为0;

然后,使用select into语句将products表中选择的产品数量分配给total_products变量。

存储过程的三种参数
(1)IN型参数:它是默认模式。在存储过程中定义IN参数时,调用程序必须将参数传递给存储过程。 另外,IN参数的值被保护。这意味着即使在存储过程中更改了IN参数的值,在存储过程结束后仍保留其原始值。换句话说,存储过程只使用IN参数的副本。

tips:

1)如果你使用的是mysql命令行客户端程序,默认的MySQL语句分隔符为;如果命令行实用程序要解释存储过程自身内的;字符,则它们最终不会成为存储过程的成分,这会使 存储过程中的SQL出现句法错误。解决办法是使用delimiter关键字临时更改命令行实用程序的语句分隔符。如delimiter //告诉命令行客户端程序使用//作为新的语句结束分隔符,这样,存储过程体内的;仍然保持不动,并且正确地传递给数据库引擎。最后,为恢复为原来的语句分隔符,可使用delimiter ;。除\符号外,任何字符都可以用作语句分隔符。

2)在创建一个存储过程前可以使用drop procedure if exists procedure_name语句,防止因为创建的新存储过程的名字已存在而出现错误。

示例:

建立存储过程IN_example:

delimiter //
drop procedure if exists IN_example//
create procedure IN_example (IN input_number int)
begin
set input_number = 2;
select input_number;
end//
把用户变量@input_number的初始值设为3:

set @input_number = 3//
调用存储过程IN_example,并将用户变量传入存储过程IN_example中:

call IN_example(@input_number)//

# 返回结果为:
+————–+
| input_number |
+————–+
| 2 |
+————–+

select @input_number//

# 返回结果为:
+—————+
| @input_number |
+—————+
| 3 |
+—————+
由上述结果可以看到,调用存储过程时,如果把用户变量@input_number作为IN型参数传给存储过程IN_example,因为IN参数在存储过程中被赋予了一个新值2,所以调用存储过程IN_example后的返回结果为2,但是用户变量@Input_number本身的值并没有改变,仍然是3。

(2)OUT型参数:可以在存储过程中更改OUT参数的值,并将其更改后的新值传递回调用程序。

示例:

建立一个求正整数算术平方根的存储过程my_sqrt:

drop procedure if exists my_sqrt//
create procedure my_sqrt(IN input_number int,OUT output_number float)
begin
set output_number = sqrt(input_number);
end//
把用户变量@number的初始值设为1:

set @number = 1//
调用存储过程my_sqrt:

# 把@number作为OUT型参数传递给存储过程my_sqrt
call my_sqrt(10,@number)//

#返回结果为空

select @number//

# 返回结果为:
+——————–+
| @number |
+——————–+
| 3.1622776985168457 |
+——————–+

由上述结果可以看到,调用存储过程后,用户变量@number的值由初始值1变成了10的算术平方根3.16227······

(3)INOUT型参数: INOUT参数是IN和OUT参数的组合。这意味着调用程序可以传递参数,且存储过程可以修改INOUT参数并将更改后的新值传递回调用程序。

示例:

建立一个存储过程INOUT_example:

drop procedure if exists INOUT_example//
create procedure INOUT_example(INOUT number int)
begin
set number = 20;
select number;
end//
把用户变量@inout_number的初始值设为1:

set @inout_number = 1//
调用存储过程INOUT_example,并把用户变量@inout_number作为INOUT型参数传给存储过程INOUT_example:

call INOUT_example(@inout_number)//

# 返回结果为:
+——–+
| number |
+——–+
| 20 |
+——–+

select @inout_number//

# 返回结果为:
+—————+
| @inout_number |
+—————+
| 20 |
+—————+
由上述结果可以看到,用户变量@inout_number作为INOUT型参数传给存储过程INOUT_example,当INOUT型参数在存储过程中被赋予了一个新值20时,用户变量@inout_number也由初始值1变成了20。此处的INOUT型参数的主要作用有两个:一是接受传入参数的值,二是保存传入的参数因在存储过程中经过一系列操作后而变成的新值。

补充说明:虽然存储过程有三种类型的参数,但是我们在创建存储过程的时候也可以没有参数,例如创建一个从商品表goods中查询所有商品的平均价格的存储过程,如下所示:

drop procedure if exists avg_price//
create avg_price()
begin
select avg(price) as avg_price from goods;
end//
返回多个值的存储过程
在创建返回多个值得存储过程之前,说明一下用到的数据表goods,它是一个商品信息记录表,详细字段如下所示:

+———-+——————————-+——–+———-+———–+————–+————+————-+
| goods_id | goods_name | cat_id | brand_id | goods_sn | goods_number | shop_price | click_count |
+———-+——————————-+——–+———-+———–+————–+————+————-+
| 4 | HTCN85原装充电器 | 8 | 1 | ECS000004 | 17 | 58.00 | 0 |
| 3 | HTC原装5800耳机 | 8 | 1 | ECS000002 | 25 | 68.00 | 3 |
| 5 | 索爱原装M2卡读卡器 | 11 | 7 | ECS000005 | 8 | 20.00 | 3 |
| 6 | 胜创KINGMAX内存卡 | 11 | 0 | ECS000006 | 15 | 42.00 | 0 |
| 7 | HTCN85原装立体声耳机HS-82 | 8 | 1 | ECS000007 | 20 | 100.00 | 0 |
| 8 | 飞利浦9@9v | 3 | 4 | ECS000008 | 17 | 399.00 | 9 |
| 9 | HTCE66 | 3 | 1 | ECS000009 | 13 | 2298.00 | 20 |
| 10 | 索爱C702c | 3 | 7 | ECS000010 | 7 | 1328.00 | 11 |
| 12 | 摩托罗拉A810 | 3 | 2 | ECS000012 | 8 | 983.00 | 14 |
| 13 | HTC5320 XpressMusic | 3 | 1 | ECS000013 | 8 | 1311.00 | 13 |
| 14 | HTC5800XM | 4 | 1 | ECS000014 | 4 | 2625.00 | 6 |
| 15 | 摩托罗拉A810 | 3 | 2 | ECS000015 | 3 | 788.00 | 8 |
| 16 | 恒基伟业G101 | 2 | 11 | ECS000016 | 0 | 823.33 | 3 |
| 17 | 夏新N7 | 3 | 5 | ECS000017 | 1 | 2300.00 | 2 |
| 18 | 夏新T5 | 4 | 5 | ECS000018 | 1 | 2878.00 | 0 |
| 19 | 三星SGH-F258 | 3 | 6 | ECS000019 | 0 | 858.00 | 7 |
| 20 | 三星BC01 | 3 | 6 | ECS000020 | 13 | 280.00 | 14 |
| 21 | 金立 A30 | 3 | 10 | ECS000021 | 40 | 2000.00 | 4 |
| 22 | 多普达Touch HD | 3 | 3 | ECS000022 | 0 | 5999.00 | 15 |
| 23 | HTCN96 | 5 | 1 | ECS000023 | 8 | 3700.00 | 17 |
| 25 | 小灵通/固话50元充值卡 | 13 | 0 | ECS000025 | 2 | 48.00 | 0 |
| 26 | 小灵通/固话20元充值卡 | 13 | 0 | ECS000026 | 2 | 19.00 | 0 |
| 27 | 联通100元充值卡 | 15 | 0 | ECS000027 | 2 | 95.00 | 0 |
| 28 | 联通50元充值卡 | 15 | 0 | ECS000028 | 0 | 45.00 | 0 |
| 29 | 移动100元充值卡 | 14 | 0 | ECS000029 | 0 | 90.00 | 0 |
| 30 | 移动20元充值卡 | 14 | 0 | ECS000030 | 9 | 18.00 | 1 |
| 31 | 摩托罗拉E8 | 3 | 2 | ECS000031 | 1 | 1337.00 | 5 |
| 32 | HTCN85 | 3 | 1 | ECS000032 | 1 | 3010.00 | 9 |
+———-+——————————-+——–+———-+———–+————–+————+————-+
字段说明:goods_id表示商品编号,goods_name表示商品名称,cat_id表示商品所属类别编号,brand_id表示商品所属品牌编号,goods_sn表示商品货号,goods_number表示商品库存量,shop_price表示商品的销售价格,click_count表示商品的点击量。

创建一个能够返回商品表goods中所有商品的最低价格、最高价格和平均价格的存储过程:

drop procedure if exists goods_price//
create procedure goods_price(
OUT pl decimal(6,2),
OUT ph decimal(6,2),
OUT pa decimal(6,2)
)
begin
select min(shop_price) into pl from goods;
select max(shop_price) into ph from goods;
select avg(shop_price) into pa from goods;
select pl;
select ph;
select pa;
end//

# 调用存储过程goods_price:

call goods_price(@low_price,@high_price,@average_price)//

# 返回结果为:
+——-+
| pl |
+——-+
| 18.00 |
+——-+
1 row in set (0.00 sec)

+———+
| ph |
+———+
| 5999.00 |
+———+
1 row in set (0.02 sec)

+———+
| pa |
+———+
| 1197.15 |
+———+
1 row in set (0.02 sec)
存储过程中的流程控制:
可以利用if或者case语句控制存储过程中的执行流程。

利用if语句创建一个根据金额大小判断是否打折的存储过程discounted_price:

drop procedure if exists discounted_price//
create procedure discounted_price(
IN normal_price decimal(8,2),
OUT discount_price decimal(8,2)
)
begin
if (normal_price>1000) then
set discount_price = normal_price*0.8;
elseif (normal_price>500) then
set discount_price = normal_price*0.9;
else
set discount_price = normal_price;
end if;
select discount_price as new_price;
end//
调用discounted_price并观察其返回结果:

call discounted_price(2000,@discount_price)//

# 返回结果为:
+———–+
| new_price |
+———–+
| 1600.00 |
+———–+
利用case语句创建一个根据金额大小判断是否打折的存储过程case_example:

drop procedure if exists case_example//
create procedure case_example(IN normal_price decimal(8,2),OUT discount_price decimal(8,2))
begin
case
when normal_price > 1000 then
set discount_price = normal_price * 0.8;
when normal_price > 500 then
set discount_price = normal_price * 0.9;
else
set discount_price = normal_price;
end case;
select discount_price as new_price;

end//
调用存储过程case_example并观察其返回结果:

call case_example(2000,@discount_price)//

# 返回结果为:
+———–+
| new_price |
+———–+
| 1600.00 |
+———–+
if语句和case语句的比较:

MySQL提供if和case语句用于流程控制,使我们能够根据某些条件(称为流程控制)执行一个SQL代码块。那么我们应该使用什么语句? 对于大多数开发人员,在if和case之间进行选择只是个人偏好的问题。但是,当决定使用if或case时,应该考虑以下几点:

1)当将单个表达式与唯一值的范围进行比较时,简单case语句比if语句更易读。另外,简单case语句比if语句更有效率。

关于存储过程中case语句的基本用法可以参考:https://blog.csdn.net/qq_41080850/article/details/84851263

2)当根据多个值检查复杂表达式时,if语句更容易理解。

3)如果选择使用case语句,则必须确保至少有一个case条件匹配。否则,需要定义一个错误处理程序来捕获错误。if语句则不需要处理错误。关于存储过程中的错误处理,可以参考:

https://www.yiibai.com/mysql/error-handling-in-stored-procedures.html、

https://www.cnblogs.com/shijiaqi1066/p/3435037.html

4)在某些情况下,if和case混合使用反而使存储过程更加可读和高效。

存储过程中的循环
loop循环

有两个语句可以用于控制loop循环:

1)leave语句用于立即退出循环,而无需等待检查条件。leave语句的工作原理类似于C/C++,java等语言中的break语句。

2)iterate语句允许跳过剩下的整个代码并开始新的迭代。iterate语句类似C/C++,Java等语言中的continue语句。

示例1:

创建存储过程loop_example1,它可以用于条件计数:

drop procedure if exists loop_example1//
create procedure loop_example1()
begin
declare counter int default 0;
my_loop:loop
if counter < 10 then
set counter = counter + 1;
else
leave my_loop;
end if;
end loop my_loop;
select counter;
end//
调用存储过程loop_example1并观察其返回结果:

call loop_example1()//

# 返回结果为
+———+
| counter |
+———+
| 10 |
+———+
示例2:

创建存储过程loop_example2,它的功能是找出20以内不为0的偶数:

drop procedure if exists loop_example2//
create procedure loop_example2()
begin
declare x int default 0;
declare strings varchar(100) default ”;
my_loop:loop
if x > 20 then
leave my_loop;
end if;
set x = x + 1;
if mod(x,2) then
iterate my_loop;
else
set strings = concat(strings,x,’,’);
iterate my_loop;
end if;
end loop;
select strings;
end//
调用存储过程loop_example2并观察其返回结果:

call loop_example2()//

# 返回结果为:
+—————————-+
| strings |
+—————————-+
| 2,4,6,8,10,12,14,16,18,20, |
+—————————-+
while循环

while语句的语法如下:

while expression do
statements;
end while
while循环在每次迭代开始时检查表达式。 如果expression为True,MySQL将执行while和end while之间的语句,直到expression为false。 while循环称为预先测试条件循环,因为它总是在执行前检查语句的表达式。

示例:

创建存储过程while_example,它也可以用于条件计数:

drop procedure if exists while_example//
create procedure while_example()
begin
declare counter int default 0;
while counter < 10 do
set counter = counter + 1;
end while;
select counter;
end//
调用存储过程while_example并观察其返回结果:

call while_example()//

# 返回结果为:
+———+
| counter |
+———+
| 10 |
+———+

repeat循环

repeat循环语句的语法如下:

repeat
statements;
until expression # 注意until语句后面是没有标点符号的
end repeat
首先,MySQL执行语句statements,然后评估求值表达式expression。如果表达式expression的计算结果为false,则MySQL将重复执行该语句,直到该表达式计算结果为True。因为repeat循环语句在执行语句后检查表达式expression,因此repeat循环语句也称为测试后循环。

示例:

创建存储过程repeat_example,它也可以用于条件计数:

drop procedure if exists repeat_example//
create procedure repeat_example()
begin
declare counter int default 0;
repeat
set counter = counter + 1;
until counter > 10
end repeat;
select counter;
end//
调用存储过程repeat_example并观察其返回结果:

call repeat_example()//

# 返回结果为:
+———+
| counter |
+———+
| 11 |
+———+
MySQL中的游标
说明:不像其他DBMS,MySQL中的游标只能用于存储过程和存储函数。

关于MySQL游标的理解:

MySQL中的游标可以理解成一个可迭代对象(类比Python中的列表、字典等可迭代对象),它可以用来存储select 语句查询到的结果集,这个结果集可以包含多行数据,从而使我们可以使用迭代的方法从游标中依次取出每行数据。

MySQL游标的特点:

1)只读:无法通过光标更新基础表中的数据。

2)不可滚动:只能按照select语句确定的顺序获取行。不能以相反的顺序获取行。 此外,不能跳过行或跳转到结果集中的特定行。

3)敏感:有两种游标:敏感游标和不敏感游标。敏感游标指向实际数据,不敏感游标使用数据的临时副本。敏感游标比一个不敏感的游标执行得更快,因为它不需要临时拷贝数据。MySQL游标是敏感的。

MySQL游标的使用方法:

首先,必须使用declare语句声明游标:

declare cursor_name cursor for select_statement;
游标声明必须在变量声明之后。如果在变量声明之前声明游标,MySQL将会发出一个错误。游标必须始终与select语句相关联。

接下来,使用open语句打开游标。open语句初始化游标的结果集,因此必须在从结果集中提取行之前调用open语句。

open cursor_name;
然后,使用fetch语句来检索游标指向的一行数据,并将游标移动到结果集中的下一行。

fetch cursor_name into variable_name;
之后,可以检查是否有任何行记录可用,然后再提取它。

最后,调用close语句来停用游标并释放与之关联的内存,如下所示:

close cursor_name
当游标不再使用时,应该关闭它。

当使用MySQL游标时,还必须声明一个not found处理程序来处理当游标找不到任何行时的情况。 因为每次调用fetch语句时,游标会尝试依次读取结果集中的每一行数据。 当游标到达结果集的末尾时,它将无法获得数据,并且会产生一个条件。 处理程序用于处理这种情况。

要声明一个not found处理程序,参考以下语法:

declare continue handler for not found set finished = 1;
finished是一个变量,指示游标到达结果集的结尾。注意,处理程序声明必须出现在存储过程中的变量和游标声明之后。

示例:

# 先创建一个根据商品名称获取商品价格的存储过程get_shop_price

drop procedure if exists get_shop_price//

create procedure get_shop_price(IN name varchar(20),OUT price decimal(6,2))
begin
select shop_price into price from goods where goods_name = name;
end//

# 再创建一个获取商品表goods中所有价格大于指定值的商品名称和价格,并把结果存入一张新建的goodsnames表中

drop procedure if exists build_goodsname_list//

create procedure build_goodsname_list(IN input_price decimal(6,2))
begin
declare finished int default 0;
declare name varchar(20) default ”;
declare price decimal(6,2) default 0;

— declare the cursor
declare goods_cursor cursor for select goods_name from goods where shop_price>input_price;

— declare continue handler
declare continue handler for not found set finished = 1;

— drop table goodsnames
drop table goodsnames;

— create a table to store the results
create table goodsnames(
goods_name varchar(20) not null default ”,
shop_price decimal(6,2) not null default 0
)engine myisam charset utf8;

— open the cursor
open goods_cursor;

— fetch all rows in the cursor
repeat
— get goods_name
fetch goods_cursor into name;

if not finished then

— get the shop_price for this goods_name
call get_shop_price(name,price);

— insert name and price into goodsnames
insert into goodsnames values(name,price);

end if;

— end of repeat loop
until finished
end repeat;

— close the cursor
close goods_cursor;

select * from goodsnames;
end//
调用存储过程build_goodsname_list,并观察其返回结果:

call build_goodsname_list(1000)//

# 返回结果为:
+————————–+————+
| goods_name | shop_price |
+————————–+————+
| HTCE66 | 2298.00 |
| 索爱C702c | 1328.00 |
| HTC5320 XpressMusic | 1311.00 |
| HTC5800XM | 2625.00 |
| 夏新N7 | 2300.00 |
| 夏新T5 | 2878.00 |
| 金立 A30 | 2000.00 |
| 多普达Touch HD | 5999.00 |
| HTCN96 | 3700.00 |
| 摩托罗拉E8 | 1337.00 |
| HTCN85 | 3010.00 |
+————————–+————+

call build_goodsname_list(2000)//

# 返回结果为:
+————————-+————+
| goods_name | shop_price |
+————————-+————+
| HTCE66 | 2298.00 |
| HTC5800XM | 2625.00 |
| 夏新N7 | 2300.00 |
| 夏新T5 | 2878.00 |
| 多普达Touch HD | 5999.00 |
| HTCN96 | 3700.00 |
| HTCN85 | 3010.00 |
+————————-+————+
MySQL中的存储函数
存储的函数是返回单个值的特殊类型的存储程序。我们使用存储的函数来封装在SQL语句或存储的程序中可重用的常用公式或业务规则。

与存储过程不同,我们可以在SQL语句中使用存储的函数,也可以在表达式中使用。 这有助于提高程序代码的可读性和可维护性。

存储函数语法

create function function_name(param1,param2,…)
returns datatype
[not] deterministic
statements;
上述代码的详细解释:

首先,在create function语句之后指定存储函数的名称。

其次,列出括号内存储函数的所有参数。 默认情况下,所有参数均为IN参数。不能为参数指定IN,OUT或INOUT修饰符。

第三,必须在returns语句中指定返回值的数据类型。它可以是任何有效的MySQL数据类型。

第四,对于相同的输入参数,如果存储的函数返回相同的结果,这样则被认为是确定性的,否则存储的函数不是确定性的。必须决定一个存储函数是否是确定性的。 如果声明不正确,则存储的函数可能会产生意想不到的结果,或者不能使用可用的优化,从而降低性能。

第五,将代码写入存储函数的主体中。 它可以是单个语句或复合语句。 在主体部分中,必须至少指定一个return语句。return语句用于返回一个值给调用者。 每当到达return语句时,存储函数的执行将立即终止。

 示例:

创建一个存储函数goods_level,它的功能是根据给定的商品价格确定商品所属的价格分类:

drop function if exists goods_level//

create function goods_level(p_shop_price decimal(6,2))
returns varchar(20)
deterministic
begin
declare g_level varchar(20);
if p_shop_price > 2000 then
set g_level = ‘high_level’;
elseif (p_shop_price>=1000 and p_shop_price<=2000) then
set g_level = ‘middle_level’;
else
set g_level = ‘low_level’;
end if;
return (g_level);
end//
在select语句中调用goods_level函数:

select goods_id,goods_name,shop_price,goods_level(shop_price) as price_level from goods//

# 返回结果为:
+———-+——————————-+————+————–+
| goods_id | goods_name | shop_price | price_level |
+———-+——————————-+————+————–+
| 4 | HTCN85原装充电器 | 58.00 | low_level |
| 3 | HTC原装5800耳机 | 68.00 | low_level |
| 5 | 索爱原装M2卡读卡器 | 20.00 | low_level |
| 6 | 胜创KINGMAX内存卡 | 42.00 | low_level |
| 7 | HTCN85原装立体声耳机HS-82 | 100.00 | low_level |
| 8 | 飞利浦9@9v | 399.00 | low_level |
| 9 | HTCE66 | 2298.00 | high_level |
| 10 | 索爱C702c | 1328.00 | middle_level |
| 12 | 摩托罗拉A810 | 983.00 | low_level |
| 13 | HTC5320 XpressMusic | 1311.00 | middle_level |
| 14 | HTC5800XM | 2625.00 | high_level |
| 15 | 摩托罗拉A810 | 788.00 | low_level |
| 16 | 恒基伟业G101 | 823.33 | low_level |
| 17 | 夏新N7 | 2300.00 | high_level |
| 18 | 夏新T5 | 2878.00 | high_level |
| 19 | 三星SGH-F258 | 858.00 | low_level |
| 20 | 三星BC01 | 280.00 | low_level |
| 21 | 金立 A30 | 2000.00 | middle_level |
| 22 | 多普达Touch HD | 5999.00 | high_level |
| 23 | HTCN96 | 3700.00 | high_level |
| 25 | 小灵通/固话50元充值卡 | 48.00 | low_level |
| 26 | 小灵通/固话20元充值卡 | 19.00 | low_level |
| 27 | 联通100元充值卡 | 95.00 | low_level |
| 28 | 联通50元充值卡 | 45.00 | low_level |
| 29 | 移动100元充值卡 | 90.00 | low_level |
| 30 | 移动20元充值卡 | 18.00 | low_level |
| 31 | 摩托罗拉E8 | 1337.00 | middle_level |
| 32 | HTCN85 | 3010.00 | high_level |
+———-+——————————-+————+————–+
建立存储过程get_price_level,在存储过程中调用存储函数goods_level,使得存储过程get_goods_level能根据给定的goods_id确定对应商品的价格所属的分类:

drop procedure if exists get_price_level//

create procedure get_price_level(IN g_goods_id int,OUT g_price_level varchar(20))
begin
declare g_shop_price decimal(6,2);
select shop_price into g_shop_price from goods where goods_id = g_goods_id;
select goods_level(g_shop_price) into g_price_level;
select g_price_level as price_level;
end//
测试存储过程get_price_level:

call get_price_level(3,@price_level)//

# 返回结果为:
+————-+
| price_level |
+————-+
| low_level |
+————-+

call get_price_level(9,@price_level)//

# 返回结果为:
+————-+
| price_level |
+————-+
| high_level |
+————-+

call get_price_level(21,@price_level)//

# 返回结果为:
+————–+
| price_level |
+————–+
| middle_level |
+————–+

# 将以上测试结果与前文在select语句中调用goods_level后的返回结果比较可知测试结果是正确的。

 

存储过程与存储函数的比较

 

1、总述
存储函数和存储过程统称为存储例程(stored routine)。两者的定义语法很相似,但却是不同的内容。
存储函数限制比较多,比如不能用临时表,只能用表变量。还有一些函数都不可用等等。而存储过程的限制相对就比较少。
一般来说,存储过程实现的功能要复杂一点,而函数的实现的功能针对性比较强。
2、返回值上的不同
存储函数将向调用者返回一个且仅返回一个结果值。
存储过程将返回一个或多个结果集(函数做不到这一点),或者只是来实现某种效果或动作而无需返回值。
3、调用方式上的不同
存储函数嵌入在sql中使用的,可以在select中调用,就像内建函数一样,比如cos()、hex()
存储过程只能通过call语句进行调用
4、参数的不同
存储函数的参数类型类似于IN参数
存储过程的参数类型有三种、IN参数、OUT参数、INOUT参数

参考:https://blog.csdn.net/qq_32444825/article/details/79170109

 

其他参考:

https://www.cnblogs.com/gavin110-lgy/p/5772577.html

https://blog.csdn.net/JQ_AK47/article/details/52087484

https://blog.csdn.net/myweishanli/article/details/41245923

https://www.yiibai.com/mysql/stored-procedure.html#article-start

https://blog.csdn.net/Maple1997/article/details/79390797

《MySQL必知必会》——Ben·Forta

 

转自:https://blog.csdn.net/qq_41080850/article/details/85064850

Mysql索引底层数据结构与算法

索引是什么

  • 索引是帮助MySQL高效获取数据的排好序的数据结构。
  • 索引存储在文件里
  • 补充知识:
磁盘存取原理:
* 寻道时间(速度慢,费时)
* 旋转时间(速度较快)

磁盘IO读取效率:
* 单次IO读取是N个页的大小,读取数据量大于N个页就需要分页读取。
  • 索引的数据结构
    • 二叉树
      • 有可能出现worst-case,如果输入序列已经排序,则时间复杂度为O(N)
    • 红黑树
      • 解决了二叉树的缺点,但是在数据量大的情况下,会出现分层很深的情况,影响查询效率。
    • hash
      • 查询效率很高,但是无法实现范围查询
    • B树
      • 【特点】度(Degree)-节点的数据存储个数
      • 【特点】叶节点具有相同的深度
      • 【特点】叶节点的指针为空
      • 【特点】节点中的数据key从左到右递增排列
      • 【缺点】每个节点存储key+具体数据,而度涉及到IO读取,故如果度设计的很大就会影响IO读取效率。由于该限制,只能限制度的大小,使得深度无法控制。
    • B+树
      • 非叶子节点不存储data,只存储key,可以增大度
      • 叶子节点不存储指针
      • 顺序访问指针,提高区间访问的性能
      • 优点
        • 一般使用磁盘I/O次数评价索引结构的优劣
        • 预读:磁盘一般会顺序向后读取一定长度的数据(页的整数倍)放入内存
        • 局部性原理:当一个数据被用到时,其附近的数据也通常会马上被使用
        • B+Tree节点的大小设为等于一个页,每次新建节点直接申请一个页的空间,这样就保证一个节点物理上也存储在一个页里,就实现了一个节点的载入只需一次I/O
        • B+Tree的度d一般会超过100,因此h非常小(一般为3到5之间)
        • 【最佳实践】使用自增的主键
          • a.自增型主键以利于插入性能的提高;
          • b.自增型主键设计(int,bigint)可以降低二级索引的空间,提升二级索引的内存命中率;
          • c.自增型的主键可以减小page的碎片,提升空间和内存的使用。

存储引擎

  • 存储引擎是表的属性。
  • MyISAM是非聚集的索引实现
    • 索引文件和数据文件是分开的两个文件
    • 索引中叶子节点存储的是数据的地址
      • 主键索引和其他索引都是一样的,叶子节点指向的是数据文件中对应的地址
  • InnoDB是聚集的索引实现
    • 索引和数据在同一个文件中
    • 数据的存储是建立在主键索引结构中,叶子节点是具体的数据。故必须要有主键
    • 其他索引的叶子节点是主键的值
    • 【好处】一致性和节省存储空间

 

转自:https://www.cnblogs.com/xxxuwentao/p/9769942.html