java.util.concurrent包和Active Object模式

类图

kshiTS.png

Main.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import activeobject.ActiveObject;
import activeobject.ActiveObjectFactory;

public class Main {

public static void main(String[] args) {
ActiveObject activeObject = ActiveObjectFactory.createActiveObject();

try {
new MakerClientThread("Alice", activeObject).start();
new MakerClientThread("Bobby", activeObject).start();
new DisplayClientThread("Chris", activeObject).start();
Thread.sleep(5000);
} catch (InterruptedException e) {
} finally {
System.out.println("*** shutdown ***");
activeObject.shutdown();
}
}
}

MakerClientThread.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import activeobject.ActiveObject;

import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;

public class MakerClientThread extends Thread {
private final ActiveObject activeObject;
private final char fillchar;

public MakerClientThread(String name, ActiveObject activeObject) {
super(name);
this.activeObject = activeObject;
this.fillchar = name.charAt(0);
}

@Override
public void run() {
try {
for (int i = 0; true; i++) {
Future<String> future = activeObject.makeString(i, fillchar);
Thread.sleep(10);
String value = future.get();
System.out.println(Thread.currentThread().getName() + ": value = " + value);
}
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + ":" + e);
} catch (ExecutionException e) {
System.out.println(Thread.currentThread().getName() + ":" + e);
} catch (RejectedExecutionException e) {
System.out.println(Thread.currentThread().getName() + ":" + e);
} catch (CancellationException e) {
System.out.println(Thread.currentThread().getName() + ":" + e);
}
}
}

DisplayClientThread.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import activeobject.ActiveObject;

import java.util.concurrent.CancellationException;
import java.util.concurrent.RejectedExecutionException;

public class DisplayClientThread extends Thread {
private final ActiveObject activeObject;

public DisplayClientThread(String name, ActiveObject activeObject) {
super(name);
this.activeObject = activeObject;
}

@Override
public void run() {
try {
for (int i = 0; true; i++) {
String string = Thread.currentThread().getName() + " " + i;
activeObject.displayString(string);
Thread.sleep(200);
}
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + ":" + e);
} catch (RejectedExecutionException e) {
System.out.println(Thread.currentThread().getName() + ":" + e);
} catch (CancellationException e) {
System.out.println(Thread.currentThread().getName() + ":" + e);
}
}
}

ActiveObject.java

1
2
3
4
5
6
7
8
9
package activeobject;

import java.util.concurrent.Future;

public interface ActiveObject {
public abstract Future<String> makeString(int count, char fillchar);
public abstract void displayString(String string);
public abstract void shutdown();
}

ActiveObjectFactory.java

1
2
3
4
5
6
7
package activeobject;

public class ActiveObjectFactory {
public static ActiveObject createActiveObject() {
return new ActiveObjectImpl();
}
}

ActiveObjectImpl.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package activeobject;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

class ActiveObjectImpl implements ActiveObject {
private final ExecutorService service = Executors.newSingleThreadExecutor();

@Override
public Future<String> makeString(final int count, final char fillchar) {
class MakeStringRequest implements Callable<String> {

@Override
public String call() throws Exception {
char[] buffer = new char[count];

for (int i = 0; i < count; i++) {
buffer[i] = fillchar;

try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
}

return new String(buffer);
}
}

return service.submit(new MakeStringRequest());
}

@Override
public void displayString(final String string) {
class DisplayStringRequest implements Runnable {

@Override
public void run() {
try {
System.out.println("displayString: " + string);
Thread.sleep(10);
} catch (InterruptedException e) {
}
}
}

service.execute(new DisplayStringRequest());
}

@Override
public void shutdown() {
service.shutdown();
}
}

运行结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
displayString: Chris 0
Alice: value =
Bobby: value =
Alice: value = A
Bobby: value = B
displayString: Chris 1
Alice: value = AA
displayString: Chris 2
Bobby: value = BB
Alice: value = AAA
displayString: Chris 3
Bobby: value = BBB
displayString: Chris 4
displayString: Chris 5
Alice: value = AAAA
displayString: Chris 6
Bobby: value = BBBB
displayString: Chris 7
displayString: Chris 8
Alice: value = AAAAA
displayString: Chris 9
displayString: Chris 10
Bobby: value = BBBBB
displayString: Chris 11
displayString: Chris 12
displayString: Chris 13
Alice: value = AAAAAA
displayString: Chris 14
displayString: Chris 15
displayString: Chris 16
Bobby: value = BBBBBB
displayString: Chris 17
displayString: Chris 18
displayString: Chris 19
*** shutdown ***
Chris:java.util.concurrent.RejectedExecutionException: Task activeobject.ActiveObjectImpl$1DisplayStringRequest@16d7ece6 rejected from java.util.concurrent.ThreadPoolExecutor@1810004c[Shutting down, pool size = 1, active threads = 1, queued tasks = 6, completed tasks = 34]
Alice: value = AAAAAAA
displayString: Chris 20
Alice:java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@5ddf2842 rejected from java.util.concurrent.ThreadPoolExecutor@1810004c[Shutting down, pool size = 1, active threads = 1, queued tasks = 5, completed tasks = 35]
displayString: Chris 21
displayString: Chris 22
Bobby: value = BBBBBBB
displayString: Chris 23
Bobby:java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@5bfd5176 rejected from java.util.concurrent.ThreadPoolExecutor@1810004c[Shutting down, pool size = 1, active threads = 1, queued tasks = 1, completed tasks = 39]
displayString: Chris 24