Site Search:

ProducerConsumer.java

<Back

The following program scans local drives for documents and indexes them. A BlockingQueue and the producer-consumer pattern are used to decomposition the work. FileCrawler threads are the producer, they put file into the fileQueue; while Indexer threads are the consumer, they took the file from the same fileQueue and index it. Since BlockingQueue's take method blocks until data available and the put method blocks until space available, multiple producers and consumers can access the BlockingQueue concurrently.

By decomposing the producer and consumer, we get some performance benefits. If producer is I/O bound and the consumer is CPU bound, running them in multiple threads can get achieve better parallelization.

So far we have analyzed the program's synchronization policy, last but not least, let's analyze identify the variables that form the object's state and the invariants that constrain the state variables.

  • producerConsumer's  state variable total is safely published with static final keyword and the AtomicLong is a thread safe class. The other state variables are static final primitive type,  which are always thread safe.
  • FileCrawler's state variable fileQueue is safely published with final keyword and the BlockingQueue is a thread Safe class. The other state variables fileFilter and root are effectively immutable, they are safely published with final keyword.
  • Indexer's state variable queue is safely published with final keyword and the BlockingQueue is a thread Safe class.


import java.io.File;
import java.io.FileFilter;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;

public class ProducerConsumer {
    static class FileCrawler implements Runnable {
        private final BlockingQueue<File> fileQueue;
        private final FileFilter fileFilter;
        private final File root;
        
        public FileCrawler(BlockingQueue<File> fileQueue,
                           final FileFilter fileFilter,
                           File root) {
            this.fileQueue = fileQueue;
            this.root = root;
            this.fileFilter = new FileFilter() {
                public boolean accept(File f) {
                    return f.isDirectory() || fileFilter.accept(f);
                }
            };
        }

        private boolean alreadyIndexed(File f) {
            return false;
        }

        public void run() {
            try {
                crawl(root);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }

        private void crawl(File root) throws InterruptedException {
            File[] entries = root.listFiles(fileFilter);
            if (entries != null) {
                for (File entry : entries)
                    if (entry.isDirectory())
                        crawl(entry);
                    else if (!alreadyIndexed(entry))
                        fileQueue.put(entry);
            }
        }
    }

    static class Indexer implements Runnable {
        private final BlockingQueue<File> queue;

        public Indexer(BlockingQueue<File> queue) {
            this.queue = queue;
        }

        public void run() {
            try {
                while (true)
                    indexFile(queue.take());
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }

        public void indexFile(File file) {
            // Index the file...
            System.out.println(total.incrementAndGet());
        };
    }

    private static final int BOUND = 10;
    private static final int N_CONSUMERS = Runtime.getRuntime().availableProcessors();
    private static final AtomicLong total = new AtomicLong();

    public static void startIndexing(File[] roots) {
        BlockingQueue<File> queue = new LinkedBlockingQueue<File>(BOUND);
        FileFilter filter = new FileFilter() {
            public boolean accept(File file) {
                return true;
            }
        };

        for (File root : roots)
            new Thread(new FileCrawler(queue, filter, root)).start();

        for (int i = 0; i < N_CONSUMERS; i++)
            new Thread(new Indexer(queue)).start();
    }
    
    public static void main(String...args) {
        File[] roots = new File[]{new File("/etc"), new File("/home")};
        startIndexing(roots);
    }
}