image-20190804182044480

![image-20190804182104427](../../../../../Users/apple/Library/Application Support/typora-user-images/image-20190804182104427.png)

image-20190804182117076

image-20190804182125972


image-20190804182149683

image-20190804182155883


1 并发容器J.U.C – AQS组件CountDownLatch、Semaphore、CyclicBarrier

AQS简介

AQS全名:AbstractQueuedSynchronizer,是并发容器J.U.C(java.lang.concurrent)下locks包内的一个类。它实现了一个FIFO(FirstIn、FisrtOut先进先出)的队列。底层实现的数据结构是一个双向列表

image-20190804182423241

Sync queue:同步队列,是一个双向列表。包括head节点和tail节点。head节点主要用作后续的调度。
Condition queue:非必须,单向列表。当程序中存在cindition的时候才会存在此列表。

AQS设计思想

  • 使用Node实现FIFO队列,可以用于构建锁或者其他同步装置的基础框架。
  • 利用int类型标识状态。在AQS类中有一个叫做state的成员变量
1
2
3
4
/**
* The synchronization state.
*/
private volatile int state;
  • 基于AQS有一个同步组件,叫做ReentrantLock。在这个组件里,stste表示获取锁的线程数,假如state=0,表示还没有线程获取锁,1表示有线程获取了锁。大于1表示重入锁的数量。

  • 继承:子类通过继承并通过实现它的方法管理其状态(acquire和release方法操纵状态)。

  • 可以同时实现排它锁和共享锁模式(独占、共享),站在一个使用者的角度,AQS的功能主要分为两类:独占和共享。它的所有子类中,要么实现并使用了它的独占功能的api,要么使用了共享锁的功能,而不会同时使用两套api,即便是最有名的子类ReentrantReadWriteLock也是通过两个内部类读锁和写锁分别实现了两套api来实现的。

AQS的大致实现思路

AQS内部维护了一个CLH队列来管理锁。线程会首先尝试获取锁,如果失败就将当前线程及等待状态等信息包装成一个node节点加入到同步队列sync queue里。

接着会不断的循环尝试获取锁,条件是当前节点为head的直接后继才会尝试。如果失败就会阻塞自己直到自己被唤醒。而当持有锁的线程释放锁的时候,会唤醒队列中的后继线程。

AQS组件:CountDownLatch

image-20190804182620528

通过一个计数来保证线程是否需要被阻塞。实现一个或多个线程等待其他线程执行的场景。

我们定义一个CountDownLatch,通过给定的计数器为其初始化,该计数器是原子性操作,保证同时只有一个线程去操作该计数器。调用该类await方法的线程会一直处于阻塞状态。只有其他线程调用countDown方法(每次使计数器-1),使计数器归零才能继续执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
final CountDownLatch countDownLatch = new CountDownLatch(threadCount);

for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
exec.execute(() -> {
try {
test(threadNum); //需要被等待的线程执行的方法
} catch (Exception e) {
log.error("exception", e);
} finally {
countDownLatch.countDown();
}
});
}
countDownLatch.await();

CountDownLatch的await方法还有重载形式,可以设置等待的时间,如果超过此时间,计数器还未清零,则不继续等待:

1
2
3
4
countDownLatch.await(10, TimeUnit.MILLISECONDS);

//参数1:等待的时间长度
//参数2:等待的时间单位

AQS组件:Semaphore

  • 用于保证同一时间并发访问线程的数目。

  • 信号量在操作系统中是很重要的概念,Java并发库里的Semaphore就可以很轻松的完成类似操作系统信号量的控制。Semaphore可以很容易控制系统中某个资源被同时访问的线程个数。

  • 在数据结构中我们学过链表,链表正常是可以保存无限个节点的,而Semaphore可以实现有限大小的列表。
    使用场景:仅能提供有限访问的资源。比如数据库连接。

  • Semaphore使用acquire方法和release方法来实现控制:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
* 1、普通调用
*/
try {
semaphore.acquire(); // 获取一个许可
test();//需要并发控制的内容
semaphore.release(); // 释放一个许可
} catch (Exception e) {
log.error("exception", e);
}

/**
* 2、同时获取多个许可,同时释放多个许可
*/
try {
semaphore.acquire(2);
test();
semaphore.release(2);
} catch (Exception e) {
log.error("exception", e);
}

/*
* 3、尝试获取许可,获取不到不执行
*/
try {
if (semaphore.tryAcquire()) {
test(threadNum);
semaphore.release();
}
} catch (Exception e) {
log.error("exception", e);
}

/*
* 4、尝试获取许可一段时间,获取不到不执行
* 参数1:等待时间长度 参数2:等待时间单位
*/
try {
if (semaphore.tryAcquire(5000, TimeUnit.MILLISECONDS)) {
test(threadNum);
semaphore.release();
}
} catch (Exception e) {
log.error("exception", e);
}

AQS组件:CyclicBarrier

image-20190804182758307

  • 也是一个同步辅助类,它允许一组线程相互等待,直到到达某个公共的屏障点(循环屏障)
  • 通过它可以完成多个线程之间相互等待,只有每个线程都准备就绪后才能继续往下执行后面的操作。
    每当有一个线程执行了await方法,计数器就会执行+1操作,待计数器达到预定的值,所有的线程再同时继续执行。由于计数器释放之后可以重用(reset方法),所以称之为循环屏障。
  • 与CountDownLatch区别:
    1、计数器可重复用

​ 2、描述一个或多个线程等待其他线程的关系/多个线程相互等待

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
//公共线程循环调用方法
private static CyclicBarrier barrier = new CyclicBarrier(5);

public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();

for (int i = 0; i < 10; i++) {
final int threadNum = i;
Thread.sleep(1000);
executor.execute(() -> {
try {
race(threadNum);
} catch (Exception e) {
log.error("exception", e);
}
});
}
executor.shutdown();
}

//使用方法1:每个线程都持续等待
private static void race(int threadNum) throws Exception {
Thread.sleep(1000);
log.info("{} is ready", threadNum);
barrier.await();
log.info("{} continue", threadNum);
}

//使用方法2:每个线程只等待一段时间
private static void race(int threadNum) throws Exception {
Thread.sleep(1000);
try {
barrier.await(2000, TimeUnit.MILLISECONDS);
} catch (InterruptedException | BrokenBarrierException | TimeoutException e) {
log.warn("BarrierException", e);
}
}

//使用方法3:在初始化的时候设置runnable,当线程达到屏障时优先执行runnable
private static CyclicBarrier barrier = new CyclicBarrier(5, () -> {
log.info("callback is running");
});



2 并发容器J.U.C – AQS组件 锁:ReentrantLock、ReentrantReadWriteLock、StempedLock

ReentrantLock

java中有两类锁,一类是Synchronized,而另一类就是J.U.C中提供的锁。ReentrantLock与Synchronized都是可重入锁,本质上都是lock与unlock的操作。接下来我们介绍三种J.U.C中的锁,其中 ReentrantLock使用synchronized与之比对介绍。

