In BetterLogService, we delegate the logging thread management to an Executors.newSingleThreadExecutor(). We still have a single log consumer. For log producers, they have to submit a Runnable () -> writer.println(msg) to the ExecutorService in order to be logged.
ExecutorService's Executors.newSingleThreadExecutor() implementation has a good support for task cancellation -- when shutdown() is called, it stops receiving new task submission and processes all the tasks already submitted before exiting -- that is exactly what we want. The producers no longer directly deal with a blocking queue, they submit a task to the ExecutorService instead (having the ExecutorService to worry about the blocking queue), so they don't need to worry about getting blocked.
CHAP7>cat BetterLogService.java
import java.io.PrintWriter;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
public class BetterLogService {
private final ExecutorService exec = Executors.newSingleThreadExecutor();
private final PrintWriter writer;
public BetterLogService(PrintWriter writer) {
this.writer = writer;
}
public void start() {}
public void stop() throws InterruptedException {
try {
exec.shutdown();
exec.awaitTermination(10, TimeUnit.SECONDS);
} finally {
writer.close();
}
}
public void log(String msg) {
try {
exec.execute(() -> writer.println(msg));
} catch (RejectedExecutionException ignored) {
System.out.println("won't see me because the System.out is closed.");
ignored.printStackTrace();
}
}
public static void main(String...args) {
PrintWriter writer = new PrintWriter(System.out);
BetterLogService log = new BetterLogService(writer);
log.start();
Thread th = new Thread(() -> {
try {
for(int i = 1; i < 5; i++) {
log.log("msg " + i);
System.out.println("submited msg " + i);
//block if queue is full
Thread.sleep(1000);
}
} catch(Exception e) {
e.printStackTrace();
}
});
th.start();
try {
log.log("start to log");
Thread.sleep(2000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
try {
log.stop();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
CHAP7>javac BetterLogService.java
CHAP7>java BetterLogService
submited msg 1
submited msg 2
start to log
msg 1
msg 2
java.util.concurrent.RejectedExecutionException: Task BetterLogService$$Lambda$2/1392838282@4225e37 rejected from java.util.concurrent.ThreadPoolExecutor@a6c152f[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 3]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
at java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:668)
at BetterLogService.log(BetterLogService.java:29)
at BetterLogService.lambda$main$1(BetterLogService.java:44)
at BetterLogService$$Lambda$1/925858445.run(Unknown Source)
at java.lang.Thread.run(Thread.java:745)
java.util.concurrent.RejectedExecutionException: Task BetterLogService$$Lambda$2/1392838282@72488ae0 rejected from java.util.concurrent.ThreadPoolExecutor@a6c152f[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 3]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
at java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:668)
at BetterLogService.log(BetterLogService.java:29)
at BetterLogService.lambda$main$1(BetterLogService.java:44)
at BetterLogService$$Lambda$1/925858445.run(Unknown Source)
at java.lang.Thread.run(Thread.java:745)
CHAP7>