如何建设高可用系统

“高可用性”(High Availability)通常来描述一个系统经过专门的设计,从而减少停工时间,而保持其服务的高度可用性。以下是高可用系统的设计建议:

 

减少单点

去单点首先要识别整个系统所有主链路的单点,如机房(同城异地双机房),应用服务器,DNS服务器,SFTP服务器,LBS,缓存服务器,数据库,消息服务器,代理服务器和专线等,如系统通过专线调用对方服务,需要考虑同时拉联通和电信的专线,联通或电信的专线还是有一定概率会出现问题的,但是同时出问题的概率会小非常多。优先使用软负载,使用硬负载兜底。

 

减少依赖

减少DNS依赖,减少远程服务依赖,DNS依赖可以尝试设置本地host,用工具给所有服务器推送最新的域名映射关系,通过本地缓存或近端服务减少RPC调用。

 

限制循环

避免无限死循环,导致CPU利用率百分百,可以设置for循环的最大循环次数,如最大循环1000次。

 

控制流量

避免异常流量对应用服务器产生影响,可以对指定服务设置流量限制,如QPS,TPS,QPH(每小时总请求量)和QPD(每天总请求量)。

 

精准监控

对CPU利用率,load,内存,带宽,系统调用量,应用错误量,PV,UV和业务量进行监控,避免内存泄露和异常代码对系统产生影响,配置监控一定要精准,如平时内存利用率是50%,监控可以配置成60%进行报警,这样可以提前感知内存泄露问题,避免应用无响应。

 

无状态

服务器不能保存用户状态数据,如在集群环境下不能用static变量保存用户数据,不能长时间把用户文件存放在服务器本地。服务器有状态会难以扩容,且出现单点问题。

 

容量规划

定期对容量进行评估。如大促前进行压测和容量预估,根据需要进行扩容。

 

功能开关

打开和关闭某些功能,比如消息量过大,系统处理不了,把开关打开后直接丢弃消息不处理。上线新功能增加开关,如果有问题关闭新功能。

 

设置超时

设置连接超时和读超时设置,不应该太大,如果是内部调用连接超时可以设置成1秒,读超时3秒,外部系统调用连接超时可以设置成3秒,读超时设置成20秒。

 

重试策略

当调用外部服务异常时可以设置重试策略,每次重试时间递增,但是需要设置最大重试次数和重试开关,避免对下游系统产生影响。

 

隔离

应用隔离,模块隔离,机房隔离和线程池隔离。可以按照优先级,不变和变几个维度来隔离应用和模块,如抽象和不变的代码放在一个模块,这个模块的代码几乎不会修改,可用性高,经常变的业务逻辑放在一个模块里,这样就算有问题,也只会影响到某一个业务。不同的业务使用不同的线程池,避免低优先级任务阻塞高优先级,或高优先级任务过多时影响低优先级任务永远不会执行。

 

异步调用

同步调用改成异步调用,解决远程调用故障或调用超时对系统的影响。

 

热点缓存

对热点数据进行缓存,降低RPC调用。如B系统提供名单服务,B系统可以提供一个client SDK提供近端缓存服务,定期去服务器端取数据,减少RPC调用。
缓存容灾 – 当数据库不可用时可以使用缓存的数据。并设置分级缓存,如优先读本地缓存,其次读分布式缓存。

 

分级缓存

优先读本地缓存,其次读分布式缓存。通过推模式更新本地缓存。

 

系统分级

对系统进行分级,如ABC三个等级,高级别系统不依赖于低级别系统,并且高级别系统比底级别系统高可用率要高。

 

服务降级

如果系统出现响应缓慢等状况,可以关闭部分功能,从而释放系统资源,保证核心服务的正常运行。需要识别哪些服务可以降级,比如突然有大量消息流入,导致服务不可用,我们会把消息直接丢弃掉。或通过设置流控,拒绝为低级别系统提供服务。

 

流量蓄洪

当流量陡增时,可以将请求进行蓄洪,如把请求保存在数据库中,再按照指定的QPS进行泄洪,有效的保护下游系统,也保证了服务的可用性。当调用对方系统,对方系统响应缓慢或无响应时,可采取自动蓄洪。

 

服务权重

在集群环境中,可自动识别高性能服务,拒绝调用性能低的服务。如在集群环境中,对调用超时的服务器进行权重降低,优先调用权重高的服务器。

 

依赖简化

减少系统之间的依赖,比如使用消息驱动,A和B系统通过消息服务器传递数据,A和B系统使用数据库进行读写分离,A系统负责往数据库中写数据,B系统负责读数据,因为数据存放在数据库中,当A不可用时,短时间内不影响B系统提供服务。

 

弹性扩容

根据资源的使用率自动或手动进行扩容。如带宽不够用时,快速增加带宽。

 

灰度和回滚

发布新功能只让部分服务器生效,且观察几天逐渐切流,如果出现问题只影响部分客户。出现问题快速回滚,或者直接下线灰度的机器。

 

减少远程调用

优先调用本地JVM内服务,其次是同机房服务,然后是同城服务,最后是跨城服务。如A调用B,B调用互联网的C系统获取数据,B系统可以把数据缓存起来,并设置数据的保鲜度,减少B对C的依赖。配置中心把注册服务的地址推送到调用服务的系统本地。参数中心把参数配置信息推送到系统的本地内存,而不是让系统去远程服务器获取参数信息。

 

熔断机制

增加熔断机制,当监控出线上数据出现大幅跌涨时,及时中断,避免对业务产生更大影响。如我们做指标计算时,指标可以计算慢,但是不能算错,如果发现某个用户的指标环比或同比增长一倍或跌零,会考虑保存所有消息,并中止该用户的指标计算。

 

运行时加载模块

我们会把经常变的业务代码变成一个个业务模块,使用Java的ClassLoader在运行时动态加载和卸载模块,当某个模块有问题时候,可以快速修复。

 

代码扫描

使用IDEA代码分析等工具进行代码扫描,识别出程序中的BUG,如空指针异常,循环依赖等。

 

自动备份

程序,系统配置和数据定期进行备份。可使用linux命令和shell脚本定时执行备份策略,自动进行本地或异地。出现问题时能快速重新部署。

 

线上压测

系统的对外服务需要进行压测,知道该服务能承受的QPS和TPS,从而做出相对准确的限流。

转自:https://www.cnblogs.com/modou/p/9661022.html

详解Condition的await和signal等待/通知机制

1.Condition简介

任何一个java对象都天然继承于Object类,在线程间实现通信的往往会应用到Object的几个方法,比如wait(),wait(long timeout),wait(long timeout, int nanos)与notify(),notifyAll()几个方法实现等待/通知机制,同样的, 在java Lock体系下依然会有同样的方法实现等待/通知机制。从整体上来看Object的wait和notify/notify是与对象监视器配合完成线程间的等待/通知机制,而Condition与Lock配合完成等待通知机制,前者是java底层级别的,后者是语言级别的,具有更高的可控制性和扩展性。两者除了在使用方式上不同外,在功能特性上还是有很多的不同:

  1. Condition能够支持不响应中断,而通过使用Object方式不支持;
  2. Condition能够支持多个等待队列(new 多个Condition对象),而Object方式只能支持一个;
  3. Condition能够支持超时时间的设置,而Object不支持

参照Object的wait和notify/notifyAll方法,Condition也提供了同样的方法:

针对Object的wait方法

  1. void await() throws InterruptedException:当前线程进入等待状态,如果其他线程调用condition的signal或者signalAll方法并且当前线程获取Lock从await方法返回,如果在等待状态中被中断会抛出被中断异常;
  2. long awaitNanos(long nanosTimeout):当前线程进入等待状态直到被通知,中断或者超时
  3. boolean await(long time, TimeUnit unit)throws InterruptedException:同第二种,支持自定义时间单位
  4. boolean awaitUntil(Date deadline) throws InterruptedException:当前线程进入等待状态直到被通知,中断或者到了某个时间

针对Object的notify/notifyAll方法

  1. void signal():唤醒一个等待在condition上的线程,将该线程从等待队列中转移到同步队列中,如果在同步队列中能够竞争到Lock则可以从等待方法中返回。
  2. void signalAll():与1的区别在于能够唤醒所有等待在condition上的线程

2.Condition实现原理分析

2.1 等待队列

要想能够深入的掌握condition还是应该知道它的实现原理,现在我们一起来看看condiiton的源码。创建一个condition对象是通过lock.newCondition(),而这个方法实际上是会new出一个ConditionObject对象,该类是AQS(AQS的实现原理的文章)的一个内部类,有兴趣可以去看看。前面我们说过,condition是要和lock配合使用的也就是condition和Lock是绑定在一起的,而lock的实现原理又依赖于AQS,自然而然ConditionObject作为AQS的一个内部类无可厚非。我们知道在锁机制的实现上,AQS内部维护了一个同步队列,如果是独占式锁的话,所有获取锁失败的线程的尾插入到同步队列,同样的,condition内部也是使用同样的方式,内部维护了一个 等待队列,所有调用condition.await方法的线程会加入到等待队列中,并且线程状态转换为等待状态。另外注意到ConditionObject中有两个成员变量:

/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;

这样我们就可以看出来ConditionObject通过持有等待队列的头尾指针来管理等待队列。主要注意的是Node类复用了在AQS中的Node类,其节点状态和相关属性可以去看AQS的实现原理的文章,如果您仔细看完这篇文章对condition的理解易如反掌,对lock体系的实现也会有一个质的提升。Node类有这样一个属性:

//后继节点
Node nextWaiter;

进一步说明,等待队列是一个单向队列,而在之前说AQS时知道同步队列是一个双向队列。接下来我们用一个demo,通过debug进去看是不是符合我们的猜想:

public static void main(String[] args) {
    for (int i = 0; i < 10; i++) {
        Thread thread = new Thread(() -> {
            lock.lock();
            try {
                condition.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                lock.unlock();
            }
        });
        thread.start();
    }
}

这段代码没有任何实际意义,甚至很臭,只是想说明下我们刚才所想的。新建了10个线程,没有线程先获取锁,然后调用condition.await方法释放锁将当前线程加入到等待队列中,通过debug控制当走到第10个线程的时候查看firstWaiter即等待队列中的头结点,debug模式下情景图如下:

 

 

debug模式下情景图

从这个图我们可以很清楚的看到这样几点:1. 调用condition.await方法后线程依次尾插入到等待队列中,如图队列中的线程引用依次为Thread-0,Thread-1,Thread-2….Thread-8;2. 等待队列是一个单向队列。通过我们的猜想然后进行实验验证,我们可以得出等待队列的示意图如下图所示:

 

 

等待队列的示意图

同时还有一点需要注意的是:我们可以多次调用lock.newCondition()方法创建多个condition对象,也就是一个lock可以持有多个等待队列。而在之前利用Object的方式实际上是指在对象Object对象监视器上只能拥有一个同步队列和一个等待队列,而并发包中的Lock拥有一个同步队列和多个等待队列。示意图如下:

 

 

AQS持有多个Condition.png

如图所示,ConditionObject是AQS的内部类,因此每个ConditionObject能够访问到AQS提供的方法,相当于每个Condition都拥有所属同步器的引用。

2.2 await实现原理

当调用condition.await()方法后会使得当前获取lock的线程进入到等待队列,如果该线程能够从await()方法返回的话一定是该线程获取了与condition相关联的lock。接下来,我们还是从源码的角度去看,只有熟悉了源码的逻辑我们的理解才是最深的。await()方法源码为:

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    // 1. 将当前线程包装成Node,尾插入到等待队列中
    Node node = addConditionWaiter();
    // 2. 释放当前线程所占用的lock,在释放的过程中会唤醒同步队列中的下一个节点
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        // 3. 当前线程进入到等待状态
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 4. 自旋等待获取到同步状态(即获取到lock)
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    // 5. 处理被中断的情况
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

代码的主要逻辑请看注释,我们都知道当当前线程调用condition.await()方法后,会使得当前线程释放lock然后加入到等待队列中,直至被signal/signalAll后会使得当前线程从等待队列中移至到同步队列中去,直到获得了lock后才会从await方法返回,或者在等待时被中断会做中断处理。那么关于这个实现过程我们会有这样几个问题:1. 是怎样将当前线程添加到等待队列中去的?2.释放锁的过程?3.怎样才能从await方法退出?而这段代码的逻辑就是告诉我们这三个问题的答案。具体请看注释,在第1步中调用addConditionWaiter将当前线程添加到等待队列中,该方法源码为:

private Node addConditionWaiter() {
    Node t = lastWaiter;
    // If lastWaiter is cancelled, clean out.
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    //将当前线程包装成Node
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        firstWaiter = node;
    else
        //尾插入
        t.nextWaiter = node;
    //更新lastWaiter
    lastWaiter = node;
    return node;
}

这段代码就很容易理解了,将当前节点包装成Node,如果等待队列的firstWaiter为null的话(等待队列为空队列),则将firstWaiter指向当前的Node,否则,更新lastWaiter(尾节点)即可。就是通过尾插入的方式将当前线程封装的Node插入到等待队列中即可,同时可以看出等待队列是一个不带头结点的链式队列,之前我们学习AQS时知道同步队列是一个带头结点的链式队列,这是两者的一个区别。将当前节点插入到等待对列之后,会使当前线程释放lock,由fullyRelease方法实现,fullyRelease源码为:

final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        int savedState = getState();
        if (release(savedState)) {
            //成功释放同步状态
            failed = false;
            return savedState;
        } else {
            //不成功释放同步状态抛出异常
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}

这段代码就很容易理解了,调用AQS的模板方法release方法释放AQS的同步状态并且唤醒在同步队列中头结点的后继节点引用的线程,如果释放成功则正常返回,若失败的话就抛出异常。到目前为止,这两段代码已经解决了前面的两个问题的答案了,还剩下第三个问题,怎样从await方法退出?现在回过头再来看await方法有这样一段逻辑:

while (!isOnSyncQueue(node)) {
    // 3. 当前线程进入到等待状态
    LockSupport.park(this);
    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
        break;
}

很显然,当线程第一次调用condition.await()方法时,会进入到这个while()循环中,然后通过LockSupport.park(this)方法使得当前线程进入等待状态,那么要想退出这个await方法第一个前提条件自然而然的是要先退出这个while循环,出口就只剩下两个地方:1. 逻辑走到break退出while循环;2. while循环中的逻辑判断为false。再看代码出现第1种情况的条件是当前等待的线程被中断后代码会走到break退出,第二种情况是当前节点被移动到了同步队列中(即另外线程调用的condition的signal或者signalAll方法),while中逻辑判断为false后结束while循环。总结下,就是当前线程被中断或者调用condition.signal/condition.signalAll方法当前节点移动到了同步队列后 ,这是当前线程退出await方法的前提条件。当退出while循环后就会调用acquireQueued(node, savedState),这个方法在介绍AQS的底层实现时说过了,若感兴趣的话可以去看这篇文章,该方法的作用是在自旋过程中线程不断尝试获取同步状态,直至成功(线程获取到lock)。这样也说明了退出await方法必须是已经获得了condition引用(关联)的lock。到目前为止,开头的三个问题我们通过阅读源码的方式已经完全找到了答案,也对await方法的理解加深。await方法示意图如下图:

 

 

await方法示意图

如图,调用condition.await方法的线程必须是已经获得了lock,也就是当前线程是同步队列中的头结点。调用该方法后会使得当前线程所封装的Node尾插入到等待队列中。

超时机制的支持

condition还额外支持了超时机制,使用者可调用方法awaitNanos,awaitUtil。这两个方法的实现原理,基本上与AQS中的tryAcquire方法如出一辙,关于tryAcquire可以仔细阅读这篇文章的第3.4部分

不响应中断的支持

要想不响应中断可以调用condition.awaitUninterruptibly()方法,该方法的源码为:

public final void awaitUninterruptibly() {
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    boolean interrupted = false;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if (Thread.interrupted())
            interrupted = true;
    }
    if (acquireQueued(node, savedState) || interrupted)
        selfInterrupt();
}

这段方法与上面的await方法基本一致,只不过减少了对中断的处理,并省略了reportInterruptAfterWait方法抛被中断的异常。

2.3 signal/signalAll实现原理

调用condition的signal或者signalAll方法可以将等待队列中等待时间最长的节点移动到同步队列中,使得该节点能够有机会获得lock。按照等待队列是先进先出(FIFO)的,所以等待队列的头节点必然会是等待时间最长的节点,也就是每次调用condition的signal方法是将头节点移动到同步队列中。我们来通过看源码的方式来看这样的猜想是不是对的,signal方法源码为:

public final void signal() {
    //1. 先检测当前线程是否已经获取lock
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    //2. 获取等待队列中第一个节点,之后的操作都是针对这个节点
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

signal方法首先会检测当前线程是否已经获取lock,如果没有获取lock会直接抛出异常,如果获取的话再得到等待队列的头指针引用的节点,之后的操作的doSignal方法也是基于该节点。下面我们来看看doSignal方法做了些什么事情,doSignal方法源码为:

private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        //1. 将头结点从等待队列中移除
        first.nextWaiter = null;
        //2. while中transferForSignal方法对头结点做真正的处理
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

具体逻辑请看注释,真正对头节点做处理的逻辑在transferForSignal放,该方法源码为:

final boolean transferForSignal(Node node) {
    /*
     * If cannot change waitStatus, the node has been cancelled.
     */
    //1. 更新状态为0
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    /*
     * Splice onto queue and try to set waitStatus of predecessor to
     * indicate that thread is (probably) waiting. If cancelled or
     * attempt to set waitStatus fails, wake up to resync (in which
     * case the waitStatus can be transiently and harmlessly wrong).
     */
    //2.将该节点移入到同步队列中去
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

关键逻辑请看注释,这段代码主要做了两件事情1.将头结点的状态更改为CONDITION;2.调用enq方法,将该节点尾插入到同步队列中,关于enq方法请看AQS的底层实现这篇文章。现在我们可以得出结论:调用condition的signal的前提条件是当前线程已经获取了lock,该方法会使得等待队列中的头节点即等待时间最长的那个节点移入到同步队列,而移入到同步队列后才有机会使得等待线程被唤醒,即从await方法中的LockSupport.park(this)方法中返回,从而才有机会使得调用await方法的线程成功退出。signal执行示意图如下图:

 

 

signal执行示意图

signalAll

sigllAll与sigal方法的区别体现在doSignalAll方法上,前面我们已经知道doSignal方法只会对等待队列的头节点进行操作,,而doSignalAll的源码为:

private void doSignalAll(Node first) {
    lastWaiter = firstWaiter = null;
    do {
        Node next = first.nextWaiter;
        first.nextWaiter = null;
        transferForSignal(first);
        first = next;
    } while (first != null);
}

该方法只不过时间等待队列中的每一个节点都移入到同步队列中,即“通知”当前调用condition.await()方法的每一个线程。

3. await与signal/signalAll的结合思考

文章开篇提到等待/通知机制,通过使用condition提供的await和signal/signalAll方法就可以实现这种机制,而这种机制能够解决最经典的问题就是“生产者与消费者问题”,关于“生产者消费者问题”之后会用单独的一篇文章进行讲解,这也是面试的高频考点。await和signal和signalAll方法就像一个开关控制着线程A(等待方)和线程B(通知方)。它们之间的关系可以用下面一个图来表现得更加贴切:

 

 

condition下的等待通知机制.png

如图,线程awaitThread先通过lock.lock()方法获取锁成功后调用了condition.await方法进入等待队列,而另一个线程signalThread通过lock.lock()方法获取锁成功后调用了condition.signal或者signalAll方法,使得线程awaitThread能够有机会移入到同步队列中,当其他线程释放lock后使得线程awaitThread能够有机会获取lock,从而使得线程awaitThread能够从await方法中退出执行后续操作。如果awaitThread获取lock失败会直接进入到同步队列

3. 一个例子

我们用一个很简单的例子说说condition的用法:

public class AwaitSignal {
    private static ReentrantLock lock = new ReentrantLock();
    private static Condition condition = lock.newCondition();
    private static volatile boolean flag = false;

    public static void main(String[] args) {
        Thread waiter = new Thread(new waiter());
        waiter.start();
        Thread signaler = new Thread(new signaler());
        signaler.start();
    }

    static class waiter implements Runnable {

        @Override
        public void run() {
            lock.lock();
            try {
                while (!flag) {
                    System.out.println(Thread.currentThread().getName() + "当前条件不满足等待");
                    try {
                        condition.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println(Thread.currentThread().getName() + "接收到通知条件满足");
            } finally {
                lock.unlock();
            }
        }
    }

    static class signaler implements Runnable {

        @Override
        public void run() {
            lock.lock();
            try {
                flag = true;
                condition.signalAll();
            } finally {
                lock.unlock();
            }
        }
    }
}

输出结果为:

Thread-0当前条件不满足等待
Thread-0接收到通知,条件满足

开启了两个线程waiter和signaler,waiter线程开始执行的时候由于条件不满足,执行condition.await方法使该线程进入等待状态同时释放锁,signaler线程获取到锁之后更改条件,并通知所有的等待线程后释放锁。这时,waiter线程获取到锁,并由于signaler线程更改了条件此时相对于waiter来说条件满足,继续执行。

参考文献

《java并发编程的艺术》

转自:https://www.jianshu.com/p/28387056eeb4

从源码角度彻底理解ReentrantLock(重入锁)

转自:https://www.cnblogs.com/takumicx/p/9402021.html

目录

  • 1.前言
  • 2.AbstractQueuedSynchronizer介绍
    • 2.1 AQS是构建同步组件的基础
    • 2.2 AQS的内部结构(ReentrantLock的语境下)
  • 3 非公平模式加锁流程
    • 3.1加锁流程真正意义上的入口
    • 3.2 尝试获取锁的通用方法:tryAcquire()
    • 3.3 获取锁失败的线程如何安全的加入同步队列:addWaiter()
    • 3.4 线程加入同步队列后会做什么:acquireQueued()
    • 3.5 加锁流程源码总结
  • 4.非公平模式解锁流程
    • 4.1 解锁流程源码解读
    • 4.2 解锁流程源码总结
  • 5. 公平锁相比非公平锁的不同
  • 6. 一些疑问的解答
    • 6.1 为什么基于FIFO的同步队列可以实现非公平锁?
    • 6.2 为什么非公平锁性能好
  • 7. 阅读源码的收获

1.前言

ReentrantLock(重入锁)功能详解和应用演示这篇文章里我们讲解并演示了ReentrantLock(重入锁)的各种功能,其中就谈到ReentrantLock可以有公平锁和非公平锁的不同实现,只要在构造它的时候传入不同的布尔值,继续跟进下源码我们就能发现,关键在于实例化内部变量sync的方式不同,如下所示

/**
 * Creates an instance of {@code ReentrantLock} with the
 * given fairness policy.
 *
 * @param fair {@code true} if this lock should use a fair ordering policy
 */
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

公平锁内部是FairSync,非公平锁内部是NonfairSync。而不管是FairSync还是NonfariSync,都间接继承自AbstractQueuedSynchronizer这个抽象类,如下图所示

  • NonfairSync的类继承关系

  • FairSync的类继承关系

该抽象类为我们的加锁和解锁过程提供了统一的模板方法,只是一些细节的处理由该抽象类的实现类自己决定。所以在解读ReentrantLock(重入锁)的源码之前,有必要了解下AbstractQueuedSynchronizer。

2.AbstractQueuedSynchronizer介绍

2.1 AQS是构建同步组件的基础

AbstractQueuedSynchronizer,简称AQS,为构建不同的同步组件(重入锁,读写锁,CountDownLatch等)提供了可扩展的基础框架,如下图所示。

AQS以模板方法模式在内部定义了获取和释放同步状态的模板方法,并留下钩子函数供子类继承时进行扩展,由子类决定在获取和释放同步状态时的细节,从而实现满足自身功能特性的需求。除此之外,AQS通过内部的同步队列管理获取同步状态失败的线程,向实现者屏蔽了线程阻塞和唤醒的细节。

2.2 AQS的内部结构(ReentrantLock的语境下)

AQS的内部结构主要由同步等待队列构成

2.2.1 同步等待队列

AQS中同步等待队列的实现是一个带头尾指针(这里用指针表示引用是为了后面讲解源码时可以更直观形象,况且引用本身是一种受限的指针)且不带哨兵结点(后文中的头结点表示队列首元素结点,不是指哨兵结点)的双向链表。

/**
 * Head of the wait queue, lazily initialized.  Except for
 * initialization, it is modified only via method setHead.  Note:
 * If head exists, its waitStatus is guaranteed not to be
 * CANCELLED.
 */
private transient volatile Node head;//指向队列首元素的头指针

/**
 * Tail of the wait queue, lazily initialized.  Modified only via
 * method enq to add new wait node.
 */
private transient volatile Node tail;//指向队列尾元素的尾指针

head是头指针,指向队列的首元素;tail是尾指针,指向队列的尾元素。而队列的元素结点Node定义在AQS内部,主要有如下几个成员变量

volatile Node prev; //指向前一个结点的指针

volatile Node next; //指向后一个结点的指针
volatile Thread thread; //当前结点代表的线程
volatile int waitStatus; //等待状态

  • prev:指向前一个结点的指针
  • next:指向后一个结点的指针
  • thread:当前结点表示的线程,因为同步队列中的结点内部封装了之前竞争锁失败的线程,故而结点内部必然有一个对应线程实例的引用
  • waitStatus:对于重入锁而言,主要有3个值。0:初始化状态;-1(SIGNAL):当前结点表示的线程在释放锁后需要唤醒后续节点的线程;1(CANCELLED):在同步队列中等待的线程等待超时或者被中断,取消继续等待。

同步队列的结构如下图所示

为了接下来能够更好的理解加锁和解锁过程的源码,对该同步队列的特性进行简单的讲解:

  • 1.同步队列是个先进先出(FIFO)队列,获取锁失败的线程将构造结点并加入队列的尾部,并阻塞自己。如何才能线程安全的实现入队是后面讲解的重点,毕竟我们在讲锁的实现,这部分代码肯定是不能用锁的。
  • 2.队列首结点可以用来表示当前正获取锁的线程。
  • 3.当前线程释放锁后将尝试唤醒后续处结点中处于阻塞状态的线程。

为了加深理解,还可以在阅读源码的过程中思考下这个问题:

这个同步队列是FIFO队列,也就是说先在队列中等待的线程将比后面的线程更早的得到锁,那ReentrantLock是如何基于这个FIFO队列实现非公平锁的?

2.2.2 AQS中的其他数据结构(ReentrantLock的语境下)

  • 同步状态变量
/**
 * The synchronization state.
 */
private volatile int state;

这是一个带volatile前缀的int值,是一个类似计数器的东西。在不同的同步组件中有不同的含义。以ReentrantLock为例,state可以用来表示该锁被线程重入的次数。当state为0表示该锁不被任何线程持有;当state为1表示线程恰好持有该锁1次(未重入);当state大于1则表示锁被线程重入state次。因为这是一个会被并发访问的量,为了防止出现可见性问题要用volatile进行修饰。

  • 持有同步状态的线程标志
/**
 * The current owner of exclusive mode synchronization.
 */
private transient Thread exclusiveOwnerThread;

如注释所言,这是在独占同步模式下标记持有同步状态线程的。ReentrantLock就是典型的独占同步模式,该变量用来标识锁被哪个线程持有。


了解AQS的主要结构后,就可以开始进行ReentrantLock的源码解读了。由于非公平锁在实际开发中用的比较多,故以讲解非公平锁的源码为主。以下面这段对非公平锁使用的代码为例:

/**
 * @author: takumiCX
 * @create: 2018-08-01
 **/
public class NoFairLockTest {


    public static void main(String[] args) {

        //创建非公平锁
        ReentrantLock lock = new ReentrantLock(false);

        try {

            //加锁
            lock.lock();

            //模拟业务处理用时
            TimeUnit.SECONDS.sleep(1);

        } catch (InterruptedException e) {
            e.printStackTrace();

        } finally {
            //释放锁
            lock.unlock();
        }

    }
}

3 非公平模式加锁流程

加锁流程从lock.lock()开始

public void lock() {
    sync.lock();
}

进入该源码,正确找到sycn的实现类后可以看到真正有内容的入口方法

3.1加锁流程真正意义上的入口

/**
 * Performs lock.  Try immediate barge, backing up to normal
 * acquire on failure.
 */
//加锁流程真正意义上的入口
final void lock() {
    //以cas方式尝试将AQS中的state从0更新为1
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());//获取锁成功则将当前线程标记为持有锁的线程,然后直接返回
    else
        acquire(1);//获取锁失败则执行该方法
}

首先尝试快速获取锁,以cas的方式将state的值更新为1,只有当state的原值为0时更新才能成功,因为state在ReentrantLock的语境下等同于锁被线程重入的次数,这意味着只有当前锁未被任何线程持有时该动作才会返回成功。若获取锁成功,则将当前线程标记为持有锁的线程,然后整个加锁流程就结束了。若获取锁失败,则执行acquire方法

/**
 * Acquires in exclusive mode, ignoring interrupts.  Implemented
 * by invoking at least once {@link #tryAcquire},
 * returning on success.  Otherwise the thread is queued, possibly
 * repeatedly blocking and unblocking, invoking {@link
 * #tryAcquire} until success.  This method can be used
 * to implement method {@link Lock#lock}.
 *
 * @param arg the acquire argument.  This value is conveyed to
 *        {@link #tryAcquire} but is otherwise uninterpreted and
 *        can represent anything you like.
 */
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

该方法主要的逻辑都在if判断条件中,这里面有3个重要的方法tryAcquire(),addWaiter()和acquireQueued(),这三个方法中分别封装了加锁流程中的主要处理逻辑,理解了这三个方法到底做了哪些事情,整个加锁流程就清晰了。

3.2 尝试获取锁的通用方法:tryAcquire()

tryAcquire是AQS中定义的钩子方法,如下所示

protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

该方法默认会抛出异常,强制同步组件通过扩展AQS来实现同步功能的时候必须重写该方法,ReentrantLock在公平和非公平模式下对此有不同实现,非公平模式的实现如下:

protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}

底层调用了nonfairTryAcquire()
从方法名上我们就可以知道这是非公平模式下尝试获取锁的方法,具体方法实现如下

/**
 * Performs non-fair tryLock.  tryAcquire is implemented in
 * subclasses, but both need nonfair try for trylock method.
 */
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();//获取当前线程实例
    int c = getState();//获取state变量的值,即当前锁被重入的次数
    if (c == 0) {   //state为0,说明当前锁未被任何线程持有
        if (compareAndSetState(0, acquires)) { //以cas方式获取锁
            setExclusiveOwnerThread(current);  //将当前线程标记为持有锁的线程
            return true;//获取锁成功,非重入
        }
    }
    else if (current == getExclusiveOwnerThread()) { //当前线程就是持有锁的线程,说明该锁被重入了
        int nextc = c + acquires;//计算state变量要更新的值
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);//非同步方式更新state值
        return true;  //获取锁成功,重入
    }
    return false;     //走到这里说明尝试获取锁失败
}

这是非公平模式下获取锁的通用方法。它囊括了当前线程在尝试获取锁时的所有可能情况:

  • 1.当前锁未被任何线程持有(state=0),则以cas方式获取锁,若获取成功则设置exclusiveOwnerThread为当前线程,然后返回成功的结果;若cas失败,说明在得到state=0和cas获取锁之间有其他线程已经获取了锁,返回失败结果。
  • 2.若锁已经被当前线程获取(state>0,exclusiveOwnerThread为当前线程),则将锁的重入次数加1(state+1),然后返回成功结果。因为该线程之前已经获得了锁,所以这个累加操作不用同步。
  • 3.若当前锁已经被其他线程持有(state>0,exclusiveOwnerThread不为当前线程),则直接返回失败结果

因为我们用state来统计锁被线程重入的次数,所以当前线程尝试获取锁的操作是否成功可以简化为:state值是否成功累加1,是则尝试获取锁成功,否则尝试获取锁失败。

其实这里还可以思考一个问题:nonfairTryAcquire已经实现了一个囊括所有可能情况的尝试获取锁的方式,为何在刚进入lock方法时还要通过compareAndSetState(0, 1)去获取锁,毕竟后者只有在锁未被任何线程持有时才能执行成功,我们完全可以把compareAndSetState(0, 1)去掉,对最后的结果不会有任何影响。这种在进行通用逻辑处理之前针对某些特殊情况提前进行处理的方式在后面还会看到,一个直观的想法就是它能提升性能,而代价是牺牲一定的代码简洁性。

退回到上层的acquire方法,

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&  //当前线程尝试获取锁,若获取成功返回true,否则false
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))  //只有当前线程获取锁失败才会执行者这部分代码
        selfInterrupt();
}

tryAcquire(arg)返回成功,则说明当前线程成功获取了锁(第一次获取或者重入),由取反和&&可知,整个流程到这结束,只有当前线程获取锁失败才会执行后面的判断。先来看addWaiter(Node.EXCLUSIVE)
部分,这部分代码描述了当线程获取锁失败时如何安全的加入同步等待队列。这部分代码可以说是整个加锁流程源码的精华,充分体现了并发编程的艺术性。

3.3 获取锁失败的线程如何安全的加入同步队列:addWaiter()

这部分逻辑在addWaiter()方法中

/**
 * Creates and enqueues node for current thread and given mode.
 *
 * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
 * @return the new node
 */
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);//首先创建一个新节点,并将当前线程实例封装在内部,mode这里为null
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);//入队的逻辑这里都有
    return node;
}

首先创建了一个新节点,并将当前线程实例封装在其内部,之后我们直接看enq(node)方法就可以了,中间这部分逻辑在enq(node)中都有,之所以加上这部分“重复代码”和尝试获取锁时的“重复代码”一样,对某些特殊情况
进行提前处理,牺牲一定的代码可读性换取性能提升。

/**
 * Inserts node into queue, initializing if necessary. See picture above.
 * @param node the node to insert
 * @return node's predecessor
 */
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;//t指向当前队列的最后一个节点,队列为空则为null
        if (t == null) { // Must initialize  //队列为空
            if (compareAndSetHead(new Node())) //构造新结点,CAS方式设置为队列首元素,当head==null时更新成功
                tail = head;//尾指针指向首结点
        } else {  //队列不为空
            node.prev = t;
            if (compareAndSetTail(t, node)) { //CAS将尾指针指向当前结点,当t(原来的尾指针)==tail(当前真实的尾指针)时执行成功
                t.next = node;    //原尾结点的next指针指向当前结点
                return t;
            }
        }
    }
}

这里有两个CAS操作:

  • compareAndSetHead(new Node()),CAS方式更新head指针,仅当原值为null时更新成功
/**
 * CAS head field. Used only by enq.
 */