ReentrantLock与synchronized的区别

可重入性:两者的锁都是可重入的,差别不大,有线程进入锁,计数器自增1,等下降为0时才可以释放锁
锁的实现:synchronized是基于JVM实现的(用户很难见到,无法了解其实现),ReentrantLock是JDK实现的。
性能区别:在最初的时候,二者的性能差别差很多,当synchronized引入了偏向锁、轻量级锁(自选锁)后,二者的性能差别不大,官方推荐synchronized(写法更容易、在优化时其实是借用了ReentrantLock的CAS技术,试图在用户态就把问题解决,避免进入内核态造成线程阻塞)
功能区别:
(1)便利性:synchronized更便利,它是由编译器保证加锁与释放。ReentrantLock是需要手动释放锁,所以为了避免忘记手工释放锁造成死锁,所以最好在finally中声明释放锁。
(2)锁的细粒度和灵活度,ReentrantLock优于synchronized
ReentrantLock独有的功能

可以指定是公平锁还是非公平锁,sync只能是非公平锁。(所谓公平锁就是先等待的线程先获得锁)
提供了一个Condition类,可以分组唤醒需要唤醒的线程。不像是synchronized要么随机唤醒一个线程,要么全部唤醒。
提供能够中断等待锁的线程的机制,通过lock.lockInterruptibly()实现,这种机制 ReentrantLock是一种自选锁,通过循环调用CAS操作来实现加锁。性能比较好的原因是避免了进入内核态的阻塞状态。
要放弃synchronized?

从上边的介绍,看上去ReentrantLock不仅拥有synchronized的所有功能,而且有一些功能synchronized无法实现的特性。性能方面,ReentrantLock也不比synchronized差,那么到底我们要不要放弃使用synchronized呢?答案是不要这样做。

J.U.C包中的锁定类是用于高级情况和高级用户的工具,除非说你对Lock的高级特性有特别清楚的了解以及有明确的需要,或这有明确的证据表明同步已经成为可伸缩性的瓶颈的时候,否则我们还是继续使用synchronized。相比较这些高级的锁定类,synchronized还是有一些优势的,比如synchronized不可能忘记释放锁。还有当JVM使用synchronized管理锁定请求和释放时,JVM在生成线程转储时能够包括锁定信息,这些信息对调试非常有价值,它们可以标识死锁以及其他异常行为的来源。

如何使用ReentrantLock?

1
2
3
4
5
6
7
8
9
10
11
//创建锁:使用Lock对象声明,使用ReentrantLock接口创建
private final static Lock lock = new ReentrantLock();
//使用锁:在需要被加锁的方法中使用
private static void add() {
lock.lock();
try {
count++;
} finally {
lock.unlock();
}
}

分析一下源码:

1
2
3
4
5
6
7
8
9
//初始化方面:
//在new ReentrantLock的时候默认给了一个不公平锁
public ReentrantLock() {
sync = new NonfairSync();
}
//也可以加参数来初始化指定使用公平锁还是不公平锁
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

内置函数(部分)

基础特性:

tryLock():仅在调用时锁定未被另一个线程保持的情况下才获取锁定。
tryLock(long timeout, TimeUnit unit):如果锁定在给定的时间内没有被另一个线程保持且当前线程没有被中断,则获取这个锁定。
lockInterruptbily:如果当前线程没有被中断的话,那么就获取锁定。如果中断了就抛出异常。
isLocked:查询此锁定是否由任意线程保持
isHeldByCurrentThread:查询当前线程是否保持锁定状态。
isFair:判断是不是公平锁

Condition相关特性:

hasQueuedThread(Thread):查询指定线程是否在等待获取此锁定
hasQueuedThreads():查询是否有线程在等待获取此锁定
getHoldCount():查询当前线程保持锁定的个数,也就是调用Lock方法的个数

Condition的使用

