Site Search:

LogService.java

Back>

LogService fixes the race condition of LogWriter by making the submission of a new log message atomic. To avoid getting blocked by enqueuing a full blocking queue, the log(String msg) code atomically check for shutdown and conditionally increment a counter to "reserve" the right to submit a message.

During shutdown, the Producer threads detect the isShutdown flag and throw new IllegalStateException(/*...*/);, the calling code then get chance to catch the IllegalStateException  and take actions. In this example, the Producer thread exits the for loop earlier and run to the finish (instead of blocking on queue.put and prevent the program to exit like LogWriter did).

The logger thread also takes effort to drain all the messages in the blocking queue before closing the writer and exiting.



CHAP7>cat LogService.java 
import java.io.PrintWriter;
import java.io.Writer;
import java.util.concurrent.*;

/**
 * LogService
 * <p/>
 * Adding reliable cancellation to LogWriter
 */
public class LogService {
    private final BlockingQueue<String> queue;
    private final LoggerThread loggerThread;
    private final PrintWriter writer;
    private boolean isShutdown;
    private int reservations;

    public LogService(Writer writer) {
        this.queue = new LinkedBlockingQueue<String>();
        this.loggerThread = new LoggerThread();
        this.writer = new PrintWriter(writer);
    }

    public void start() {
        loggerThread.start();
    }

    public void stop() {
        synchronized (this) {
            isShutdown = true;
        }
        loggerThread.interrupt();
    }

    public void log(String msg) throws InterruptedException {
        synchronized (this) {
            if (isShutdown)
                throw new IllegalStateException(/*...*/);
            ++reservations;
        }
        queue.put(msg);
    }

    private class LoggerThread extends Thread {
        public void run() {
            try {
                while (true) {
                    try {
                        synchronized (LogService.this) {
                            if (isShutdown && reservations == 0)
                                break;
                        }
                        String msg = queue.take();
                        synchronized (LogService.this) {
                            --reservations;
                        }
                        writer.println(msg);
                    } catch (InterruptedException e) { /* retry */
                    }
                }
            } finally {
                writer.close();
            }
        }
    }
    
    public static void main(String...args) {
        PrintWriter writer = new PrintWriter(System.out);
        LogService log = new LogService(writer);
        log.start();
        
        Thread th = new Thread(() -> {
            try {
                for(int i = 1; i < 10; i++) {
                    log.log("msg " + i);
                    System.out.println("submited msg " + i);
                    //block if queue is full
                    Thread.sleep(1000);
                }
            } catch(Exception e) {
                e.printStackTrace();
            }
            });
        
        th.start();
        
        try {
            log.log("start to log");
            Thread.sleep(2000);
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }
        
        log.stop();
    }
}

CHAP7>javac LogService.java 
CHAP7>java LogService
submited msg 1
submited msg 2
start to log
msg 1
msg 2
java.lang.IllegalStateException
at LogService.log(LogService.java:37)
at LogService.lambda$main$0(LogService.java:74)
at LogService$$Lambda$1/925858445.run(Unknown Source)
at java.lang.Thread.run(Thread.java:745)

CHAP7>