private final boolean compareAndSetHead(Node update) {
    return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
  • compareAndSetTail(t, node),CAS方式更新tial指针,仅当原值为t时更新成功
/**
 * CAS tail field. Used only by enq.
 */
private final boolean compareAndSetTail(Node expect, Node update) {
    return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}

外层的for循环保证了所有获取锁失败的线程经过失败重试后最后都能加入同步队列。因为AQS的同步队列是不带哨兵结点的,故当队列为空时要进行特殊处理,这部分在if分句中。注意当前线程所在的结点不能直接插入
空队列,因为阻塞的线程是由前驱结点进行唤醒的。故先要插入一个结点作为队列首元素,当锁释放时由它来唤醒后面被阻塞的线程,从逻辑上这个队列首元素也可以表示当前正获取锁的线程,虽然并不一定真实持有其线程实例。

首先通过new Node()创建一个空结点,然后以CAS方式让头指针指向该结点(该结点并非当前线程所在的结点),若该操作成功,则将尾指针也指向该结点。这部分的操作流程可以用下图表示

当队列不为空,则执行通用的入队逻辑,这部分在else分句中

else {
            node.prev = t;//step1:待插入结点pre指针指向原尾结点
            if (compareAndSetTail(t, node)) { step2:CAS方式更改尾指针
                t.next = node; //原尾结点next指针指向新的结点
                return t;
            }
        }

首先当前线程所在的结点的前向指针pre指向当前线程认为的尾结点,源码中用t表示。然后以CAS的方式将尾指针指向当前结点,该操作仅当tail=t,即尾指针在进行CAS前未改变时成功。若CAS执行成功,则将原尾结点的后向指针next指向新的尾结点。整个过程如下图所示

整个入队的过程并不复杂,是典型的CAS加失败重试的乐观锁策略。其中只有更新头指针和更新尾指针这两步进行了CAS同步,可以预见高并发场景下性能是非常好的。但是本着质疑精神我们不禁会思考下这么做真的线程安全吗?

  • 1.队列为空的情况:
    因为队列为空,故head=tail=null,假设线程执行2成功,则在其执行3之前,因为tail=null,其他进入该方法的线程因为head不为null将在2处不停的失败,所以3即使没有同步也不会有线程安全问题。
  • 2.队列不为空的情况:
    假设线程执行5成功,则此时4的操作必然也是正确的(当前结点的prev指针确实指向了队列尾结点,换句话说tail指针没有改变,如若不然5必然执行失败),又因为4执行成功,当前节点在队列中的次序已经确定了,所以6何时执行对线程安全不会有任何影响,比如下面这种情况

为了确保真的理解了它,可以思考这个问题:把enq方法图中的4放到5之后,整个入队的过程还线程安全吗?

到这为止,获取锁失败的线程加入同步队列的逻辑就结束了。但是线程加入同步队列后会做什么我们并不清楚,这部分在acquireQueued方法中

3.4 线程加入同步队列后会做什么:acquireQueued()

先看acquireQueued方法的源码

/**
 * Acquires in exclusive uninterruptible mode for thread already in
 * queue. Used by condition wait methods as well as acquire.
 *
 * @param node the node
 * @param arg the acquire argument
 * @return {@code true} if interrupted while waiting
 */
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        //死循环,正常情况下线程只有获得锁才能跳出循环
        for (;;) {
            final Node p = node.predecessor();//获得当前线程所在结点的前驱结点
            //第一个if分句
            if (p == head && tryAcquire(arg)) { 
                setHead(node); //将当前结点设置为队列头结点
                p.next = null; // help GC
                failed = false;
                return interrupted;//正常情况下死循环唯一的出口
            }
            //第二个if分句
            if (shouldParkAfterFailedAcquire(p, node) &&  //判断是否要阻塞当前线程
                parkAndCheckInterrupt())      //阻塞当前线程
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

这段代码主要的内容都在for循环中,这是一个死循环,主要有两个if分句构成。第一个if分句中,当前线程首先会判断前驱结点是否是头结点,如果是则尝试获取锁,获取锁成功则会设置当前结点为头结点(更新头指针)。为什么必须前驱结点为头结点才尝试去获取锁?因为头结点表示当前正占有锁的线程,正常情况下该线程释放锁后会通知后面结点中阻塞的线程,阻塞线程被唤醒后去获取锁,这是我们希望看到的。然而还有一种情况,就是前驱结点取消了等待,此时当前线程也会被唤醒,这时候就不应该去获取锁,而是往前回溯一直找到一个没有取消等待的结点,然后将自身连接在它后面。一旦我们成功获取了锁并成功将自身设置为头结点,就会跳出for循环。否则就会执行第二个if分句:确保前驱结点的状态为SIGNAL,然后阻塞当前线程。

先来看shouldParkAfterFailedAcquire(p, node),从方法名上我们可以大概猜出这是判断是否要阻塞当前线程的,方法内容如下

/**
 * Checks and updates status for a node that failed to acquire.
 * Returns true if thread should block. This is the main signal
 * control in all acquire loops.  Requires that pred == node.prev.
 *
 * @param pred node's predecessor holding status
 * @param node the node
 * @return {@code true} if thread should block
 */
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL) //状态为SIGNAL

        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
    if (ws > 0) { //状态为CANCELLED,
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else { //状态为初始化状态(ReentrentLock语境下)
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

可以看到针对前驱结点pred的状态会进行不同的处理

  • 1.pred状态为SIGNAL,则返回true,表示要阻塞当前线程。
  • 2.pred状态为CANCELLED,则一直往队列头部回溯直到找到一个状态不为CANCELLED的结点,将当前节点node挂在这个结点的后面。
  • 3.pred的状态为初始化状态,此时通过compareAndSetWaitStatus(pred, ws, Node.SIGNAL)方法将pred的状态改为SIGNAL。

其实这个方法的含义很简单,就是确保当前结点的前驱结点的状态为SIGNAL,SIGNAL意味着线程释放锁后会唤醒后面阻塞的线程。毕竟,只有确保能够被唤醒,当前线程才能放心的阻塞。

但是要注意只有在前驱结点已经是SIGNAL状态后才会执行后面的方法立即阻塞,对应上面的第一种情况。其他两种情况则因为返回false而重新执行一遍
for循环。这种延迟阻塞其实也是一种高并发场景下的优化,试想我如果在重新执行循环的时候成功获取了锁,是不是线程阻塞唤醒的开销就省了呢?

最后我们来看看阻塞线程的方法parkAndCheckInterrupt

shouldParkAfterFailedAcquire返回true表示应该阻塞当前线程,则会执行parkAndCheckInterrupt方法,这个方法比较简单,底层调用了LockSupport来阻塞当前线程,源码如下:

/**
 * Convenience method to park and then check if interrupted
 *
 * @return {@code true} if interrupted
 */
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

该方法内部通过调用LockSupport的park方法来阻塞当前线程,不清楚LockSupport的可以看看这里。LockSupport功能简介及原理浅析

下面通过一张流程图来说明线程从加入同步队列到成功获取锁的过程

概括的说,线程在同步队列中会尝试获取锁,失败则被阻塞,被唤醒后会不停的重复这个过程,直到线程真正持有了锁,并将自身结点置于队列头部。

3.5 加锁流程源码总结

ReentrantLock非公平模式下的加锁流程如下

4.非公平模式解锁流程

4.1 解锁流程源码解读

解锁的源码相对简单,源码如下:

public void unlock() {
    sync.release(1);  
}
public final boolean release(int arg) {
    if (tryRelease(arg)) { //释放锁(state-1),若释放后锁可被其他线程获取(state=0),返回true
        Node h = head;
        //当前队列不为空且头结点状态不为初始化状态(0)   
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);  //唤醒同步队列中被阻塞的线程
        return true;
    }
    return false;
}

正确找到sync的实现类,找到真正的入口方法,主要内容都在一个if语句中,先看下判断条件tryRelease方法

protected final boolean tryRelease(int releases) {
    int c = getState() - releases; //计算待更新的state值
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) { //待更新的state值为0,说明持有锁的线程未重入,一旦释放锁其他线程将能获取
        free = true; 
        setExclusiveOwnerThread(null);//清除锁的持有线程标记
    }
    setState(c);//更新state值
    return free;
}

tryRelease其实只是将线程持有锁的次数减1,即将state值减1,若减少后线程将完全释放锁(state值为0),则该方法将返回true,否则返回false。由于执行该方法的线程必然持有锁,故该方法不需要任何同步操作。
若当前线程已经完全释放锁,即锁可被其他线程使用,则还应该唤醒后续等待线程。不过在此之前需要进行两个条件的判断:

  • h!=null是为了防止队列为空,即没有任何线程处于等待队列中,那么也就不需要进行唤醒的操作
  • h.waitStatus != 0是为了防止队列中虽有线程,但该线程还未阻塞,由前面的分析知,线程在阻塞自己前必须设置前驱结点的状态为SIGNAL,否则它不会阻塞自己。

接下来就是唤醒线程的操作,unparkSuccessor(h)源码如下

private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

一般情况下只要唤醒后继结点的线程就行了,但是后继结点可能已经取消等待,所以从队列尾部往前回溯,找到离头结点最近的正常结点,并唤醒其线程。

4.2 解锁流程源码总结

5. 公平锁相比非公平锁的不同

公平锁模式下,对锁的获取有严格的条件限制。在同步队列有线程等待的情况下,所有线程在获取锁前必须先加入同步队列。队列中的线程按加入队列的先后次序获得锁。
从公平锁加锁的入口开始,

对比非公平锁,少了非重入式获取锁的方法,这是第一个不同点

接着看获取锁的通用方法tryAcquire(),该方法在线程未进入队列,加入队列阻塞前和阻塞后被唤醒时都会执行。

在真正CAS获取锁之前加了判断,内容如下

public final boolean hasQueuedPredecessors() {
    // The correctness of this depends on head being initialized
    // before tail and on head.next being accurate if the current
    // thread is first in queue.
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

从方法名我们就可知道这是判断队列中是否有优先级更高的等待线程,队列中哪个线程优先级最高?由于头结点是当前获取锁的线程,队列中的第二个结点代表的线程优先级最高。
那么我们只要判断队列中第二个结点是否存在以及这个结点是否代表当前线程就行了。这里分了两种情况进行探讨:

  1. 第二个结点已经完全插入,但是这个结点是否就是当前线程所在结点还未知,所以通过s.thread != Thread.currentThread()进行判断,如果为true,说明第二个结点代表其他线程。
  2. 第二个结点并未完全插入,我们知道结点入队一共分三步:
  • 1.待插入结点的pre指针指向原尾结点
  • 2.CAS更新尾指针
  • 3.原尾结点的next指针指向新插入结点

所以(s = h.next) == null 就是用来判断2刚执行成功但还未执行3这种情况的。这种情况第二个结点必然属于其他线程。
以上两种情况都会使该方法返回true,即当前有优先级更高的线程在队列中等待,那么当前线程将不会执行CAS操作去获取锁,保证了线程获取锁的顺序与加入同步队列的顺序一致,很好的保证了公平性,但也增加了获取锁的成本。

6. 一些疑问的解答

6.1 为什么基于FIFO的同步队列可以实现非公平锁?

由FIFO队列的特性知,先加入同步队列等待的线程会比后加入的线程更靠近队列的头部,那么它将比后者更早的被唤醒,它也就能更早的得到锁。从这个意义上,对于在同步队列中等待的线程而言,它们获得锁的顺序和加入同步队列的顺序一致,这显然是一种公平模式。然而,线程并非只有在加入队列后才有机会获得锁,哪怕同步队列中已有线程在等待,非公平锁的不公平之处就在于此。回看下非公平锁的加锁流程,线程在进入同步队列等待之前有两次抢占锁的机会:

  • 第一次是非重入式的获取锁,只有在当前锁未被任何线程占有(包括自身)时才能成功;
  • 第二次是在进入同步队列前,包含所有情况的获取锁的方式。

只有这两次获取锁都失败后,线程才会构造结点并加入同步队列等待。而线程释放锁时是先释放锁(修改state值),然后才唤醒后继结点的线程的。试想下这种情况,线程A已经释放锁,但还没来得及唤醒后继线程C,而这时另一个线程B刚好尝试获取锁,此时锁恰好不被任何线程持有,它将成功获取锁而不用加入队列等待。线程C被唤醒尝试获取锁,而此时锁已经被线程B抢占,故而其获取失败并继续在队列中等待。整个过程如下图所示

如果以线程第一次尝试获取锁到最后成功获取锁的次序来看,非公平锁确实很不公平。因为在队列中等待很久的线程相比还未进入队列等待的线程并没有优先权,甚至竞争也处于劣势:在队列中的线程要等待其他线程唤醒,在获取锁之前还要检查前驱结点是否为头结点。在锁竞争激烈的情况下,在队列中等待的线程可能迟迟竞争不到锁。这也就非公平在高并发情况下会出现的饥饿问题。那我们再开发中为什么大多使用会导致饥饿的非公平锁?很简单,因为它性能好啊。

6.2 为什么非公平锁性能好

非公平锁对锁的竞争是抢占式的(队列中线程除外),线程在进入等待队列前可以进行两次尝试,这大大增加了获取锁的机会。这种好处体现在两个方面:

  • 1.线程不必加入等待队列就可以获得锁,不仅免去了构造结点并加入队列的繁琐操作,同时也节省了线程阻塞唤醒的开销,线程阻塞和唤醒涉及到线程上下文的切换和操作系统的系统调用,是非常耗时的。在高并发情况下,如果线程持有锁的时间非常短,短到线程入队阻塞的过程超过线程持有并释放锁的时间开销,那么这种抢占式特性对并发性能的提升会更加明显。

  • 2.减少CAS竞争。如果线程必须要加入阻塞队列才能获取锁,那入队时CAS竞争将变得异常激烈,CAS操作虽然不会导致失败线程挂起,但不断失败重试导致的对CPU的浪费也不能忽视。除此之外,加锁流程中至少有两处通过将某些特殊情况提前来减少CAS操作的竞争,增加并发情况下的性能。一处就是获取锁时将非重入的情况提前,如下图所示

另一处就是入队的操作,将同步队列非空的情况提前处理

这两部分的代码在之后的通用逻辑处理中都有,很显然属于重复代码,但因为避免了执行无意义的流程代码,比如for循环,获取同步状态等,高并发场景下也能减少CAS竞争失败的可能。

7. 阅读源码的收获

    • 1.熟悉了ReentrantLock的内部构造以及加锁和解锁的流程,理解了非公平锁和公平锁实现的本质区别以及为何前者相比后者有更好的性能。以此为基础,我们可以更好的使用ReentrantLock。
    • 2.通过对部分实现细节的学习,了解了如何以CAS算法构建无锁的同步队列,我们可以借鉴并以此来构建自己的无锁的并发容器。

MySQL 分库分表及其平滑扩容方案

转自:https://kefeng.wang/2018/07/22/mysql-sharding/ 

 

众所周知,数据库很容易成为应用系统的瓶颈。单机数据库的资源和处理能力有限,在高并发的分布式系统中,可采用分库分表突破单机局限。本文总结了分库分表的相关概念、全局ID的生成策略、分片策略、平滑扩容方案、以及流行的方案。

1 分库分表概述
在业务量不大时,单库单表即可支撑。
当数据量过大存储不下、或者并发量过大负荷不起时,就要考虑分库分表。

1.1 分库分表相关术语
读写分离: 不同的数据库,同步相同的数据,分别只负责数据的读和写;
分区: 指定分区列表达式,把记录拆分到不同的区域中(必须是同一服务器,可以是不同硬盘),应用看来还是同一张表,没有变化;
分库:一个系统的多张数据表,存储到多个数据库实例中;
分表: 对于一张多行(记录)多列(字段)的二维数据表,又分两种情形:
(1) 垂直分表: 竖向切分,不同分表存储不同的字段,可以把不常用或者大容量、或者不同业务的字段拆分出去;
(2) 水平分表(最复杂): 横向切分,按照特定分片算法,不同分表存储不同的记录。
1.2 真的要采用分库分表?
需要注意的是,分库分表会为数据库维护和业务逻辑带来一系列复杂性和性能损耗,除非预估的业务量大到万不得已,切莫过度设计、过早优化。
规划期内的数据量和性能问题,尝试能否用下列方式解决:

当前数据量:如果没有达到几百万,通常无需分库分表;
数据量问题:增加磁盘、增加分库(不同的业务功能表,整表拆分至不同的数据库);
性能问题:升级CPU/内存、读写分离、优化数据库系统配置、优化数据表/索引、优化 SQL、分区、数据表的垂直切分;
如果仍未能奏效,才考虑最复杂的方案:数据表的水平切分。
2 全局ID生成策略
2.1 自动增长列
优点:数据库自带功能,有序,性能佳。
缺点:单库单表无妨,分库分表时如果没有规划,ID可能重复。解决方案:

2.1.1 设置自增偏移和步长
## 假设总共有 10 个分表
## 级别可选: SESSION(会话级), GLOBAL(全局)
SET @@SESSION.auto_increment_offset = 1; ## 起始值, 分别取值为 1~10
SET @@SESSION.auto_increment_increment = 10; ## 步长增量
1
2
3
4
如果采用该方案,在扩容时需要迁移已有数据至新的所属分片。

2.1.2 全局ID映射表
在全局 Redis 中为每张数据表创建一个 ID 的键,记录该表当前最大 ID;
每次申请 ID 时,都自增 1 并返回给应用;
Redis 要定期持久至全局数据库。

2.2 UUID(128位)
在一台机器上生成的数字,它保证对在同一时空中的所有机器都是唯一的。通常平台会提供生成UUID的API。
UUID 由4个连字号(-)将32个字节长的字符串分隔后生成的字符串,总共36个字节长。形如:550e8400-e29b-41d4-a716-446655440000。
UUID 的计算因子包括:以太网卡地址、纳秒级时间、芯片ID码和许多可能的数字。
UUID 是个标准,其实现有几种,最常用的是微软的 GUID(Globals Unique Identifiers)。

优点:简单,全球唯一;
缺点:存储和传输空间大,无序,性能欠佳。

2.3 COMB(组合)
参考资料:The Cost of GUIDs as Primary Keys
组合 GUID(10字节) 和时间(6字节),达到有序的效果,提高索引性能。

2.4 Snowflake(雪花) 算法
参考资料:twitter/snowflake,Snowflake 算法详解
Snowflake 是 Twitter 开源的分布式 ID 生成算法,其结果为 long(64bit) 的数值。
其特性是各节点无需协调、按时间大致有序、且整个集群各节点单不重复。
该数值的默认组成如下(符号位之外的三部分允许个性化调整):

1bit: 符号位,总是 0(为了保证数值是正数)。
41bit: 毫秒数(可用 69 年);
10bit: 节点ID(5bit数据中心 + 5bit节点ID,支持 32 * 32 = 1024 个节点)
12bit: 流水号(每个节点每毫秒内支持 4096 个 ID,相当于 409万的 QPS,相同时间内如 ID 遇翻转,则等待至下一毫秒)
3 分片策略
3.1 连续分片
根据特定字段(比如用户ID、订单时间)的范围,值在该区间的,划分到特定节点。
优点:集群扩容后,指定新的范围落在新节点即可,无需进行数据迁移。
缺点:如果按时间划分,数据热点分布不均(历史数冷当前数据热),导致节点负荷不均。

3.3 ID取模分片
缺点:扩容后需要迁移数据。

3.2 一致性Hash算法
优点:扩容后无需迁移数据。

3.4 Snowflake 分片
优点:扩容后无需迁移数据。

4 分库分表引入的问题
4.1 分布式事务
参见 分布式事务的解决方案
由于两阶段/三阶段提交对性能损耗大,可改用事务补偿机制。

4.2 跨节点 JOIN
对于单库 JOIN,MySQL 原生就支持;
对于多库,出于性能考虑,不建议使用 MySQL 自带的 JOIN,可以用以下方案避免跨节点 JOIN:

全局表: 一些稳定的共用数据表,在各个数据库中都保存一份;
字段冗余: 一些常用的共用字段,在各个数据表中都保存一份;
应用组装:应用获取数据后再组装。
另外,某个 ID 的用户信息在哪个节点,他的关联数据(比如订单)也在哪个节点,可以避免分布式查询。

4.3 跨节点聚合
只能在应用程序端完成。
但对于分页查询,每次大量聚合后再分页,性能欠佳。

4.4 节点扩容
节点扩容后,新的分片规则导致数据所属分片有变,因而需要迁移数据。

5 节点扩容方案
相关资料: 数据库秒级平滑扩容架构方案

5.1 常规方案
如果增加的节点数和扩容操作没有规划,那么绝大部分数据所属的分片都有变化,需要在分片间迁移:

预估迁移耗时,发布停服公告;
停服(用户无法使用服务),使用事先准备的迁移脚本,进行数据迁移;
修改为新的分片规则;
启动服务器。
5.2 免迁移扩容
采用双倍扩容策略,避免数据迁移。扩容前每个节点的数据,有一半要迁移至一个新增节点中,对应关系比较简单。
具体操作如下(假设已有 2 个节点 A/B,要双倍扩容至 A/A2/B/B2 这 4 个节点):

无需停止应用服务器;
新增两个数据库 A2/B2 作为从库,设置主从同步关系为:A=>A2、B=>B2,直至主从数据同步完毕(早期数据可手工同步);
调整分片规则并使之生效:
原 ID%2=0 => A 改为 ID%4=0 => A, ID%4=2 => A2;
原 ID%2=1 => B 改为 ID%4=1 => B, ID%4=3 => B2。
解除数据库实例的主从同步关系,并使之生效;
此时,四个节点的数据都已完整,只是有冗余(多存了和自己配对的节点的那部分数据),择机清除即可(过后随时进行,不影响业务)。
6 分库分表方案
6.1 代理层方式
部署一台代理服务器伪装成 MySQL 服务器,代理服务器负责与真实 MySQL 节点的对接,应用程序只和代理服务器对接。对应用程序是透明的。
比如 MyCAT,官网,源码,参考文档:MyCAT+MySQL 读写分离部署
MyCAT 后端可以支持 MySQL, SQL Server, Oracle, DB2, PostgreSQL等主流数据库,也支持MongoDB这种新型NoSQL方式的存储,未来还会支持更多类型的存储。
MyCAT 不仅仅可以用作读写分离,以及分表分库、容灾管理,而且可以用于多租户应用开发、云平台基础设施,让你的架构具备很强的适应性和灵活性。

6.2 应用层方式
处于业务层和 JDBC 层中间,是以 JAR 包方式提供给应用调用,对代码有侵入性。主要方案有:
(1)淘宝网的 TDDL: 已于 2012 年关闭了维护通道,建议不要使用。
(2)当当网的 Sharding-JDBC: 仍在活跃维护中:
是当当应用框架 ddframe 中,从关系型数据库模块 dd-rdb 中分离出来的数据库水平分片框架,实现透明化数据库分库分表访问,实现了 Snowflake 分片算法;
Sharding-JDBC定位为轻量Java框架,使用客户端直连数据库,无需额外部署,无其他依赖,DBA也无需改变原有的运维方式。
Sharding-JDBC分片策略灵活,可支持等号、between、in等多维度分片,也可支持多分片键。
SQL解析功能完善,支持聚合、分组、排序、limit、or等查询,并支持Binding Table以及笛卡尔积表查询。

Sharding-JDBC直接封装JDBC API,可以理解为增强版的JDBC驱动,旧代码迁移成本几乎为零:

可适用于任何基于Java的ORM框架,如JPA、Hibernate、Mybatis、Spring JDBC Template或直接使用JDBC。
可基于任何第三方的数据库连接池,如DBCP、C3P0、 BoneCP、Druid等。
理论上可支持任意实现JDBC规范的数据库。虽然目前仅支持MySQL,但已有支持Oracle、SQLServer等数据库的计划。

转自:https://kefeng.wang/2018/07/22/mysql-sharding/ 

机甲大师S1机器人编程学习

  机甲大师 S1(RoboMaster S1)是大疆新出的教育机器人,很期待。S1支持Scratch和Python编程。(Scratch是麻省理工学院的“终身幼儿园团队”(Lifelong Kindergarten Group)开发的图形化编程工具,主要面对青少年开放。类似积木,有不同颜色和形状标识不同含义,操作简单,很适合儿童学习)

  

  官方编程指南:https://www.dji.com/cn/robomaster-s1/programming-guide (同时包含Scratch和Python实例)

  官方视频教程:https://www.dji.com/cn/robomaster-s1/video-courses (全英语)。

RabbitMQ 延时消息队列

消息延时在日常随处可见:

1、订单创建10min之后不发起支付,自动取消。

2、30min定时推送一次邮件信息。

 

最常用到方式后台定时任务轮训,量小的时候可以使用,量大会出现数据读取会性能问题。RabbitMQ并没有直接实现延时队列,但是可以利用RabbitMQ两个属性实现延时队列特性:

1、x-message-ttl:消息过期时间(Time To Live,TTL),超过过期时间之后即变为死信(Dead-letter),不会再被消费者消费。

设置TTL有两种方式:

  (1)创建队列时指定x-message-ttl,此时整个队列具有统一过期时间;

  (2)发送消息为每个消息设置expiration,此时消息之间过期时间不同。 

注意:如果两者都设置,过期时间取两者最小。 

2、x-dead-letter-exchange(RabbitMQ文档) :过期消息路由转发(转发器类型),当消息达到过期时间由该exchange按照配置的x-dead-letter-routing-key转发到指定队列,最后被消费者消费。

 

spring整合RabbitMQ : 

RabbitMQ延时队列逻辑:

 

 

  

1、exchange_delay_begin:缓冲队列exchange交换器,用于将消息转发至缓存消息队列 queue_delay_begin 。

2、exchange_delay_done:死信(dead-letter)队列exchange交换器,用于将队列 queue_delay_begin 转发到死信队列。

3、queue_delay_begin:缓冲消息队列,等待消息过期。

4、queue_delay_done:死信消息队列,消费者能够真正消费信息。

 

 spring-rabbitmq.xml :

复制代码



    
    

    
    
        
            
        
    

    
        
            
            
            
            
        
    

    
        
            
            
            
        
    

    

    

    

    
    
        
    

复制代码

 DelayMessageProducer.java

复制代码
@Service
public class DelayMessageProducer {

    @Resource(name="delayMsgTwoTemplate")
    private AmqpTemplate delayMsgTwoTemplate;

    public void delayMsgTwo(String exchange, String routingKey, Object msg) {
        delayMsgTwoTemplate.convertAndSend(exchange, routingKey, msg, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setExpiration(String.valueOf(10000));
                return message;
            }
        });
    }
}
复制代码

 

 MessageConsumer.java

