import java.util.*;
import java.util.concurrent.*;
public abstract class TransformingSequential {
public static void main(String[] args) {
TransformingSequential ts = new TransformingSequential() {
@Override
public void process(Element e) {
System.out.println(e);
}
};
try {
List<Node<Element>> nodes = new ArrayList<>();
nodes.add(new Node<Element>() {
@Override
public Element compute() {
return new Element(){public String toString() {return "a";}};
}
@Override
public List<Node<Element>> getChildren() {
List<Node<Element>> l = new ArrayList<>();
l.add(new Node<Element>(){
@Override
public Element compute() {
return new Element() {public String toString() {return "b";}};
}
@Override
public List<Node<Element>> getChildren() {
return new ArrayList<>();
}});
return l;
}});
Collection<Element> c = ts.getParallelResults(nodes);
System.out.println(c);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
void processSequentially(List<Element> elements) {
for (Element e : elements)
process(e);
}
void processInParallel(Executor exec, List<Element> elements) {
for (final Element e : elements)
exec.execute(new Runnable() {
public void run() {
process(e);
}
});
}
public abstract void process(Element e);
public <T> void sequentialRecursive(List<Node<T>> nodes,
Collection<T> results) {
for (Node<T> n : nodes) {
results.add(n.compute());
sequentialRecursive(n.getChildren(), results);
}
}
public <T> void parallelRecursive(final Executor exec,
List<Node<T>> nodes,
final Collection<T> results) {
for (final Node<T> n : nodes) {
exec.execute(new Runnable() {
public void run() {
results.add(n.compute());
}
});
parallelRecursive(exec, n.getChildren(), results);
}
}
public <T> Collection<T> getParallelResults(List<Node<T>> nodes)
throws InterruptedException {
ExecutorService exec = Executors.newCachedThreadPool();
Queue<T> resultQueue = new ConcurrentLinkedQueue<T>();
parallelRecursive(exec, nodes, resultQueue);
exec.shutdown();
exec.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
return resultQueue;
}
interface Element {
}
interface Node <T> {
T compute();
List<Node<T>> getChildren();
}
}
TransformSequential.java change a depth-first traversal from sequential one to parallel one. During the traversal, instead of compute the node result immediately, it submits a task to Executor for parallel computation. A thread safe ConcurrentLinkedQueue is used to collect the results from multiple worker threads.