【图解Java多线程设计模式】Active Object模式

主动对象会通过自己特有的线程在合适的时机处理从外部接收到的异步消息。

示例

实现具有“生成字符串”(makeString)和“显示字符串”(displayString)这两种功能(可以处理两种异步消息)的主动对象。

类图

kD1UE9.png

时序图

kD16De.png

Main.java

1
2
3
4
5
6
7
8
9
10
11
12
import activeobject.ActiveObject;
import activeobject.ActiveObjectFactory;

public class Main {

public static void main(String[] args) {
ActiveObject activeObject = ActiveObjectFactory.createActiveObject();
new MakerClientThread("Alice", activeObject).start();
new MakerClientThread("Bobby", activeObject).start();
new DisplayClientThread("Chris", activeObject).start();
}
}

MakerClientThread.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import activeobject.ActiveObject;
import activeobject.Result;

public class MakerClientThread extends Thread {
private final ActiveObject activeObject;
private final char fillchar;

public MakerClientThread(String name, ActiveObject activeObject) {
super(name);
this.activeObject = activeObject;
this.fillchar = name.charAt(0);
}

@Override
public void run() {
try {
for (int i = 0; true; i++) {
Result<String> result = activeObject.makeString(i, fillchar);
Thread.sleep(10);
String value = result.getResultValue();
System.out.println(Thread.currentThread().getName() + ": value = " + value);
}
} catch (InterruptedException e) {
}
}
}

DisplayClientThread.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import activeobject.ActiveObject;

public class DisplayClientThread extends Thread {
private final ActiveObject activeObject;

public DisplayClientThread(String name, ActiveObject activeObject) {
super(name);
this.activeObject = activeObject;
}

@Override
public void run() {
try {
for (int i = 0; true; i++) {
String string = Thread.currentThread().getName() + " " + i;
activeObject.displayString(string);
Thread.sleep(200);
}
} catch (InterruptedException e) {
}
}
}

ActiveObject.java

1
2
3
4
5
6
package activeobject;

public interface ActiveObject {
public abstract Result<String> makeString(int count, char fillchar);
public abstract void displayString(String string);
}

ActiveObjectFactory.java

1
2
3
4
5
6
7
8
9
10
11
12
package activeobject;

public class ActiveObjectFactory {
public static ActiveObject createActiveObject() {
Servant servant = new Servant();
ActivationQueue queue = new ActivationQueue();
SchedulerThread scheduler = new SchedulerThread(queue);
Proxy proxy = new Proxy(scheduler, servant);
scheduler.start();
return proxy;
}
}

Proxy.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package activeobject;

class Proxy implements ActiveObject {
private final SchedulerThread scheduler;
private final Servant servant;

public Proxy(SchedulerThread scheduler, Servant servant) {
this.scheduler = scheduler;
this.servant = servant;
}

@Override
public Result<String> makeString(int count, char fillchar) {
FutureResult<String> future = new FutureResult<String>();
scheduler.invoke(new MakeStringRequest(servant, future, count, fillchar));
return future;
}

@Override
public void displayString(String string) {
scheduler.invoke(new DisplayStringRequest(servant, string));
}
}

SchedulerThread.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package activeobject;

class SchedulerThread extends Thread {
private final ActivationQueue queue;

public SchedulerThread(ActivationQueue queue) {
this.queue = queue;
}

public void invoke(MethodRequest request) {
queue.putRequest(request);
}

@Override
public void run() {
while (true) {
MethodRequest request = queue.takeRequest();
request.execute();
}
}
}

ActivationQueue.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package activeobject;

class ActivationQueue {
private static final int MAX_METHOD_REQUEST = 100;
private final MethodRequest[] requestQueue;
private int head;
private int tail;
private int count;

public ActivationQueue() {
this.requestQueue = new MethodRequest[MAX_METHOD_REQUEST];
this.head = 0;
this.tail = 0;
this.count = 0;
}

public synchronized void putRequest(MethodRequest request){
while (count >= requestQueue.length) {
try {
wait();
} catch (InterruptedException e) {
}
}

requestQueue[tail] = request;
tail = (tail + 1) % requestQueue.length;
count++;
notifyAll();
}

public synchronized MethodRequest takeRequest() {
while (count <= 0) {
try {
wait();
} catch (InterruptedException e) {
}
}

MethodRequest request = requestQueue[head];
head = (head + 1) % requestQueue.length;
count--;
notifyAll();
return request;
}
}