复制代码
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;

public class MessageConsumer implements MessageListener {

    @Override
    public void onMessage(Message message) {
        System.out.println("consumer receive message 22------->:{}"+ message);

    }
}
复制代码

 

application.xml 

复制代码



    

    
    

    
    
    
    

复制代码

pom.xml

复制代码

    
        learning-demo
        com.nancy
        1.0-SNAPSHOT
    
    4.0.0

    demo-jms
    jar

    demo-jms
    http://maven.apache.org

    
        UTF-8
        5.0.0.RELEASE
    

    

        
            org.aspectj
            aspectjweaver
            1.8.6
        
        
        
            org.springframework
            spring-jms
            ${spring.version}
        
        
        
            org.springframework.amqp
            spring-amqp
            1.6.6.RELEASE
            
                
                    org.springframework
                    spring-core
                
            
        

        
            org.springframework.amqp
            spring-rabbit
            1.6.6.RELEASE
            
                
                    org.springframework
                    spring-web
                
                
                    org.springframework
                    spring-core
                
                
                    org.springframework
                    spring-context
                
                
                    org.springframework
                    spring-tx
                
                
                    org.springframework
                    spring-messaging
                
            
        
        
        
            org.springframework
            spring-core
            ${spring.version}
        
        
            org.springframework
            spring-aspects
            ${spring.version}
            
                
                        org.aspectj
                        aspectjweaver
                
            
        
        
            org.springframework
            spring-webmvc
            ${spring.version}
        
        
            org.springframework
            spring-context
            ${spring.version}
        
        
            org.springframework
            spring-context-support
            ${spring.version}
        
        
            org.springframework
            spring-test
            ${spring.version}
        
        
        
            junit
            junit
            3.8.1
            test
        
        
            junit
            junit
        
    

复制代码

 DelayQueueTest.java

复制代码
import com.alibaba.fastjson.JSONObject;
import com.nancy.rabbitmq.delay.DelayMessageProducer;
import org.junit.Test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class DelayQueueTest {

    private ApplicationContext context = null;

    //    @Ignore
    @org.junit.Before
    public void setUp() throws Exception {
        context = new ClassPathXmlApplicationContext("rabbitmq/application.xml");
    }

    //    @Ignore
    @Test
    public void delayQueueTest() throws Exception {
        DelayMessageProducer messageProducer = context.getBean(DelayMessageProducer.class);
        int a = 10;
        while (a > 0) {
            System.out.println("send "+ a);
            messageProducer.delayMsgTwo("exchange_delay_begin","delay", "hello world delay2 :" + a--);
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        System.out.println("sended ");
        Thread.sleep(1000*60);
    }
    
}
复制代码

 

 运行结果: 发送消息 10s之后, 消费监听到消息 消费。

复制代码
send 10
send 9
send 8
send 7
send 6
send 5
send 4
send 3
send 2
send 1
sended 
consumer receive message 22------->:{}(Body:'hello world delay2 :10' MessageProperties [headers={x-first-death-exchange=exchange_delay_begin, x-death=[{reason=expired, original-expiration=10000, count=1, exchange=exchange_delay_begin, time=Sun Jul 01 18:33:19 CST 2018, routing-keys=[delay], queue=queue_delay_begin}], x-first-death-reason=expired, x-first-death-queue=queue_delay_begin}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchange_delay_done, receivedRoutingKey=delay, receivedDelay=1000, deliveryTag=1, messageCount=0, consumerTag=amq.ctag-oOJzeYHEJYrLFL4HbHWZcA, consumerQueue=queue_delay_done])
consumer receive message 22------->:{}(Body:'hello world delay2 :9' MessageProperties [headers={x-first-death-exchange=exchange_delay_begin, x-death=[{reason=expired, original-expiration=10000, count=1, exchange=exchange_delay_begin, time=Sun Jul 01 18:33:19 CST 2018, routing-keys=[delay], queue=queue_delay_begin}], x-first-death-reason=expired, x-first-death-queue=queue_delay_begin}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchange_delay_done, receivedRoutingKey=delay, receivedDelay=1000, deliveryTag=2, messageCount=0, consumerTag=amq.ctag-oOJzeYHEJYrLFL4HbHWZcA, consumerQueue=queue_delay_done])
consumer receive message 22------->:{}(Body:'hello world delay2 :8' MessageProperties [headers={x-first-death-exchange=exchange_delay_begin, x-death=[{reason=expired, original-expiration=10000, count=1, exchange=exchange_delay_begin, time=Sun Jul 01 18:33:19 CST 2018, routing-keys=[delay], queue=queue_delay_begin}], x-first-death-reason=expired, x-first-death-queue=queue_delay_begin}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchange_delay_done, receivedRoutingKey=delay, receivedDelay=1000, deliveryTag=3, messageCount=0, consumerTag=amq.ctag-oOJzeYHEJYrLFL4HbHWZcA, consumerQueue=queue_delay_done])
consumer receive message 22------->:{}(Body:'hello world delay2 :7' MessageProperties [headers={x-first-death-exchange=exchange_delay_begin, x-death=[{reason=expired, original-expiration=10000, count=1, exchange=exchange_delay_begin, time=Sun Jul 01 18:33:19 CST 2018, routing-keys=[delay], queue=queue_delay_begin}], x-first-death-reason=expired, x-first-death-queue=queue_delay_begin}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchange_delay_done, receivedRoutingKey=delay, receivedDelay=1000, deliveryTag=4, messageCount=0, consumerTag=amq.ctag-oOJzeYHEJYrLFL4HbHWZcA, consumerQueue=queue_delay_done])
consumer receive message 22------->:{}(Body:'hello world delay2 :6' MessageProperties [headers={x-first-death-exchange=exchange_delay_begin, x-death=[{reason=expired, original-expiration=10000, count=1, exchange=exchange_delay_begin, time=Sun Jul 01 18:33:19 CST 2018, routing-keys=[delay], queue=queue_delay_begin}], x-first-death-reason=expired, x-first-death-queue=queue_delay_begin}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchange_delay_done, receivedRoutingKey=delay, receivedDelay=1000, deliveryTag=5, messageCount=0, consumerTag=amq.ctag-oOJzeYHEJYrLFL4HbHWZcA, consumerQueue=queue_delay_done])
consumer receive message 22------->:{}(Body:'hello world delay2 :5' MessageProperties [headers={x-first-death-exchange=exchange_delay_begin, x-death=[{reason=expired, original-expiration=10000, count=1, exchange=exchange_delay_begin, time=Sun Jul 01 18:33:19 CST 2018, routing-keys=[delay], queue=queue_delay_begin}], x-first-death-reason=expired, x-first-death-queue=queue_delay_begin}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchange_delay_done, receivedRoutingKey=delay, receivedDelay=1000, deliveryTag=6, messageCount=0, consumerTag=amq.ctag-oOJzeYHEJYrLFL4HbHWZcA, consumerQueue=queue_delay_done])
consumer receive message 22------->:{}(Body:'hello world delay2 :4' MessageProperties [headers={x-first-death-exchange=exchange_delay_begin, x-death=[{reason=expired, original-expiration=10000, count=1, exchange=exchange_delay_begin, time=Sun Jul 01 18:33:19 CST 2018, routing-keys=[delay], queue=queue_delay_begin}], x-first-death-reason=expired, x-first-death-queue=queue_delay_begin}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchange_delay_done, receivedRoutingKey=delay, receivedDelay=1000, deliveryTag=7, messageCount=0, consumerTag=amq.ctag-oOJzeYHEJYrLFL4HbHWZcA, consumerQueue=queue_delay_done])
consumer receive message 22------->:{}(Body:'hello world delay2 :3' MessageProperties [headers={x-first-death-exchange=exchange_delay_begin, x-death=[{reason=expired, original-expiration=10000, count=1, exchange=exchange_delay_begin, time=Sun Jul 01 18:33:19 CST 2018, routing-keys=[delay], queue=queue_delay_begin}], x-first-death-reason=expired, x-first-death-queue=queue_delay_begin}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchange_delay_done, receivedRoutingKey=delay, receivedDelay=1000, deliveryTag=8, messageCount=0, consumerTag=amq.ctag-oOJzeYHEJYrLFL4HbHWZcA, consumerQueue=queue_delay_done])
consumer receive message 22------->:{}(Body:'hello world delay2 :2' MessageProperties [headers={x-first-death-exchange=exchange_delay_begin, x-death=[{reason=expired, original-expiration=10000, count=1, exchange=exchange_delay_begin, time=Sun Jul 01 18:33:19 CST 2018, routing-keys=[delay], queue=queue_delay_begin}], x-first-death-reason=expired, x-first-death-queue=queue_delay_begin}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchange_delay_done, receivedRoutingKey=delay, receivedDelay=1000, deliveryTag=9, messageCount=0, consumerTag=amq.ctag-oOJzeYHEJYrLFL4HbHWZcA, consumerQueue=queue_delay_done])
consumer receive message 22------->:{}(Body:'hello world delay2 :1' MessageProperties [headers={x-first-death-exchange=exchange_delay_begin, x-death=[{reason=expired, original-expiration=10000, count=1, exchange=exchange_delay_begin, time=Sun Jul 01 18:33:19 CST 2018, routing-keys=[delay], queue=queue_delay_begin}], x-first-death-reason=expired, x-first-death-queue=queue_delay_begin}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchange_delay_done, receivedRoutingKey=delay, receivedDelay=1000, deliveryTag=10, messageCount=0, consumerTag=amq.ctag-oOJzeYHEJYrLFL4HbHWZcA, consumerQueue=queue_delay_done])
复制代码

 

参考资料:

DLX: https://www.rabbitmq.com/dlx.html

RabbitMQ如何实现延迟队列?:https://blog.csdn.net/u013256816/article/details/55106401

消息队列之 RabbitMQ :https://www.jianshu.com/p/79ca08116d57

使用RabbitMQ实现延迟任务 : https://www.cnblogs.com/haoxinyue/p/6613706.html

 

转自:https://www.cnblogs.com/xiaoxing/p/9250823.html

分布式之延时任务方案解析

引言

在开发中,往往会遇到一些关于延时任务的需求。例如

  • 生成订单30分钟未支付,则自动取消
  • 生成订单60秒后,给用户发短信

对上述的任务,我们给一个专业的名字来形容,那就是延时任务。那么这里就会产生一个问题,这个延时任务定时任务的区别究竟在哪里呢?一共有如下几点区别

  1. 定时任务有明确的触发时间,延时任务没有
  2. 定时任务有执行周期,而延时任务在某事件触发后一段时间内执行,没有执行周期
  3. 定时任务一般执行的是批处理操作是多个任务,而延时任务一般是单个任务

下面,我们以判断订单是否超时为例,进行方案分析

方案分析

(1)数据库轮询

思路

该方案通常是在小型项目中使用,即通过一个线程定时的去扫描数据库,通过订单时间来判断是否有超时的订单,然后进行update或delete等操作

实现

博主当年早期是用quartz来实现的(实习那会的事),简单介绍一下
maven项目引入一个依赖如下所示

    <dependency>
        <groupId>org.quartz-schedulergroupId>
        <artifactId>quartzartifactId>
        <version>2.2.2version>
    dependency>

调用Demo类MyJob如下所示

package com.rjzheng.delay1;

import org.quartz.JobBuilder;
import org.quartz.JobDetail;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.SchedulerFactory;
import org.quartz.SimpleScheduleBuilder;
import org.quartz.Trigger;
import org.quartz.TriggerBuilder;
import org.quartz.impl.StdSchedulerFactory;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;

public class MyJob implements Job {
    public void execute(JobExecutionContext context)
            throws JobExecutionException {
        System.out.println("要去数据库扫描啦。。。");
    }

    public static void main(String[] args) throws Exception {
        // 创建任务
        JobDetail jobDetail = JobBuilder.newJob(MyJob.class)
                .withIdentity("job1", "group1").build();
        // 创建触发器 每3秒钟执行一次
        Trigger trigger = TriggerBuilder
                .newTrigger()
                .withIdentity("trigger1", "group3")
                .withSchedule(
                        SimpleScheduleBuilder.simpleSchedule()
                                .withIntervalInSeconds(3).repeatForever())
                .build();
        Scheduler scheduler = new StdSchedulerFactory().getScheduler();
        // 将任务及其触发器放入调度器
        scheduler.scheduleJob(jobDetail, trigger);
        // 调度器开始调度任务
        scheduler.start();
    }
}

运行代码,可发现每隔3秒,输出如下

要去数据库扫描啦。。。

优缺点

优点:简单易行,支持集群操作
缺点:(1)对服务器内存消耗大
   (2)存在延迟,比如你每隔3分钟扫描一次,那最坏的延迟时间就是3分钟
   (3)假设你的订单有几千万条,每隔几分钟这样扫描一次,数据库损耗极大

(2)JDK的延迟队列

思路

该方案是利用JDK自带的DelayQueue来实现,这是一个无界阻塞队列,该队列只有在延迟期满的时候才能从中获取元素,放入DelayQueue中的对象,是必须实现Delayed接口的。
DelayedQueue实现工作流程如下图所示

image


其中Poll():获取并移除队列的超时元素,没有则返回空
  take():获取并移除队列的超时元素,如果没有则wait当前线程,直到有元素满足超时条件,返回结果。

 

实现

定义一个类OrderDelay实现Delayed,代码如下

package com.rjzheng.delay2;

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

public class OrderDelay implements Delayed {
    
    private String orderId;
    private long timeout;

    OrderDelay(String orderId, long timeout) {
        this.orderId = orderId;
        this.timeout = timeout + System.nanoTime();
    }

    public int compareTo(Delayed other) {
        if (other == this)
            return 0;
        OrderDelay t = (OrderDelay) other;
        long d = (getDelay(TimeUnit.NANOSECONDS) - t
                .getDelay(TimeUnit.NANOSECONDS));
        return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
    }

    // 返回距离你自定义的超时时间还有多少
    public long getDelay(TimeUnit unit) {
        return unit.convert(timeout - System.nanoTime(), TimeUnit.NANOSECONDS);
    }

    void print() {
        System.out.println(orderId+"编号的订单要删除啦。。。。");
    }
}

运行的测试Demo为,我们设定延迟时间为3秒

package com.rjzheng.delay2;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;

public class DelayQueueDemo {
     public static void main(String[] args) {  
            // TODO Auto-generated method stub  
            List list = new ArrayList();  
            list.add("00000001");  
            list.add("00000002");  
            list.add("00000003");  
            list.add("00000004");  
            list.add("00000005");  
            DelayQueue queue = new DelayQueue();  
            long start = System.currentTimeMillis();  
            for(int i = 0;i<5;i++){  
                //延迟三秒取出
                queue.put(new OrderDelay(list.get(i),  
                        TimeUnit.NANOSECONDS.convert(3, TimeUnit.SECONDS)));  
                    try {  
                         queue.take().print();  
                         System.out.println("After " +   
                                 (System.currentTimeMillis()-start) + " MilliSeconds");  
                } catch (InterruptedException e) {  
                    // TODO Auto-generated catch block  
                    e.printStackTrace();  
                }  
            }  
        }  
    
}

输出如下

00000001编号的订单要删除啦。。。。
After 3003 MilliSeconds
00000002编号的订单要删除啦。。。。
After 6006 MilliSeconds
00000003编号的订单要删除啦。。。。
After 9006 MilliSeconds
00000004编号的订单要删除啦。。。。
After 12008 MilliSeconds
00000005编号的订单要删除啦。。。。
After 15009 MilliSeconds

可以看到都是延迟3秒,订单被删除

优缺点

优点:效率高,任务触发时间延迟低。
缺点:(1)服务器重启后,数据全部消失,怕宕机
   (2)集群扩展相当麻烦
   (3)因为内存条件限制的原因,比如下单未付款的订单数太多,那么很容易就出现OOM异常
   (4)代码复杂度较高

(3)时间轮算法

思路

先上一张时间轮的图(这图到处都是啦)


时间轮算法可以类比于时钟,如上图箭头(指针)按某一个方向按固定频率轮动,每一次跳动称为一个 tick。这样可以看出定时轮由个3个重要的属性参数,ticksPerWheel(一轮的tick数),tickDuration(一个tick的持续时间)以及 timeUnit(时间单位),例如当ticksPerWheel=60,tickDuration=1,timeUnit=秒,这就和现实中的始终的秒针走动完全类似了。

 

如果当前指针指在1上面,我有一个任务需要4秒以后执行,那么这个执行的线程回调或者消息将会被放在5上。那如果需要在20秒之后执行怎么办,由于这个环形结构槽数只到8,如果要20秒,指针需要多转2圈。位置是在2圈之后的5上面(20 % 8 + 1)

实现

我们用Netty的HashedWheelTimer来实现
给Pom加上下面的依赖

        <dependency>
            <groupId>io.nettygroupId>
            <artifactId>netty-allartifactId>
            <version>4.1.24.Finalversion>
        dependency>

测试代码HashedWheelTimerTest如下所示

package com.rjzheng.delay3;

import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.TimerTask;

import java.util.concurrent.TimeUnit;

public class HashedWheelTimerTest {
    static class MyTimerTask implements TimerTask{
        boolean flag;
        public MyTimerTask(boolean flag){
            this.flag = flag;
        }
        public void run(Timeout timeout) throws Exception {
            // TODO Auto-generated method stub
             System.out.println("要去数据库删除订单了。。。。");
             this.flag =false;
        }
    }
    public static void main(String[] argv) {
        MyTimerTask timerTask = new MyTimerTask(true);
        Timer timer = new HashedWheelTimer();
        timer.newTimeout(timerTask, 5, TimeUnit.SECONDS);
        int i = 1;
        while(timerTask.flag){
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            System.out.println(i+"秒过去了");
            i++;
        }
    }
}

输出如下

1秒过去了
2秒过去了
3秒过去了
4秒过去了
5秒过去了
要去数据库删除订单了。。。。
6秒过去了

优缺点

优点:效率高,任务触发时间延迟时间比delayQueue低,代码复杂度比delayQueue低。
缺点:(1)服务器重启后,数据全部消失,怕宕机
   (2)集群扩展相当麻烦
   (3)因为内存条件限制的原因,比如下单未付款的订单数太多,那么很容易就出现OOM异常

(4)redis缓存

思路一

利用redis的zset,zset是一个有序集合,每一个元素(member)都关联了一个score,通过score排序来取集合中的值
zset常用命令
添加元素:ZADD key score member [[score member] [score member] …]
按顺序查询元素:ZRANGE key start stop [WITHSCORES]
查询元素score:ZSCORE key member
移除元素:ZREM key member [member …]
测试如下

# 添加单个元素

redis> ZADD page_rank 10 google.com
(integer) 1


# 添加多个元素

redis> ZADD page_rank 9 baidu.com 8 bing.com
(integer) 2

redis> ZRANGE page_rank 0 -1 WITHSCORES
1) "bing.com"
2) "8"
3) "baidu.com"
4) "9"
5) "google.com"
6) "10"

