java.util.concurrent包和Producer-Consumer模式

java.util.concurrent包中的队列

java.util.concurrent包提供了BlockingQueue接口及其实现类,它们相当于Producer-Consumer模式中的Channel角色。

类图

FrTQVs.png

BlockingQueue接口——阻塞队列

BlockingQueue接口表示的是在达到合适的状态之前线程一直阻塞(wait)的队列。BlockingQueue是java.util.Queue接口的子接口,拥有offer方法和poll方法等。但实际上,实现“阻塞”功能的方法是BlockingQueue接口固有的put方法和take方法。

ArrayBlockingQueue类——基于数组的BlockingQueue

ArrayBlockingQueue类表示的是元素个数有最大限制的BlockingQueue。该类基于数组,当数组满了但仍要put数据时,或者数组为空但仍要take数据时,线程就会阻塞。

LinkedBlockingQueue类——基于链表的BlockingQueue

LinkedBlockingQueue类表示的是元素个数没有最大限制的BlockingQueue。该类基于链表,如果没有特别指定,元素个数将没有最大限制。只要还有内存,就可以put数据。

PriorityBlockingQueue类——带有优先级的BlockingQueue

PriorityBlockingQueue类表示的是带有优先级的BlockingQueue。数据的“优先级”是依据Comparable接口的自然排序,或者构造函数的Comparator接口决定的顺序指定的。

DelayQueue类——一定时间之后才可以take的BlockingQueue

DelayQueue类表示的是用于存储java.util.concurrent.Delayed对象的队列。当从该队列take时,只有在各元素指定的时间到期后才可以take。另外,到期时间最长的元素将先被take。

SynchronousQueue类——直接传递的BlockingQueue

SynchronousQueue类表示的是BlockingQueue,该BlockingQueue用于执行由Producer角色到Consumer角色的“直接传递”。如果Producer角色先put,在Consumer角色take之前,Producer角色的线程将一直阻塞。相反,如果Consumer角色先take,在Producer角色put之前,Consumer角色的线程将一直阻塞。

ConcurrentLinkedQueue类——元素个数没有最大限制的线程安全队列

ConcurrentLinkedQueue类并不是BlockingQueue的实现类,它表示的是元素个数没有最大限制的线程安全队列。在ConcurrentLinkedQueue中,内部的数据结构是分开的,线程之间互不影响,所以也就无需执行互斥处理。根据线程情况的不同,有时程序的性能也会有所提高。

使用java.util.concurrent.ArrayBlockingQueue

ArrayBlockingQueue是一个泛型类,可以通过参数类型来指定队列中添加的元素类型。

Table.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import java.util.concurrent.ArrayBlockingQueue;

public class Table extends ArrayBlockingQueue<String> {

public Table(int count) {
super(count);
}

public void put(String cake) throws InterruptedException {
System.out.println(Thread.currentThread().getName() + " puts " + cake);
super.put(cake);
}

public String take() throws InterruptedException {
String cake = super.take();
System.out.println(Thread.currentThread().getName() + " takes " + cake);
return cake;
}
}

使用java.util.concurrent.Exchanger类交换缓冲区

使用java.util.concurrent.Exchanger类用于让两个线程安全地交换对象。

Main.java

1
2
3
4
5
6
7
8
9
10
11
12
import java.util.concurrent.Exchanger;

public class Main {

public static void main(String[] args) {
Exchanger<char[]> exchanger = new Exchanger<>();
char[] buffer1 = new char[10];
char[] buffer2 = new char[10];
new ProducerThread(exchanger, buffer1, 314159).start();
new ConsumerThread(exchanger, buffer2, 265358).start();
}
}

ProducerThread.java

ProducerThread循环执行如下操作:

  • 填充字符,直至缓冲区被填满
  • 使用exchange方法将填满的缓冲区传递给ConsumerThread
  • 传递缓冲区后,作为交换,接收空的缓冲区
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import java.util.Random;
import java.util.concurrent.Exchanger;

public class ProducerThread extends Thread {
private final Exchanger<char[]> exchanger;
private char[] buffer = null;
private char index = 0;
private final Random random;

public ProducerThread(Exchanger<char[]> exchanger, char[] buffer, long seed) {
super("ProducerThread");
this.exchanger = exchanger;
this.buffer = buffer;
this.random = new Random(seed);
}

public void run() {
try {
while (true) {
for (int i = 0; i < buffer.length; i++) {
buffer[i] = nextChar();
System.out.println(Thread.currentThread().getName() + ": " + buffer[i] + " -> ");
}

System.out.println(Thread.currentThread().getName() + ": BEFORE exchange");
buffer = exchanger.exchange(buffer);
System.out.println(Thread.currentThread().getName() + ": AFTER exchange");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}

private char nextChar() throws InterruptedException {
char c = (char)('A' + index % 26);
index++;
Thread.sleep(random.nextInt(1000));
return c;
}
}

ConsumerThread.java

ConsumerThread循环执行如下操作:

  • 使用exchange方法将空的缓冲区传递给ProducerThread
  • 传递空的缓冲区后,作为交换,接收被填满字符的缓冲区
  • 使用缓冲区中的字符
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import java.util.Random;
import java.util.concurrent.Exchanger;

public class ConsumerThread extends Thread {
private final Exchanger<char[]> exchanger;
private char[] buffer = null;
private final Random random;

public ConsumerThread(Exchanger<char[]> exchanger, char[] buffer, long seed) {
super("ConsumerThread");
this.exchanger = exchanger;
this.buffer = buffer;
this.random = new Random(seed);
}

public void run() {
try {
while (true) {
System.out.println(Thread.currentThread().getName() + ": BEFORE exchange");
buffer = exchanger.exchange(buffer);
System.out.println(Thread.currentThread().getName() + ": AFTER exchange");

for (int i = 0; i < buffer.length; i++) {
System.out.println(Thread.currentThread().getName() + ": -> " + buffer[i]);
Thread.sleep(random.nextInt(1000));
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

Timethreads图

FyNGPx.png

运行结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
ConsumerThread: BEFORE exchange
ProducerThread: A ->
ProducerThread: B ->
ProducerThread: C ->
ProducerThread: D ->
ProducerThread: E ->
ProducerThread: F ->
ProducerThread: G ->
ProducerThread: H ->
ProducerThread: I ->
ProducerThread: J ->
ProducerThread: BEFORE exchange
ProducerThread: AFTER exchange
ConsumerThread: AFTER exchange
ConsumerThread: -> A
ConsumerThread: -> B
ProducerThread: K ->
ConsumerThread: -> C
ProducerThread: L ->
ProducerThread: M ->
ConsumerThread: -> D
ProducerThread: N ->
ConsumerThread: -> E
ProducerThread: O ->
ConsumerThread: -> F
ProducerThread: P ->
ConsumerThread: -> G
ProducerThread: Q ->
ConsumerThread: -> H
ProducerThread: R ->
ConsumerThread: -> I
ConsumerThread: -> J
ConsumerThread: BEFORE exchange
ProducerThread: S ->
ProducerThread: T ->
ProducerThread: BEFORE exchange
ProducerThread: AFTER exchange
ConsumerThread: AFTER exchange
ConsumerThread: -> K
ConsumerThread: -> L
ConsumerThread: -> M
ProducerThread: U ->
ProducerThread: V ->