死锁

内容纲要

什么是死锁

  • 发送在并发
  • 互不相让:当两个(或更多) 线程(或进程)相互持有对方所需要的资源,又不主动释放,导致所有人都无法继续前进导致程序陷入无尽的阻塞,这就是死锁。
  • 如果个线程之间的依赖关系是环形,存在环路的锁的依赖关系,那么也可能会发生死锁

死锁的影响

死锁的影响在不同系统中是不一样的,这取决于系统对死锁的处理能力

  • 数据库中:检测并放弃事务
  • JVM中:无法自动处理

几率不高但危害大

  • 不一定发生,但是遵守“墨菲定律"
  • 一旦发生,多是高并发场景,影响用户多
  • 整个系统崩溃、子系统崩溃、性能降低
  • 压力测试无法找出所有潜在的死锁

发生死锁的列子

最简单的情况

-代码

/**
 * 描述:     必定发生死锁的情况
 */
public class MustDeadLock implements Runnable {
    int flag = 1;
    static Object o1 = new Object();
    static Object o2 = new Object();
    public static void main(String[] args) {
        MustDeadLock r1 = new MustDeadLock();
        MustDeadLock r2 = new MustDeadLock();
        r1.flag = 1;
        r2.flag = 0;
        Thread t1 = new Thread(r1);
        Thread t2 = new Thread(r2);
        t1.start();
        t2.start();
    }
    @Override
    public void run() {
        System.out.println("flag = " + flag);
        if (flag == 1) {
            synchronized (o1) {
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                synchronized (o2) {
                    System.out.println("线程1成功拿到两把锁");
                }
            }
        }
        if (flag == 0) {
            synchronized (o2) {
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                synchronized (o1) {
                    System.out.println("线程2成功拿到两把锁");
                }
            }
        }
    }
}
  • 注意看退出信号: Process finished with exit code 130
    (interrupted by signal 2: SIGINT) ,是不正常退出的信号
    对比正常结束的程序的结束信号是0

实际生产中的例子:转账

  • 需要把锁
  • 获取两把锁成功,且余额大于0 ,则扣除转出人,增加收款人的余额,是原子操作
  • 顺序相反导致死锁

模拟多人随机转账

  • 5万人很多,但是依然会发生死锁,墨菲定律
  • 复习:发生死锁几率不高但危害大

死锁的4个必要条件

1. 互斥条件
2. 请求与保持条件
3. 不剥夺条件
4. 循环等待条件

如何定位死锁

  • jstack pid

  • ThreadMXBean代码演示
ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
long[] deadlockedThreads = threadMXBean.findDeadlockedThreads();
if (deadlockedThreads != null && deadlockedThreads.length > 0) {
    for (int i = 0; i < deadlockedThreads.length; i++) {
        ThreadInfo threadInfo = threadMXBean.getThreadInfo(deadlockedThreads[i]);
        System.out.println("发现死锁" + threadInfo.getThreadName());
    }
}

修复死锁的策略

线上发生死锁应该怎么办?

  • 线上问题都需要防患于未然,不造成损失地扑灭几乎已经是不可能
  • 保存案发现场然后立刻重启服务器
  • 暂时保证线上服务的安全,然后在利用刚才保存的信息,排查死锁,修改代码,重新发版

常见修复策略

  • 避免策略:哲学家就餐的换手方案、转账换序方案
    • 实际上不在乎获取锁的顺序
    • 通过hashcode来决定获取锁的顺序、冲突时需要“加时赛(新增一把锁,让他们竞争)
    • 主键(主键是唯一的,不可能出现相同的情况)就更方便
// 获取到对应的哈希值,判断对象之间的大小,将获取锁的顺序改为永远先获取到哈希值小的索
int fromHash = System.identityHashCode(from);
int toHash = System.identityHashCode(to);
if (fromHash < toHash) {
    synchronized (from) {
        synchronized (to) {
            new Helper().transfer();
        }
    }
}
else if (fromHash > toHash) {
    synchronized (to) {
        synchronized (from) {
            new Helper().transfer();
        }
    }
}else  {
//当哈希值相同的情况下,加一个锁,然后再竞争这把锁,避免哈希冲突
    synchronized (lock) {
        synchronized (to) {
            synchronized (from) {
                new Helper().transfer();
            }
        }
    }
}
  • 检测与恢复策略:一段时间检测是否有死锁,如果有就剥夺某一个资源,来打开死锁
  • 鸵鸟策略:鸵鸟这种动物在遇到危险的时候,通常就会把头埋在地上,这样一来它就看不到危险了。而鸵鸟策略的意思就是说,如果我们发生死锁的概率极其低,那么我们就直接忽略它,直到死锁发生的时候,再人工修复

