Site Search:

WebCrawler.java

Back>

WebCrawler.java demonstrate the technique of fast  shutting down an executor service with shutdownNow. After your main thread called shutdownNow on executor service. It attempts to cancel the tasks currently in progress and returns a list of tasks that were submitted but never started so that they can be run later. Sounds good, but wait a minute. What happened to those tasks already started but didn't finish within the time of exec.awaitTermination(TIMEOUT, UNIT)? shutdownNow don't care. These tasks were interrupted. Upon interrupting, they can choose to behave badly by finishing their work before exiting (so prevent the program to shutdown quickly), or they can choose to behave nicely by dropping whatever they are doing and exit quickly and prematurely, leaving a mess behind. If the tasks choose later, they didn't get chance to clean the mess later, since executor service only returns the tasks await running, but doesn't return tasks that started but not finished.

TrackingExecutor fixed the above bad scenario by encapsulating an ExecutorService and instrumenting execute to remember what tasks were in progress at shutdown time with if (isShutdown() && Thread.currentThread().isInterrupted()). When stop() is called, it not only saved the tasks await running, it also saved tasks get interrupted -- better than default ExecutorService.

Using TrackingExecutor, WebCrawler.java is a program crawling xyzcode.blogspot.com's html files in order to count the occurrence of a particular words. You can specify how long the crawling will take, what html file to start with, what word to search for, what kind of urls you want to deal with.

