共享锁Semaphore
# 前言
Semaphore(信号量)主要作用是控制访问特定资源的线程数目,底层依赖AQS的状态State,实现逻辑与ReentrantLock很相似,有很多共用的方法,里面也使用到相同的CLH队列。
重要方法
public void acquire() throws InterruptedException
acquire() 表示阻塞并获取许可
public void release()
release() 表示释放许可
# 1 获取许可(非公平)
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
//线程已被中断,则抛出异常
if (Thread.interrupted())
throw new InterruptedException();
//如果获取不到许可(state < 0),则去CLH队列排队
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
1
2
3
4
5
6
7
8
9
10
11
12
2
3
4
5
6
7
8
9
10
11
12
提示
Semaphore也支持公平和非公平两种方式,tryAcquireShared
下有两种实现。
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
final int nonfairTryAcquireShared(int acquires) {
//自旋直到返回结果
for (;;) {
int available = getState();
int remaining = available - acquires;
//返回state小于0
if (remaining < 0 ||
//或者state更新成功(大于等于0)
compareAndSetState(available, remaining))
return remaining;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
//新增一个共享类型的节点到队列
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
//如果当前节点的前驱节点时头结点
if (p == head) {
//去尝试获取许可
int r = tryAcquireShared(arg);
if (r >= 0) {
//获取许可成功则把当前节点设置成头结点,并且尝试唤醒队列中下一节点
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//1.前驱节点状态为可唤醒,则当前节点所属线程可以park住
//2.park住线程;清除线程的中断标识,并返回清除之前的状态
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//该判断代码块与ReentrantLock的实现一致,
// 不同的是ReentrantLock只更新中断标识变量,Semaphore直接抛出中断异常
throw new InterruptedException();
}
} finally {
if (failed)
//与ReentrantLock实现一致
cancelAcquire(node);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
注意
以下逻辑较难理解,目前未理解透彻。
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
//当前节点设置成头结点
setHead(node);
/*
* 尝试去唤醒队列中的下一个节点,如果满足如下条件:
* 调用者明确表示"传递"(propagate > 0),
* 或者h.waitStatus为PROPAGATE(被上一个操作设置)
* 并且
* 下一个节点处于共享模式或者为null。
*
* 这两项检查中的保守主义可能会导致不必要的唤醒,但只有在有
* 有在多个线程争取获得/释放同步状态时才会发生,所以大多
* 数情况下会立马获得需要的信号
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private void doReleaseShared() {
/*
* 保证释放动作(向同步等待队列尾部)传递,即使没有其他正在进行的
* 请求或释放动作。如果头节点的后继节点需要唤醒,那么执行唤醒
* 动作;如果不需要,将头结点的等待状态设置为PROPAGATE保证
* 唤醒传递。另外,为了防止过程中有新节点进入(队列),这里必
* 需做循环,所以,和其他unparkSuccessor方法使用方式不一样
* 的是,如果(头结点)等待状态设置失败,重新检测。
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
//唤醒状态更新回0
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
//唤醒头结点的后继节点
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
// 如果头结点发生变化,则继续循环。否则,退出循环。
if (h == head)
break;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 2 释放许可
public void release() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
//更新state
if (tryReleaseShared(arg)) {
//唤醒队列中的节点
doReleaseShared();
return true;
}
return false;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
2
3
4
5
6
7
8
9
10
11
12
13
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
//给state加上释放的许可
int next = current + releases;
//超出Integer.MAX_VALUE,即2^31,抛异常
if (next < current)
throw new Error("Maximum permit count exceeded");
//cas方式更新state
if (compareAndSetState(current, next))
return true;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
2
3
4
5
6
7
8
9
10
11
12
13
# 3 获取许可(公平)
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
公平锁与非公平锁的实现基本一致,只是在更新 state
之前会先去检查下队列中是否其他节点在排队,有的话则直接去排队。
if (hasQueuedPredecessors())
return -1;
1
2
2
上次更新: 2022/11/24, 17:59:25