哲学家就餐问题

代码演示:哲学家进入死锁

/**
 * 描述:     演示哲学家就餐问题导致的死锁
 */
public class DiningPhilosophers {

    public static class Philosopher implements Runnable {

        private Object leftChopstick;

        public Philosopher(Object leftChopstick, Object rightChopstick) {
            this.leftChopstick = leftChopstick;
            this.rightChopstick = rightChopstick;
        }

        private Object rightChopstick;

        @Override
        public void run() {
            try {
                while (true) {
                    doAction("Thinking");
                    synchronized (leftChopstick) {
                        doAction("Picked up left chopstick");
                        synchronized (rightChopstick) {
                            doAction("Picked up right chopstick - eating");
                            doAction("Put down right chopstick");
                        }
                        doAction("Put down left chopstick");
                    }
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        private void doAction(String action) throws InterruptedException {
            System.out.println(Thread.currentThread().getName() + " " + action);
            Thread.sleep((long) (Math.random() * 10));
        }
    }

    public static void main(String[] args) {
        Philosopher[] philosophers = new Philosopher[5];
        Object[] chopsticks = new Object[philosophers.length];
        for (int i = 0; i < chopsticks.length; i++) {
            chopsticks[i] = new Object();
        }
        for (int i = 0; i < philosophers.length; i++) {
            Object leftChopstick = chopsticks[i];
            Object rightChopstick = chopsticks[(i + 1) % chopsticks.length];
            philosophers[i] = new Philosopher(leftChopstick, rightChopstick);
            new Thread(philosophers[i], "哲学家" + (i + 1) + "号").start();
        }
    }
}

多种解决方案

  • 服务员检查(避免策略)

  • 改变一个哲学家拿叉子或筷子的顺序(避免策略)

/**
 * 描述:     演示哲学家就餐问题导致的死锁
 */
public class DiningPhilosophers {

    public static class Philosopher implements Runnable {

        private Object leftChopstick;

        public Philosopher(Object leftChopstick, Object rightChopstick) {
            this.leftChopstick = leftChopstick;
            this.rightChopstick = rightChopstick;
        }

        private Object rightChopstick;

        @Override
        public void run() {
            try {
                while (true) {
                    doAction("Thinking");
                    synchronized (leftChopstick) {
                        doAction("Picked up left chopstick");
                        synchronized (rightChopstick) {
                            doAction("Picked up right chopstick - eating");
                            doAction("Put down right chopstick");
                        }
                        doAction("Put down left chopstick");
                    }
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        private void doAction(String action) throws InterruptedException {
            System.out.println(Thread.currentThread().getName() + " " + action);
            Thread.sleep((long) (Math.random() * 10));
        }
    }

    public static void main(String[] args) {
        Philosopher[] philosophers = new Philosopher[5];
        Object[] chopsticks = new Object[philosophers.length];
        for (int i = 0; i < chopsticks.length; i++) {
            chopsticks[i] = new Object();
        }
        for (int i = 0; i < philosophers.length; i++) {
            Object leftChopstick = chopsticks[i];
            Object rightChopstick = chopsticks[(i + 1) % chopsticks.length];
            if (i == philosophers.length - 1) {
                philosophers[i] = new Philosopher(rightChopstick, leftChopstick);
            } else {
                philosophers[i] = new Philosopher(leftChopstick, rightChopstick);
            }
            new Thread(philosophers[i], "哲学家" + (i + 1) + "号").start();
        }
    }
}
  • 餐票(避免策略)
  • 领导调节(检测与恢复策略)
    • 检测算法: 锁的调用链路图
    • 恢复方法1 : 进程终止
      • 逐个终止线程,直到死锁消除
      • 终止顺序
        1. 优先级(是前台交互还是后台处理)
        2. 已占用资源、还需要的资源
        3. 已经运行时间
    • 恢复方法2 :资源抢占
      • 把已经分发出去的锁给收回来
      • 让线程回退几步,这样就不用结束整个线程,成本比较低
      • 缺点:可能同一个线程一直被抢占,那就造成饥饿

实际工程中如何避免死锁

  1. 设置超时时间
    • Lock的tryLock(long timeout, TimeUnit unit)
    • synchronized不具备尝试锁的能力
    • 造成超时的可能性多:发生了死锁、线程陷入死循环、线程执行很慢
    • 获取锁失败:打日志、发报警邮件、重启等
/**
 * 描述:     用tryLock来避免死锁
 */
public class TryLockDeadlock implements Runnable {

    int flag = 1;
    static Lock lock1 = new ReentrantLock();
    static Lock lock2 = new ReentrantLock();

    public static void main(String[] args) {
        TryLockDeadlock r1 = new TryLockDeadlock();
        TryLockDeadlock r2 = new TryLockDeadlock();
        r1.flag = 1;
        r2.flag = 0;
        new Thread(r1).start();
        new Thread(r2).start();
    }

    @Override
    public void run() {
        for (int i = 0; i < 100; i++) {
            if (flag == 1) {
                try {
                    if (lock1.tryLock(800, TimeUnit.MILLISECONDS)) {
                        System.out.println("线程1获取到了锁1");
                        Thread.sleep(new Random().nextInt(1000));
                        if (lock2.tryLock(800, TimeUnit.MILLISECONDS)) {
                            System.out.println("线程1获取到了锁2");
                            System.out.println("线程1成功获取到了两把锁");
                            lock2.unlock();
                            lock1.unlock();
                            break;
                        } else {
                            System.out.println("线程1尝试获取锁2失败,已重试");
                            lock1.unlock();
                            Thread.sleep(new Random().nextInt(1000));
                        }
                    } else {
                        System.out.println("线程1获取锁1失败,已重试");
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            if (flag == 0) {
                try {
                    if (lock2.tryLock(3000, TimeUnit.MILLISECONDS)) {
                        System.out.println("线程2获取到了锁2");

                        Thread.sleep(new Random().nextInt(1000));
                        if (lock1.tryLock(3000, TimeUnit.MILLISECONDS)) {
                            System.out.println("线程2获取到了锁1");
                            System.out.println("线程2成功获取到了两把锁");
                            lock1.unlock();
                            lock2.unlock();
                            break;
                        } else {
                            System.out.println("线程2尝试获取锁1失败,已重试");
                            lock2.unlock();
                            Thread.sleep(new Random().nextInt(1000));
                        }
                    } else {
                        System.out.println("线程2获取锁2失败,已重试");
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
  1. 多使用并发类而不是自己设置锁
    • ConcurrentHashMap,ConcurrentLinkedQueue,AtomicBoolean等
    • 实际应用中java.util.concurrent.atomic十分有用,简单方便且效率比使用Lock更高
    • 多用并发集合少用同步集合,并发集合比同步集合的可扩展性更好
    • 并发场景需要用到map ,首先想到用ConcurrentHashMap
  2. 尽量降低锁的使用粒度:用不同的锁而不是一个锁
  3. 如果能使用同步代码块,就不使用同步方法:自己指定锁对象
  4. 给你的线程起个有意义的名字: debug和排查时事半功倍,框架和JDK都遵守这个最佳实践
  5. 避免锁的嵌套: MustDeadLock类
  6. 分配资源前先看能不能收回来:银行家算法
  7. 尽量不要几个功能用同一把锁:专锁专用

其他活性故障

死锁是最常见的活跃性问题,不过除了刚才的死锁之外,还有一些类似的问题,会导致程序无法顺利执行,统称为活跃性问题

活锁( LiveLock )

什么是活锁

  • 虽然线程并没有阻塞,也始终在运行(所以叫做“活”锁,线程是“活”的),但是程序却得不到进展,因为线程始终重复做同样的事
  • 如果这里死锁,那么就是这里两个人都始终一 动不动,直到对方先抬头,他们之间不再说话了,只是等待
  • 如果发生活锁,那么这里的情况就是,双方都不停地对对方说你先起来吧,你先起来吧”,双方都一直在说话,在运行
  • 死锁和活锁的结果是一样的,就是谁都不能先抬头

工程中的活锁实例 : 消息队列

  • 策略:消息如果处理失败,就放在队列开头重试
  • 由于依赖服务出了问题,处理该消息一-直失败
  • 没阻塞,但程序无法继续
  • 解决:放到队列尾部、重试限制

饥饿

  • 当线程需要某些资源(例如CPU) , 但是却始终得不到
  • 线程的优先级设置得过于低,或者有某线程持有锁同时又无限循环从而不释放锁,或者某程序始终占用某文件的写锁
  • 饥饿可能会导致响应性差:比如,我们的浏览器有一个线程负责处理前台响应(打开收藏夹等动作) , 另外的后台线程负责下载图片和文件、计算渲染等。在这种情况下,如果后台线程把CPU资源都占用了,那么前台线程将无法得到很好地执行,这会导致用户的体验很差
THE END
分享
二维码
< <上一篇
下一篇>>