Condition可以非常灵活的操作线程的唤醒,下面是一个线程等待与唤醒的例子,其中用1234序号标出了日志输出顺序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public static void main(String[] args) {
ReentrantLock reentrantLock = new ReentrantLock();
Condition condition = reentrantLock.newCondition();//创建condition
//线程1
new Thread(() -> {
try {
reentrantLock.lock();
log.info("wait signal"); // 1
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("get signal"); // 4
reentrantLock.unlock();
}).start();
//线程2
new Thread(() -> {
reentrantLock.lock();
log.info("get lock"); // 2
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
condition.signalAll();//发送信号
log.info("send signal"); // 3
reentrantLock.unlock();
}).start();
}

(这里对等待队列不熟悉的,请回顾我的上一篇文章中讲解的AQS等待队列:高并发探索(十一):并发容器J.U.C – AQS组件CountDownLatch、Semaphore、CyclicBarrier)
输出过程讲解:

1、线程1调用了reentrantLock.lock(),线程进入AQS等待队列,输出1号log
2、接着调用了awiat方法,线程从AQS队列中移除,锁释放,直接加入condition的等待队列中
3、线程2因为线程1释放了锁,拿到了锁,输出2号log
4、线程2执行condition.signalAll()发送信号,输出3号log
5、condition队列中线程1的节点接收到信号,从condition队列中拿出来放入到了AQS的等待队列,这时线程1并没有被唤醒。
6、线程2调用unlock释放锁,因为AQS队列中只有线程1,因此AQS释放锁按照从头到尾的顺序,唤醒线程1
7、线程1继续执行,输出4号log,并进行unlock操作。

读写锁:ReentrantReadWriteLock读写锁

在没有任何读写锁的时候才可以取得写入锁(悲观读取,容易写线程饥饿),也就是说如果一直存在读操作,那么写锁一直在等待没有读的情况出现,这样我的写锁就永远也获取不到,就会造成等待获取写锁的线程饥饿。
平时使用的场景并不多。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class LockExample3 {

private final Map<String, Data> map = new TreeMap<>();
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final Lock readLock = lock.readLock();//读锁
private final Lock writeLock = lock.writeLock();//写锁

//加读锁
public Data get(String key) {
readLock.lock();
try {
return map.get(key);
} finally {
readLock.unlock();
}
}
//加写锁
public Data put(String key, Data value) {
writeLock.lock();
try {
return map.put(key, value);
} finally {
writeLock.unlock();
}
}

class Data {}
}

票据锁:StempedLock

它控制锁有三种模式(写、读、乐观读)。一个StempedLock的状态是由版本和模式两个部分组成。锁获取方法返回一个数字作为票据(stamp),他用相应的锁状态表示并控制相关的访问。数字0表示没有写锁被锁写访问,在读锁上分为悲观锁和乐观锁。

乐观读:
如果读的操作很多写的很少,我们可以乐观的认为读的操作与写的操作同时发生的情况很少,因此不悲观的使用完全的读取锁定。程序可以查看读取资料之后是否遭到写入资料的变更,再采取之后的措施

如何使用?

1
2
3
4
5
6
7
8
9
10
11
//定义
private final static StampedLock lock = new StampedLock();
//需要上锁的方法
private static void add() {
long stamp = lock.writeLock();
try {
count++;
} finally {
lock.unlock(stamp);
}
}

分析一下源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
class Point {
private double x, y;
private final StampedLock sl = new StampedLock();

void move(double deltaX, double deltaY) {
long stamp = sl.writeLock();
try {
x += deltaX;
y += deltaY;
} finally {
sl.unlockWrite(stamp);
}
}

//下面看看乐观读锁案例
double distanceFromOrigin() { // A read-only method
long stamp = sl.tryOptimisticRead(); //获得一个乐观读锁
double currentX = x, currentY = y; //将两个字段读入本地局部变量
if (!sl.validate(stamp)) { //检查发出乐观读锁后同时是否有其他写锁发生?
stamp = sl.readLock(); //如果没有,我们再次获得一个读悲观锁
try {
currentX = x; // 将两个字段读入本地局部变量
currentY = y; // 将两个字段读入本地局部变量
} finally {
sl.unlockRead(stamp);
}
}
return Math.sqrt(currentX * currentX + currentY * currentY);
}

//下面是悲观读锁案例
void moveIfAtOrigin(double newX, double newY) { // upgrade
// Could instead start with optimistic, not read mode
long stamp = sl.readLock();
try {
while (x == 0.0 && y == 0.0) { //循环,检查当前状态是否符合
long ws = sl.tryConvertToWriteLock(stamp); //将读锁转为写锁
if (ws != 0L) { //这是确认转为写锁是否成功
stamp = ws; //如果成功 替换票据
x = newX; //进行状态改变
y = newY; //进行状态改变
break;
} else { //如果不能成功转换为写锁
sl.unlockRead(stamp); //我们显式释放读锁
stamp = sl.writeLock(); //显式直接进行写锁 然后再通过循环再试
}
}
} finally {
sl.unlock(stamp); //释放读锁或写锁
}
}
}

如何选择锁?

1、当只有少量竞争者,使用synchronized
2、竞争者不少但是线程增长的趋势是能预估的,使用ReetrantLock
3、synchronized不会造成死锁,jvm会自动释放死锁。





J.U.C之AQS—CountDownLatch

概述

它是一个同步辅助类,通过它可以在一个线程(线程间会轮换)执行countdown() -> count值减至0的期间,保证其他线程会调用await()一直阻塞等待,最后等待的线程执行resume(),所有线程再一起执行另一个实务操作。其中有一个原子性的且不会被重置的计数器以保证上述的实现。

![image-20190804192550419](../../../../../Users/apple/Library/Application Support/typora-user-images/image-20190804192550419.png)

使用场景

当一个程序需在另一个条件完成后才可以继续执行后续操作。
如:并行计算中最后的汇总操作场

例子演示

情景一:指定计数次数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Slf4j
public class CountDownLatchExample1 {

private final static int threadCount = 200;

public static void main(String[] args) throws Exception {

ExecutorService exec = Executors.newCachedThreadPool();

final CountDownLatch countDownLatch = new CountDownLatch(threadCount);

for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
exec.execute(() -> {
try {
test(threadNum);
} catch (Exception e) {
log.error("exception", e);
} finally {
//保证了方法被调用即计数执行减一
countDownLatch.countDown();
}
});
}
countDownLatch.await();
log.info("finish");
exec.shutdown();
}

private static void test(int threadNum) throws Exception {
Thread.sleep(100);
log.info("{}", threadNum);
Thread.sleep(100);
}
}

![image-20190804192632503](../../../../../Users/apple/Library/Application Support/typora-user-images/image-20190804192632503.png)

根据结果,多线程并发期间,核心方法以乱序执行,但总数仍一定,且最后执行到测试语句“finish”。其中,countDownLatch.await()语句循环检查计数是否已经减为0,即保证了此时全部线程执行结束。

情景二:指定计数时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

@Slf4j
public class CountDownLatchExample2 {

private final static int threadCount = 2000;

public static void main(String[] args) throws Exception {

ExecutorService exec = Executors.newCachedThreadPool();

final CountDownLatch countDownLatch = new CountDownLatch(threadCount);

for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
exec.execute(() -> {
try {
test(threadNum);
} catch (Exception e) {
log.error("exception", e);
} finally {
countDownLatch.countDown();
}
});
}
countDownLatch.await(3, TimeUnit.MILLISECONDS);
log.info("finish");
exec.shutdown();
}

private static void test(int threadNum) throws Exception {
Thread.sleep(100);
log.info("{}", threadNum);
}
}

测试结果:

image-20190804192710622

先来看一下await()的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

/**
* Causes the current thread to wait until the latch has counted down to
* zero, unless the thread is {@linkplain Thread#interrupt interrupted},
* or the specified waiting time elapses.
*
* <p>If the current count is zero then this method returns immediately
* with the value {@code true}.
*
* <p>If the current count is greater than zero then the current
* thread becomes disabled for thread scheduling purposes and lies
* dormant until one of three things happen:
* <ul>
* <li>The count reaches zero due to invocations of the
* {@link #countDown} method; or
* <li>Some other thread {@linkplain Thread#interrupt interrupts}
* the current thread; or
* <li>The specified waiting time elapses.
* </ul>
*/
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

通过countDownLatch.await(3, TimeUnit.MILLISECONDS)方法,指定timeout参数实现。其中,该参数指的是在test方法执行时开始计数,延时timeout的时间后计数已经减为0后,继续执行后续方法,此例为log.info("finish"),即输出finish
而无参的await()方法只有计数到0时才会继续执行后续方法。

结果分析:
例子中为了测试timeout,将timeout设置为1ms,而线程执行核心方法时设定休眠100ms,那么与结果是正好对应的,即finish总是第一个被输出的日志。

但是,finish输出后就直接执行了exec.shutdown()即关闭线程池的操作了啊,怎么还会有线程日志输出?

其实,exec.shutdown()操作:不会再接受新的线程任务,只会等待当前已经分配的线程执行完操作后再关闭,而不是在第一时间销毁所有的线程并强制关闭线程池。但是线程池还有一个立即关闭的线程池的方法 -> 在第一时间销毁所有的线程并强制关闭线程池,即shutdownNow()。

现在修改shutdown()为shutdownNow(),再进行测试,运行结果为:(部分截图)

![image-20190804192737368](../../../../../Users/apple/Library/Application Support/typora-user-images/image-20190804192737368.png)