# 查询元素的score值
redis> ZSCORE page_rank bing.com
"8"

# 移除单个元素

redis> ZREM page_rank google.com
(integer) 1

redis> ZRANGE page_rank 0 -1 WITHSCORES
1) "bing.com"
2) "8"
3) "baidu.com"
4) "9"

那么如何实现呢?我们将订单超时时间戳与订单号分别设置为score和member,系统扫描第一个元素判断是否超时,具体如下图所示

image

 

实现一

package com.rjzheng.delay4;

import java.util.Calendar;
import java.util.Set;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.Tuple;

public class AppTest {
    private static final String ADDR = "127.0.0.1";
    private static final int PORT = 6379;
    private static JedisPool jedisPool = new JedisPool(ADDR, PORT);
    
    public static Jedis getJedis() {
       return jedisPool.getResource();
    }
    
    //生产者,生成5个订单放进去
    public void productionDelayMessage(){
        for(int i=0;i<5;i++){
            //延迟3秒
            Calendar cal1 = Calendar.getInstance();
            cal1.add(Calendar.SECOND, 3);
            int second3later = (int) (cal1.getTimeInMillis() / 1000);
            AppTest.getJedis().zadd("OrderId", second3later,"OID0000001"+i);
            System.out.println(System.currentTimeMillis()+"ms:redis生成了一个订单任务:订单ID为"+"OID0000001"+i);
        }
    }
    
    //消费者,取订单
    public void consumerDelayMessage(){
        Jedis jedis = AppTest.getJedis();
        while(true){
            Set items = jedis.zrangeWithScores("OrderId", 0, 1);
            if(items == null || items.isEmpty()){
                System.out.println("当前没有等待的任务");
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                continue;
            }
            int  score = (int) ((Tuple)items.toArray()[0]).getScore();
            Calendar cal = Calendar.getInstance();
            int nowSecond = (int) (cal.getTimeInMillis() / 1000);
            if(nowSecond >= score){
                String orderId = ((Tuple)items.toArray()[0]).getElement();
                jedis.zrem("OrderId", orderId);
                System.out.println(System.currentTimeMillis() +"ms:redis消费了一个任务:消费的订单OrderId为"+orderId);
            }
        }
    }
    
    public static void main(String[] args) {
        AppTest appTest =new AppTest();
        appTest.productionDelayMessage();
        appTest.consumerDelayMessage();
    }
    
}

此时对应输出如下

1525086085261ms:redis生成了一个订单任务:订单ID为OID00000010
1525086085263ms:redis生成了一个订单任务:订单ID为OID00000011
1525086085266ms:redis生成了一个订单任务:订单ID为OID00000012
1525086085268ms:redis生成了一个订单任务:订单ID为OID00000013
1525086085270ms:redis生成了一个订单任务:订单ID为OID00000014
1525086088000ms:redis消费了一个任务:消费的订单OrderId为OID00000010
1525086088001ms:redis消费了一个任务:消费的订单OrderId为OID00000011
1525086088002ms:redis消费了一个任务:消费的订单OrderId为OID00000012
1525086088003ms:redis消费了一个任务:消费的订单OrderId为OID00000013
1525086088004ms:redis消费了一个任务:消费的订单OrderId为OID00000014
当前没有等待的任务
当前没有等待的任务
当前没有等待的任务

可以看到,几乎都是3秒之后,消费订单。

然而,这一版存在一个致命的硬伤,在高并发条件下,多消费者会取到同一个订单号,我们上测试代码ThreadTest

package com.rjzheng.delay4;

import java.util.concurrent.CountDownLatch;

public class ThreadTest {
    private static final int threadNum = 10;
    private static CountDownLatch cdl = new CountDownLatch(threadNum);
    static class DelayMessage implements Runnable{
        public void run() {
            try {
                cdl.await();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            AppTest appTest =new AppTest();
            appTest.consumerDelayMessage();
        }
    }
    public static void main(String[] args) {
        AppTest appTest =new AppTest();
        appTest.productionDelayMessage();
        for(int i=0;inew Thread(new DelayMessage()).start();
            cdl.countDown();
        }
    }
}

输出如下所示

1525087157727ms:redis生成了一个订单任务:订单ID为OID00000010
1525087157734ms:redis生成了一个订单任务:订单ID为OID00000011
1525087157738ms:redis生成了一个订单任务:订单ID为OID00000012
1525087157747ms:redis生成了一个订单任务:订单ID为OID00000013
1525087157753ms:redis生成了一个订单任务:订单ID为OID00000014
1525087160009ms:redis消费了一个任务:消费的订单OrderId为OID00000010
1525087160011ms:redis消费了一个任务:消费的订单OrderId为OID00000010
1525087160012ms:redis消费了一个任务:消费的订单OrderId为OID00000010
1525087160022ms:redis消费了一个任务:消费的订单OrderId为OID00000011
1525087160023ms:redis消费了一个任务:消费的订单OrderId为OID00000011
1525087160029ms:redis消费了一个任务:消费的订单OrderId为OID00000011
1525087160038ms:redis消费了一个任务:消费的订单OrderId为OID00000012
1525087160045ms:redis消费了一个任务:消费的订单OrderId为OID00000012
1525087160048ms:redis消费了一个任务:消费的订单OrderId为OID00000012
1525087160053ms:redis消费了一个任务:消费的订单OrderId为OID00000013
1525087160064ms:redis消费了一个任务:消费的订单OrderId为OID00000013
1525087160065ms:redis消费了一个任务:消费的订单OrderId为OID00000014
1525087160069ms:redis消费了一个任务:消费的订单OrderId为OID00000014
当前没有等待的任务
当前没有等待的任务
当前没有等待的任务
当前没有等待的任务

显然,出现了多个线程消费同一个资源的情况。

解决方案

(1)用分布式锁,但是用分布式锁,性能下降了,该方案不细说。
(2)对ZREM的返回值进行判断,只有大于0的时候,才消费数据,于是将consumerDelayMessage()方法里的

if(nowSecond >= score){
    String orderId = ((Tuple)items.toArray()[0]).getElement();
    jedis.zrem("OrderId", orderId);
    System.out.println(System.currentTimeMillis()+"ms:redis消费了一个任务:消费的订单OrderId为"+orderId);
}

修改为

if(nowSecond >= score){
    String orderId = ((Tuple)items.toArray()[0]).getElement();
    Long num = jedis.zrem("OrderId", orderId);
    if( num != null && num>0){
        System.out.println(System.currentTimeMillis()+"ms:redis消费了一个任务:消费的订单OrderId为"+orderId);
    }
}

在这种修改后,重新运行ThreadTest类,发现输出正常了

思路二

该方案使用redis的Keyspace Notifications,中文翻译就是键空间机制,就是利用该机制可以在key失效之后,提供一个回调,实际上是redis会给客户端发送一个消息。是需要redis版本2.8以上。

实现二

在redis.conf中,加入一条配置

notify-keyspace-events Ex

运行代码如下

package com.rjzheng.delay5;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPubSub;

public class RedisTest {
    private static final String ADDR = "127.0.0.1";
    private static final int PORT = 6379;
    private static JedisPool jedis = new JedisPool(ADDR, PORT);
    private static RedisSub sub = new RedisSub();

    public static void init() {
        new Thread(new Runnable() {
            public void run() {
                jedis.getResource().subscribe(sub, "__keyevent@0__:expired");
            }
        }).start();
    }

    public static void main(String[] args) throws InterruptedException {
        init();
        for(int i =0;i<10;i++){
            String orderId = "OID000000"+i;
            jedis.getResource().setex(orderId, 3, orderId);
            System.out.println(System.currentTimeMillis()+"ms:"+orderId+"订单生成");
        }
    }
    
    static class RedisSub extends JedisPubSub {
        @Override
        public void onMessage(String channel, String message) {
            System.out.println(System.currentTimeMillis()+"ms:"+message+"订单取消");
        }
    }
}

输出如下

1525096202813ms:OID0000000订单生成
1525096202818ms:OID0000001订单生成
1525096202824ms:OID0000002订单生成
1525096202826ms:OID0000003订单生成
1525096202830ms:OID0000004订单生成
1525096202834ms:OID0000005订单生成
1525096202839ms:OID0000006订单生成
1525096205819ms:OID0000000订单取消
1525096205920ms:OID0000005订单取消
1525096205920ms:OID0000004订单取消
1525096205920ms:OID0000001订单取消
1525096205920ms:OID0000003订单取消
1525096205920ms:OID0000006订单取消
1525096205920ms:OID0000002订单取消

可以明显看到3秒过后,订单取消了
ps:redis的pub/sub机制存在一个硬伤,官网内容如下
:Because Redis Pub/Sub is fire and forget currently there is no way to use this feature if your application demands reliable notification of events, that is, if your Pub/Sub client disconnects, and reconnects later, all the events delivered during the time the client was disconnected are lost.
: Redis的发布/订阅目前是即发即弃(fire and forget)模式的,因此无法实现事件的可靠通知。也就是说,如果发布/订阅的客户端断链之后又重连,则在客户端断链期间的所有事件都丢失了。
因此,方案二不是太推荐。当然,如果你对可靠性要求不高,可以使用。

优缺点

优点:(1)由于使用Redis作为消息通道,消息都存储在Redis中。如果发送程序或者任务处理程序挂了,重启之后,还有重新处理数据的可能性。
   (2)做集群扩展相当方便
   (3)时间准确度高
缺点:(1)需要额外进行redis维护

(5)使用消息队列

我们可以采用rabbitMQ的延时队列。RabbitMQ具有以下两个特性,可以实现延迟队列

  • RabbitMQ可以针对Queue和Message设置 x-message-tt,来控制消息的生存时间,如果超时,则消息变为dead letter
  • lRabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可选)两个参数,用来控制队列内出现了deadletter,则按照这两个参数重新路由。
    结合以上两个特性,就可以模拟出延迟消息的功能,具体的,我改天再写一篇文章,这里再讲下去,篇幅太长。

优缺点

优点: 高效,可以利用rabbitmq的分布式特性轻易的进行横向扩展,消息支持持久化增加了可靠性。
缺点:本身的易用度要依赖于rabbitMq的运维.因为要引用rabbitMq,所以复杂度和成本变高

总结

本文总结了目前互联网中,绝大部分的延时任务的实现方案。希望大家在工作中能够有所收获。
其实大家在工作中,百分九十的人还是以业务逻辑为主,很少有机会能够进行方案设计。所以博主不推荐在分布式这块,花太多时间,应该看看《手把手系列的文章》。不过,鉴于现在的面试造火箭,工作拧螺丝现象太过严重,所以博主开始写《分布式系列》,最后来个小漫画娱乐一下。

 

转自:https://www.cnblogs.com/rjzheng/p/8972725.html

 

一文搞定十大经典排序算法(Java实现)

本文总结十大经典排序算法及变形,并提供Java实现。
参考文章:
十大经典排序算法总结(Java语言实现)
快速排序算法—左右指针法,挖坑法,前后指针法,递归和非递归
快速排序及优化(三路划分等)

一、排序算法概述

1、定义

将杂乱无章的数据元素,通过一定的方法按关键字顺序排列的过程叫做排序。

2、分类

十种常见排序算法可以分为两大类:

  • 非线性时间比较类排序:通过比较来决定元素间的相对次序,由于其时间复杂度不能突破O(nlogn),因此称为非线性时间比较类排序。

  • 线性时间非比较类排序:不通过比较来决定元素间的相对次序,它可以突破基于比较排序的时间下界,以线性时间运行,因此称为线性时间非比较类排序。

 
 

3、比较

 
 

4、相关概念

  • 稳定:如果a原本在b前面且a=b,排序之后a仍然在b的前面。
  • 不稳定:如果a原本在b的前面且a=b,排序之后 a 可能会出现在 b 的后面。
  • 时间复杂度:对排序数据的总的操作次数。反映当n变化时,操作次数呈现什么规律。
  • 空间复杂度:是指算法在计算机内执行时所需存储空间的度量,它也是数据规模n的函数。
  • 内部排序:所有排序操作都在内存中完成。本文主要介绍的是内部排序。
  • 外部排序:待排序记录的数量很大,以致于内存不能一次容纳全部记录,所以在排序过程中需要对外存进行访问的排序过程。

二、各算法原理及实现

下面我们来逐一分析十大经典排序算法,主要围绕下列问题展开:
1、算法的基本思想是什么?
2、算法的代码实现?
3、算法的时间复杂度是多少?(平均、最好、最坏)什么情况下最好?什么情况下最坏?
4、算法的空间复杂度是多少?
5、算法的稳定性如何?

1、冒泡排序(Bubble Sort)

① 基本思想
冒泡排序是一种简单的排序算法。它重复地走访过要排序的数列,一次比较两个元素,如果它们的顺序错误就把它们交换过来。走访数列的工作是重复地进行直到没有再需要交换,也就是说该数列已经排序完成。这个算法的名字由来是因为每趟比较将当前数列未排序部分的最大的元素“沉”到数列末端,而小的元素会经由交换慢慢“浮”到数列的顶端。

② 算法描述
1)比较相邻的元素。如果前一个比后一个大,就交换它们两个;
2)对每一对相邻元素作同样的工作,从开始第一对到结尾的最后一对,这样在最后的元素应该会是最大的数;
3)针对所有的元素重复以上的步骤,除了最后一个;
4)重复步骤1~3,直到排序完成。为了优化算法,可以设立一个布尔标识,每趟排序开始前设为false,如果该趟排序发生了交换就置为true,如果一趟排序结束标识仍为false表示该趟排序没有发生交换,即数组已经有序,可以提前结束排序。

③ 动图演示

 
 

 

④ 代码实现

 public static int[] bubbleSort(int[] array) {
      if (array.length == 0)
          return array;
      for (int i = 0; i < array.length; i++){  //外层循环一次为一趟排序
          /*设置标识,判断这趟排序是否发生了交换。
         如果未发生交换,则说明数组已经有序,不必再排序了*/
          boolean isSwap = false;
          for (int j = 0; j < array.length - 1 - i; j++) //内层循环一次为一次相邻比较
              if (array[j + 1] < array[j]) {
                  int temp = array[j + 1];
                  array[j + 1] = array[j];
                  array[j] = temp;
                  isSwap = true;
              }
          if(!isSwap)
              break;
      }
      return array;
  }

⑤ 时间复杂度
冒泡排序平均时间复杂度为O(n2),最好时间复杂度为O(n),最坏时间复杂度为O(n2)。
最好情况:如果待排序元素本来是正序的,那么一趟冒泡排序就可以完成排序工作,比较和移动元素的次数分别是 (n – 1) 和 0,因此最好情况的时间复杂度为O(n)。
最坏情况:如果待排序元素本来是逆序的,需要进行 (n – 1) 趟排序,所需比较和移动次数分别为 n * (n – 1) / 2和 3 * n * (n-1) / 2。因此最坏情况下的时间复杂度为O(n2)。

⑥ 空间复杂度
冒泡排序使用了常数空间,空间复杂度为O(1)

⑦ 稳定性
当 array[j] == array[j+1] 的时候,我们不交换 array[i] 和 array[j],所以冒泡排序是稳定的。

⑧ 算法拓展
鸡尾酒排序,又称定向冒泡排序、搅拌排序等,是对冒泡排序的改进。在把最大的数往后面冒泡的同时,把最小的数也往前面冒泡,同时收缩无序区的左右边界,有序区在序列左右逐渐累积。

动图如下:

 
 

代码如下:

public static void cocktailSort(int[] array) {
        int left = 0,right = array.length-1;
        while(left < right) {
            for(int i = left; i < right; i++) 
              if(array[i] > array[i+1]) 
                    swap(array,i,i + 1);
            right--;
            for(int i = right; i > left; i--) 
              if(array[i] < array[i-1]) 
                    swap(array,i,i-1);
            left++;
        }
}

鸡尾酒排序是稳定的。它的平均时间复杂度为O(n2),最好情况是待排序列原先就是正序的,时间复杂度为O(n),最坏情况是待排序列原先是逆序的,时间复杂度为O(n2)。空间复杂度为O(1)。

2、简单选择排序(Selection Sort)

① 基本思想
简单选择排序(Selection-sort)是一种简单直观的排序算法。它的工作原理:首先在未排序序列中找到最小(大)元素,存放到排序序列的起始位置,然后,再从剩余未排序元素中继续寻找最小(大)元素,然后放到已排序序列的末尾。以此类推,直到所有元素均排序完毕。

② 算法描述
n个记录的简单选择排序可经过(n-1)趟简单选择排序得到有序结果。具体算法描述如下:
1)初始状态:无序区为R[1..n],有序区为空;
2)第i趟排序(i=1,2,3…n-1)开始时,当前有序区和无序区分别为R[1..i-1]和R[i..n]。该趟排序从当前无序区中选出关键字最小的记录 R[k],将它与无序区的第1个记录R交换,使R[1..i]和R[i+1..n]分别变为记录个数增加1个的新有序区和记录个数减少1个的新无序区;
3)(n-1)趟结束,数组有序化了。

③ 动图演示

 
 

 

④ 代码实现

public static int[] selectionSort(int[] array) {
        if (array.length == 0)
             return array;
        for (int i = 0; i < array.length; i++) {
            int minIndex = i;
            for (int j = i; j < array.length; j++) {
                if (array[j] < array[minIndex]) //找到最小的数
                    minIndex = j; //将最小数的索引保存
            }
            int temp = array[minIndex]; //将最小数和无序区的第一个数交换
            array[minIndex] = array[i];
            array[i] = temp;
        }
        return array;
    }

⑤ 时间复杂度
简单选择排序平均时间复杂度为O(n2),最好时间复杂度为O(n2),最坏时间复杂度为O(n2)。
最好情况:如果待排序元素本来是正序的,则移动元素次数为 0,但需要进行 n * (n – 1) / 2 次比较。
最坏情况:如果待排序元素中第一个元素最大,其余元素从小到大排列,则仍然需要进行 n * (n – 1) / 2 次比较,且每趟排序都需要移动 3 次元素,即移动元素的次数为3 * (n – 1)次。
需要注意的是,简单选择排序过程中需要进行的比较次数与初始状态下待排序元素的排列情况无关。

