Site Search:

ReaderThread.java

Back>

ReaderThread.java shows a technique for encapsulating nonstandard cancellation. ReaderThread extends thread, it passes in a socket connection in constructor, then read from the inputStream from the socket in the run() method. Since the read() method in InputStream is not responsive to interruption, ReaderThread has to override the thread's interrupt() method. In interrupt(), socket.close() is called, so that any threads blocked in read or write will throw SocketException, thus have chance to exit the blocking state. Notice in interrupt(), super.interrupt() is called to deliver a standard interrupt.

The program's timeline is the following. The main thread creates a ServerSocket, then listen on  localhost port 8090. It then blocked on socket2.accept(). At client side, a netcat command is issued "nc localhost 8090". This command sends a request to localhost port 8090 and opens an input channel, thus unblocks the server code socket2.accept(). Upon unblocking, the main thread constructed a new instance of ReaderThread with the newly received socket. Main thread then start the newly created ReaderThread. Once ReaderThread starts, it is blocking on int count = in.read(buf); in the run() method. The netcat command neither sent more input nor closed the input channel, so the ReaderThread is waiting for the input from socket InputStream and can not unblock itself. The main thread at the mean time, submitted a runnable () -> rt.interrupt() to a ScheduledExecutorService  to be run at 3 seconds later. After that, main thread busy spin on while(!rt.isInterrupted()), trying to capture the short time period after ReaderThread is interrupted but before ReaderThread run to finish and exits. After that, Main thread goes to sleep for 5 seconds, then wake up to check the status of ReaderTread and shutdown the ScheduledExecutorService.


CHAP7>cat ReaderThread.java 
import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
 * ReaderThread
 * <p/>
 * Encapsulating nonstandard cancellation in a Thread by overriding interrupt
 */
public class ReaderThread extends Thread {
    private static final int BUFSZ = 512;
    private final Socket socket;
    private final InputStream in;
    private static ServerSocket socket2;
    
    public static void main(String[] args) {
        try {
            ScheduledExecutorService exec = Executors.newScheduledThreadPool(2);
            
            socket2 = new ServerSocket(8090);
            System.out.println("block on socket.accept() until a request arrives");
            ReaderThread rt = new ReaderThread(socket2.accept()); 
            rt.start();
            exec.schedule(() -> rt.interrupt(), 3, TimeUnit.SECONDS);
            
            /*reveal the very short period of rt both alive and interrupted*/
            while(!rt.isInterrupted()) {
                if(rt.isInterrupted()) {
                    System.out.println(rt + " rt.isAlive() : " + rt.isAlive());
                    System.out.println(rt + " rt.isInterrupted(): " + rt.isInterrupted()); 
                    break;
                }
            }
            
            Thread.sleep(5000);
            System.out.println(rt + " rt.isAlive() : " + rt.isAlive());
            System.out.println(rt + " rt.isInterrupted(): " + rt.isInterrupted()); //rt already exits
            
            exec.shutdown();
            exec.awaitTermination(10, TimeUnit.SECONDS);
            socket2.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public ReaderThread(Socket socket) throws IOException {
        this.socket = socket;
        this.in = socket.getInputStream();
    }

    @Override
    public void interrupt() {
        try {
            socket.close();
        } catch (IOException ignored) {
            ignored.printStackTrace();
        } finally {
            System.out.println(Thread.currentThread() + ": called interrupt()");
            System.out.println(this.hashCode() - super.hashCode() + " this and super refer to the same thread that gets interrupted");
            System.out.println(super.getName() + " rt.isInterrupted(): " + super.isInterrupted());
            super.interrupt(); //call thread's interrupt()
            //this.interrupt(); //call this override method itself. Don't do it
            System.out.println(super.getName() + " rt.isInterrupted(): " + super.isInterrupted());
        }
    }

    public void run() {
        try {
            byte[] buf = new byte[BUFSZ];
            while (true) {
                int count = in.read(buf);
                if (count < 0)
                    break;
                else if (count > 0)
                    processBuffer(buf, count);
            }
        } catch (IOException e) { /* Allow thread to exit */
            System.out.println(Thread.currentThread() + " exits");
            e.printStackTrace();
        } 
    }

    public void processBuffer(byte[] buf, int count) {
        System.out.println(but);
    }
}
CHAP7>javac ReaderThread.java 
CHAP7>java ReaderThread
block on socket.accept() until a request arrives
Thread[Thread-0,5,main] exits
Thread[pool-1-thread-1,5,main]: called interrupt()
0 this and super refer to the same thread that gets interrupted
Thread-0 rt.isInterrupted(): false
Thread-0 rt.isInterrupted(): true
Thread[Thread-0,5,main] rt.isAlive() : true
Thread[Thread-0,5,main] rt.isInterrupted(): true
java.net.SocketException: Socket closed
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.read(SocketInputStream.java:150)
at java.net.SocketInputStream.read(SocketInputStream.java:121)
at java.net.SocketInputStream.read(SocketInputStream.java:107)
at ReaderThread.run(ReaderThread.java:75)
Thread[Thread-0,5,] rt.isAlive() : false
Thread[Thread-0,5,] rt.isInterrupted(): false

CHAP7>




CHAP7-client>nc localhost 8090

CHAP7-client>