结果分析:
finish日志字段输出和其后200个exception java.lang.InterruptedException: sleep interrupted异常说明了测试的正确性。也是对应了上面await()源代码中的说明:unless the thread is {@linkplain Thread#interrupt interrupted}

小总结

使用CountDownLatch中,最好是计数指定配合指定超时时间使用,避免计数因为意外的情况难以到达使得系统资源空耗或业务逻辑无法继续执行情况,以提高程序的高效性,实用性。


J.U.C之AQS—Semaphore

概述

Semaphore字面意思就是信号量。它通过提供同步机制,控制资源可同时被并发访问的线程的个数。当信号量限定为1时,它就和单线程很相似了。Semaphore和CountDownLatch的使用有些相似,其中也有两个核心实现方法:acquire()和release()。

通过semaphore可以实现有限结点个数的链表,虽然可重入锁reentrant也可以实现,但是semaphore的实现更为简单。

使用场景

常适用于仅能提供有限资源访问的场景。
如:数据库链接数远远小于上层应用业务并发的数量,如果不对数据库的访问进行控制,很容易出现因有些线程因无法获取到数据库链接而导致的异常。

例子演示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Slf4j
public class SemaphoreExample1 {

private final static int threadCount = 5;

public static void main(String[] args) throws Exception {

ExecutorService exec = Executors.newCachedThreadPool();

final Semaphore semaphore = new Semaphore(2);//信号量声明

for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
exec.execute(() -> {
try {
semaphore.acquire();//信号量获取
test(threadNum);
semaphore.release();//信号量释放
} catch (Exception e) {
log.error("exception", e);
}
});
}
exec.shutdown();
}

private static void test(int threadNum) throws Exception {
log.info("{}", threadNum);
Thread.sleep(1000);
}
}

image-20190804193003788

例子说明:
仔细的童鞋应该已经根据运行结果发现了:相同的信号量的输出是在同一时刻的!这也就对应了semaphore的含义。semaphore的使用也十分简单,方法执行前声明并制定允许的并发访问的数量,并用semaphore.acquire()和semaphore.release()分别前后包裹着test()方法即可。

看一下semaphore的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public void acquire() throws InterruptedException {

sync.acquireSharedInterruptibly(1);
}

public void release() {
sync.releaseShared(1);
}

public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}

public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}

其实semaphore.acquire()和semaphore.release()都是可以制定获取/释放信号量的数量的。而且都是使用的sync的方法获取或释放。那再看一下sync是什么鬼?

读者:等等!!你不是说只是看一看源码吗?看看就得了,怎么还一直分析起来了?
我:大爷,来都来了,不进去逛逛?

1
2
3
4
5
6
7
8
9
10
11
/** All mechanics via AbstractQueuedSynchronizer subclass */
private final Sync sync;

/**
* Synchronization implementation for semaphore. Uses AQS state
* to represent permits. Subclassed into fair and nonfair
* versions.
*/
abstract static class Sync extends AbstractQueuedSynchronizer {
xx省略一万字xx
}

sync是其中的内部类,semaphore果然是使用了AQS框架!!使用了AQS的state字段来实现信号量的允许值(state字段之前提到过)。还分为公平和非公平两个版本!!

那sync的方法是怎么实现的呢? 再看看源码:

读者:公子!停停停!STOP!别看了,我有点恶心了~
我:emmm?我裤子都脱了,你让我停?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}

protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}

private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}

/**
* CAS waitStatus field of a node.
*/
private static final boolean compareAndSetWaitStatus(Node node,
int expect,
int update) {
return unsafe.compareAndSwapInt(node, waitStatusOffset,
expect, update);
}

hasQueuedPredecessors()

1
2
3
4
5
6
7
8
9
10
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}

读者:公子!咱们走吧,咱不看了吧~
我:呕~ 呕呕呕扶我起来,我还能…(话音未落,倒地身亡

acquire()中,调用syncacquireSharedInterruptibly()其中指定参数(默认为1):在共享模式中获取,首先检查线程状态,再至少调用一次tryAcquireShared()查询判断当前对象的state允许在共享模式中被获取,不允许则执行doAcquireSharedInterruptibly(args),其中调用addWaiter(Node.SHARED)(参数指明为在共享模式下)进行声明:添加等待获取的结点,并循环遍历当前等待结点的前一结点,如果是head结点且此时再次查询当前对象的state允许在共享模式中被获取,那么设置队列的head并检查是否成功的对象正处于队列中,若是则传递消息以尝试给下一个队列结点传信号。如果不是head结点则调用cancelAcquire(node),取消获取信号量。

release()中,当参数存在时,调用sync.releaseShared(permits),再调用tryReleaseShared(args)查询是否允许释放,若是则调用doReleaseShared():即使有其他进行中的请求/释放信号量的进程,也要确保释放的消息传递,循环遍历以防止在这个过程中有新的结点加入;一般释放信号量的过程是当head需要信号量时,尝试释放head的继承结点。由于结合了CAS,需循环是否CAS重置状态失败了,若是则重新检查。

如果并发数太多了,但是资源还有限,这时候怎么搞?

semaphore还有一个叫做tryAcquire的方法。
看下例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Slf4j
public class SemaphoreExample3 {

private final static int threadCount = 2000;

public static void main(String[] args) throws Exception {

ExecutorService exec = Executors.newCachedThreadPool();

final Semaphore semaphore = new Semaphore(3);

for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
exec.execute(() -> {
try {
if (semaphore.tryAcquire()) { // 尝试获取一个许可
test(threadNum);
semaphore.release(); // 释放一个许可
}
} catch (Exception e) {
log.error("exception", e);
}
});
}
exec.shutdown();
}

private static void test(int threadNum) throws Exception {
log.info("{}", threadNum);
Thread.sleep(1000);
}
}

运行结果:

image-20190804193127071

为什么只有三个线程执行了测试方法?是不是测试方法中的线程休眠时间太长了?现在改成500ms。

再次运行:

![image-20190804193157841](../../../../../Users/apple/Library/Application Support/typora-user-images/image-20190804193157841.png)