⑥ 空间复杂度
简单选择排序使用了常数空间,空间复杂度为O(1)

⑦ 稳定性
简单选择排序不稳定,比如序列 2、4、2、1,我们知道第一趟排序第 1 个元素 2 会和 1 交换,那么原序列中 2 个 2 的相对前后顺序就被破坏了,所以简单选择排序不是一个稳定的排序算法。

3、直接插入排序(Insertion Sort)

① 基本思想
直接插入排序(Insertion-Sort)的算法描述是一种简单直观的排序算法。它的工作原理是通过构建有序序列,对于未排序数据,在已排序序列中从后向前扫描,找到相应位置并插入。

② 算法描述
一般来说,直接插入排序都采用in-place(原地算法)在数组上实现。具体算法描述如下:
1)从第一个元素开始,该元素可以认为已经被排序;
2)取出下一个元素,在已经排序的元素序列中从后向前扫描;
3)如果该元素(已排序)大于新元素,将该元素移到下一位置;
4)重复步骤3,直到找到已排序的元素小于或者等于新元素的位置;
5)将新元素插入到该位置后;
6)重复步骤2~5。

③ 动图演示

 
 

 

④ 代码实现

public static int[] insertionSort(int[] array) {
        if (array.length == 0)
            return array;
        int current;
        for (int i = 1; i < array.length; i++) {
            current = array[i];
            int preIndex = i - 1;
            while (preIndex >= 0 && current < array[preIndex]) {
                array[preIndex + 1] = array[preIndex];
                preIndex--;
            }
            array[preIndex + 1] = current;
        }
        return array;
    }

⑤ 时间复杂度
直接插入排序平均时间复杂度为O(n2),最好时间复杂度为O(n),最坏时间复杂度为O(n2)。
最好情况:如果待排序元素本来是正序的,比较和移动元素的次数分别是 (n – 1) 和 0,因此最好情况的时间复杂度为O(n)。
最坏情况:如果待排序元素本来是逆序的,需要进行 (n – 1) 趟排序,所需比较和移动次数分别为 n * (n – 1) / 2和 n * (n – 1) / 2。因此最坏情况下的时间复杂度为O(n2)。

⑥ 空间复杂度
直接插入排序使用了常数空间,空间复杂度为O(1)

⑦ 稳定性
直接插入排序是稳定的。

⑧ 算法拓展
在直接插入排序中,待插入的元素总是在有序区线性查找合适的插入位置,没有利用有序的优势,考虑使用二分查找搜索插入位置进行优化,即二分插入排序

代码实现如下:

 public static int[] BinaryInsertionSort(int[] array) {
        if (array.length == 0)
            return array;
        for(int i = 1;i < array.length;i++) {
            int left = 0;
            int right = i - 1;  // left 和 right 分别为有序区的左右边界 
            int current = array[i];
            while (left <= right) {
           //搜索有序区中第一个大于 current 的位置,即为 current 要插入的位置
                int mid = left + ((right - left) >> 1);
                if(array[mid] > current){
                    right = mid - 1;
                }else{
                    left = mid + 1;
                }
            }
            for(int j = i - 1;j >= left;j--) {
                array[j + 1] = array[j];
            }
            array[left] = current; // left 为第一个大于 current 的位置,插入 current
        }
        return array;
 }

二分插入排序是稳定的。它的平均时间复杂度是O(n2),最好时间复杂度为O(nlogn),最坏时间复杂度为O(n2)。

4、希尔排序(Shell Sort)

① 基本思想
1959年Shell发明,第一个突破O(n2)的排序算法,是直接插入排序的改进版。它与直接插入排序的不同之处在于,它会优先比较距离较远的元素。希尔排序又叫缩小增量排序

② 算法描述
先将整个待排元素序列分割成 gap 个增量为 gap 的子序列(每个子序列由位置相差为 gap 的元素组成,整个序列正好分割成 gap 个子序列,每个序列中有 n / gap 个元素)分别进行直接插入排序,然后缩减增量为之前的一半再进行排序,待 gap == 1时,希尔排序就变成了直接插入排序。因为此时序列已经基本有序,直接插入排序在元素基本有序的情况下(接近最好情况),效率是很高的。gap初始值一般取 len / 2。

③ 动图演示

 
 

 

④ 代码实现

 public static int[] ShellSort(int[] array) {
        int len = array.length;
        if(len == 0)
            return array;
        int current, gap = len / 2;
        while (gap > 0) {
            for (int i = gap; i < len; i++) {
                current = array[i];
                int preIndex = i - gap;
                while (preIndex >= 0 && array[preIndex] > current) {
                    array[preIndex + gap] = array[preIndex];
                    preIndex -= gap;
                }
                array[preIndex + gap] = current;
            }
            gap /= 2;
        }
        return array;
    }

⑤ 时间复杂度
希尔排序平均时间复杂度为O(nlogn),最好时间复杂度为O(nlog2n),最坏时间复杂度为O(nlog2n)。希尔排序的时间复杂度与增量序列的选取有关。

⑥ 空间复杂度
希尔排序使用了常数空间,空间复杂度为O(1)

⑦ 稳定性
由于相同的元素可能在各自的序列中插入排序,最后其稳定性就会被打乱,比如序列 2、4、1、2,所以希尔排序是不稳定的。

5、归并排序(Merge Sort)

① 基本思想
归并排序是建立在归并操作上的一种有效的排序算法。该算法是采用分治法(Divide and Conquer)的一个非常典型的应用。将已有序的子序列合并,得到完全有序的序列;即先使每个子序列有序,再使子序列段间有序。若将两个有序表合并成一个有序表,称为2-路归并。

② 算法描述
1)把长度为 n 的输入序列分成两个长度为 n / 2 的子序列;
2)对这两个子序列分别采用归并排序;
3)将两个排序好的子序列合并成一个最终的排序序列。

③ 动图演示

 
 

 

④ 代码实现

    /**
     * 归并排序
     *
     * @param array
     * @return
     */
    public static int[] MergeSort(int[] array) {
        if (array.length < 2) return array;
        int mid = array.length / 2;
        int[] left = Arrays.copyOfRange(array, 0, mid);
        int[] right = Arrays.copyOfRange(array, mid, array.length);
        return merge(MergeSort(left), MergeSort(right));
    }
    /**
     * 归并排序——将两段有序数组结合成一个有序数组
     *
     * @param left
     * @param right
     * @return
     */
    public static int[] merge(int[] left, int[] right) {
       int[] result = new int[left.length + right.length];
       int i = 0,j = 0,k = 0;
       while (i < left.length && j < right.length) {
            if (left[i] <= right[j]) {
                result[k++] = left[i++];
            } else {
                result[k++] = right[j++];
            }
        }
        while (i < left.length) {
            result[k++] = left[i++];
        }
        while (j < right.length) {
            result[k++] = right[j++];
        }
        return result;
    }

⑤ 时间复杂度
归并排序平均时间复杂度为O(nlogn),最好时间复杂度为O(nlogn),最坏时间复杂度为O(nlogn)。
归并排序的形式就是一棵二叉树,它需要遍历的次数就是二叉树的深度,而根据完全二叉树的可以得出它在任何情况下时间复杂度均是O(nlogn)。

⑥ 空间复杂度
归并排序空间复杂度为O(n)

⑦ 稳定性
归并排序是稳定的。

⑧ 算法应用
归并排序可以用于求解逆序对数量问题,具体见:剑指offer – 数组中的逆序对

解法如下:

import java.util.*;
public class Solution {
     private static final int MOD = 1000000007;
     private int cnt = 0;

    //递归调用
     private int[] MergeSort(int[] array) {
        if (array.length < 2) 
            return array;
        int mid = array.length / 2;
        int[] left = Arrays.copyOfRange(array, 0, mid);
        int[] right = Arrays.copyOfRange(array, mid, array.length);
        return merge(MergeSort(left), MergeSort(right));
    }
    /**
     * 将两段有序数组结合成一个有序数组
     *
     * @param left
     * @param right
     * @return
     */
    private int[] merge(int[] left, int[] right) {
             int[] result = new int[left.length + right.length];
             int i = 0,j = 0,k = 0;
             while (i < left.length && j < right.length) {
                if (left[i] <= right[j]) {
                    result[k++] = left[i++];
                } else {
                    result[k++] = right[j++];
                     /*归并同时统计逆序对数量,因为归并的两个子序列都已有序,故当left[i] >  
                     right[j],有left[i...left.length - 1]均大于right[j]*/
                    this.cnt = (this.cnt % MOD + (left.length - i) % MOD) % MOD;
                }
             }
             while (i < left.length) {
                result[k++] = left[i++];
             }
             while (j < right.length) {
                result[k++] = right[j++];
             }
             return result;
     }

     public int InversePairs(int [] array) {
        MergeSort(array);
        return cnt % MOD;
    }
}

6、快速排序(Quick Sort)

① 基本思想
快速排序的基本思想:通过一趟排序将待排记录分隔成独立的两部分,其中一部分记录的关键字均比另一部分的关键字小,则可分别对这两部分记录继续进行排序,以达到整个序列有序。

② 算法描述
快速排序使用分治法来把一个数列分为两个子数列。具体算法描述如下:
1)从数列中挑出一个元素,称为 “基准”(pivot);
2)重新排序数列,所有比基准值小的元素放在基准前面,所有比基准值大的元素放在基准的后面(相同的数可以到任一边),该基准就处于数列的中间位置。这称为分区(partition)操作;
3)递归地(recursive)对小于基准值元素的子数列和大于基准值元素的子数列进行快速排序。

③ 动图演示

 
 

 

④ 代码实现
快速排序最核心的步骤就是partition操作,即从待排序的数列中选出一个数作为基准,将所有比基准值小的元素放在基准前面,所有比基准值大的元素放在基准的后面(相同的数可以到任一边),该基准就处于数列的中间位置。partition函数返回基准的位置,然后就可以对基准位置的左右子序列递归地进行同样的快排操作,从而使整个序列有序。

下面我们来介绍partition操作的两种实现方法:左右指针法挖坑法

方法一:左右指针法
基本思路:
1.将数组的最后一个数 right 作为基准数 key。
2.分区过程:从数组的首元素 begin 开始向后找比 key 大的数(begin 找大);end 开始向前找比 key 小的数(end 找小);找到后交换两者(swap),直到 begin >= end 终止遍历。最后将 begin(此时begin == end)和最后一个数交换( 这个时候 end 不是最后一个位置),即 key 作为中间数(左区间都是比key小的数,右区间都是比key大的数)
3.再对左右区间重复第二步,直到各区间只有一个数。

 
 
/**
 * partition操作
 * @param array
 * @param left 数列左边界
 * @param right 数列右边界
 * @return
 */
public static int partition(int[] array,int left,int right) {
        int begin = left;
        int end = right;
        int key = right;

        while( begin < end ) {
            //begin找大
            while(begin < end && array[begin] <= array[key])
                begin++;
            //end找小
            while(begin < end && array[end] >= array[key])
                end--;
            swap(array,begin,end);
        }
        swap(array,begin,right);
        return begin;   //返回基准位置
    }
 
/**
 * 交换数组内两个元素
 * @param array
 * @param i
 * @param j
 */
public static void swap(int[] array, int i, int j) {
    int temp = array[i];
    array[i] = array[j];
    array[j] = temp;
}

方法二:挖坑法
基本思路:
1.定义两个指针 left 指向起始位置,right 指向最后一个元素的位置,然后指定一个基准 key(right),作为坑。
2.left 寻找比基准(key)大的数字,找到后将 left 的数据赋给 right,left 成为一个坑,然后 right 寻找比基数(key)小的数字,找到将 right 的数据赋给 left,right 成为一个新坑,循环这个过程,直到 begin 指针与 end指针相遇,然后将 key 填入那个坑(最终:key的左边都是比key小的数,key的右边都是比key大的数),然后进行递归操作。

 
 
/**
 * partition操作
 * @param array
 * @param left 数列左边界
 * @param right 数列右边界
 * @return
 */
public static int partition(int[] array,int left,int right) {
        int key = array[right];//初始坑
        while(left < right) {
            //left找大
            while(left < right && array[left] <= key )
                left++;
            array[right] = array[left];//赋值,然后left作为新坑
            //right找小
            while(left = key)
                right--;
            array[left] = array[right];//right作为新坑
        }
        array[left] = key;
       /*将key赋值给left和right的相遇点,
        保持key的左边都是比key小的数,key的右边都是比key大的数*/
        return left;//最终返回基准
    }

实现了partition操作,我们就可以递归地进行快速排序了

 /**
 * 快速排序方法
 * @param array
 * @param left 数列左边界
 * @param right 数列右边界
 * @return
 */
public static void Quicksort(int array[], int left, int right) {
        if(left < right){
            int pos = partition(array, left, right);
            Quicksort(array, left, pos - 1);
            Quicksort(array, pos + 1, right);
        }
    }

⑤ 代码优化
我们之前选择基准的策略都是固定基准,即固定地选择序列的右边界值作为基准,但如果在待排序列几乎有序的情况下,选择的固定基准将是序列的最大(小)值,快排的性能不好(因为每趟排序后,左右两个子序列规模相差悬殊,大的那部分最后时间复杂度很可能会达到O(n2))。

下面提供几种常用的快排优化:
优化一:随机基准
每次随机选取基准值,而不是固定选取左或右边界值。将随机选取的基准值和右边界值进行交换,然后就回到了之前的解法。
只需要在 partition 函数前增加如下操作即可:

int random = (int) (left + Math.random() * (right - left + 1));
//随机选择 left ~ right 之间的一个位置作为基准
swap(array, random, right);
//把基准值交换到右边界

优化二:三数取中法
基本思想:
取第一个数,最后一个数,第(N/2)个数即中间数,三个数中数值中间的那个数作为基准值。
举个例子,对于int[] array = { 2,5,4,9,3,6,8,7,1,0},2、3、0分别是第一个数,第(N/2)个是数以及最后一个数,三个数中3最大,0最小,2在中间,所以取2为基准值。
实现getMid函数即可:

/**
     * 三数取中,返回array[left]、array[mid]、array[right]三者的中间者下标作为基准
     * @param array
     * @param left
     * @param right
     * @return
     */
public static int getMid(int[] array,int left,int right) {
        int mid = left + ((right - left) >> 1);
        int a = array[left];
        int b = array[mid];
        int c = array[right];
        if ((b <= a && a <= c) || (c <= a && a <= b)) { //a为中间值
            return left;
        }
        if ((a <= b && b <= c) || (c <= b && b <= a)) { //b为中间值
            return mid;
        }
        if ((a <= c && c <= b) || (b <= c && c <= a)) { //c为中间值
            return right;
        }
        return left;
    }

优化三:当待排序序列的长度分割到一定大小后,使用插入排序
在子序列比较小的时候,直接插入排序性能较好,因为对于有序的序列,插排可以达到O(n)的复杂度,如果序列比较小,使用插排效率要比快排高。
实现方式也很简单,快排是在子序列元素个数为 1 时才停止递归,我们可以设置一个阈值n,假设为5,则大于5个元素,子序列继续递归,否则选用插排。
此时QuickSort()函数如下:

public static void Quicksort(int array[], int left, int right) {
        if(right - left > 5){
            int pos = partition(array, left, right);
            Quicksort(array, left, pos - 1);
            Quicksort(array, pos + 1, right);
        }else{
            insertionSort(array);
        }
    }

这种优化非常实用。
实测发现当待排序列为 [100000,99999,99998,…,3,2,1] 时,不加插入优化的快排由于递归次数过多甚至抛出了 java.lang.StackOverflowError!

 
 

 

而加入了插入优化并选择阈值为 12500 时,排序用时如下:

 
 

实验发现阈值的选择也很关键,选择阈值为 5 ,排序用时如下:

 
 

优化四:三路划分
如果待排序列中重复元素过多,也会大大影响排序的性能,这是因为大量相同元素参与快排时,左右序列规模相差极大,快排将退化为冒泡排序,时间复杂度接近O(n2)。这时候,如果采用三路划分,则会很好的避免这个问题。
三路划分的思想是利用 partition 函数将待排序列划分为三部分:第一部分小于基准v,第二部分等于基准v,第三部分大于基准v。这样在递归排序区间的时候,我们就不必再对第二部分元素均相等的区间进行快排了,这在待排序列存在大量相同元素的情况下能大大提高快排效率。

来看下面的三路划分示意图:

 
 

说明:红色部分为小于基准v的序列,绿色部分为等于基准v的序列,白色部分由于还未被 cur 指针遍历到,属于大小未知的部分,蓝色部分为大于基准v的序列。
left 指针为整个待排区间的左边界,right 指针为整个待排区间的右边界。less 指针指向红色部分的最后一个数(即小于v的最右位置),more 指针指向蓝色部分的第一个数(即大于v的最左位置)。cur 指针指向白色部分(未知部分)的第一个数,即下一个要判断大小的位置。

算法思路:
1)由于最初红色和蓝色区域没有元素,初始化 less = left – 1,more = right + 1,cur = left。整个区间为未知部分(白色)。
2)如果当前 array[cur] < v,则 swap(array,++less,cur++),即把红色区域向右扩大一格(less指针后移),把 array[cur] 交换到该位置,cur 指针前移判断下一个数。
3)如果当前 array[cur] = v,则不必交换,直接 cur++
4)如果当前 array[cur] > v,则 swap(array,–more,cur),即把蓝色区域向左扩大一格(more指针前移),把 array[cur] 交换到该位置。特别注意!此时cur指针不能前移,这是因为交换到cur位置的元素来自未知区域,还需要进一步判断array[cur]。

利用三路划分,我们就可以递归地进行三路快排了!并且可以愉快地避开所有重复元素区间。

代码实现:

public static int[] partition(int[] array,int left,int right){
        int v = array[right]; //选择右边界为基准
        int less = left - 1; // < v 部分的最后一个数
        int more = right + 1; // > v 部分的第一个数
        int cur = left;
        while(cur < more){
            if(array[cur] < v){
                swap(array,++less,cur++);
            }else if(array[cur] > v){
                swap(array,--more,cur);
            }else{
                cur++;
            }
        }
        return new int[]{less + 1,more - 1};  //返回的是 = v 区域的左右下标
}

public static void Quicksort(int array[], int left, int right) {
        if (left < right) {
            int[] p = partition(array,left,right);
            Quicksort(array,left,p[0] - 1); //避开重复元素区间
            Quicksort(array,p[1] + 1,right);
        }
}

三路划分可以解决经典的荷兰国旗问题,具体见 leetcode 75

解法如下:

class Solution {
    // 方法一:使用计数排序解决,但需要两趟扫描,不符合要求
    /*public void sortColors(int[] nums) {
        int[] count = new int[3];
        for(int i = 0; i < nums.length; i++)
            count[nums[i]]++;
        int k = 0;
        for(int i = 0; i < 3; i++){
            for(int j = 0; j < count[i]; j++){
                nums[k++] = i;
            }
        }
    }*/
    // 方法二:使用快速排序的三路划分,时间复杂度为O(n),空间复杂度为O(1)
    public void sortColors(int[] nums) {
        int len = nums.length;
        if(len == 0)
            return;
        int less = -1;
        int more = len;
        int cur = 0;
        while(cur < more){
            if(nums[cur] == 0){
                swap(nums,++less,cur++);
            }else if(nums[cur] == 2){
                swap(nums,--more,cur);
            }else{
                cur++;
            }
        }
    }
    
    public static void swap(int[] array,int i,int j){
        int temp = array[i];
        array[i] = array[j];
        array[j] = temp;
    }

}

