1. AQS是什么
队列同步器(AbstractQueuedSynchronizer,AQS),是用来构建锁或者其他同步组件的基础框架,它使用了一个int成员变量表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作。并发包的大师(Doug Lea)期望它能够成为实现大部分同步需求的基础。
2.AQS使用方式和其中的设计模式
AQS的主要使用方式是继承,子类通过继承AQS并实现它的抽象方法来管理同步状态,在AQS里由一个int型的state来代表这个状态,在抽象方法的实现过程中免不了要对同步状态进行更改,这时就需要使用同步器提供的3个方法(getState()、setState(int newState)和compareAndSetState(int expect,int update))来进行操作,因为它们能够保证状态的改变是安全的。
同步器使用的设计模式:模板方法的设计模式。
在实现上,子类推荐被定义为自定义同步组件的静态内部类,AQS自身没有实现任何同步接口,它仅仅是定义了若干同步状态获取和释放的方法来供自定义同步组件使用,同步器既可以支持独占式地获取同步状态,也可以支持共享式地获取同步状态,这样就可以方便实现不同类型的同步组件(ReentrantLock、ReentrantReadWriteLock和CountDownLatch等)。
同步器是实现锁(也可以是任意同步组件)的关键,在锁的实现中聚合同步器。可以这样理解二者之间的关系:
锁是面向使用者的,它定义了使用者与锁交互的接口(比如可以允许两个线程并行访问),隐藏了实现细节;
同步器面向的是锁的实现者,它简化了锁的实现方式,屏蔽了同步状态管理、线程的排队、等待与唤醒等底层操作。锁和同步器很好地隔离了使用者和实现者所需关注的领域。
实现者需要继承同步器并重写指定的方法,随后将同步器组合在自定义同步组件的实现中,并调用同步器提供的模板方法,而这些模板方法将会调用使用者重写的方法。
3.AQS中的方法
在AQS抽象类中定义了以下模板方法,分为三类:独占式获取与释放同步状态、共享式获取与释放、同步状态和查询同步队列中的等待线程情况。
可供子类重写的方法如下:
3.2 AQS和synchronznized底层实现区别
AQS是基于LockSupport等待超时来实现锁机制,底层实现是基于volatile和cas实现。
synchronznized映射成字节码指令就是增加两个指令:monitorenter、monitorexit;
当一条线程执行时遇到monitorenter指令时,它会尝试去获得锁,如果获得锁,那么所计数器+1(为什么要加1,因为它是可重入锁,可根据这个琐计数器判断锁状态),如果没有获得锁,那么阻塞;当它遇到一个monitoerexit时,琐计数器会-1,当计数器为0时,就释放锁。
3.3 如何自定义一个锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
| public class SelfLock implements Lock { private static class Sync extends AbstractQueuedSynchronizer {
@Override protected boolean isHeldExclusively() { return getState()==1; }
@Override protected boolean tryAcquire(int arg) { if(compareAndSetState(0,1)){ setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; }
@Override protected boolean tryRelease(int arg) { if(getState()==0){ throw new IllegalMonitorStateException(); } setExclusiveOwnerThread(null); setState(0); return true; }
Condition newCondition() { return new ConditionObject(); } }
private final Sync sync = new Sync();
public void lock() { System.out.println(Thread.currentThread().getName()+" ready get lock"); sync.acquire(1); System.out.println(Thread.currentThread().getName()+" already got lock"); }
public boolean tryLock() { return sync.tryAcquire(1); }
public void unlock() { System.out.println(Thread.currentThread().getName()+" ready release lock"); sync.release(1); System.out.println(Thread.currentThread().getName()+" already released lock"); }
public Condition newCondition() { return sync.newCondition(); }
public boolean isLocked() { return sync.isHeldExclusively(); }
public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); }
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); } }
|
4.AQS基本思想 CLH队列锁
CLH队列锁即Craig, Landin, and Hagersten (CLH) locks。
CLH队列锁也是一种基于链表的可扩展、高性能、公平的自旋锁,申请线程仅仅在本地变量上自旋,它不断轮询前驱的状态,假设发现前驱释放了锁就结束自旋。
AQS是CLH队列锁的一种变体实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| final boolean acquireQueued(final Node node, int arg) { try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } catch (Throwable t) { cancelAcquire(node); throw t; } }
|
5.AQS原理
AQS内部有一个关键的成员变量state,用于记录当前的同步状态;第二需要实现自己的同步类的话,需要采用AQS的模板设计模式,实现模板方法;AQS内部本质上是一个CLH队列锁,每一个等待的线程都会被包装成一个节点,节点内容是:当前线程,前一个节点和是否需要获取锁状态。然后将该节点挂到一个链表上去。然后线程会检测前一个线程是否释放了锁,即locked标志位是否变成了false,如果变成了false自己就可以拿到这把锁。AQS在检查锁的时候,不会一直自旋的去查看locked的状态,会尝试去检查几次,如果前一个线程都没有释放,则自己进入block状态。
6.公平锁和非公平锁
公平锁:所有需要拿锁的线程都会被挂到队列锁的最后面。
非公平锁:在一个线程拿锁的时候,一个线程也同时来抢锁,这就形成了非公平锁。
ReentrantLock实现了公平锁和非公平锁,实现的具体差异在于hasQueuedPredecessors()的调用。上源码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
|
7.ReentrantLock锁的可重入(递归拿自己锁)
可以递归拿锁。每次拿锁之后,多记录一下同步的状态。
例如:如果锁不能重入,则会发生死锁,自己把自己锁死了。
1 2 3 4 5 6 7 8 9 10 11 12 13
| public void reenter(int x){ lock.lock(); try { System.out.println(Thread.currentThread().getName()+":递归层级:"+x); int y = x - 1; if (y==0) return; else{ reenter(y); } } finally { lock.unlock(); } }
|
实现可重入锁方法如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| private static class Sync extends AbstractQueuedSynchronizer {
protected boolean isHeldExclusively() { return getState() > 0; }
public boolean tryAcquire(int acquires) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; }else if(getExclusiveOwnerThread()==Thread.currentThread()){ setState(getState()+1); return true; } return false; }
protected boolean tryRelease(int releases) { if(getExclusiveOwnerThread()!=Thread.currentThread()){ throw new IllegalMonitorStateException(); } if (getState() == 0) throw new IllegalMonitorStateException();
setState(getState()-1); if(getState()==0){ setExclusiveOwnerThread(null); } return true; }
Condition newCondition() { return new ConditionObject(); } }
|
测试用例代码见: git@github.com:oujie123/UnderstandingOfThread.git