MethodRequest.java

1
2
3
4
5
6
7
8
9
10
11
12
13
package activeobject;

abstract class MethodRequest<T> {
protected final Servant servant;
protected final FutureResult<T> future;

protected MethodRequest(Servant servant, FutureResult<T> future) {
this.servant = servant;
this.future = future;
}

public abstract void execute();
}

MakeStringRequest.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package activeobject;

class MakeStringRequest extends MethodRequest<String> {
private final int count;
private final char fillchar;

public MakeStringRequest(Servant servant, FutureResult<String> future, int count, char fillchar) {
super(servant, future);
this.count = count;
this.fillchar = fillchar;
}

@Override
public void execute() {
Result<String> result = servant.makeString(count, fillchar);
future.setResult(result);
}
}

DisplayStringRequest.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package activeobject;

class DisplayStringRequest extends MethodRequest<Object> {
private final String string;

public DisplayStringRequest(Servant servant, String string) {
super(servant, null);
this.string = string;
}

@Override
public void execute() {
servant.displayString(string);
}
}

Result.java

1
2
3
4
5
package activeobject;

public abstract class Result<T> {
public abstract T getResultValue();
}

FutureResult.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package activeobject;

class FutureResult<T> extends Result<T> {
private Result<T> result;
private boolean ready = false;

public synchronized void setResult(Result<T> result) {
this.result = result;
this.ready = true;
notifyAll();
}

@Override
public synchronized T getResultValue() {
while (!ready) {
try {
wait();
} catch (InterruptedException e) {
}
}

return result.getResultValue();
}
}

RealResult.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
package activeobject;

class RealResult<T> extends Result<T> {
private final T resultValue;

public RealResult(T resultValue) {
this.resultValue = resultValue;
}

@Override
public T getResultValue() {
return resultValue;
}
}

Servant.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package activeobject;

class Servant implements ActiveObject {

@Override
public Result<String> makeString(int count, char fillchar) {
char[] buffer = new char[count];

for (int i = 0; i < count; i++) {
buffer[i] = fillchar;

try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
}

return new RealResult<String>(new String(buffer));
}

@Override
public void displayString(String string) {
try {
System.out.println("displayString: " + string);
Thread.sleep(10);
} catch (InterruptedException e) {
}
}
}

运行结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
displayString: Chris 0
Alice: value =
Bobby: value =
Alice: value = A
Bobby: value = B
displayString: Chris 1
Alice: value = AA
displayString: Chris 2
Bobby: value = BB
displayString: Chris 3
Alice: value = AAA
displayString: Chris 4
Bobby: value = BBB
displayString: Chris 5
Alice: value = AAAA
displayString: Chris 6
displayString: Chris 7
Bobby: value = BBBB
displayString: Chris 8
displayString: Chris 9
Alice: value = AAAAA
displayString: Chris 10
displayString: Chris 11
Bobby: value = BBBBB
displayString: Chris 12
displayString: Chris 13
displayString: Chris 14
Alice: value = AAAAAA
displayString: Chris 15
displayString: Chris 16
Bobby: value = BBBBBB
displayString: Chris 17
displayString: Chris 18
displayString: Chris 19
displayString: Chris 20

登场角色

Client(委托者)

Client角色调用ActiveObject角色的方法来委托处理,它能够调用的只有ActiveObject角色提供的方法。调用这些方法后,(如果ActivationQueue角色没有满)程序控制权会立即返回。
虽然Client角色只知道ActiveObject角色,但它实际调用的是Proxy角色。
Client角色在获取处理结果时,会调用VirtualResult角色的getResultValue方法。这里使用了Future模式。
在示例程序中,由MakerClientThread类和DisplayClientThread类扮演此角色。

ActiveObject(主动对象)