例子就不演示了,通过指定时间,准确指定可以并发请求的数量,大大减轻了控制的操作。再配合在核心方法中修改线程休眠时间来控制线程并发访问数量,最少的数量是semaphore和并发请求中的最小值(但一般情况下还是semaphore小,即最小是semaphore声明值


J.U.C之AQS—CyclicBarrier

概述

CyclicBarrier也是JDK提供的辅助类。它允许一组线程相互等待,直到到达一个公共的屏障点(CommonBarrierPoint)。通过它可以使多个线程间相互等待,只有当每个线程都准备就绪后才能各自继续执行后面的操作。

image-20190804200345419

与CountDownLatch的比较:

  1. CountDownLatch是实现一个或多个线程需要等待其他线程完成某项操作之后才会才会继续执行。而CyclicBarrier是实现多个线程间相互等待,直到所有线程都满足条件后继续执行后续操作。
  2. 同时通过计数器实现,该类不过是加一操作;
  3. 当线程调用await()时,进入等待状态;
  4. 当达到指定值时都会执行一定的操作:CountDownLatch是计数到达0或超时时间过期会执行下一步操作;CyclicBarrier是加一操作计数达到设定初始值或超时时间过期时,等待的线程会继续执行后续的操作。
  5. 当CountDownLatch计数到0后,就不可再使用了;但CyclicBarrier计数到设定值后,可以复用,可以reset重新计数使用。(呼应名字:循环屏障)
  6. 使用场景相似:CyclicBarrier和CountDownLatch都可用于多线程计算数据,最后汇总计算。但CyclicBarrier可以处理对付更加复杂的场景,例如:若一次并发执行后出现错误,接着可reset计数器,重新执行一次。

演示例子-1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Slf4j
public class CyclicBarrierExample1 {

private static CyclicBarrier barrier = new CyclicBarrier(2);

public static void main(String[] args) throws Exception {

ExecutorService executor = Executors.newCachedThreadPool();

for (int i = 0; i < 6; i++) {
final int threadNum = i;
Thread.sleep(1000);
executor.execute(() -> {
try {
race(threadNum);
} catch (Exception e) {
log.error("exception", e);
}
});
}
executor.shutdown();
}

private static void race(int threadNum) throws Exception {
Thread.sleep(1000);
log.info("{} is ready", threadNum);
barrier.await();
log.info("{} continue", threadNum);
}
}
运行结果:

image-20190804200432697

结果分析:
  1. 当线程0和1都已经到达ready后,才会continue。
  2. 当上一层执行结束后,紧接着开始下一层的执行。

源码分析

看一下该类的内部成员:

image-20190804200453607

带有阻塞是操作指令参数的构造函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* Creates a new {@code CyclicBarrier} that will trip when the
* given number of parties (threads) are waiting upon it, and which
* will execute the given barrier action when the barrier is tripped,
* performed by the last thread entering the barrier.
*
* @param parties the number of threads that must invoke {@link #await}
* before the barrier is tripped
* @param barrierAction the command to execute when the barrier is
* tripped, or {@code null} if there is no action
* @throws IllegalArgumentException if {@code parties} is less than 1
*/
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}

带有超时时间参数的await():

1
2
3
4
5
6
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
return dowait(true, unit.toNanos(timeout));
}

设置当前屏障无效,并唤醒每个线程。(持锁时有效)

1
2
3
4
5
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}

重置当前屏障计数:先加锁,再设置当前屏障代无效并创建新一代。

1
2
3
4
5
6
7
8
9
10
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
breakBarrier(); // break the current generation
nextGeneration(); // start a new generation
} finally {
lock.unlock();
}
}

创建新一代屏障:更新屏障闸的state字段并唤醒每个线程(持锁时有效)。

1
2
3
4
5
6
7
private void nextGeneration() {
// signal completion of last generation
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation();
}

await()中真正的实现方法==>dowait():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;//使用重入锁。
lock.lock();//锁内操作保安全
try {
final Generation g = generation;

if (g.broken)
throw new BrokenBarrierException();

if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
//count为parties即聚集个数(互相等待的个数)
//当index减为0,即所有线程都执行了该dowait()方法,都已执行完毕。
int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {//如果有执行命令则运行,并返回方法。
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)//执行命令为null,则破除屏障
breakBarrier();
}
}

// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}

if (g.broken)
throw new BrokenBarrierException();

if (g != generation)
return index;

if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();//操作结束释放锁
}
}

Conditin:放闸前所处的等待状态

1
private final Condition trip = lock.newCondition();

演示例子-2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Slf4j
public class CyclicBarrierExample2 {

private static CyclicBarrier barrier = new CyclicBarrier(5);

public static void main(String[] args) throws Exception {

ExecutorService executor = Executors.newCachedThreadPool();

for (int i = 0; i < 10; i++) {
final int threadNum = i;
Thread.sleep(1000);
executor.execute(() -> {
try {
race(threadNum);
} catch (Exception e) {
log.error("exception", e);
}
});
}
executor.shutdown();
}

private static void race(int threadNum) throws Exception {
Thread.sleep(1000);
log.info("{} is ready", threadNum);
try {
//只等待2000ms,之后的线程就会被抛弃了。
// 因线程的condition改变为broken(因超时而强行打破屏障继续执行)而产生了BrokenBarrierException,
// 或因线程被中断而产生线程中断异常而产生的需要try-catch
barrier.await(2000, TimeUnit.MILLISECONDS);
} catch (Exception e) {
log.warn("BarrierException", e);
}
log.info("{} continue", threadNum);
}
}
运行结果:(部分截取)

image-20190804200641684

演示例子-3

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Slf4j
public class CyclicBarrierExample3 {

//有参的构造函数,在dowait()方法执行时若存在则优先执行该Runnable指令再返
// 回(该时执行不到打破屏障代码段),之后再打破屏障并继续执行后续操作。
private static CyclicBarrier barrier = new CyclicBarrier(5, () -> {
log.info("callback is running");
});

public static void main(String[] args) throws Exception {

ExecutorService executor = Executors.newCachedThreadPool();

for (int i = 0; i < 10; i++) {
final int threadNum = i;
Thread.sleep(1000);
executor.execute(() -> {
try {
race(threadNum);
} catch (Exception e) {
log.error("exception", e);
}
});
}
executor.shutdown();
}

private static void race(int threadNum) throws Exception {
Thread.sleep(1000);
log.info("{} is ready", threadNum);
barrier.await();
log.info("{} continue", threadNum);
}
}
运行结果:(截取部分)

image-20190804200626835




并发容器J.U.C – AQS组件 锁:ReentrantLock、ReentrantReadWriteLock、StempedLock

ReentrantLock

java中有两类锁,一类是Synchronized,而另一类就是J.U.C中提供的锁。ReentrantLock与Synchronized都是可重入锁,本质上都是lock与unlock的操作。接下来我们介绍三种J.U.C中的锁,其中 ReentrantLock使用synchronized与之比对介绍。

ReentrantLock与synchronized的区别

可重入性:两者的锁都是可重入的,差别不大,有线程进入锁,计数器自增1,等下降为0时才可以释放锁
锁的实现:synchronized是基于JVM实现的(用户很难见到,无法了解其实现),ReentrantLock是JDK实现的。
性能区别:在最初的时候,二者的性能差别差很多,当synchronized引入了偏向锁、轻量级锁(自选锁)后,二者的性能差别不大,官方推荐synchronized(写法更容易、在优化时其实是借用了ReentrantLock的CAS技术,试图在用户态就把问题解决,避免进入内核态造成线程阻塞)
功能区别:
(1)便利性:synchronized更便利,它是由编译器保证加锁与释放。ReentrantLock是需要手动释放锁,所以为了避免忘记手工释放锁造成死锁,所以最好在finally中声明释放锁。
(2)锁的细粒度和灵活度,ReentrantLock优于synchronized
ReentrantLock独有的功能

