java.util.concurrent包和Worker Thread模式

ThreadPoolExecutor类

java.util.concurrent.ThreadPoolExecutor类是管理工人线程的类。ThreadPoolExecutor可以轻松地实现Worker Thread模式。ThreadPoolExecutor类可以对线程池(保存和管理工人线程的场所)进行以下设置。

  • 指定线程池的大小
  • 指定提前创建线程还是按需创建线程
  • 指定创建线程的工厂(java.util.concurrent.ThreadFactory)
  • 指定多长时间终止不需要的线程
  • 指定传递要执行的工作时的排队方式
  • 指定拒绝工作的方式
  • 指定执行工作前和执行工作后的处理

通过java.util.concurrent包创建线程池

java.util.concurrent.Executors类是用于创建线程池的工具类。

Executors.newFixedThreadPool方法

Executors.newFixedThreadPool方法会创建一个线程池,该线程池会创建个数由参数指定的工人线程,而且创建出的线程会被重复利用。如果在这个方法的参数中加上ThreadFactory对象,则线程池会使用该ThreadFactory来创建新的工人线程。

Executors.newCachedThreadPool方法

Executors.newCachedThreadPool方法会创建一个线程池,该线程池可以根据需要自动创建工人线程,而且工人线程会被重复利用。没有工作的工人线程会在缓存约60秒后自动终止。如果向这个方法的参数中传入ThreadFactory对象,则线程池会使用这个ThreadFactory来创建新的工人线程。

Executors.newScheduledThreadPool方法

Executors.newScheduledThreadPool方法会创建一个线程池,该线程池可以在一定时间后执行请求或是反复执行请求。即使在没有请求时也需要保存的线程数量可以通过参数指定。此外,如果在这个方法的参数中加上ThreadFactory对象,则线程池会使用这个ThreadFactory来创建新的工人线程。

Main.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;

public class Main {

public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(5);

try {
new ClientThread("Alice", executorService).start();
new ClientThread("Bobby", executorService).start();
new ClientThread("Chris", executorService).start();

Thread.sleep(5000);
} catch (InterruptedException e) {
} finally {
executorService.shutdown();
}
}
}

ClientThread.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;

public class ClientThread extends Thread {
private final ExecutorService executorService;
private static final Random random = new Random();

public ClientThread(String name, ExecutorService executorService) {
super(name);
this.executorService = executorService;
}

public void run() {
try {
for (int i = 0; ; i++) {
Request request = new Request(getName(), i);
executorService.execute(request);
Thread.sleep(random.nextInt(1000));
}
} catch (InterruptedException e) {
} catch (RejectedExecutionException e) {
System.out.println(getName() + " : " + e);
}
}
}

Request.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import java.util.Random;

public class Request implements Runnable {
private final String name;
private final int number;
private static final Random random = new Random();

public Request(String name, int number) {
this.name = name;
this.number = number;
}

public void run() {
System.out.println(Thread.currentThread().getName() + " executes " + this);

try {
Thread.sleep(random.nextInt(1000));
} catch (InterruptedException e) {
}
}

public String toString() {
return "[ Request from " + name + " No." + number + " ]";
}
}

运行结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
pool-1-thread-1 executes [ Request from Chris No.0 ]
pool-1-thread-2 executes [ Request from Bobby No.0 ]
pool-1-thread-3 executes [ Request from Alice No.0 ]
pool-1-thread-4 executes [ Request from Bobby No.1 ]
pool-1-thread-5 executes [ Request from Chris No.1 ]
pool-1-thread-3 executes [ Request from Bobby No.2 ]
pool-1-thread-5 executes [ Request from Bobby No.3 ]
pool-1-thread-2 executes [ Request from Alice No.1 ]
pool-1-thread-1 executes [ Request from Chris No.2 ]
pool-1-thread-3 executes [ Request from Chris No.3 ]
pool-1-thread-4 executes [ Request from Alice No.2 ]
...
pool-1-thread-1 executes [ Request from Chris No.8 ]
pool-1-thread-2 executes [ Request from Alice No.6 ]
pool-1-thread-5 executes [ Request from Bobby No.8 ]
pool-1-thread-4 executes [ Request from Alice No.7 ]
pool-1-thread-3 executes [ Request from Alice No.8 ]
pool-1-thread-1 executes [ Request from Chris No.9 ]
pool-1-thread-4 executes [ Request from Bobby No.9 ]
Bobby : java.util.concurrent.RejectedExecutionException: Task [ Request from Bobby No.10 ] rejected from java.util.concurrent.ThreadPoolExecutor@442f5c95[Shutting down, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 28]
Alice : java.util.concurrent.RejectedExecutionException: Task [ Request from Alice No.9 ] rejected from java.util.concurrent.ThreadPoolExecutor@442f5c95[Shutting down, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 28]
Chris : java.util.concurrent.RejectedExecutionException: Task [ Request from Chris No.10 ] rejected from java.util.concurrent.ThreadPoolExecutor@442f5c95[Shutting down, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 28]