ActiveObject角色定义了主动对象向Client角色提供的接口。
在示例程序中,由ActiveObject接口扮演此角色。

Proxy(代理人)

Proxy角色负责将方法调用转换为MethodRequest角色的对象。转换后的MethodRequest角色会被传递给Scheduler角色。
Proxy角色实现了ActiveObject角色提供的接口。
调用Proxy角色的方法的是Client角色。将方法调用转换为MethodRequest角色,并传递给Scheduler角色的操作都是使用Client角色的线程进行的。
在示例程序中,由Proxy类扮演此角色。

Scheduler

Scheduler角色负责将Proxy角色传递来的MethodRequest传递给ActivationQueue角色,以及从ActivationQueue角色取出并执行MethodRequest角色这两项工作。
Client角色的线程负责将MethodRequest角色传递给ActivationQueue角色。
而从ActivationQueue角色取出并执行MethodRequest角色这项工作则是使用Scheduler角色自己的线程进行的。在ActiveObject模式中,只有使用Client角色和Scheduler角色时才会启动新线程。
Scheduler角色会把MethodRequest角色放入ActivationQueue角色或者从ActivationQueue角色取出MethodRequest角色。因此,Scheduler角色可以判断下次要执行哪个请求。
在示例程序中,由SchedulerThread类扮演此角色。SchedulerThread并没有进行特殊的调度,而只是执行FIFO(First In First Out,先进先出)处理。

MethodRequest

MethodRequest角色是与来自Client角色的请求对应的角色。MethodRequest定义了负责执行处理的Servant角色,以及负责设置返回值的Future角色和负责执行请求的方法(execute)。
MethodRequest角色为主动对象的接口赋予了对象的表象形式。
在示例程序中,由MethodRequest类扮演此角色。

ConcreteMethodRequest

ConcreteMethodRequest角色是使MethodRequest角色与具体的方法相对应的角色。对于ActiveObject角色中定义的每个方法,会有各个类与之对应,比如MethodAlphaRequest、MethodBetaRequest…。
各个ConcreteMethodRequest角色中的字段分别与方法的参数相对应。
在示例程序中,由MakeStringRequest类和DisplayStringRequest类扮演此角色。其中,MakeStringRequest类对应makeString方法(生成字符串),DisplayStringRequest类对应displayString方法(显示字符串)。

Servant(仆人)

Servant角色负责实际地处理请求。
调用Servant角色的是Scheduler角色的线程。Scheduler角色会从ActivationQueue角色取出一个MethodRequest角色(实际上是ConcreteMethodRequest角色)并执行它。此时,Scheduler角色调用的就是Servant角色的方法。
Servant角色实现了ActiveObject角色定义的接口。
Proxy角色会将请求转换为MethodRequest角色,而Servant角色则会实际地执行该请求。Scheduler角色介于Proxy角色和Servant角色之间,负责管理按照什么顺序执行请求。
在示例程序中,由Servant类扮演此角色。

ActivationQueue(主动队列)

ActivationQueue角色是保存MethodRequest角色的类。
调用putRequest方法的是Client角色的线程,而调用takeRequest方法的是Scheduler角色的线程。这里使用了Producer-Consumer模式。
在示例程序中,由ActivationQueue类扮演此角色。

VirtualResult(虚拟结果)

VirtualResult角色与Future角色、RealResult角色共同构成了Future模式。
Client角色在获取处理结果时会调用VirtualResult角色(实际上是Future角色)的getResultValue方法。
在示例程序中,由Result类扮演此角色。

Future(期货)

Future角色是Client角色在获取处理结果时实际调用的角色。当处理结果还没有出来的时候,它会使用Guarded Suspension模式让Client角色的线程等待结果出来。
在示例程序中,由FutureResult类扮演此角色。

RealResult(真实结果)

RealResult角色是表示处理结果的角色。Servant角色会创建一个RealResult角色作为处理结果,然后调用Future角色的setRealResult方法将其设置到Future角色中。
在示例程序中,由RealResult类扮演此角色。

类图

kslR6f.png

时序图

ksl7hn.png

Timethreads图

kslvBF.png