可以指定是公平锁还是非公平锁,sync只能是非公平锁。(所谓公平锁就是先等待的线程先获得锁)
提供了一个Condition类,可以分组唤醒需要唤醒的线程。不像是synchronized要么随机唤醒一个线程,要么全部唤醒。
提供能够中断等待锁的线程的机制,通过lock.lockInterruptibly()实现,这种机制 ReentrantLock是一种自选锁,通过循环调用CAS操作来实现加锁。性能比较好的原因是避免了进入内核态的阻塞状态。
要放弃synchronized?

从上边的介绍,看上去ReentrantLock不仅拥有synchronized的所有功能,而且有一些功能synchronized无法实现的特性。性能方面,ReentrantLock也不比synchronized差,那么到底我们要不要放弃使用synchronized呢?答案是不要这样做。

J.U.C包中的锁定类是用于高级情况和高级用户的工具,除非说你对Lock的高级特性有特别清楚的了解以及有明确的需要,或这有明确的证据表明同步已经成为可伸缩性的瓶颈的时候,否则我们还是继续使用synchronized。相比较这些高级的锁定类,synchronized还是有一些优势的,比如synchronized不可能忘记释放锁。还有当JVM使用synchronized管理锁定请求和释放时,JVM在生成线程转储时能够包括锁定信息,这些信息对调试非常有价值,它们可以标识死锁以及其他异常行为的来源。

如何使用ReentrantLock?

1
2
3
4
5
6
7
8
9
10
11
//创建锁:使用Lock对象声明,使用ReentrantLock接口创建
private final static Lock lock = new ReentrantLock();
//使用锁:在需要被加锁的方法中使用
private static void add() {
lock.lock();
try {
count++;
} finally {
lock.unlock();
}
}

分析一下源码:

1
2
3
4
5
6
7
8
9
//初始化方面:
//在new ReentrantLock的时候默认给了一个不公平锁
public ReentrantLock() {
sync = new NonfairSync();
}
//也可以加参数来初始化指定使用公平锁还是不公平锁
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

内置函数(部分)

基础特性:

tryLock():仅在调用时锁定未被另一个线程保持的情况下才获取锁定。
tryLock(long timeout, TimeUnit unit):如果锁定在给定的时间内没有被另一个线程保持且当前线程没有被中断,则获取这个锁定。
lockInterruptbily:如果当前线程没有被中断的话,那么就获取锁定。如果中断了就抛出异常。
isLocked:查询此锁定是否由任意线程保持
isHeldByCurrentThread:查询当前线程是否保持锁定状态。
isFair:判断是不是公平锁

Condition相关特性:

hasQueuedThread(Thread):查询指定线程是否在等待获取此锁定
hasQueuedThreads():查询是否有线程在等待获取此锁定
getHoldCount():查询当前线程保持锁定的个数,也就是调用Lock方法的个数

Condition的使用

