The Fork/Join framework is an implementation of the ExecutorService interface. It is designed for work that can be broken into smaller pieces recursively. The general pattern is like the following:
Fork/Join Framework |
if(my portion of the work is small enough)
do the work directly
else
spilt my work into two pieces
invoke the two pieces and wait for the results
As with any ExecutorService implementation, the fork/join framework distributes tasks to worker threads in a thread pool, called ForkJoinPool.
In OCPJP scope, Use Fork/Join Framework test the following two classes.
- public abstract class RecursiveAction extends ForkJoinTask<Void>
- public abstract class RecursiveTask<V> extends ForkJoinTask<V>
Example of RecursiveAction:
OCPJP>cat IncrementTask.java
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveAction;
public class IncrementTask extends RecursiveAction {
final long[] array; final int lo, hi;
final static long THRESHOLD = 100L;
IncrementTask(long[] array, int lo, int hi) {
this.array = array; this.lo = lo; this.hi = hi;
}
protected void compute() {
if (hi - lo < THRESHOLD) {
for (int i = lo; i < hi; ++i)
array[i]++;
}
else {
int mid = (lo + hi) >>> 1;
invokeAll(new IncrementTask(array, lo, mid),
new IncrementTask(array, mid, hi));
}
}
public static void main(String...args) {
long[] array = new long[100_000];
//Arrays.setAll(array, i -> i + 1);
ForkJoinTask<?> task = new IncrementTask(array, 0, array.length);
ForkJoinPool pool = new ForkJoinPool();
pool.invoke(task);
System.out.println(array[1] + array[10] + array[101] + array[1011]);
}
}
OCPJP>
OCPJP>
OCPJP>javac IncrementTask.java
OCPJP>java IncrementTask
4
OCPJP>
Notice RecursiveAction is an abstract class, you need to extend it and override
protected void compute()
inside compute, the invokeAll is called to start the fork. invokeAll takes two instances of the RecursiveAction class and returns nothing.
Example of RecursiveTask:
OCPJP>cat Fibonacci.java
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
public class Fibonacci extends RecursiveTask<Integer> {
final int n;
Fibonacci(int n) { this.n = n; }
protected Integer compute() {
if (n <= 1)
return n;
Fibonacci f1 = new Fibonacci(n - 1);
f1.fork();
Fibonacci f2 = new Fibonacci(n - 2);
return f2.compute() + f1.join();
}
public static void main(String[] args) {
ForkJoinTask<Integer> task = new Fibonacci(10);
ForkJoinPool pool = new ForkJoinPool();
Integer sum = pool.invoke(task);
System.out.println(sum);
}
}
OCPJP>
OCPJP>javac Fibonacci.java
OCPJP>java Fibonacci
55
OCPJP>
Notice RecursiveTask is also an abstract class, you need to override
protected abstract V compute()
The f1.join() method is called after the f1.fork() method is called and the current thread performs a f2.compute(), join will causes the current thread to wait for the results of a subtask.
Common to both RecursiveAction and RecursiveTask, the main method did the following steps:
- Create a ForkJoinTask
- Create a ForkJoinPool
- invoke the ForkJoinTask