⑥ 时间复杂度
快速排序平均时间复杂度为O(nlogn),最好时间复杂度为O(nlogn),最坏时间复杂度为O(n2)。
最好情况:基准选择得当,partition函数每次恰好能均分序列,其递归树的深度就为logn,时间复杂度为O(nlogn)。
最坏情况:选择了最大或者最小数字作为基准,每次划分只能将序列分为一个元素与其他元素两部分,此时快速排序退化为冒泡排序,如果用树画出来,得到的将会是一棵单斜树,即所有的结点只有左(右)结点的树,树的深度为 n,时间复杂度为O(n2)。

⑦ 空间复杂度
快速排序的空间复杂度主要考虑递归时使用的栈空间。
在最好情况下,即partition函数每次恰好能均分序列,空间复杂度为O(logn);在最坏情况下,即退化为冒泡排序,空间复杂度为O(n)。平均空间复杂度为O(logn)。

⑧ 稳定性
快速排序是不稳定的。

⑨ 算法拓展
快速选择算法
快速选择算法用于求解 Kth Element 问题(无序数组第K大元素),使用快速排序的 partition() 进行实现。
快速排序的 partition() 方法会返回一个整数 j 使得 a[left..j-1] 小于等于 a[j],且 a[j+1..right] 大于等于 a[j]。
此时 a[j] 就是数组的第 j 小的元素,我们可以转换一下题意,第 k 大的元素就是第 nums.size() – k 小的元素。
找到 Kth Element 之后,再遍历一次数组,所有大于等于 Kth Element 的元素都是 TopK Elements。
时间复杂度 O(N),空间复杂度 O(1)。

还可以使用小根堆求解此问题,时间复杂度 O(NlogK),空间复杂度 O(K)。具体见:leetcode 215

解法如下:

class Solution {
private:
    int partition(vector<int>& array,int left,int right) {
        int key = array[right]; //初始坑
        while(left < right) {
            //left找大
            while(left < right && array[left] <= key )
                left++;
            array[right] = array[left]; //赋值,然后left作为新坑
            //right找小
            while(left = key)
                right--;
            array[left] = array[right]; //right作为新坑
        }
        array[left] = key;
       /*将key赋值给left和right的相遇点,
        保持key的左边都是比key小的数,key的右边都是比key大的数*/
        return left;//最终返回基准
    }

public:
    
    /*方法1:堆。用于求解 TopK Elements 问题,通过维护一个大小为 K 的小根堆,
    堆中的元素就是TopK Elements。堆顶元素就是 Kth Element。如果是第K小的元素
    就建立大根堆。时间复杂度 O(NlogK),空间复杂度 O(K)。*/
   /* int findKthLargest(vector& nums, int k) {
        int n = nums.size();
        priority_queue,greater> q; 
        for(int i = 0;i < n;i++){
                q.push(nums[i]);
                if(q.size() > k) 
                    q.pop();
        }
        return q.top();
    }*/
    
    /*方法2:快速选择。用于求解 Kth Element 问题,使用快速排序的 partition() 进行实现。
    快速排序的 partition() 方法会返回一个整数 j 使得 a[left..j-1] 小于等于 a[j],
    且 a[j+1..right] 大于等于 a[j],此时 a[j] 就是数组的第 j 小的元素,
    我们可以转换一下题意,第 k 大的元素就是第 nums.size() - k 小的元素。
    找到 Kth Element 之后,再遍历一次数组,所有大于等于 Kth Element 的元素都是
    TopK Elements。时间复杂度 O(N),空间复杂度 O(1)*/
   int findKthLargest(vector<int>& nums, int k) {
    k = nums.size() - k;
    int left = 0, right = nums.size() - 1;
    while (left < right) {
        int j = partition(nums, left, right);
        if (j == k) { //选择的基准等于目标,跳出循环
            break;
        } else if (j < k) { //选择的基准小于目标,在右侧子序列中继续选择
            left = j + 1; 
        } else { //选择的基准大于目标,在左侧子序列中继续选择
            right = j - 1; 
        }
      }
        return nums[k];
    }
};

拓展:Arrays.sort() 和 Collections.sort() 原理,Collections.sort() 底层调用的是 Arrays.sort()。Arrays.sort() 原理见 剖析JDK8中Arrays.sort底层原理及其排序算法的选择

7、堆排序(Heap Sort)

① 基本思想
堆排序是一种树形选择排序方法,它利用了这种数据结构。在排序的过程中,将array[0,…,n-1]看成是一颗完全二叉树的顺序存储结构,利用完全二叉树中双亲结点和孩子结点之间的关系,在当前无序区中选择关键字最大(最小)的元素。

② 概念
堆:堆是一种完全二叉树,且满足所有父节点的值均大于等于(或小于等于)其子节点的值。
大根堆(最大堆):满足所有父节点的值均大于等于其子节点的值的堆称为大根堆,堆顶元素是堆中元素的最大值。
小根堆(最小堆):满足所有父节点的值均小于等于其子节点的值的堆称为小根堆,堆顶元素是堆中元素的最小值。
堆的顺序存储结构:使用顺序数据结构(数组)存储堆,表示方法为:
1.数组按层序遍历的顺序存放完全二叉树的结点,下标为 0 处为堆顶,下标为 len – 1 处为堆尾。
2.结点 i 如果存在左孩子(下标不超过 len – 1 就存在),左孩子的下标为(2 * i + 1);如果存在右孩子,右孩子的下标为(2 * i + 2)。结点 i 的父结点下标为 (i – 1) / 2 (下标为 0 的结点除外,它没有父结点)。最后一个非叶子结点即为堆尾元素的父结点,下标为 (len – 1 – 1) / 2 = (len – 2) / 2。

③ 算法描述
1)将初始待排序关键字序列(R1,R2….Rn)构建成大顶堆,此堆为初始的无序区;
2)将堆顶元素R[1]与最后一个元素R[n]交换,此时得到新的无序区(R1,R2,……Rn-1)和新的有序区(Rn),且满足R[1,2…n-1]<=R[n];
3)由于交换后新的堆顶R[1]可能违反堆的性质,因此需要对当前无序区(R1,R2,……Rn-1)调整为新堆,然后再次将R[1]与无序区最后一个元素交换,得到新的无序区(R1,R2….Rn-2)和新的有序区(Rn-1,Rn)。不断重复此过程直到有序区的元素个数为(n-1),则整个排序过程完成。

④ 动图演示

 
 

 

⑤ 代码实现

//声明全局变量,用于记录数组array的长度;
static int len;
/**
 * 堆排序算法
 * @param array
 * @return
 */
public static int[] HeapSort(int[] array) {
        len = array.length;
        if (len == 0) return array;
        //1.构建一个大根堆
        buildMaxHeap(array);
        //2.循环将堆顶(最大值)与堆尾交换,删除堆尾元素,然后重新调整大根堆
        while (len > 0) {
            swap(array, 0, len - 1);
            len--; //原先的堆尾进入有序区,删除堆尾元素
            adjustHeap(array, 0); //重新调整大根堆
        }
        return array;
 }

 /**
   * 自顶向下调整以 i 为根的堆为大根堆
   * @param array
   * @param i
   */
public static void adjustHeap(int[] array, int i) {
        int maxIndex = i;
        //如果有左子树,且左子树大于父节点,则将最大指针指向左子树
        if (2 * i + 1 < len && array[2 * i + 1] > array[maxIndex])
            maxIndex = 2 * i + 1;
        //如果有右子树,且右子树大于父节点,则将最大指针指向右子树
        if (2 * i + 2 < len && array[2 * i + 2] > array[maxIndex])
            maxIndex = 2 * i + 2;
        //如果父节点不是最大值,则将父节点与最大值交换,并且递归调整与父节点交换的位置。
        if (maxIndex != i) {
            swap(array, maxIndex, i);
            adjustHeap(array, maxIndex);
        }
 }

 /**
  * 自底向上构建初始大根堆
  * @param array
  */
 public static void buildMaxHeap(int[] array) {
        //从最后一个非叶子节点开始自底向上构造大根堆
        for (int i = (len - 2) / 2; i >= 0; i--) { 
            adjustHeap(array, i);
        }
 }

拓展:
1)插入元素:只需要把待插入的元素放置在堆尾,然后 len++ 把其纳入堆,然后调用 adjustHeap 函数重新调整堆即可。
2)删除堆顶元素:只需要把堆顶元素交换到堆尾,然后 len– 把其移出堆,然后调用 adjustHeap 函数重新调整堆即可。

⑥ 时间复杂度
堆排序平均时间复杂度为O(nlogn),最好时间复杂度为O(nlogn),最坏时间复杂度为O(nlogn)。
堆排序的形式就是一棵二叉树,它需要遍历的次数就是二叉树的深度,而根据完全二叉树的可以得出它在任何情况下时间复杂度均是O(nlogn)。

⑦ 空间复杂度
堆排序使用了常数空间,空间复杂度为O(1)。

⑧ 稳定性
堆排序是不稳定的。

8、计数排序(Counting Sort)

① 基本思想
计数排序不是基于比较的排序算法,其核心在于将输入的数据值转化为键存储在额外开辟的数组空间中。 作为一种线性时间复杂度的排序,计数排序要求输入的数据必须是有确定范围的整数。

② 算法描述
1)找出待排序的数组中最大和最小的元素;
2)统计数组中每个值为 i 的元素出现的次数,存入数组C的第i项;
3)对所有的计数累加(从C中的第一个元素开始,每一项和前一项相加);
4)反向填充目标数组:将每个元素 i 放在新数组的第C(i)项,每放一个元素就将C(i)减去1。

③ 动图演示

 
 

 

④ 代码实现

/**
     * 计数排序
     *
     * @param array
     * @return
     */
    public static int[] CountingSort(int[] array) {
        if (array.length == 0) return array;
        int bias, min = Integer.MAX_VALUE, max = Integer.MIN_VALUE;
        for (int i = 0; i < array.length; i++) {
            max = Math.max(max, array[i]);
            min = Math.min(min, array[i]);
        }
       //计算偏移量,将 min ~ max 映射到 bucket 数组的 0 ~ (max - min) 位置上
        bias = -min; 
        int[] bucket = new int[max - min + 1];
        Arrays.fill(bucket, 0);
        for (int i = 0; i < array.length; i++) {
            bucket[array[i] + bias]++;
        }
        int index = 0, i = 0;
        while (index < array.length) {
            if (bucket[i] != 0) {
                array[index] = i - bias;
                bucket[i]--;
                index++;
            } else
                i++;
        }
        return array;
    }

⑤ 时间复杂度
计数排序平均时间复杂度为O(n + k),最好时间复杂度为O(n + k),最坏时间复杂度为O(n + k)。n 为遍历一趟数组计数过程的复杂度,k 为遍历一趟桶取出元素过程的复杂度。

⑥ 空间复杂度
计数排序空间复杂度为O(k),k为桶数组的长度。

⑦ 稳定性
计数排序是稳定的。

9、桶排序(Bucket Sort)

① 基本思想
桶排序与计数排序很相似,不过现在的桶不单计数,是实实在在地放入元素。按照映射函数将数据分配到不同的桶里,每个桶内元素再分别排序(可能使用别的排序算法),最后拼接各个桶中排好序的数据。映射函数人为设计,但要保证桶 i 中的数均小于桶 j (i < j)中的数,即必须桶间必须有序,桶内可以无序,可以考虑按照数的区间范围划分桶。下面代码的桶映射函数为:(i - min) / arr.length。

② 算法描述
1)设置一个定量的数组当作空桶;
2)遍历输入数据,并且把数据一个一个放到对应的桶里去;
3)对每个不是空的桶的桶内元素进行排序(可以使用直接插入排序等);
4)从不是空的桶里把排好序的数据拼接起来。

③ 动图演示

 
 

 

④ 代码实现

public static int[] bucketSort(int[] array){
    int max = Integer.MIN_VALUE;
    int min = Integer.MAX_VALUE;
    for(int i = 0; i < array.length; i++){
        max = Math.max(max, array[i]);
        min = Math.min(min, array[i]);
    }
    
    /*桶映射函数:自己设计,要保证桶 i 的数均小于桶 j (i < j)的数,
      即必须桶间必须有序,桶内可以无序。这里桶映射函数为:(i - min) / arr.length*/
    int bucketNum = (max - min) / array.length + 1;
    ArrayList> bucketArr = new ArrayList<>(bucketNum);
    for(int i = 0; i < bucketNum; i++){
        bucketArr.add(new ArrayList());
    }
    
    //将每个元素放入桶
    for(int i = 0; i < array.length; i++){
        int num = (array[i] - min) / (array.length);
        bucketArr.get(num).add(array[i]);
    }
    
    //对每个桶进行排序
    for(int i = 0; i < bucketArr.size(); i++){
        Collections.sort(bucketArr.get(i));
    }
    
   int k = 0;
   for(int i = 0; i < bucketArr.size(); i++){
      for(int j = 0;j < bucketArr.get(i).size();j++) {
           array[k++] = bucketArr.get(i).get(j);
      }
  }
  return array;
}

⑤ 时间复杂度
桶排序平均时间复杂度为O(n + k),最好时间复杂度为O(n + k),最坏时间复杂度为O(n2)。

⑥ 空间复杂度
桶排序空间复杂度为O(n + k)。

⑦ 稳定性
桶排序是稳定的。

10、基数排序(Radix Sort)

① 基本思想
基数排序是按照低位先排序,然后收集;再按照高位排序,然后再收集;依次类推,直到最高位。有时候有些属性是有优先级顺序的,先按低优先级排序,再按高优先级排序。最后的次序就是高优先级高的在前,高优先级相同的低优先级高的在前。

② 算法描述
1)取得数组中的最大数,并取得位数;
2)array 为原始数组,从最低位开始取每个位组成 radix 数组;

  1. 对 radix 进行计数排序(利用计数排序适用于小范围数的特点);

③ 动图演示

 
 

 

④ 代码实现

    /**
     * 基数排序
     * @param array
     * @return
     */
public static int[] RadixSort(int[] array) {
     if (array == null || array.length < 2)
        return array;
        // 1.先算出最大数的位数;
    int max = Integer.MIN_VALUE;
    for (int i = 0; i < array.length; i++) {
            max = Math.max(max, array[i]);
    }
    int maxDigit = 0;
    while (max != 0) {
            max /= 10;
            maxDigit++;
    }
    int div = 1;
    ArrayList> bucketList = new ArrayList>();
    for (int i = 0; i < 10; i++)
        bucketList.add(new ArrayList());
        //2.进行maxDigit趟分配
    for (int i = 0; i < maxDigit; i++,div *= 10) {
            for (int j = 0; j < array.length; j++) {
                int num = (array[j] / div) % 10;
                bucketList.get(num).add(array[j]);
            }
        //3.收集
            int index = 0;
            for (int j = 0; j < bucketList.size(); j++) {
                for (int k = 0; k < bucketList.get(j).size(); k++)
                    array[index++] = bucketList.get(j).get(k);
                bucketList.get(j).clear();
            }
   }
   return array;
}

⑤ 时间复杂度
基数排序平均时间复杂度为O(n * k),最好时间复杂度为O(n * k),最坏时间复杂度为O(n * k)。

⑥ 空间复杂度
基数排序空间复杂度为O(n + k)。

⑦ 稳定性
基数排序是稳定的。

三、各排序算法应用场景及选择

1)若 n 较小(如n ≤ 50)时,可采用直接插入或简单选择排序。
2)若元素初始状态基本有序(正序),直接插入、冒泡或快速排序为宜。
3)若 n 较大,则应采用时间复杂度为O(nlogn)的排序方法:快速排序、堆排序或归并排序。
快速排序是目前基于比较的内部排序中被认为是最好的方法,当待排序的关键字是随机分布时,快速排序的平均时间最短。
堆排序所需的辅助空间少于快速排序,并且不会出现快速排序可能出现的最坏情况。这两种排序都是不稳定的。
若要求排序稳定,则可选用归并排序。但本文介绍的从单个记录起进行两两归并的归并排序算法并不值得提倡,通常可以将它和直接插入排序结合在一起使用。先利用直接插入排序求得较长的有序数列,然后再两两归并之。因为直接插入排序是稳定的,所以改进后的归并排序仍是稳定的。
4)当范围已知,且空间不是很重要的情况下可以考虑使用桶排序。

链接:https://www.jianshu.com/p/47170b1ced23

 

java实现二叉树常见操作

package com.xk.test.struct.newp;

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.Queue;
import java.util.Stack;

public class MyBinaryTree {

    /**
     * 插入节点
     * @param root
     * @param node
     * @return
     */
    TreeNode insertNode(TreeNode root,TreeNode node){
        if(root == node){
            return node;
        }
        TreeNode tmp = new TreeNode();
        tmp = root;
        TreeNode last = null;
        while(tmp!=null){
            last = tmp;
            if(tmp.val>node.val){
                tmp = tmp.left;
            }else{
                tmp = tmp.right;
            }
        }
        if(last!=null){
            if(last.val>node.val){
                last.left = node;
            }else{
                last.right = node;
            }
        }
        return root;
    }
    
    /**
     * 递归解法前序遍历
     * @param root
     * @return
     */
    ArrayList preOrderReverse(TreeNode root){
        ArrayList result = new ArrayList();
        preOrder2(root,result);
        return result;
        
    }
    void preOrder2(TreeNode root,ArrayList result){
        if(root == null){
            return;
        }
        result.add(root.val);
        preOrder2(root.left,result);
        preOrder2(root.right,result);
    }
    
    /**
     * 迭代解法前序遍历
     * @param root
     * @return
     */
    ArrayList preOrder(TreeNode root){
        Stack stack = new Stack();
        ArrayList list = new ArrayList();
        if(root == null){
            return list;
        }
        stack.push(root);
        while(!stack.empty()){
            TreeNode node = stack.pop();
            list.add(node.val);
            if(node.right!=null){
                stack.push(node.right);
            }
            if(node.left != null){
                stack.push(node.left);
            }
            
        }
        return list;
    }
        
    /**
     * 中序遍历
     * @param root
     * @return
     */
    ArrayList inOrder(TreeNode root){
        ArrayList list = new ArrayList();
        Stack stack = new Stack();
        TreeNode current = root;
        while(current != null|| !stack.empty()){
            while(current != null){
                stack.add(current);
                current = current.left;
            }
            current = stack.peek();
            stack.pop();
            list.add(current.val);
            current = current.right;
            
        }
        return list;
    }
    
    /**
     * 后序遍历
     * @param root
     * @return
     */
    ArrayList postOrder(TreeNode root){
        ArrayList list = new ArrayList();
        if(root == null){
            return list;
        }
        list.addAll(postOrder(root.left));
        list.addAll(postOrder(root.right));
        list.add(root.val);
        return list;
    }
    
    /**
     * 最大深度
     * @param node
     * @return
     */
    int maxDeath(TreeNode node){
        if(node==null){
            return 0;
        }
        int left = maxDeath(node.left);
        int right = maxDeath(node.right);
        return Math.max(left,right) + 1;
    }
    
    /**
     * 层次遍历
     * @param root
     * @return
     */
    ArrayList> levelOrder(TreeNode root){
        ArrayList> result = new ArrayList>();
        if(root == null){
            return result;
        }
        Queue queue = new LinkedList();
        queue.offer(root);
        while(!queue.isEmpty()){
            int size = queue.size();
            ArrayList level = new ArrayList();
            for(int i = 0;i < size ;i++){
                TreeNode node = queue.poll();
                level.add(node.val);
                if(node.left != null){
                    queue.offer(node.left);
                }
                if(node.right != null){
                    queue.offer(node.right);
                }
            } 
            result.add(level);
        }
        return result;
    }
    
