java.util.concurrent包与线程同步

java.util.concurrent.CountDownLatch类

当想让某个线程等待指定的线程终止时,可以使用java.lang.Thread类的join方法。但是,由于join方法可以等待的只是“线程终止”这个一次性的操作,所以无法使用它实现“等待指定次数的某种操作发生”。
使用java.util.concurrent.CountDownLatch类可以实现“等待指定次数的CountDown方法被调用”这一功能。

Main.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Main {
private static final int TASKS = 10;

public static void main(String[] args) {
System.out.println("BEGIN");
ExecutorService service = Executors.newFixedThreadPool(5);
CountDownLatch doneLatch = new CountDownLatch(TASKS);

try {
for (int t = 0; t < TASKS; t++) {
service.execute(new MyTask(doneLatch, t));
}

System.out.println("AWAIT");
doneLatch.await();
} catch (InterruptedException e) {
} finally {
service.shutdown();
System.out.println("END");
}
}
}

MyTask.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import java.util.Random;
import java.util.concurrent.CountDownLatch;

public class MyTask implements Runnable {
private final CountDownLatch doneLatch;
private final int context;
private static final Random random = new Random(314159);

public MyTask(CountDownLatch doneLatch, int context) {
this.doneLatch = doneLatch;
this.context = context;
}

@Override
public void run() {
doTask();
doneLatch.countDown();
}

protected void doTask() {
String name = Thread.currentThread().getName();
System.out.println(name + ":MyTask:BEGIN:context = " + context);

try {
Thread.sleep(random.nextInt(3000));
} catch (InterruptedException e) {
} finally {
System.out.println(name + ":MyTask:END:context = " + context);
}
}
}

运行结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
BEGIN
pool-1-thread-2:MyTask:BEGIN:context = 1
pool-1-thread-1:MyTask:BEGIN:context = 0
pool-1-thread-3:MyTask:BEGIN:context = 2
pool-1-thread-4:MyTask:BEGIN:context = 3
AWAIT
pool-1-thread-5:MyTask:BEGIN:context = 4
pool-1-thread-4:MyTask:END:context = 3
pool-1-thread-4:MyTask:BEGIN:context = 5
pool-1-thread-1:MyTask:END:context = 0
pool-1-thread-1:MyTask:BEGIN:context = 6
pool-1-thread-1:MyTask:END:context = 6
pool-1-thread-1:MyTask:BEGIN:context = 7
pool-1-thread-4:MyTask:END:context = 5
pool-1-thread-4:MyTask:BEGIN:context = 8
pool-1-thread-2:MyTask:END:context = 1
pool-1-thread-2:MyTask:BEGIN:context = 9
pool-1-thread-1:MyTask:END:context = 7
pool-1-thread-5:MyTask:END:context = 4
pool-1-thread-3:MyTask:END:context = 2
pool-1-thread-4:MyTask:END:context = 8
pool-1-thread-2:MyTask:END:context = 9
END

时序图

k8OsKg.png

java.util.concurrent.CyclicBarrier类

CountDownLatch类只能进行倒数计数。也就是说,一旦计数值变为0后,即使调用await方法,主线程也会立即返回。
当想多次重复进行线程同步时,使用java.util.concurrent.CyclicBarrier类会很方便。
CyclicBarrier可以周期性地(cyclic)创建出屏障(barrier)。在屏障解除之前,碰到屏障的线程是无法继续前进的。屏障的解除条件是到达屏障处的线程个数达到了构造函数指定的个数。也就是说,当指定个数的线程到达屏障处后,屏障就会被解除,然后这些线程就会一起冲出去。

