线程通信

概述

线程通信的目标是使线程间能够互相发送信号。另一方面,线程通信使线程能够等待其他线程的信号。

例如,线程B可以等待线程A的一个信号,这个信号会通知线程B数据已经准备好了。本文将讲解以下几个JAVA线程间通信的主题:

通过共享对象通信

线程间发送信号的一个简单方式是在共享对象的变量里设置信号值。线程A在一个同步块里设置boolean型成员变量hasDataToProcess为true,线程B也在同步块里读取hasDataToProcess这个成员变量。这个简单的例子使用了一个持有信号的对象,并提供了set和check方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class MySignal{

protected boolean hasDataToProcess = false;

public synchronized boolean hasDataToProcess(){
return this.hasDataToProcess;
}

public synchronized void setHasDataToProcess(boolean hasData){
this.hasDataToProcess = hasData;
}

}

线程A和B必须获得指向一个MySignal共享实例的引用,以便进行通信。如果它们持有的引用指向不同的MySingal实例,那么彼此将不能检测到对方的信号。需要处理的数据可以存放在一个共享缓存区里,它和MySignal实例是分开存放的。

忙等待

准备处理数据的线程B正在等待数据变为可用。换句话说,它在等待线程A的一个信号,这个信号使hasDataToProcess()返回true。线程B运行在一个循环里,以等待这个信号:

1
2
3
4
5
6
7
protected MySignal sharedSignal = ...

...

while(!sharedSignal.hasDataToProcess()){
//do nothing... busy waiting
}

wait(),notify()和notifyAll()

忙等待没有对运行等待线程的CPU进行有效的利用,除非平均等待时间非常短。否则,让等待线程进入睡眠或者非运行状态更为明智,直到它接收到它等待的信号。

Java有一个内建的等待机制来允许线程在等待信号的时候变为非运行状态。java.lang.Object 类定义了三个方法,wait()、notify()和notifyAll()来实现这个等待机制。

一个线程一旦调用了任意对象的wait()方法,就会变为非运行状态,直到另一个线程调用了同一个对象 的notify()方法。为了调用wait()或者notify(),线程必须先获得那个对象的锁。也就是说,线程必须在同步块里调用wait()或者notify()。以下是MySingal的修改版本——使用了wait()和notify()的MyWaitNotify:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class MonitorObject{
}

public class MyWaitNotify{

MonitorObject myMonitorObject = new MonitorObject();

public void doWait(){
synchronized(myMonitorObject){
try{
myMonitorObject.wait();
} catch(InterruptedException e){...}
}
}

public void doNotify(){
synchronized(myMonitorObject){
myMonitorObject.notify();
}
}
}

等待线程将调用doWait(),而唤醒线程将调用doNotify()。当一个线程调用一个对象的notify()方法,正在等待该对象的所有线程中将有一个线程被唤醒并允许执行 (校注:这个将被唤醒的线程是随机的,不可以指定唤醒哪个线程)。 同时也提供了一个notifyAll()方法来唤醒正在等待一个给定对象的所有线程。

如你所见,不管是等待线程还是唤醒线程都在同步块里调用wait()和notify()。这是强制性的!一个线程如果没有持有对象锁,将不能调用wait(),notify()或者notifyAll()。 否则,会抛出IllegalMonitorStateException异常。

(校注:JVM是这么实现的,当你调用wait时候它首先要检查下当前线程是否是锁的拥有者,不是则抛出IllegalMonitorStateExcept,参考JVM源码的 1422行。)

但是,这怎么可能?等待线程在同步块里面执行的时候,不是一直持有监视器对象(myMonitor对象)的锁吗?等待线程不能阻塞唤醒线程进入doNotify()的同步块吗?答案是:的确不能。一旦线程调用了wait()方法,它就释放了所持有的监视器对象上的锁。这将允许其他线程也可以调用wait()或者notify()。

一旦一个线程被唤醒,不能立刻就退出wait()的方法调用,直到调用notify()的线程退出了它自己的同步块。换句话说:被唤醒的线程必须重新获得监视器对象的锁,才可以退出wait()的方法调用,因为wait方法调用运行在同步块里面。如果多个线程被notifyAll()唤醒,那么在同一时刻将只有一个线程可以退出wait()方法,因为每个线程在退出wait()前必须获得监视器对象的锁。

丢失的信号

notify()和notifyAll()方法不会保存调用它们的方法,因为当这两个方法被调用时,有可能没有线程处于等待状态。通知信号过后便丢弃了。因此,如果一个线程先于被通知线程调用wait()前调用了notify(),等待的线程将错过这个信号。这可能是也可能不是个问题。不过,在某些情况下,这可能使等待线程永远在等待,不再醒来,因为线程错过了唤醒信号。