    /**
     * 最小深度
     * @param root
     * @return
     */
    int getMinDepth(TreeNode root){
        if(root == null){
            return 0;
        }
        return getMin(root);
    }
    int getMin(TreeNode root){
        if(root == null){
            return Integer.MAX_VALUE;
        }
        if(root.left == null&&root.right == null){
            return 1;
        }
        return Math.min(getMin(root.left),getMin(root.right)) + 1;
    }
    
    /**
     * 节点的个数
     * @param root
     * @return
     */
    int numOfTreeNode(TreeNode root){
        if(root == null){
            return 0;
            
        }
        int left = numOfTreeNode(root.left);
        int right = numOfTreeNode(root.right);
        return left + right + 1;
    }
    
    /**
     * 叶子节点的个数
     * @param root
     * @return
     */
    int numsOfNodeTreeNode(TreeNode root){
        if(root == null){
            return 0;
        }
        if(root.left==null&&root.right==null){
            return 1;
        }
        return numsOfNodeTreeNode(root.left)+numsOfNodeTreeNode(root.right);
        
    }
    
    /**
     * 第k层节点的个数
     * @param root
     * @param k
     * @return
     */
    int numsOfkLevelTreeNode(TreeNode root,int k){
        if(root == null||k<1){
            return 0;
        }
        if(k==1){
            return 1;
        }
        int numsLeft = numsOfkLevelTreeNode(root.left,k-1);
        int numsRight = numsOfkLevelTreeNode(root.right,k-1);
        return numsLeft + numsRight;
    }
    
    /**
     * 翻转二叉树or镜像二叉树
     * @param root
     * @return
     */
    TreeNode mirrorTreeNode(TreeNode root){
        if(root == null){
            return null;
        }
        TreeNode left = mirrorTreeNode(root.left);
        TreeNode right = mirrorTreeNode(root.right);
        root.left = right;
        root.right = left;
        return root;
    }
    
    /**
     * 两个二叉树是否互为镜像
     * @param t1
     * @param t2
     * @return
     */
    boolean isMirror(TreeNode t1,TreeNode t2){
        if(t1==null&&t2==null){
            return true;
        }
        if(t1==null||t2==null){
            return false;
        }
        if(t1.val != t2.val){
            return false;
        }
        return isMirror(t1.left,t2.right)&&isMirror(t1.right,t2.left);
    }
    
    /**
     * 是否是平衡二叉树
     * 它是一棵空树或它的左右两个子树的高度差的绝对值不超过1,并且左右两个子树都是一棵平衡二叉树。
     * @param node
     * @return
     */
    boolean isBalanced(TreeNode node){
        return maxDeath2(node)!=-1;
    }
    int maxDeath2(TreeNode node){
        if(node == null){
            return 0;
        }
        int left = maxDeath2(node.left);
        int right = maxDeath2(node.right);
        if(left==-1||right==-1||Math.abs(left-right)>1){
            return -1;
        }
        return Math.max(left, right) + 1;
    }
    
    /**
     * 是否是完全二叉树
     * 对于深度为K的,有n个结点的二叉树,当且仅当其每一个结点都与深度为K的满二叉树中编号从1至n的结点一一对应时称之为完全二叉树。
     * @param root
     * @return
     */
    boolean isCompleteTreeNode(TreeNode root){
        if(root == null){
            return false;
        }
        Queue queue = new LinkedList();
        queue.add(root);
        boolean result = true;
        boolean hasNoChild = false;
        while(!queue.isEmpty()){
            TreeNode current = queue.remove();
            if(hasNoChild){
                if(current.left!=null||current.right!=null){
                    result = false;
                    break;
                }
            }else{
                if(current.left!=null&¤t.right!=null){
                    queue.add(current.left);
                    queue.add(current.right);
                }else if(current.left!=null&¤t.right==null){
                    queue.add(current.left);
                    hasNoChild = true;
                    
                }else if(current.left==null&¤t.right!=null){
                    result = false;
                    break;
                }else{
                    hasNoChild = true;
                }
            }
            
        }
        return result;
    }
    
    
    /**
     * 是否是合法的二叉查找树(BST)
    一棵BST定义为:
    节点的左子树中的值要严格小于该节点的值。
    节点的右子树中的值要严格大于该节点的值。
    左右子树也必须是二叉查找树。
    一个节点的树也是二叉查找树。
     */
    public int lastVal = Integer.MAX_VALUE;
    public boolean firstNode = true;
    public boolean isValidBST(TreeNode root) {
        // write your code here
        if(root==null){
            return true;
        }
        if(!isValidBST(root.left)){
            return false;
        }
        if(!firstNode&&lastVal >= root.val){
            return false;
        }
        firstNode = false;
        lastVal = root.val;
        if (!isValidBST(root.right)) {
            return false;
        }
        return true;
    }
    
    /**
     * 把二叉树打印成多行
     * @param pRoot
     * @return
     */
    ArrayList > Print(TreeNode pRoot) {
        ArrayList > res = new ArrayList >();
        if(pRoot == null)
            return res;
        ArrayList temp = new ArrayList();
        Queue layer = new LinkedList();
        layer.offer(pRoot);
        int start = 0, end = 1;
        while(!layer.isEmpty()){
            TreeNode node = layer.poll();
            temp.add(node.val);
            start ++;
            if(node.left != null)
                layer.add(node.left);
            if(node.right != null)
                layer.add(node.right);
            if(start == end){
                start = 0;
                res.add(temp);
                temp = new ArrayList();
                end = layer.size();
            }
        }
        return res;
    }
    
    /**
     * 按之字形顺序打印二叉树
     * @param pRoot
     * @return
     */
    public ArrayList > PrintZ(TreeNode pRoot) {
        ArrayList > res = new ArrayList >();
        Stack s1 = new Stack();
        Stack s2 = new Stack();
        int flag = 1;
        if(pRoot == null)
            return res;
        s2.push(pRoot);
        ArrayList temp = new ArrayList();
        while(!s1.isEmpty() || !s2.isEmpty()){
            if(flag % 2 != 0){
                while(!s2.isEmpty()){
                    TreeNode node = s2.pop();
                    temp.add(node.val);
                    if(node.left != null){
                        s1.push(node.left);
                    }
                    if(node.right != null){
                        s1.push(node.right);
                    }
                }
            }
            if(flag % 2 == 0){
                while(!s1.isEmpty()){
                    TreeNode node = s1.pop();
                    temp.add(node.val);
                    if(node.right != null){
                        s2.push(node.right);
                    }
                    if(node.left != null){
                        s2.push(node.left);
                    }
                }
            }
            res.add(new ArrayList(temp));
            temp.clear();
            flag ++;
        }
        return res;
    }

}

class TreeNode{
    int val;
    //左孩子
    TreeNode left;
    //右孩子
    TreeNode right;
}

 

转自:https://www.jianshu.com/p/0190985635eb

           https://www.weiweiblog.cn/printz/

java实现单链表常见操作

一、概述:

  本文主要总结单链表常见操作的实现,包括链表结点添加、删除;链表正向遍历和反向遍历、链表排序、判断链表是否有环、是否相交、获取某一结点等。

二、概念:

链表:

  一种重要的数据结构,HashMap等集合的底层结构都是链表结构。链表以结点作为存储单元,这些存储单元可以是不连续的。每个结点由两部分组成:存储的数值+前序结点和后序结点的指针。即有前序结点的指针又有后序结点的指针的链表称为双向链表,只包含后续指针的链表为单链表,本文总结的均为单链表的操作。

单链表结构:

Java中单链表采用Node实体类类标识,其中data为存储的数据,next为下一个节点的指针:

复制代码
package com.algorithm.link;
/**
 * 链表结点的实体类
 * @author bjh
 *
 */
public class Node {
    Node next = null;//下一个结点
    int data;//结点数据
    public Node(int data){
        this.data = data;
    }
}
复制代码

三、链表常见操作:

复制代码
package com.algorithm.link;

import java.util.Hashtable;
/**
 * 单链表常见算法
 * @author bjh
 *
 */
public class MyLinkedList {
    
    /**链表的头结点*/
    Node head = null;
        
    /**
     * 链表添加结点:
     * 找到链表的末尾结点,把新添加的数据作为末尾结点的后续结点
     * @param data
     */
    public void addNode(int data){
        Node newNode = new Node(data);
        if(head == null){
            head = newNode;
            return;
        }
        Node temp = head;
        while(temp.next != null){
            temp = temp.next;
        }
        temp.next = newNode;
    }
    
    /**
     * 链表删除结点:
     * 把要删除结点的前结点指向要删除结点的后结点,即直接跳过待删除结点
     * @param index
     * @return
     */
    public boolean deleteNode(int index){
        if(index<1 || index>length()){//待删除结点不存在
            return false;
        }
        if(index == 1){//删除头结点
            head = head.next;
            return true;
        }
        Node preNode = head;
        Node curNode = preNode.next;
        int i = 1;
        while(curNode != null){
            if(i==index){//寻找到待删除结点
                preNode.next = curNode.next;//待删除结点的前结点指向待删除结点的后结点
                return true;
            }
            //当先结点和前结点同时向后移
            preNode = preNode.next;
            curNode = curNode.next;
            i++;
        }
        return true;
    }
    
    /**
     * 求链表的长度
     * @return
     */
    public int length(){
        int length = 0;
        Node curNode = head;
        while(curNode != null){
            length++;
            curNode = curNode.next;
        }
        return length;
    }
    
    /**
     * 链表结点排序,并返回排序后的头结点:
     * 选择排序算法,即每次都选出未排序结点中最小的结点,与第一个未排序结点交换
     * @return
     */
    public Node linkSort(){
        Node curNode = head;
        while(curNode != null){
            Node nextNode = curNode.next;
            while(nextNode != null){
                if(curNode.data > nextNode.data){
                    int temp = curNode.data;
                    curNode.data = nextNode.data;
                    nextNode.data = temp;
                }
                nextNode = nextNode.next;
            }
            curNode = curNode.next;
        }
        return head;
    }
    
    /**
     * 打印结点
     */
    public void printLink(){
        Node curNode = head;
        while(curNode !=null){
            System.out.print(curNode.data+" ");
            curNode = curNode.next;
        }
        System.out.println();
    }
    
    /**
     * 去掉重复元素:
     * 需要额外的存储空间hashtable,调用hashtable.containsKey()来判断重复结点
     */
    public void distinctLink(){
        Node temp = head;
        Node pre = null;
        Hashtable hb = new Hashtable();
        while(temp != null){
            if(hb.containsKey(temp.data)){//如果hashtable中已存在该结点,则跳过该结点
                pre.next = temp.next;
            }else{//如果hashtable中不存在该结点,将结点存到hashtable中
                hb.put(temp.data, 1);
                pre=temp;
            }
            temp = temp.next;
        }
    }
    
    /**
     * 返回倒数第k个结点,
     * 两个指针,第一个指针向前移动k-1次,之后两个指针共同前进,
     * 当前面的指针到达末尾时,后面的指针所在的位置就是倒数第k个位置
     * @param k
     * @return
     */
    public Node findReverNode(int k){
        if(k<1 || k>length()){//第k个结点不存在
            return null;
        }
        Node first = head;
        Node second = head;
        for(int i=0; i//前移k-1步
            first = first.next;
        }
        while(first.next != null){
            first = first.next;
            second = second.next;
        }
        return second;
    }
    
    /**
     * 查找正数第k个元素
     */
    public Node findNode(int k){
        if(k<1 || k>length()){//不合法的k
            return null;
        }
        Node temp = head;
        for(int i = 0; i){
            temp = temp.next;
        }
        return temp;
    }
    
    /**
     * 反转链表,在反转指针钱一定要保存下个结点的指针
     */
    public void reserveLink(){
        Node curNode = head;//头结点
        Node preNode = null;//前一个结点
        while(curNode != null){
            Node nextNode = curNode.next;//保留下一个结点
            curNode.next = preNode;//指针反转
            preNode = curNode;//前结点后移
            curNode = nextNode;//当前结点后移
        }
        head = preNode;
    }
    
    /**
     * 反向输出链表,三种方式:
     * 方法一、先反转链表,再输出链表,需要链表遍历两次
     * 方法二、把链表中的数字放入栈中再输出,需要维护额外的栈空间
     * 方法三、依据方法2中栈的思想,通过递归来实现,递归起始就是将先执行的数据压入栈中,再一次出栈
     */
    public void reservePrt(Node node){
        if(node != null){
            reservePrt(node.next);
            System.out.print(node.data+" ");
        }
    }
    
    /**
     * 寻找单链表的中间结点:
     * 方法一、先求出链表的长度,再遍历1/2链表长度,寻找出链表的中间结点
     * 方法二、:
     * 用两个指针遍历链表,一个快指针、一个慢指针,
     * 快指针每次向前移动2个结点,慢指针一次向前移动一个结点,
     * 当快指针移动到链表的末尾,慢指针所在的位置即为中间结点所在的位置 
     */
    public Node findMiddleNode(){
        Node slowPoint = head;
        Node quickPoint = head;
        //quickPoint.next == null是链表结点个数为奇数时,快指针已经走到最后了
        //quickPoint.next.next == null是链表结点数为偶数时,快指针已经走到倒数第二个结点了
        //链表结点个数为奇数时,返回的是中间结点;链表结点个数为偶数时,返回的是中间两个结点中的前一个
        while(quickPoint.next != null && quickPoint.next.next != null){
            slowPoint = slowPoint.next;
            quickPoint = quickPoint.next.next;
        }
        return slowPoint;
    }
    
    
    /**
     * 判断链表是否有环:
     * 设置快指针和慢指针,慢指针每次走一步,快指针每次走两步
     * 当快指针与慢指针相等时,就说明该链表有环
     */
    public boolean isRinged(){
        if(head == null){
            return false;
        }
        Node slow = head;
        Node fast = head;
        while(fast.next != null && fast.next.next != null){
            slow = slow.next;
            fast = fast.next.next;
            if(fast == slow){
                return true;
            }        
        }
        return false;
    }
    
    /**
     * 返回链表的最后一个结点
     */
    public Node getLastNode(){
        Node temp = head;
        while(temp.next != null){
            temp = temp.next;
        }
        return temp;
    }
    
    /**
     * 在不知道头结点的情况下删除指定结点:
     * 删除结点的重点在于找出其前结点,使其前结点的指针指向其后结点,即跳过待删除结点
     * 1、如果待删除的结点是尾结点,由于单链表不知道其前结点,没有办法删除
     * 2、如果删除的结点不是尾结点,则将其该结点的值与下一结点交换,然后该结点的指针指向下一结点的后续结点
     */
    public boolean deleteSpecialNode(Node n){
        if(n.next == null){
            return false;
        }else{
            //交换结点和其后续结点中的数据
            int temp = n.data;
            n.data = n.next.data;
            n.next.data = temp;
            //删除后续结点
            n.next = n.next.next;
            return true;
        }
    }
    
    /**
     * 判断两个链表是否相交:
     * 两个链表相交,则它们的尾结点一定相同,比较两个链表的尾结点是否相同即可
     */
    public boolean isCross(Node head1, Node head2){
        Node temp1 = head1;
        Node temp2 = head2;
        while(temp1.next != null){
            temp1 = temp1.next;
        }
        while(temp2.next != null){
            temp2 = temp2.next;
        }
        if(temp1 == temp2){
            return true;
        }
        return false;
    }
    
    /**
     * 如果链表相交,求链表相交的起始点:
     * 1、首先判断链表是否相交,如果两个链表不相交,则求相交起点没有意义
     * 2、求出两个链表长度之差:len=length1-length2
     * 3、让较长的链表先走len步
     * 4、然后两个链表同步向前移动,没移动一次就比较它们的结点是否相等,第一个相等的结点即为它们的第一个相交点
     */
    public Node findFirstCrossPoint(MyLinkedList linkedList1, MyLinkedList linkedList2){
        //链表不相交
        if(!isCross(linkedList1.head,linkedList2.head)){
            return null;
        }else{
            int length1 = linkedList1.length();//链表1的长度
            int length2 = linkedList2.length();//链表2的长度
            Node temp1 = linkedList1.head;//链表1的头结点
            Node temp2 = linkedList2.head;//链表2的头结点
            int len = length1 - length2;//链表1和链表2的长度差
            
            if(len > 0){//链表1比链表2长,链表1先前移len步        
                for(int i=0; i){
                    temp1 = temp1.next;
                }
            }else{//链表2比链表1长,链表2先前移len步
                for(int i=0; i){
                    temp2 = temp2.next;
                }
            }
            //链表1和链表2同时前移,直到找到链表1和链表2相交的结点
            while(temp1 != temp2){
                temp1 = temp1.next;
                temp2 = temp2.next;
            }
            return temp1;
        }
    }
    
}
复制代码

四、测试类:

复制代码
package com.algorithm.link;
/**
 * 单链表操作测试类
 * @author bjh
 *
 */
public class Test {

    public static void main(String[] args){
        MyLinkedList myLinkedList = new MyLinkedList();
        //添加链表结点
        myLinkedList.addNode(9);
        myLinkedList.addNode(8);
        myLinkedList.addNode(6);
        myLinkedList.addNode(3);
        myLinkedList.addNode(5);
        
        //打印链表
        myLinkedList.printLink();
        
        /*//测试链表结点个数
        System.out.println("链表结点个数为:" + myLinkedList.length());
        
        //链表排序
        Node head = myLinkedList.linkSort();
        System.out.println("排序后的头结点为:" + head.data);
        myLinkedList.printLink();
        
        //去除重复结点
        myLinkedList.distinctLink();
        myLinkedList.printLink();
        
        //链表反转
        myLinkedList.reserveLink();
        myLinkedList.printLink();
        
        //倒序输出/遍历链表
        myLinkedList.reservePrt(myLinkedList.head);
        
        //返回链表的中间结点
        Node middleNode = myLinkedList.findMiddleNode();
        System.out.println("中间结点的数值为:"+middleNode.data);
        
        //判断链表是否有环
        boolean isRinged = myLinkedList.isRinged();
        System.out.println("链表是否有环:" + isRinged);
        //将链表的最后一个结点指向头结点,制造有环的效果
        Node lastNode = myLinkedList.getLastNode();
        lastNode.next = myLinkedList.head;
        isRinged = myLinkedList.isRinged();
        System.out.println("链表是否有环:" + isRinged);
        
        //删除指定结点
        Node nk = myLinkedList.findReverNode(3);
        System.out.println(nk.data);
        myLinkedList.deleteSpecialNode(nk);
        myLinkedList.printLink();
        
        //链表是否相交
        //新链表
        MyLinkedList myLinkedList1 = new MyLinkedList();
        myLinkedList1.addNode(1);
        myLinkedList1.addNode(2);
        myLinkedList1.printLink();
        System.out.println("链表一和链表二是否相交"+myLinkedList.isCross(myLinkedList.head, myLinkedList1.head));
        //把第二个链表从第三个结点开始接在第二个链表的后面,制造相交的效果
        myLinkedList1.findNode(2).next = myLinkedList.findNode(3);
        myLinkedList1.printLink();
        System.out.println("链表一和链表二是否相交"+myLinkedList.isCross(myLinkedList.head, myLinkedList1.head));
        */
        
        //如果两个链表相交求链表相交的结点的值
        MyLinkedList myLinkedList1 = new MyLinkedList();
        myLinkedList1.addNode(1);
        myLinkedList1.addNode(2);
        myLinkedList1.findNode(2).next = myLinkedList.findNode(3);
        myLinkedList1.printLink();
        Node n = myLinkedList1.findFirstCrossPoint(myLinkedList, myLinkedList1);
        if(n == null){
            System.out.println("链表不相交");
        }else{
            System.out.println("两个链表相交,第一个交点的数值为:" + n.data);
        }
    }
}

 

转自:https://www.cnblogs.com/bjh1117/p/8335108.html