Site Search:

BetterLogService

Back>

In BetterLogService, we delegate the logging thread management to an Executors.newSingleThreadExecutor(). We still have a single log consumer. For log producers, they have to submit a Runnable () -> writer.println(msg) to the ExecutorService in order to be logged.

ExecutorService's Executors.newSingleThreadExecutor() implementation has a good support for task cancellation -- when shutdown() is called, it stops receiving new task submission and processes all the tasks already submitted before exiting -- that is exactly what we want. The producers no longer  directly deal with a blocking queue, they submit a task to the ExecutorService instead (having the ExecutorService to worry about the blocking queue), so they don't need to worry about getting blocked.

CHAP7>cat BetterLogService.java 
import java.io.PrintWriter;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;


public class BetterLogService {
    private final ExecutorService exec = Executors.newSingleThreadExecutor();
    private final PrintWriter writer;
    
    public BetterLogService(PrintWriter writer) {
        this.writer = writer;
    }
    
    public void start() {}
    
    public void stop() throws InterruptedException {
        try {
            exec.shutdown();
            exec.awaitTermination(10, TimeUnit.SECONDS);
        } finally {
            writer.close();
        }
    }
    
    public void log(String msg) {
        try {
            exec.execute(() -> writer.println(msg));
        } catch (RejectedExecutionException ignored) {
            System.out.println("won't see me because the System.out is closed.");
            ignored.printStackTrace();
        }
    }
    
    public static void main(String...args) {
        PrintWriter writer = new PrintWriter(System.out);
        BetterLogService log = new BetterLogService(writer);
        log.start();
        
        Thread th = new Thread(() -> {
            try {
                for(int i = 1; i < 5; i++) {
                    log.log("msg " + i);
                    System.out.println("submited msg " + i);
                    //block if queue is full
                    Thread.sleep(1000);
                }
            } catch(Exception e) {
                e.printStackTrace();
            }
            });
        
        th.start();
        
        try {
            log.log("start to log");
            Thread.sleep(2000);
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }
        
        try {
            log.stop();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
CHAP7>javac BetterLogService.java 
CHAP7>java BetterLogService
submited msg 1
submited msg 2
start to log
msg 1
msg 2
java.util.concurrent.RejectedExecutionException: Task BetterLogService$$Lambda$2/1392838282@4225e37 rejected from java.util.concurrent.ThreadPoolExecutor@a6c152f[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 3]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
at java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:668)
at BetterLogService.log(BetterLogService.java:29)
at BetterLogService.lambda$main$1(BetterLogService.java:44)
at BetterLogService$$Lambda$1/925858445.run(Unknown Source)
at java.lang.Thread.run(Thread.java:745)
java.util.concurrent.RejectedExecutionException: Task BetterLogService$$Lambda$2/1392838282@72488ae0 rejected from java.util.concurrent.ThreadPoolExecutor@a6c152f[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 3]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
at java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:668)
at BetterLogService.log(BetterLogService.java:29)
at BetterLogService.lambda$main$1(BetterLogService.java:44)
at BetterLogService$$Lambda$1/925858445.run(Unknown Source)
at java.lang.Thread.run(Thread.java:745)

CHAP7>