为了避免丢失信号,必须把它们保存在信号类里。在MyWaitNotify的例子中,通知信号应被存储在MyWaitNotify实例的一个成员变量里。以下是MyWaitNotify的修改版本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class MyWaitNotify2{

MonitorObject myMonitorObject = new MonitorObject();
boolean wasSignalled = false;

public void doWait(){
synchronized(myMonitorObject){
if(!wasSignalled){
try{
myMonitorObject.wait();
} catch(InterruptedException e){...}
}
//clear signal and continue running.
wasSignalled = false;
}
}

public void doNotify(){
synchronized(myMonitorObject){
wasSignalled = true;
myMonitorObject.notify();
}
}
}

留意doNotify()方法在调用notify()前把wasSignalled变量设为true。同时,留意doWait()方法在调用wait()前会检查wasSignalled变量。事实上,如果没有信号在前一次doWait()调用和这次doWait()调用之间的时间段里被接收到,它将只调用wait()。

(校注:为了避免信号丢失, 用一个变量来保存是否被通知过。在notify前,设置自己已经被通知过。在wait后,设置自己没有被通知过,需要等待通知。)

自己的理解,wasSignalled标注的功能就是限制Wait和Notify的顺序

假唤醒

由于莫名其妙的原因,线程有可能在没有调用过notify()和notifyAll()的情况下醒来。这就是所谓的假唤醒(spurious wakeups)。无端端地醒过来了。

如果在MyWaitNotify2的doWait()方法里发生了假唤醒,等待线程即使没有收到正确的信号,也能够执行后续的操作。这可能导致你的应用程序出现严重问题。

为了防止假唤醒,保存信号的成员变量将在一个while循环里接受检查,而不是在if表达式里。这样的一个while循环叫做自旋锁 (校注:这种做法要慎重,目前的JVM实现自旋会消耗CPU,如果长时间不调用doNotify方法,doWait方法会一直自旋,CPU会消耗太大)。 被唤醒的线程会自旋直到自旋锁(while循环)里的条件变为false。以下MyWaitNotify2的修改版本展示了这点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class MyWaitNotify3{

MonitorObject myMonitorObject = new MonitorObject();
boolean wasSignalled = false;

public void doWait(){
synchronized(myMonitorObject){
while(!wasSignalled){
try{
myMonitorObject.wait();
} catch(InterruptedException e){...}
}
//clear signal and continue running.
wasSignalled = false;
}
}

public void doNotify(){
synchronized(myMonitorObject){
wasSignalled = true;
myMonitorObject.notify();
}
}
}

留意wait()方法是在while循环里,而不在if表达式里。如果等待线程没有收到信号就唤醒,wasSignalled变量将变为false,while循环会再执行一次,促使醒来的线程回到等待状态。

多线程等待相同信号

如果你有多个线程在等待,被notifyAll()唤醒,但只有一个被允许继续执行,使用while循环也是个好方法。每次只有一个线程可以获得监视器对象锁,意味着只有一个线程可以退出wait()调用并清除wasSignalled标志(设为false)。一旦这个线程退出doWait()的同步块,其他线程退出wait()调用,并在while循环里检查wasSignalled变量值。但是,这个标志已经被第一个唤醒的线程清除了,所以其余醒来的线程将回到等待状态,直到下次信号到来。

不要对常量字符串或全局对象调用wait()

(校注:本章说的字符串常量指的是值为常量的变量)

本文早期的一个版本在MyWaitNotify例子里使用字符串常量(””)作为管程对象。以下是那个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class MyWaitNotify3{

String myMonitorObject = "";
boolean wasSignalled = false;

public void doWait(){
synchronized(myMonitorObject){
while(!wasSignalled){
try{
myMonitorObject.wait();
} catch(InterruptedException e){...}
}
//clear signal and continue running.
wasSignalled = false;
}
}

public void doNotify(){
synchronized(myMonitorObject){
wasSignalled = true;
myMonitorObject.notify();
}
}
}

在空字符串作为锁的同步块(或者其他常量字符串)里调用wait()和notify()产生的问题是,JVM/编译器内部会把常量字符串转换成同一个对象。这意味着,即使你有2个不同的MyWaitNotify实例,它们都引用了相同的空字符串实例。同时也意味着存在这样的风险:在第一个MyWaitNotify实例上调用doWait()的线程会被在第二个MyWaitNotify实例上调用doNotify()的线程唤醒。这种情况可以画成以下这张图:

等待/通知机制 总结

相关方法

