13分钟聊聊并发包中常用同步组件并手写一个自定义同步组件

前言

上篇文章10分钟从源码级别搞懂AQS(AbstractQueuedSynchronizer)说到JUC并发包中的同步组件大多使用AQS来实现

本篇文章通过AQS自己来实现一个同步组件,并从源码级别聊聊JUC并发包中的常用同步组件

本篇文章需要的前置知识就是AQS,如果不了解AQS的同学可以看上一篇文章哈~

阅读本篇文章大概需要13分钟

自定义同步组件

为了更容易理解其他同步组件,我们先来使用AQS自己来实现一个常用的可重入锁

AQS模板方法流程是固定的,我们主要只需要来实现它的尝试获取同步状态和尝试释放同步状态方法即可

首先我们先规定要实现的可重入锁是独占式的

规定同步状态一开始为0,当有线程获取锁成功同步状态就为1,当这个线程重入时就累加同步状态

规定释放同步状态时每次扣减1个同步状态,只有当同步状态扣减到0时,才是真正的释放独占锁

我们使用一个内部类Sync 来继承AQS 并重写tryAcquire尝试获取同步状态、tryRelease 尝试释放同步状态、isHeldExclusively判断当前线程是否持有同步状态(等待、通知时会用到该方法)

 static class Sync extends AbstractQueuedSynchronizer {
        /**
         * 判断当前线程是否持有同步状态
         *
         * @return
         */
        @Override
        protected boolean isHeldExclusively() {
            return getExclusiveOwnerThread() == Thread.currentThread();
        }
    }

