1 Fork/Join介绍

​ ForkJoin是实现多线程”分而治之”思想的框架,将一个大任务分成多个一个个小任务,然后对每个小任务执行并行计算,最后将结果合并起来。

Fork_Join分治思想.png

1.2 Fork/Join使用

​ 在使用方面必须首先创建一个ForkJoin 任务。它提供在任务中执行fork 和join 的操作机制,通常不直接继承ForkjoinTask 类,只需要直接继承其子类。

  • RecursiveAction,用于没有返回结果的任务

  • RecursiveTask,用于有返回值的任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
// 继承RecursiveAction的方式,不用返回值的方式
public class FindDirsFiles extends RecursiveAction {
private File path;

public FindDirsFiles(File path) {
this.path = path;
}

@Override
protected void compute() {
List<FindDirsFiles> subTasks = new ArrayList<>();

File[] files = path.listFiles();
if (files!=null){
for (File file : files) {
if (file.isDirectory()) {
// 对每个子目录都新建一个子任务。
subTasks.add(new FindDirsFiles(file));
} else {
// 遇到文件,检查。
if (file.getAbsolutePath().endsWith("txt")){
System.out.println("文件:" + file.getAbsolutePath());
}
}
}
if (!subTasks.isEmpty()) {
// 在当前的 ForkJoinPool 上调度所有的子任务。
for (FindDirsFiles subTask : invokeAll(subTasks)) {
subTask.join();
}
}
}
}

public static void main(String [] args){
try {
// 用一个 ForkJoinPool 实例调度总任务
ForkJoinPool pool = new ForkJoinPool();
FindDirsFiles task = new FindDirsFiles(new File("F:/"));

// 异步提交
pool.execute(task);

System.out.println("Task is Running......");
Thread.sleep(1);
int otherWork = 0;
for(int i=0;i<100;i++){
otherWork = otherWork+i;
}
System.out.println("Main Thread done sth......,otherWork="+otherWork);
// 阻塞方法,
task.join();
System.out.println("Task end");
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// 继承RecursiveTask有返回值
public class MyRecursiveTask extends RecursiveTask<Integer> {

private static final int THRESHOLD = MakeArray.ARRAY_LENGTH / 10;
private int[] src;
private int from;
private int to;

public MyRecursiveTask(int[] src, int from, int to) {
this.src = src;
this.from = from;
this.to = to;
}

@Override
protected Integer compute() {
// 判断任务位置是否合适。
// 如果合适就就直接运算返回结果,否则就细分任务
if (to - from < THRESHOLD) {
int sum = 0;
System.out.println("start work! fromIndex:" + from + ",toIndex:" + to);
for (int i = from; i <= to; i++) {
sum += src[i];
}
return sum;
} else {
int mid = (from + to) / 2;
MyRecursiveTask left = new MyRecursiveTask(src, from, mid);
MyRecursiveTask right = new MyRecursiveTask(src, mid + 1, to);
invokeAll(left, right);
return left.join() + left.join();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static void main(String[] args) {
// 创建运算数组
int[] src = MakeArray.makeArray();
// 创建ForkJoin池
ForkJoinPool pool = new ForkJoinPool();

// 创建任务
MyRecursiveTask task = new MyRecursiveTask(src, 0, src.length - 1);

long start = System.currentTimeMillis();
// 执行任务得到结果
Integer invoke = pool.invoke(task);

System.out.println("comsume: " + (System.currentTimeMillis() - start) + "ms");
}

2. CountDownLatch

2.1 CountDownLatch是什么?

​ CountDownLatch这个类使一个线程等待其他线程各自执行完毕后再执行。

​ 它是通过一个计数器来实现的,计数器的初始值是线程的数量。每当一个线程执行完毕后,计数器的值就-1,当计数器的值为0时,表示所有线程都执行完毕,然后在闭锁上等待的线程就可以恢复工作了。

​ 如图所示,一个程序初始化需要五步,那就初始化CountDownLatch为5,每个线程初始化结束就对其进行”减一”操作,直到值减为零之后,等待线程被唤醒,继续执行主流程代码。

CountDownLatch.png

2.2 CountDownLatch使用场景

​ 一个APP初始化引入很多框架的时候,需要初始化,为了加快初始化速度,可以使用多线程来对各个框架进行初始化,当各个模块初始化结束之后,CountDownLatch值减成0后,说明所有线程初始化结束,可以开始主流程任务。

2.3 使用案例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public class UseCountDownLatch {

public static CountDownLatch latch = new CountDownLatch(5);

private static class InitThread implements Runnable {

@Override
public void run() {
System.out.println("Thread_" + Thread.currentThread().getId()
+ " ready init work......");
latch.countDown();
for (int i = 0; i < 2; i++) {
System.out.println("Thread_" + Thread.currentThread().getId()
+ " ........continue do its work");
}
}
}

private static class BusiThread implements Runnable {

@Override
public void run() {
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int i = 0; i < 3; i++) {
System.out.println("BusiThread_" + Thread.currentThread().getId()
+ " do business-----");
}
}
}

public static void main(String[] args) throws InterruptedException {
new Thread(new Runnable() {
@Override
public void run() {
SleepTools.ms(1);
System.out.println("Thread_" + Thread.currentThread().getId()
+ " ready init work step 1st......");
latch.countDown();
System.out.println("begin step 2nd.......");
SleepTools.ms(1);
System.out.println("Thread_" + Thread.currentThread().getId()
+ " ready init work step 2nd......");
latch.countDown();
}
}).start();
new Thread(new BusiThread()).start();
for (int i = 0; i < 3; i++) {
Thread thread = new Thread(new InitThread());
thread.start();
}

// 多少时间之后,没有执行就不阻塞,继续执行下去
latch.await(10, TimeUnit.NANOSECONDS);
System.out.println("Main do ites work........");
}
}

测试用例代码见: git@github.com:oujie123/UnderstandingOfThread.git