Condition可以非常灵活的操作线程的唤醒,下面是一个线程等待与唤醒的例子,其中用1234序号标出了日志输出顺序


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public static void main(String[] args) {
ReentrantLock reentrantLock = new ReentrantLock();
Condition condition = reentrantLock.newCondition();//创建condition
//线程1
new Thread(() -> {
try {
reentrantLock.lock();
log.info("wait signal"); // 1
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("get signal"); // 4
reentrantLock.unlock();
}).start();
//线程2
new Thread(() -> {
reentrantLock.lock();
log.info("get lock"); // 2
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
condition.signalAll();//发送信号
log.info("send signal"); // 3
reentrantLock.unlock();
}).start();
}

(这里对等待队列不熟悉的,请回顾我的上一篇文章中讲解的AQS等待队列:高并发探索(十一):并发容器J.U.C – AQS组件CountDownLatch、Semaphore、CyclicBarrier)
输出过程讲解:

1、线程1调用了reentrantLock.lock(),线程进入AQS等待队列,输出1号log
2、接着调用了awiat方法,线程从AQS队列中移除,锁释放,直接加入condition的等待队列中
3、线程2因为线程1释放了锁,拿到了锁,输出2号log
4、线程2执行condition.signalAll()发送信号,输出3号log
5、condition队列中线程1的节点接收到信号,从condition队列中拿出来放入到了AQS的等待队列,这时线程1并没有被唤醒。
6、线程2调用unlock释放锁,因为AQS队列中只有线程1,因此AQS释放锁按照从头到尾的顺序,唤醒线程1
7、线程1继续执行,输出4号log,并进行unlock操作。

读写锁:ReentrantReadWriteLock读写锁

在没有任何读写锁的时候才可以取得写入锁(悲观读取,容易写线程饥饿),也就是说如果一直存在读操作,那么写锁一直在等待没有读的情况出现,这样我的写锁就永远也获取不到,就会造成等待获取写锁的线程饥饿。

平时使用的场景并不多。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class LockExample3 {

private final Map<String, Data> map = new TreeMap<>();
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final Lock readLock = lock.readLock();//读锁
private final Lock writeLock = lock.writeLock();//写锁

//加读锁
public Data get(String key) {
readLock.lock();
try {
return map.get(key);
} finally {
readLock.unlock();
}
}
//加写锁
public Data put(String key, Data value) {
writeLock.lock();
try {
return map.put(key, value);
} finally {
writeLock.unlock();
}
}

class Data {}
}

票据锁:StempedLock

它控制锁有三种模式(写、读、乐观读)。一个StempedLock的状态是由版本和模式两个部分组成。锁获取方法返回一个数字作为票据(stamp),他用相应的锁状态表示并控制相关的访问。数字0表示没有写锁被锁写访问,在读锁上分为悲观锁和乐观锁。

乐观读:
如果读的操作很多写的很少,我们可以乐观的认为读的操作与写的操作同时发生的情况很少,因此不悲观的使用完全的读取锁定。程序可以查看读取资料之后是否遭到写入资料的变更,再采取之后的措施。

如何使用?

1
2
3
4
5
6
7
8
9
10
11
//定义
private final static StampedLock lock = new StampedLock();
//需要上锁的方法
private static void add() {
long stamp = lock.writeLock();
try {
count++;
} finally {
lock.unlock(stamp);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
class Point {
private double x, y;
private final StampedLock sl = new StampedLock();

void move(double deltaX, double deltaY) {
long stamp = sl.writeLock();
try {
x += deltaX;
y += deltaY;
} finally {
sl.unlockWrite(stamp);
}
}

//下面看看乐观读锁案例
double distanceFromOrigin() { // A read-only method
long stamp = sl.tryOptimisticRead(); //获得一个乐观读锁
double currentX = x, currentY = y; //将两个字段读入本地局部变量
if (!sl.validate(stamp)) { //检查发出乐观读锁后同时是否有其他写锁发生?
stamp = sl.readLock(); //如果没有,我们再次获得一个读悲观锁
try {
currentX = x; // 将两个字段读入本地局部变量
currentY = y; // 将两个字段读入本地局部变量
} finally {
sl.unlockRead(stamp);
}
}
return Math.sqrt(currentX * currentX + currentY * currentY);
}

//下面是悲观读锁案例
void moveIfAtOrigin(double newX, double newY) { // upgrade
// Could instead start with optimistic, not read mode
long stamp = sl.readLock();
try {
while (x == 0.0 && y == 0.0) { //循环,检查当前状态是否符合
long ws = sl.tryConvertToWriteLock(stamp); //将读锁转为写锁
if (ws != 0L) { //这是确认转为写锁是否成功
stamp = ws; //如果成功 替换票据
x = newX; //进行状态改变
y = newY; //进行状态改变
break;
} else { //如果不能成功转换为写锁
sl.unlockRead(stamp); //我们显式释放读锁
stamp = sl.writeLock(); //显式直接进行写锁 然后再通过循环再试
}
}
} finally {
sl.unlock(stamp); //释放读锁或写锁
}
}
}

如何选择锁?

1、当只有少量竞争者,使用synchronized
2、竞争者不少但是线程增长的趋势是能预估的,使用ReetrantLock
3、synchronized不会造成死锁,jvm会自动释放死锁。


J.U.C之AQS—RentrantLock-Part-1

概述

RentrantLock即重入锁,是JDK中J.U.C提供的最重要的锁。
通过自旋锁实现RentrantLock:循环调用CAS操作实现枷锁,即阻止线程进入内核态被阻塞,故而效率较高。

  1. 可重入性:两者都具有可重入性。
  2. 锁的实现:RentrantLock是JDK中源码实现;Synchronized的锁机制是由JVM的JMM机制管理实现。
  3. 性能区别:Synchronized关键字优化前,其性能很差;但优化后(借鉴了RentrantLock中的CAS机制:在用户态加锁解锁),引入了偏向锁、自旋锁后,两者性能几无差异。(更推荐synchronized)。
  4. 功能区别:(1).synchronized关键字使用更加简洁简单,由编译器保证实现;RentrantLock需声明锁,并加锁,在finally中解锁。(2).RentrantLock锁粒度更细,灵活度高。

公平锁即按照线程的请求的先后顺序给与锁;非公平锁即按照对锁的争夺成功的线程加锁。

特有功能:

  1. RentrantLock可以指定该lock是公平锁或非公平锁(而synchronized只能是非公平锁);
  2. 通过方法提供Condition类,可以分组唤醒需要唤醒的线程。
  3. 提供能够中断等待锁的线程的机制 –> lock.lockInterruptibly()

适用场景

当需要用到上面RentrantLock的特有功能时,必须使用RentrantLock。

演示例子

RentrantLock
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@Slf4j
@ThreadSafe
public class LockExample2 {

public static int clientTotal = 5000;

public static int threadTotal = 200;

public static int count = 0;

private final static Lock lock = new ReentrantLock();

public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal ; i++) {
executorService.execute(() -> {
try {
semaphore.acquire();
add();
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("count:{}", count);
}

private static void add() {
lock.lock();
try {
count++;
} finally {
lock.unlock();
}
}
}
例子分析:

简简单单,只是在核心方法执行前加锁,在之后的finally中解锁。

RentrantReadWriteLock
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class LockExample3 {

private final Map<String, Data> map = new TreeMap<>();

private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

private final Lock readLock = lock.readLock();

private final Lock writeLock = lock.writeLock();

public Data get(String key) {
readLock.lock();
try {
return map.get(key);
} finally {
readLock.unlock();
}
}

public Set<String> getAllKeys() {
readLock.lock();
try {
return map.keySet();
} finally {
readLock.unlock();
}
}

public Data put(String key, Data value) {
writeLock.lock();
try {
return map.put(key, value);
} finally {
readLock.unlock();
}
}
}
例子说明
  1. 其中使用了RentrantReadWriteLock,该锁是读写分离锁,即读操作和写操作分别持有一把锁。

  2. 而且由于该RentrantReadWriteLock的读锁是一种悲观锁,即只有当其他操作都执行完后才会进行写操作。那么,但实际应用中读操作是远远多于写操作的(一般情况下),那么可能会导致写操作产生线程饥饿。

    悲观锁:对数据被外界(包括本系统当前的其他事务,以及来自外部系统的事务处理)修改持保守态度;因为悲观,认为自己的数据很容易在并发中产生错误,所以在整个数据处理过程中,将数据处于锁定状态.
    乐观锁:大多是基于数据版本记录机制实现;读取出数据时,将此版本号(version字段)一同读出,之后更新时,对此版本号加一;若提交的数据版本大于数据库表当前版本号,则予以更新,否则认为是过期数据。
    线程饥饿:通常是因为线程优先级过低,导致该线程等待被执行的时间过久,甚至执行操作已经无意义。

  3. 因此,该类实际中用到的不多。

StampedLock
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public class StampedLockExample {

class Point {
private double x, y;
private final StampedLock sl = new StampedLock();

void move(double deltaX, double deltaY) { // an exclusively locked method
long stamp = sl.writeLock();
try {
x += deltaX;
y += deltaY;
} finally {
sl.unlockWrite(stamp);
}
}

//下面是乐观读锁案例
double distanceFromOrigin() { // A read-only method
long stamp = sl.tryOptimisticRead(); //获得一个乐观读锁
double currentX = x, currentY = y; //将两个字段读入本地局部变量
if (!sl.validate(stamp)) { //检查发出乐观读锁后同时是否有其他写锁发生?
stamp = sl.readLock(); //如果没有,我们再次获得一个读悲观锁
try {
currentX = x; // 将两个字段读入本地局部变量
currentY = y; // 将两个字段读入本地局部变量
} finally {
sl.unlockRead(stamp);
}
}
return Math.sqrt(currentX * currentX + currentY * currentY);
}

//下面是悲观读锁案例
void moveIfAtOrigin(double newX, double newY) { // upgrade
// Could instead start with optimistic, not read mode
long stamp = sl.readLock();
try {
while (x == 0.0 && y == 0.0) { //循环,检查当前状态是否符合
long ws = sl.tryConvertToWriteLock(stamp); //将读锁转为写锁
if (ws != 0L) { //这是确认转为写锁是否成功
stamp = ws; //如果成功 替换票据
x = newX; //进行状态改变
y = newY; //进行状态改变
break;
} else { //如果不能成功转换为写锁
sl.unlockRead(stamp); //我们显式释放读锁
stamp = sl.writeLock(); //显式直接进行写锁 然后再通过循环再试
}
}
} finally {
sl.unlock(stamp); //释放读锁或写锁
}
}
}
}
例子分析

这个例子是JDK源码中提供的参考例子。相对于RentrantReadWriteLock,StampedLock的有三种控制锁的模式:写锁,读锁,乐观读锁(!!)。由于提供了乐观锁的实现机制,那么即使是在读操作占比很高的情况中,系统仍然可以保持较好性能和高IO吞吐量。

StampedLock源码分析

先看顶部注释:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
/**
* A capability-based lock with three modes for controlling read/write
* access. The state of a StampedLock consists of a version and mode.
* Lock acquisition methods return a stamp that represents and
* controls access with respect to a lock state; "try" versions of
* these methods may instead return the special value zero to
* represent failure to acquire access. Lock release and conversion
* methods require stamps as arguments, and fail if they do not match
* the state of the lock. ……
*
* <li><b>Optimistic Reading.</b> Method {@link #tryOptimisticRead}
* returns a non-zero stamp only if the lock is not currently held
* in write mode. Method {@link #validate} returns true if the lock
* has not been acquired in write mode since obtaining a given
* stamp. This mode can be thought of as an extremely weak version
* of a read-lock, that can be broken by a writer at any time. The
* use of optimistic mode for short read-only code segments often
* reduces contention and improves throughput. However, its use is
* inherently fragile. Optimistic read sections should only read
* fields and hold them in local variables for later use after
* validation. Fields read while in optimistic mode may be wildly
* inconsistent, so usage applies only when you are familiar enough
* with data representations to check consistency and/or repeatedly
* invoke method {@code validate()}. For example, such steps are
* typically required when first reading an object or array
* reference, and then accessing one of its fields, elements or
* methods. </li>
*
* Algorithmic notes:
*
* The design employs elements of Sequence locks
* (as used in linux kernels; see Lameter's
* http://www.lameter.com/gelato2005.pdf
* and elsewhere; see
* Boehm's http://www.hpl.hp.com/techreports/2012/HPL-2012-68.html)
* and Ordered RW locks (see Shirako et al
* http://dl.acm.org/citation.cfm?id=2312015)
*
* Conceptually, the primary state of the lock includes a sequence
* number that is odd when write-locked and even otherwise.
* However, this is offset by a reader count that is non-zero when
* read-locked. The read count is ignored when validating
* "optimistic" seqlock-reader-style stamps. Because we must use
* a small finite number of bits (currently 7) for readers, a
* supplementary reader overflow word is used when the number of
* readers exceeds the count field. We do this by treating the max
* reader count value (RBITS) as a spinlock protecting overflow
* updates.
*
* Waiters use a modified form of CLH lock used in
* AbstractQueuedSynchronizer (see its internal documentation for
* a fuller account), where each node is tagged (field mode) as
* either a reader or writer. Sets of waiting readers are grouped
* (linked) under a common node (field cowait) so act as a single
* node with respect to most CLH mechanics. By virtue of the
* queue structure, wait nodes need not actually carry sequence
* numbers; we know each is greater than its predecessor. This
* simplifies the scheduling policy to a mainly-FIFO scheme that
* incorporates elements of Phase-Fair locks (see Brandenburg &
* Anderson, especially http://www.cs.unc.edu/~bbb/diss/). In
* particular, we use the phase-fair anti-barging rule: If an
* incoming reader arrives while read lock is held but there is a
* queued writer, this incoming reader is queued. (This rule is
* responsible for some of the complexity of method acquireRead,
* but without it, the lock becomes highly unfair.) Method release
* does not (and sometimes cannot) itself wake up cowaiters. This
* is done by the primary thread, but helped by any other threads
* with nothing better to do in methods acquireRead and
* acquireWrite.
*
* These rules apply to threads actually queued. All tryLock forms
* opportunistically try to acquire locks regardless of preference
* rules, and so may "barge" their way in. Randomized spinning is
* used in the acquire methods to reduce (increasingly expensive)
* context switching while also avoiding sustained memory
* thrashing among many threads. We limit spins to the head of
* queue. A thread spin-waits up to SPINS times (where each
* iteration decreases spin count with 50% probability) before
* blocking. If, upon wakening it fails to obtain lock, and is
* still (or becomes) the first waiting thread (which indicates
* that some other thread barged and obtained lock), it escalates
* spins (up to MAX_HEAD_SPINS) to reduce the likelihood of
* continually losing to barging threads.
*
* Nearly all of these mechanics are carried out in methods
* acquireWrite and acquireRead, that, as typical of such code,
* sprawl out because actions and retries rely on consistent sets
* of locally cached reads.
*
* As noted in Boehm's paper (above), sequence validation (mainly
* method validate()) requires stricter ordering rules than apply
* to normal volatile reads (of "state"). To force orderings of
* reads before a validation and the validation itself in those
* cases where this is not already forced, we use
* Unsafe.loadFence.
*
* The memory layout keeps lock state and queue pointers together
* (normally on the same cache line). This usually works well for
* read-mostly loads. In most other cases, the natural tendency of
* adaptive-spin CLH locks to reduce memory contention lessens
* motivation to further spread out contended locations, but might
* be subject to future improvements.
*/

看来这个类还是有、东西的。不过内容太多了,也不知从何看起。目前先了解一下大致情况,以后继续慢慢了解~

答应我,不要吐~ 还是翻回去从头好好看一下,其中有用到AQS的CLH队列哦~

J.U.C之AQS—RentrantLock-Part-2

源码学习

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
public ReentrantLock() {
sync = new NonfairSync();
}

/**
* Sync object for non-fair locks
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;

/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}

protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}

/**
* Atomically sets synchronization state to the given updated
* value if the current state value equals the expected value.
* This operation has memory semantics of a {@code volatile} read
* and write.
*
* @param expect the expected value
* @param update the new value
* @return {@code true} if successful. False return indicates that the actual
* value was not equal to the expected value.
*/
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

/**
* Sets the thread that currently owns exclusive access.
* A {@code null} argument indicates that no thread owns access.
* This method does not otherwise impose any synchronization or
* {@code volatile} field accesses.
* @param thread the owner thread
*/
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}

/**
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

/**
* Convenience method to interrupt current thread.
*/
static void selfInterrupt() {
Thread.currentThread().interrupt();
}

public void interrupt() {
if (this != Thread.currentThread())
checkAccess();

synchronized (blockerLock) {
Interruptible b = blocker;
if (b != null) {
interrupt0(); // Just to set the interrupt flag
b.interrupt(this);
return;
}
}
interrupt0();
}

private native void interrupt0();

配合的Condition

在下篇手记-介绍Condition中有详细分析~

小总结:

  1. synchronized关键字:它是由JVM实现,并有很多内存监控工具提供帮助,并通过这些工具配合synchronized的内存标识,进行内存层面的“debug”或观察;当运行时出现异常,JVM会自动解锁并进行处理。
  2. ReentrantLock、ReentrantReadWriteLock、StampedLock是对象层面的锁,需要手动指定加锁与解锁操作(放在finally中)。
  3. StampedLock:因为加入乐观锁,故对吞吐量有较大的优化,尤其是读多写少时。但使用起来较复杂,API内容较多。
  4. 当线程较少推荐使用synchronized关键字,简单效率高;当线程较多,且线程并发的增长有一定趋势时(可预估),推荐使用ReentrantLock。