ReentrantReadWriteLock

ReentrantReadWriteLock读写锁

定义:假如有一个资源很少去修改它,但是经常去访问它。此时就可以考虑使用读写锁,因为读写锁能保证多个线程同时读,允许同时读-读。但不允许同时写-读、同时写-写操作。

看命名和内部类就知道它同时也能保证公平|非公平锁,读|写锁,同时又是可重入锁。

用法:模拟一个场景,1000个循环下,获取一个随机数,是偶数则写数据,是奇数则读数据。

public class RwTest {

    public static volatile int salary;

    ReadWriteLock rwLock = new ReentrantReadWriteLock();

    public static void main(String[] args) {
        RwTest rwTest = new RwTest();
        for (int i = 0; i < 1000; i++) {
            int random = new Random().nextInt();
            if ((random & 1) == 0) {
                new Thread(() -> {
                    int salary = 0;
                    try {
                        salary = rwTest.getSalary();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName() + " : " + salary + " : " + LocalDateTime.now().format(DateTimeFormatter.ISO_TIME));
                }, "Read-" + i + "-" + random).start();
            } else {
                new Thread(() -> {
                    int salary = 0;
                    try {
                        salary = rwTest.setSalary();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName() + " : " + salary + " : " + LocalDateTime.now().format(DateTimeFormatter.ISO_TIME));
                }, "Write-" + i + "-" + random).start();
            }
        }
    }


    public int getSalary() throws InterruptedException {
        Lock readLock = rwLock.readLock();
        readLock.lock();
        try {
            Thread.sleep(1000);
            return salary;
        } finally {
            readLock.unlock();
        }
    }

    public int setSalary() throws InterruptedException {
        Lock writeLock = rwLock.writeLock();
        writeLock.lock();
        try {
            Thread.sleep(1000);
            salary += 100;
            return salary;
        } finally {
            writeLock.unlock();
        }
    }
}

运行结果如下,通过时间可以看出,读锁和写锁之间是互斥的,写锁和写锁之间也是互斥的,但是读锁和读锁之间是共享的。

关于源码的阅读方面,读写锁的实现里有几个需要先了解的概念。

  1. 原先的AQS中state被拆分为了高16位和低16位,共享锁针对高16位取值,而排他锁针对低16位取值。
  2. 由于读锁是所有读线程都可以获取的,同时又是可重入的,所以搞了个ThreadLocal存放每个线程获取读锁的个数。

了解完这些概念,再去读源码。读源码肯定是跟着功能接口往下走。

首先针对读锁,加锁的源码如下。

        public void lock() {
            sync.acquireShared(1);
        }

        // AQS模版
        public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
        }

        protected final int tryAcquireShared(int unused) {

            Thread current = Thread.currentThread();
            // 熟悉的state
            int c = getState();
            // 如果有写锁并且AOS中排他线程不是当前线程,直接失败返回等待
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
            // 如果读锁不用阻塞且读锁小于最大数量且CAS成功 readerShouldBlock在FairSync和NonfairSync中有不同实现   
            int r = sharedCount(c);
            if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                compareAndSetState(c, c + SHARED_UNIT)) {
                // 走到这一步表明CAS成功了,这里就是在存Threadlocal中锁的数量了。
                if (r == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                }
                return 1;
            }
            // 如果读锁要等待或者CAS失败了
            return fullTryAcquireShared(current);
        }
final int fullTryAcquireShared(Thread current) {
            /*
             * This code is in part redundant with that in
             * tryAcquireShared but is simpler overall by not
             * complicating tryAcquireShared with interactions between
             * retries and lazily reading hold counts.
             */
            HoldCounter rh = null;
            for (;;) {
                // 同之前判断写锁并且排他线程
                int c = getState();
                if (exclusiveCount(c) != 0) {
                    if (getExclusiveOwnerThread() != current)
                        return -1;
                    // else we hold the exclusive lock; blocking here
                    // would cause deadlock.
                    // 如果当前线程之前就获取过锁,代表是在重入,此时不为写锁让步,否则看下面
                } else if (readerShouldBlock()) {
                    // Make sure we're not acquiring read lock reentrantly
                    if (firstReader == current) {
                        // assert firstReaderHoldCount > 0;
                    } else {
                        if (rh == null) {
                            rh = cachedHoldCounter;
                            if (rh == null || rh.tid != getThreadId(current)) {
                                rh = readHolds.get();
                                if (rh.count == 0)
                                    readHolds.remove();
                            }
                        }
                        // 如果之前没有获取过锁,则不是重入,此时为写锁让步,避免写锁饥饿。
                        if (rh.count == 0)
                            return -1;
                    }
                }
                if (sharedCount(c) == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                // CAS操作    
                if (compareAndSetState(c, c + SHARED_UNIT)) {
                    if (sharedCount(c) == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        if (rh == null)
                            rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                        cachedHoldCounter = rh; // cache for release
                    }
                    return 1;
                }
            }
        }

其实翻译过来。读锁的加锁逻辑十分清晰,只不过state被分为高16位和低16位以及存储线程中锁的数量的实现比较复杂。

再看写锁的加锁逻辑。其实也十分简单。搞清楚怎么判断读锁或写锁的位运算后,加锁的这部分代码就非常清晰。同样写锁是否需要等待也分公平锁和非公平锁的实现,非公平锁直接就是所有都false.公平锁则需判断头节点下一个节点是不是自己的逻辑。

    // 同样走到熟悉的AQS模版方法。看读写锁里Sync的具体实现
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

    protected final boolean tryAcquire(int acquires) {
            Thread current = Thread.currentThread();
            int c = getState();
            int w = exclusiveCount(c);
            // 如果有锁
            if (c != 0) {
                // (Note: if c != 0 and w == 0 then shared count != 0)
                // 如果有写锁,并且排他线程不是当前线程,直接入队列等待
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;    
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                // Reentrant acquire
                // CAS操作
                setState(c + acquires);
                return true;
            }
            // 如果写锁需要等待或者CAS失败,入队列等待。
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
                return false;
            // 设置排他线程    
            setExclusiveOwnerThread(current);
            return true;
        }

关于读写锁源码的阅读,这篇文章写的非常好,同时读写锁的实现也不复杂。文章地址 同时也能更好的理解为嘛有了AQS中的state状态,还要搞个AOS的exclusiveThread属性。每次CAS成功后设置此值。这个其实就是个排他锁的概念,同时也是可重入锁的重要依赖。每次判断state != 0,证明其他线程正在持有锁,此时判断持有锁的线程是不是当前线程,如果是的话就能再次获取锁,实现重入。