The program's thread safety is achieved with the following considerations:

  1. The state variables of WebCrawler are controlled. The static final Strings are thread safe, the HashSet urlsToCrawl is not a thread safe variable, however, we only created one instance of WebCrawler and urlsToCrawl is only referenced in main thread, it is not shared. The other state variables are either volatile variable (exec, Delay) or thread safe class instances (seen, found, problem). 
  2. exec has to be volatile because after submitCrawlTask, the code exec.execute(new CrawlTask(u));  is called, which put a new task into the executor's internal task queue, and later cause a worker thread to be allocated to run the CrawlTask. The crawlTask's run() method called submitCrawlTask again, which causes more calls of code exec.execute(new CrawlTask(u)); and more tasks get queued for worker threads to run CrawlTask. So on and so forth. As you can see exec is shared among multiple threads (most of them worker threads of exec itself). To avoid stale state of exec to be viewed by threads, it has to be volatile, so that after the code exec = new TrackingExecutor(Executors.newCachedThreadPool()); is executed, the exec object is immediately visible to all the threads referencing it. 
  3. WebCrawler's start and stop methods are synchronized, because you can potentially start and stop the WebCrawler instance multiple times from multiple threads. The start and stop action have to be atomic, otherwise you could run into very messy race conditions.
  4. The putIfAbsent, merge methods of ConcurrentHashMap are both atomic and Concurrent. (We didn't call non-thread safe methods such as forEach and replaceAll etc.)
  5. Util class HTMLLinkExtractor is not thread safe, however, we never share a HTMLLinkExtractor instance among threads. We always create a HTMLLinkExtractor instance as local variable. Local variables are confined to stack and are always thread safe.
  6. Method getContainedUrls is static, however, the only none local variable it referenced is a volatile long type, it is almost stateless code block.
  7. CrawlTask's instance is submitted to Executor service, and is running by one worker thread from start to end of the run method. Even though a CrawlTask instance's run method causes more worker threads to be allocated to run more CrawlTask instances, this invoking CrawlTask instance is only run by one worker thread, it isn't passed around by multiple worker threads. 
  8. CrawlTask's run method have a mechanism to prevent the same url be processed multiple times by threads, which is thread safe but logically wrong. The code if (alreadyCrawled()) {return;} acts as a mutex. Even though ExecutorService received two different CrawlTask instances with exactly the same url (with bad timing), Only one thread get "permission" to execute the bulk of the run method guarded by this if "mutex" -- if it is the first one process this url, then the thread put a record to seen ConcurrentMap, then go head run the meat of the run() method, which means the other thread has only one choice left, which is return at the beginning of the run method.


CHAP7>cat WebCrawler.java 
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URL;
import java.util.*;
import java.util.concurrent.*;
import java.util.function.Predicate;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
 * WebCrawler
 * <p/>
 * Using TrackingExecutorService to save unfinished tasks for later execution
 */
public abstract class WebCrawler {
    private static final long SEARCH_TIME = 10000;
    public static final String SEARCH = "Honorificabilitudinitatibus";
    //public static final String SEARCH = "star war";
    //public static final String SEARCH = "java";
    public static final String startpage = "http://xyzcode.blogspot.com/2017/08/test-crawler.html";

    private static final Predicate<String> urlPred = i -> i.contains("html")
            && i.contains("xyzcode");

    public static void main(String... args) {
        try {
            WebCrawler crawler = new WebCrawler(startpage) {
                @Override
                protected Set<String> processPage(String url) {
                    if (url != null) {
                        Set<String> urls = getContainedUrls(url);
                        return urls;
                    }
                    return new HashSet<String>();
                }
            };

            System.out.println("Will finish searching " + SEARCH
                    + " start from " + startpage + " in: " + SEARCH_TIME / 1000
                    + " seconds");

            crawler.start();
            Thread.sleep(SEARCH_TIME);
            crawler.stop();

            System.out.println("url visited: " + crawler.getSeenURLs().size());
            System.out.println("Delay: " + Delay);
            System.out.println("crawler.urlsToCrawl.size(): "
                    + crawler.urlsToCrawl.size());
            System.out
                    .println("occurance of "
                            + SEARCH
                            + ": "
                            + found.values().stream()
                                    .mapToInt(i -> i.intValue()).sum());

            // found.keySet().stream()
            // .forEach(i -> System.out.println(i + ": " + found.get(i)));

            System.out.println("problem page retry:"
                    + crawler.getProblemURLs().size());

            // crawler.getProblemURLs()
            // .keySet()
            // .stream()
            // .forEach(
            // i -> System.out.println(i + " --- "
            // + crawler.getProblemURLs().get(i)));

/*resume last search
            crawler.start();
            Thread.sleep(SEARCH_TIME);

            crawler.stop();
*/
        } catch (Exception e) {
            if (e instanceof InterruptedException)
                Thread.currentThread().interrupt();
            // System.out.println(e.getMessage());
        }

    }

    private volatile TrackingExecutor exec;
    private final Set<String> urlsToCrawl = new HashSet<>();

    private final ConcurrentMap<String, Boolean> seen = new ConcurrentHashMap<>();
    private final static ConcurrentMap<String, Integer> found = new ConcurrentHashMap<>();
    private final ConcurrentMap<String, Integer> problem = new ConcurrentHashMap<>();
    private static final long TIMEOUT = 10;
    private static final TimeUnit UNIT = TimeUnit.SECONDS;


    private volatile static long Delay = 100;

    private static final long LIMIT = 5000;  //max delay time

    private static Set<String> getContainedUrls(String url) {
        InputStream is = null;
        BufferedReader br;
        String line;
        Set<String> urls = new HashSet<>();

        try {
            is = new URL(url).openStream(); // throws an IOException
            br = new BufferedReader(new InputStreamReader(is));
            while ((line = br.readLine()) != null) {
                Pattern matcher = Pattern.compile(SEARCH);
                Matcher m = matcher.matcher(line);  //waste memory, yes, but thread safe
                while (m.find())
                    found.merge(url, 1, (v1, v2) -> v1 = v1 + 1);

                HTMLLinkExtractor htmlLinkExtractor = new HTMLLinkExtractor();

                Set<String> links = htmlLinkExtractor.grabHTMLLinks(line);
                links.stream().filter(urlPred).forEach(i -> urls.add(i));
            }
        } catch (IOException ioe) {
            if (ioe.getMessage().contains(
                    "Server returned HTTP response code: 503")) {

                Delay = Delay < LIMIT ? Delay * 2 : LIMIT;
                try {
                    Thread.sleep(Delay);
                } catch (InterruptedException e) {
                    // e.printStackTrace();
                }
            }
            // System.out.println(ioe.getMessage());
        } finally {
            try {
                if (is != null)
                    is.close();
            } catch (IOException ioe) {
                // nothing to see here
            }
        }
        // System.out.println(Thread.currentThread() + " urls on page " + url
        // + " , urls.size() = " + urls.size());
        return urls;
    }

    public WebCrawler(String startUrl) {
        urlsToCrawl.add(startUrl);
    }

    public final Set<String> getSeenURLs() {
        return this.seen.keySet();
    }

    public final Map<String, Integer> getProblemURLs() {
        return this.problem;
    }

    public synchronized void start() {
        exec = new TrackingExecutor(Executors.newCachedThreadPool());
        for (String url : urlsToCrawl)
            submitCrawlTask(url.toString());
        urlsToCrawl.clear();
    }

    public synchronized void stop() throws InterruptedException {
        try {
            saveUncrawled(exec.shutdownNow());
            if (exec.awaitTermination(TIMEOUT, UNIT))
                saveUncrawled(exec.getCancelledTasks());
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // exec = null;
        }
    }

    protected abstract Set<String> processPage(String url);

    private void saveUncrawled(List<Runnable> uncrawled) {
        for (Runnable task : uncrawled)
            urlsToCrawl.add(((CrawlTask) task).getPage());
    }

    private void submitCrawlTask(String u) {
        try {
            Thread.sleep(Delay);
            exec.execute(new CrawlTask(u));
        } catch (Exception e) {
            if (e instanceof InterruptedException)
                Thread.currentThread().interrupt();
            // System.out.println(e.getMessage());
        }
    }

    private class CrawlTask implements Runnable {
        private final String url;

        CrawlTask(String url) {
            this.url = url;
        }

        boolean alreadyCrawled() {
            return seen.putIfAbsent(url, true) != null;
        }

        void markUncrawled() {
            seen.remove(url);
            // System.out.printf("marking %s uncrawled%n", url);
        }

        public void run() {
            if (alreadyCrawled()) {
                return;
            }
            Set<String> list = processPage(url);
            if (url.toString().contains("xyzcode")
                    && url.toString().contains("html") && list.size() == 0) {
                markUncrawled();
                problem.merge(url.toString(), 1, (v1, v2) -> v1 = v1 + 1);
                return;
            }
            for (String link : list) {
                if (Thread.currentThread().isInterrupted())
                    return;
                submitCrawlTask(link);
            }
        }

        public String getPage() {
            return url;
        }
    }
}

class TrackingExecutor extends AbstractExecutorService {
    private final ExecutorService exec;
    private final Set<Runnable> tasksCancelledAtShutdown = Collections
            .synchronizedSet(new HashSet<Runnable>());

    public TrackingExecutor(ExecutorService exec) {
        this.exec = exec;
    }

    public void shutdown() {
        exec.shutdown();
    }

    public List<Runnable> shutdownNow() {
        return exec.shutdownNow();
    }

    public boolean isShutdown() {
        return exec.isShutdown();
    }

    public boolean isTerminated() {
        return exec.isTerminated();
    }

    public boolean awaitTermination(long timeout, TimeUnit unit)
            throws InterruptedException {
        return exec.awaitTermination(timeout, unit);
    }

    public List<Runnable> getCancelledTasks() {
        if (!exec.isTerminated())
            throw new IllegalStateException(/* ... */);
        return new ArrayList<Runnable>(tasksCancelledAtShutdown);
    }

    public void execute(final Runnable runnable) {
        exec.execute(new Runnable() {
            public void run() {
                try {
                    runnable.run();
                } finally {
                    if (isShutdown() && Thread.currentThread().isInterrupted())
                        tasksCancelledAtShutdown.add(runnable);
                }
            }
        });
    }
}

class HTMLLinkExtractor {

    private Pattern patternTag, patternLink;
    private Matcher matcherTag, matcherLink;

    private static final String HTML_A_TAG_PATTERN = "(?i)<a([^>]+)>(.+?)</a>";
    private static final String HTML_A_HREF_TAG_PATTERN = "\\s*(?i)href\\s*=\\s*(\"([^\"]*\")|'[^']*'|([^'\">\\s]+))";

    public HTMLLinkExtractor() {
        patternTag = Pattern.compile(HTML_A_TAG_PATTERN);
        patternLink = Pattern.compile(HTML_A_HREF_TAG_PATTERN);
    }

    /**
     * Extract href String with regular expression
     *
     */
    public Set<String> grabHTMLLinks(final String html) {
        Set<String> result = new HashSet<String>();
        matcherTag = patternTag.matcher(html);
        while (matcherTag.find()) {
            String href = matcherTag.group(1); // href
//            String linkText = matcherTag.group(2); // link text
            matcherLink = patternLink.matcher(href);
            while (matcherLink.find()) {
                String link = matcherLink.group(1); // link
                link = link.replaceAll("'", "");
                link = link.replaceAll("\"", "");
                result.add(link);
            }
        }
        return result;
    }
}
CHAP7>
CHAP7>javac WebCrawler.java 
CHAP7>java WebCrawler
Will finish searching Honorificabilitudinitatibus start from http://xyzcode.blogspot.com/2017/08/test-crawler.html in: 10 seconds
url visited: 186
Delay: 5000
crawler.urlsToCrawl.size(): 159
occurance of Honorificabilitudinitatibus: 7
problem page retry:17

CHAP7>