Site Search:

TransformSequential.java

Back>

import java.util.*;
import java.util.concurrent.*;

public abstract class TransformingSequential {
    
    public static void main(String[] args) {
        TransformingSequential ts = new TransformingSequential() {
            @Override
            public void process(Element e) {
                System.out.println(e);
            }
        };
        try {
            List<Node<Element>> nodes = new ArrayList<>();
            nodes.add(new Node<Element>() {
                @Override
                public Element compute() {
                    return new Element(){public String toString() {return "a";}};
                }

                @Override
                public List<Node<Element>> getChildren() {
                    List<Node<Element>> l = new ArrayList<>();
                    l.add(new Node<Element>(){

                        @Override
                        public Element compute() {
                            return new Element() {public String toString() {return "b";}};
                        }

                        @Override
                        public List<Node<Element>> getChildren() {
                            return new ArrayList<>();
                        }});
                    return l;
                }});
            Collection<Element> c = ts.getParallelResults(nodes);
            System.out.println(c);
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }
    }

    void processSequentially(List<Element> elements) {
        for (Element e : elements)
            process(e);
    }

    void processInParallel(Executor exec, List<Element> elements) {
        for (final Element e : elements)
            exec.execute(new Runnable() {
                public void run() {
                    process(e);
                }
            });
    }

    public abstract void process(Element e);


    public <T> void sequentialRecursive(List<Node<T>> nodes,
                                        Collection<T> results) {
        for (Node<T> n : nodes) {
            results.add(n.compute());
            sequentialRecursive(n.getChildren(), results);
        }
    }

    public <T> void parallelRecursive(final Executor exec,
                                      List<Node<T>> nodes,
                                      final Collection<T> results) {
        for (final Node<T> n : nodes) {
            exec.execute(new Runnable() {
                public void run() {
                    results.add(n.compute());
                }
            });
            parallelRecursive(exec, n.getChildren(), results);
        }
    }

    public <T> Collection<T> getParallelResults(List<Node<T>> nodes)
            throws InterruptedException {
        ExecutorService exec = Executors.newCachedThreadPool();
        Queue<T> resultQueue = new ConcurrentLinkedQueue<T>();
        parallelRecursive(exec, nodes, resultQueue);
        exec.shutdown();
        exec.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
        return resultQueue;
    }

    interface Element {
    }

    interface Node <T> {
        T compute();

        List<Node<T>> getChildren();
    }

}


TransformSequential.java change a depth-first traversal from sequential one to parallel one. During the traversal, instead of compute the node result immediately, it submits a task to Executor for parallel computation. A thread safe ConcurrentLinkedQueue is used to collect the results from multiple worker threads.