方法名称 描述
notify() 通知一个在对象上等待的线程,由WAITING状态变为BLOCKING状态,从等待队列移动到同步队列,等待CPU调度获取该对象的锁,当该线程获取到了对象的锁后,该线程从wait()方法返回
notifyAll() 通知所有等待在该对象上的线程,由WAITING状态变为BLOCKING状态,等待CPU调度获取该对象的锁
wait() 调用该方法的线程进入WAITING状态,并将当前线程放置到对象的等待队列,只有等待另外线程的通知或被中断才会返回,需要注意,调用wait()方法后,会释放对象的锁
wait(long) 超时等待一段时间,这里的参数时间是毫秒,也就是等待长达n毫秒,如果没有通知就超时返回
wait(long,int) 对于超时时间更细力度的控制,可以达到纳秒

代码清单

如下代码本人觉得更加清晰

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package cn.caoler.WaitNotify;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeUnit;

/**
* Author: Caole
* CreateDateTime: 2017/11/23 12:43
* Description:
*/
public class WaitNotify {
static boolean flag = true;
static Object lock = new Object();

public static void main(String[] args) throws InterruptedException {
Thread waitThread = new Thread(new Wait(), "WaitThread");
waitThread.start();
TimeUnit.SECONDS.sleep(1);
Thread notifyThread = new Thread(new Notify(), "NotifyThread");
notifyThread.start();
}

static class Wait implements Runnable {

@Override
public void run() {
//加锁,拥有lock的Monitor
synchronized(lock){
//当条件不满足时继续wait,同时释放了lock的锁
while (flag){
try {
System.out.println(Thread.currentThread() + "flag is true [email protected]"
+ new SimpleDateFormat("HH:mm:ss").format(new Date()));
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//条件满足时,完成工作
System.out.println(Thread.currentThread() + "flag is false [email protected]"
+ new SimpleDateFormat("HH:mm:ss").format(new Date()));
}
}
}

static class Notify implements Runnable{

@Override
public void run() {
//加锁,拥有lock的Monitor
synchronized(lock){
//获取lock的锁,然后进行通知,通知时不会释放lock的锁
//直到当前线程释放了lock锁之后,WaitThread才能从wait方法中返回
System.out.println(Thread.currentThread() + "hold lock .notify @"
+ new SimpleDateFormat("HH:mm:ss").format(new Date()));
lock.notifyAll();
flag = false;
SleepUtils.second(5);
}
//再次加锁
synchronized(lock){
System.out.println(Thread.currentThread() + "hold lock again .sleep @"
+ new SimpleDateFormat("HH:mm:ss").format(new Date()));
SleepUtils.second(5);
}
}
}
}

输出如下:

1
2
3
4
Thread[WaitThread,5,main]flag is true .wait@15:54:24
Thread[NotifyThread,5,main]hold lock .notify @15:54:24
Thread[NotifyThread,5,main]hold lock again .sleep @15:54:30
Thread[WaitThread,5,main]flag is false .running@15:54:35

使用wait/notify注意事项和问题

  • 执行完wait后会释放锁, 在执行notify()方法后,当前线程不会立刻释放该对象锁。呈wait状态的线程并不能马上获取该对象锁,要等执行notify()方法的线程将程序执行完,也就是退出synchronized代码块后,当前线程才会释放锁。

  • 在调用wait()或notify()方法之前,必须获得该对象的对象级别锁,即只能在同步方法或者同步块中调用wait()方法,在执行wait()方法之后,当前线程释放锁。

  • 在调用notify通知某线程后,该线程不会立即进入Running状态,而是先进入Runnable状态。

  • notify每次仅通知一个线程,多次调用可将wait线程全部唤醒。

  • 当线程呈wait状态时,调用线程对象的interrupt方法会抛出InterruptedException异常。

  • 通知过早: 如果通知过早,会打乱程序正常运行逻辑。

等待/通知的经典范式

从前断示例中可以提炼出等待/通知经典范式,该范式分为两部分,分别为等待方(消费者)和通知方(生产者)。

  • 等待方遵循如下原则。

    1. 获取对象的锁
    2. 如果条件不满足,那么调用对象的wait()方法,被通知后仍要检查条件。
    3. 条件满足则执行对应的逻辑。

      对应的伪代码如下

      1
      2
      3
      4
      5
      6
      synchronized(对象){
      while(不满足){
      对象.wait();
      }
      对应的处理逻辑
      }
  • 通知方遵循如下原则

    1. 获得对象的锁
    2. 改变条件
    3. 通知所有等待在对象上的线程

      对应的伪代码如下:

      1
      2
      3
      4
      synchronized(对象){
      改变条件
      对象.notifyAll();
      }

参考

0%