FutureTask


FutureTask可用于异步获取执行结果或取消执行任务的场景。通过传入Runnable或者Callable的任务给FutureTask,直接调用其run方法或者放入线程池执行,
之后可以在外部通过FutureTask的get方法异步获取执行结果,因此,FutureTask非常适合用于耗时的计算,主线程可以在完成自己的任务后,再去获取结果。
另外,FutureTask还可以确保即使调用了多次run方法,它都只会执行一次Runnable或者Callable任务,或者通过cancel取消FutureTask的执行等。
FutureTask执行多任务计算的使用场景。利用FutureTask和ExecutorService,可以用多线程的方式提交计算任务,主线程继续执行其他任务,当主线程需要子线程的计算结果时,
在异步获取子线程的执行结果。
例子1:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package com.kongzhong.gw2.ccc.service;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;

public class Test {

public static void main(String[] args) {
ExecutorService exec=Executors.newCachedThreadPool();

FutureTask<String> task=new FutureTask<String>(new Callable<String>() {

@Override
public String call() throws Exception {
Thread.sleep(200000);
return Thread.currentThread().getName();//这里可以是一个异步操作
}
});
try {
exec.execute(task);//FutureTask实际上也是一个线程
String result=task.get();//取得异步计算的结果,如果没有返回,就会一直阻塞等待
System.out.println(result);
} catch (Exception e) {
e.printStackTrace();
}

}
}

例子2:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package com.kongzhong.gw2.ccc.service;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;

public class FutureTaskForMultiCompute {
public static void main(String[] args) {

FutureTaskForMultiCompute inst=new FutureTaskForMultiCompute();
// 创建任务集合
List<FutureTask<Integer>> taskList = new ArrayList<FutureTask<Integer>>();
// 创建线程池
ExecutorService exec = Executors.newFixedThreadPool(5);
for (int i = 0; i < 10; i++) {
// 传入Callable对象创建FutureTask对象
FutureTask<Integer> ft = new FutureTask<Integer>(inst.new ComputeTask(i, ""+i));
taskList.add(ft);
// 提交给线程池执行任务,也可以通过exec.invokeAll(taskList)一次性提交所有任务;
exec.submit(ft);
}

System.out.println("所有计算任务提交完毕, 主线程接着干其他事情!");

// 开始统计各计算线程计算结果
Integer totalResult = 0;
for (FutureTask<Integer> ft : taskList) {
try {
//FutureTask的get方法会自动阻塞,直到获取计算结果为止
totalResult = totalResult + ft.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}

// 关闭线程池
exec.shutdown();
System.out.println("多任务计算后的总结果是:" + totalResult);

}

private class ComputeTask implements Callable<Integer> {

private Integer result = 0;
private String taskName = "";

public ComputeTask(Integer iniResult, String taskName){
result = iniResult;
this.taskName = taskName;
System.out.println("生成子线程计算任务: "+taskName);
}

public String getTaskName(){
return this.taskName;
}
@Override
public Integer call() throws Exception {
// TODO Auto-generated method stub

for (int i = 0; i < 100; i++) {
result =+ i;
}
// 休眠5秒钟,观察主线程行为,预期的结果是主线程会继续执行,到要取得FutureTask的结果是等待直至完成。
Thread.sleep(5000);
System.out.println("子线程计算任务: "+taskName+" 执行完成!");
return result;
}
}
}

Exchanger


Exchanger(交换者)是一个用于线程间协作的工具类。Exchanger用于进行线程间的数据交换。它提供一个同步点,在这个同步点两个线程可以交换彼此的数据。
这两个线程通过exchange方法交换数据, 如果第一个线程先执行exchange方法,它会一直等待第二个线程也执行exchange,当两个线程都到达同步点时,这两个线程就可以交换数据,
将本线程生产出来的数据传递给对方。如果两个线程有一个没有到达exchange方法,则会一直等待,如果担心有特殊情况发生,避免一直等待,
可以使用exchange(V x, long timeout, TimeUnit unit)设置最大等待时长。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 有时我们需要对元素进行配对和交换线程的同步点,使用exchange方法 返回其伙伴的对象,这时我们就需要使用线程类中的Exchanger类了
* @author Mobim-Client
*
*/
public class TestExchanger {
public static void main(String[] args) {
ExecutorService executor = Executors.newCachedThreadPool(); //线程池

final Exchanger<String> exchanger = new Exchanger<String>(); //Exchanger
executor.execute(new Runnable() {
String data = "abc";
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() +"正在把数据 "+ data+ " 交换出去" );
Thread.sleep((long) (Math.random()*1000));
//此处是同步点
data = exchanger.exchange(data); //该线程把"abc"给另外一个线程。同时自己也获取了另外一个线程的"def"
System.out.println(Thread.currentThread().getName() + "交换数据 得到 "+ data);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});

executor.execute(new Runnable() {
String data = "def";
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() +"正在把数据 "+ data+ " 交换出去" );
Thread.sleep((long) (Math.random()*1000));
//此处是同步点
data =exchanger.exchange(data); //该线程把"def"给另外一个线程。同时自己也获取了另外一个线程的"abc"
System.out.println(Thread.currentThread().getName() + "交换数据 得到 "+ data);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
例子2:
import java.util.concurrent.Exchanger;

public class ExchangerTest {
// 描述一个装水的杯子
public static class Cup{
// 标识杯子是否有水
private boolean full = false;
public Cup(boolean full){
this.full = full;
}
// 添水,假设需要5s
public void addWater(){
if (!this.full){
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
}
this.full = true;
}
}
// 喝水,假设需要10s
public void drinkWater(){
if (this.full){
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
}
this.full = false;
}
}
}

public static void testExchanger() {
// 初始化一个Exchanger,并规定可交换的信息类型是杯子
final Exchanger<Cup> exchanger = new Exchanger<Cup>();
// 初始化一个空的杯子和装满水的杯子
final Cup initialEmptyCup = new Cup(false);
final Cup initialFullCup = new Cup(true);

//服务生线程
class Waiter implements Runnable {
public void run() {
Cup currentCup = initialEmptyCup;
try {
int i=0;
while (i < 2){
System.out.println("服务生开始往杯子中添水:"+ System.currentTimeMillis());
// 往空的杯子里加水
currentCup.addWater();
System.out.println("服务生添水完毕:"+ System.currentTimeMillis());
// 杯子满后和顾客的空杯子交换
System.out.println("服务生等待与顾客交换杯子:"+ System.currentTimeMillis());
currentCup = exchanger.exchange(currentCup);
System.out.println("服务生与顾客交换杯子完毕:"+ System.currentTimeMillis());
i++;
}

} catch (InterruptedException ex) {
}
}
}

//顾客线程
class Customer implements Runnable {
public void run() {
Cup currentCup = initialFullCup;
try {
int i=0;
while (i < 2){
System.out.println("顾客开始喝水:"+ System.currentTimeMillis());
//把杯子里的水喝掉
currentCup.drinkWater();
System.out.println("顾客喝水完毕:"+ System.currentTimeMillis());
//将空杯子和服务生的满杯子交换
System.out.println("顾客等待与服务生交换杯子:"+ System.currentTimeMillis());
currentCup = exchanger.exchange(currentCup);
System.out.println("顾客与服务生交换杯子完毕:"+ System.currentTimeMillis());
i++;
}
} catch (InterruptedException ex) {
}
}
}

new Thread(new Waiter()).start();
new Thread(new Customer()).start();
}

public static void main(String[] args) {
ExchangerTest.testExchanger();
}
}