Site Search:

SocketUsingTask.java

Back>

ReaderThread encapsulates cancellation of socket-using threads by overriding interrupt; the same can be done for tasks by overriding Future.cancel.

When a Callable is submitted to an ExecutorService, submit returns a Future that can be used to cancel the task. The newTaskFor hook of ThreadPoolExecutor is a factory method that creates the Future representing the task. It returns a RunnableFuture, an interface that extends both Future and Runnable (and is implemented by FutureTask). We can override the Future.cancel() method of FutureTask class in order to provide logging or gather statistics or cancel activities that are not responsive to interruption.

The main thread created a ServerSocket that listen on localhost port 8090. The main thread then blocks on socket2.accept(), until the client issued a "nc localhost 8090" command to open a socket connection to localhost port 8090. The unblocked main thread then created an instance of SocketUsingTask and submitted it to the class CancellingExecutor extends ThreadPoolExecutor. The SocketUsingTask is a callable, so it can be submitted to a ThreadPoolExecutor. The ThreadPool allocated a worker thread to run SocketUsingTask. The call() method is called, and the worker thread is blocked on int count = socket.getInputStream().read(buf);. The main thread wakes up after sleeping 5 seconds and called the f.cancel(true);. Calling socket.close(); in cancel allows int count = socket.getInputStream().read(buf); to throw IOException and exits the loop.

CHAP7>cat SocketUsingTask.java 
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.*;

/**
 * SocketUsingTask
 * <p/>
 * Encapsulating nonstandard cancellation in a task with newTaskFor
 */

public abstract class SocketUsingTask<T> implements CancellableTask<T> {
    private static final int BUFSZ = 512;
    private static ServerSocket socket2;
    private static Socket socket;

    public static void main(String args[]) throws Exception {
        socket2 = new ServerSocket(8090);
        System.out.println("block on socket.accept() until a request arrives");
        SocketUsingTask<?> sut = new SocketUsingTask<Object>() {
            @Override
            public Socket call() throws Exception {
                try {
                    System.out.println(Thread.currentThread() + " called call()");
                    byte[] buf = new byte[BUFSZ];
                    while (true) {
                        int count = socket.getInputStream().read(buf);
                        if (count < 0)
                            break;
                        else if (count > 0)
                            System.out.println(buf);
                    }
                } catch (IOException e) { /* Allow thread to exit */
                    System.out.println(Thread.currentThread() + " exits");
                    e.printStackTrace();
                }
                return null;
            }
        };

        sut.setSocket(socket2.accept());

        CancellingExecutor exec = new CancellingExecutor(2, 2, 10,
                TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
        
        Future<?> f = exec.submit(sut);
        Thread.sleep(5000);
        f.cancel(true);
        exec.shutdown();
        exec.awaitTermination(10, TimeUnit.SECONDS);
        socket2.close();
    }

    protected synchronized void setSocket(Socket s) {
        socket = s;
    }

    public synchronized void cancel() {
        try {
            if (socket != null)
                socket.close();
        } catch (IOException ignored) {
            System.out.println(Thread.currentThread() + " called cancel()");
        }
    }

    public RunnableFuture<T> newTask() {
        return new FutureTask<T>(this) {
            public boolean cancel(boolean mayInterruptIfRunning) {
                try {
                    SocketUsingTask.this.cancel();
                } finally {
                    System.out.println(Thread.currentThread() + " called newTask()");
                    return super.cancel(mayInterruptIfRunning);
                }
            }
        };
    }
}

interface CancellableTask<T> extends Callable<T> {
    void cancel();

    RunnableFuture<T> newTask();
}

class CancellingExecutor extends ThreadPoolExecutor {
    public CancellingExecutor(int corePoolSize, int maximumPoolSize,
            long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    public CancellingExecutor(int corePoolSize, int maximumPoolSize,
            long keepAliveTime, TimeUnit unit,
            BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                threadFactory);
    }

    public CancellingExecutor(int corePoolSize, int maximumPoolSize,
            long keepAliveTime, TimeUnit unit,
            BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                handler);
    }

    public CancellingExecutor(int corePoolSize, int maximumPoolSize,
            long keepAliveTime, TimeUnit unit,
            BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
            RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                threadFactory, handler);
    }

    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        if (callable instanceof CancellableTask)
            return ((CancellableTask<T>) callable).newTask();
        else
            return super.newTaskFor(callable);
    }
}
CHAP7>javac SocketUsingTask.java 
CHAP7>java SocketUsingTask
block on socket.accept() until a request arrives
Thread[pool-1-thread-1,5,main] called call()
Thread[main,5,main] called newTask()
Thread[pool-1-thread-1,5,main] exits
java.net.SocketException: Socket closed
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.read(SocketInputStream.java:150)
at java.net.SocketInputStream.read(SocketInputStream.java:121)
at java.net.SocketInputStream.read(SocketInputStream.java:107)
at SocketUsingTask$1.call(SocketUsingTask.java:27)
at SocketUsingTask$1.call(SocketUsingTask.java:20)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

CHAP7>



CHAP7-CLINET>nc localhost 8090
CHAP7-CLINET>