Main.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Main {
private static final int THREADS = 3;

public static void main(String[] args) {
System.out.println("BEGIN");
ExecutorService service = Executors.newFixedThreadPool(THREADS);

Runnable barrierAction = new Runnable() {
@Override
public void run() {
System.out.println("Barrier Action!");
}
};

CyclicBarrier phaseBarrier = new CyclicBarrier(THREADS, barrierAction);
CountDownLatch doneLatch = new CountDownLatch(THREADS);

try {
for (int t = 0; t < THREADS; t++) {
service.execute(new MyTask(phaseBarrier, doneLatch, t));
}

System.out.println("AWAIT");
doneLatch.await();
} catch (InterruptedException e) {
} finally {
service.shutdown();
System.out.println("END");
}
}
}

MyTask.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;

public class MyTask implements Runnable {
private static final int PHASE = 5;
private final CyclicBarrier phaseBarrier;
private final CountDownLatch doneLatch;
private final int context;
private static final Random random = new Random(314159);

public MyTask(CyclicBarrier phaseBarrier, CountDownLatch doneLatch, int context) {
this.phaseBarrier = phaseBarrier;
this.doneLatch = doneLatch;
this.context = context;
}

@Override
public void run() {
try {
for (int phase = 0; phase < PHASE; phase++) {
doPhase(phase);
phaseBarrier.await();
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
} finally {
doneLatch.countDown();
}
}

protected void doPhase(int phase) {
String name = Thread.currentThread().getName();
System.out.println(name + ":MyTask:BEGIN:context = " + context + ", phase = " + phase);

try {
Thread.sleep(random.nextInt(3000));
} catch (InterruptedException e) {
} finally {
System.out.println(name + ":MyTask:END:context = " + context + ", phase = " + phase);
}
}
}

运行结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
BEGIN
AWAIT
pool-1-thread-2:MyTask:BEGIN:context = 1, phase = 0
pool-1-thread-1:MyTask:BEGIN:context = 0, phase = 0
pool-1-thread-3:MyTask:BEGIN:context = 2, phase = 0
pool-1-thread-1:MyTask:END:context = 0, phase = 0
pool-1-thread-2:MyTask:END:context = 1, phase = 0
pool-1-thread-3:MyTask:END:context = 2, phase = 0
Barrier Action!
pool-1-thread-3:MyTask:BEGIN:context = 2, phase = 1
pool-1-thread-1:MyTask:BEGIN:context = 0, phase = 1
pool-1-thread-2:MyTask:BEGIN:context = 1, phase = 1
pool-1-thread-3:MyTask:END:context = 2, phase = 1
pool-1-thread-2:MyTask:END:context = 1, phase = 1
pool-1-thread-1:MyTask:END:context = 0, phase = 1
Barrier Action!
pool-1-thread-1:MyTask:BEGIN:context = 0, phase = 2
pool-1-thread-3:MyTask:BEGIN:context = 2, phase = 2
pool-1-thread-2:MyTask:BEGIN:context = 1, phase = 2
pool-1-thread-1:MyTask:END:context = 0, phase = 2
pool-1-thread-3:MyTask:END:context = 2, phase = 2
pool-1-thread-2:MyTask:END:context = 1, phase = 2
Barrier Action!
pool-1-thread-2:MyTask:BEGIN:context = 1, phase = 3
pool-1-thread-1:MyTask:BEGIN:context = 0, phase = 3
pool-1-thread-3:MyTask:BEGIN:context = 2, phase = 3
pool-1-thread-1:MyTask:END:context = 0, phase = 3
pool-1-thread-3:MyTask:END:context = 2, phase = 3
pool-1-thread-2:MyTask:END:context = 1, phase = 3
Barrier Action!
pool-1-thread-2:MyTask:BEGIN:context = 1, phase = 4
pool-1-thread-1:MyTask:BEGIN:context = 0, phase = 4
pool-1-thread-3:MyTask:BEGIN:context = 2, phase = 4
pool-1-thread-3:MyTask:END:context = 2, phase = 4
pool-1-thread-1:MyTask:END:context = 0, phase = 4
pool-1-thread-2:MyTask:END:context = 1, phase = 4
Barrier Action!
END

Timethreads图

k8XdeJ.png