在获取同步状态中

  • 先判断是否有同步状态(即同步状态是否为0),如果有同步状态就用CAS去获取(0->1),成功就设置当前线程为获取同步状态的线程
  • 如果没有同步状态(即同步状态不为0) ,就查看获取同步状态的线程是否为当前线程,如果是当前线程则说明此次是重入,累加重入次数
  • 其他情况说明未获取到同步状态,返回false 后续走AQS流程(构建节点加入AQS)
  •         /**
             * 尝试获取同步状态
             *
             * @param arg 获取同步状态的数量
             * @return
             */
            @Override
            protected boolean tryAcquire(int arg) {
                //1.获取同步状态
                int state = getState();
                //2.如果有同步状态则CAS替换 0->1
                if (state == 0) {
                    if (compareAndSetState(state, 1)) {
                        //替换成功 说明获取到同步状态 设置当前获取同步状态线程
                        setExclusiveOwnerThread(Thread.currentThread());
                        return true;
                    }
                } else if (getExclusiveOwnerThread() == Thread.currentThread()) {
                    //3.没有同步状态  查看获取同步资源的线程是否为当前线程  可重入  累加重入次数
                    setState(state + arg);
                    return true;
                }
    
                //其他情况就是没获取到同步状态
                return false;
            }
    

    在释放同步状态中

    只有当同步状态要改成0时才是真正释放,否则情况情况下就是重入扣减次数

            /**
             * 尝试释放同步状态
             *
             * @param arg 释放同步状态的数量
             * @return
             */
            @Override
            protected boolean tryRelease(int arg) {
                //目标状态
                int targetState = getState() - arg;
    
                //真正释放锁
                if (targetState == 0) {
                    setExclusiveOwnerThread(null);
                    setState(targetState);
                    return true;
                }
    
                //其他情况 扣减状态
                setState(targetState);
                return false;
            }
    

    使用内部类实现AQS的方法后,我们在自定义同步组件类中去实现Lock接口,并用内部类实现AQS的方法去实现Lock接口的方法

    将要获取、释放的同步状态都设置成1,对应响应中断、超时的方法就用AQS中对应的方法即可

    public class MySynchronizedComponent implements Lock {
    
        public MySynchronizedComponent() {
            sync = new Sync();
        }
    
        private Sync sync;
    
        @Override
        public void lock() {
            sync.acquire(1);
        }
    
        @Override
        public void lockInterruptibly() throws InterruptedException {
            sync.acquireInterruptibly(1);
        }
    
        @Override
        public boolean tryLock() {
            return sync.tryAcquire(1);
        }
    
        @Override
        public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
            return sync.tryAcquireNanos(1, unit.toNanos(time));
        }
    
        @Override
        public void unlock() {
            sync.release(1);
        }
    
        @Override
        public Condition newCondition() {
            return sync.new ConditionObject();
        }
    
    }
    

    实际上我们只需要去实现尝试获取、释放同步状态方法就能够完成自己的同步组件,这就是使用AQS带来的好处

    代码案例可以去git仓库获取,放在本文最后

    ReentrantLock

    ReentrantLock是并发包中提供的可重入锁,它除了能够实现synchronized的功能外还可以响应中断、超时、实现公平锁等,其底层也是通过AQS来实现的

    ReentrantLock的功能与synchronized类似,可重入的独占锁,用于保证并发场景下同步操作

    使用时需要显示加锁、解锁,常用格式如下:

    reentrantLock.lock();
    try{
        //....
    }finally {
        reentrantLock.unlock();
    }
    

    finally中最先去解锁,并且加锁要放在try块的最外层,并保证加锁和try块之间不会抛出异常

    加锁不放在try中是因为加锁实现未知可能抛出不受检查unchecked的异常,当加锁抛出异常时,后续finally块解锁也会抛出非法监视器的异常从而导致覆盖

    加锁和try块之间如果抛出异常,那么就无法执行解锁了

    ReentrantLock除了提供基本的同步功能,还提供响应中断、超时的API,同学们可以私下去查看

    熟悉ReentrantLock实现的同学,可能看上面自定义同步组件的代码很熟悉,其实就是参考ReentrantLock非公平锁写的

    ReentrantLock中使用内部类Sync来继承AQS,同时内部类NonfairSync和FairSync来继承Sync去实现非公平、公平的获取同步状态

    image.png

    非公平锁尝试获取同步状态 流程类似就不过多描述

        final boolean nonfairTryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                    if (compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0) // overflow
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
    

    那公平锁如何来实现获取同步状态呢?

    其实看过上篇AQS文章的同学就知道了,在上篇文章中已经说过

    只需要在尝试获取同步状态前加上一个条件:队列中是否有前置任务(即在队列中FIFO排队获取)

    公平锁也是这么去实现的,前置条件hasQueuedPredecessors

            protected final boolean tryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                    if (!hasQueuedPredecessors() &&
                        compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0)
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
    

    ReentrantReadWriteLock

    功能与实现

    ReentrantReadWriteLock在ReentrantLock功能的基础上,提供读写锁的功能,让锁的粒度更细

    在一些读多写少的场景下是允许同时读的,允许多个线程获取,其实想到了AQS的共享式,读锁也就是共享式

    在读读的场景下,都是读锁/共享锁,不会进行阻塞

    在读写、写读、写写的场景下,都会进行阻塞

    比如要获取写锁时,需要等待读锁、写锁都解锁;要获取读锁时,需要等待写锁解锁

    ReentrantReadWriteLock 在 ReentrantLock 的基础上增加ReadLockWriteLock分别作为读锁和写锁

    image.png

    实际上读锁就是共享锁、写锁就是独占锁,在实现加锁、解锁的方法时分别调用共享式、独占式的获取、释放同步状态即可

    在构造时,读写锁中实际使用的都是同一个AQS

            public ReentrantReadWriteLock(boolean fair) {
                sync = fair ? new FairSync() : new NonfairSync();
                readerLock = new ReadLock(this);
                writerLock = new WriteLock(this);
            }
    
            //读锁构造
            protected ReadLock(ReentrantReadWriteLock lock) {
                sync = lock.sync;
            }
    
            //写锁构造
            protected WriteLock(ReentrantReadWriteLock lock) {
                sync = lock.sync;
            }
    

    即同步状态会被读写锁共享,那么它们如何查看/修改自己的那部分同步状态呢?

    在读写锁中,同步状态被一分为二,高16位的同步状态是读锁的,低16位的同步状态是写锁的

    image.png

    当线程获取写锁时,写状态+1,由于写状态在低位,相当于同步状态+1

    当线程获取读锁时,读状态+1,由于读状态在高位,相当于同步状态+(1{ try { semaphore.acquire(); System.out.println(Thread.currentThread().getName()+"获得资源"); //执行任务 TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }finally { System.out.println(Thread.currentThread().getName()+"释放资源======"); semaphore.release(); } }); } executor.shutdown();