Site Search:

CooperatingNoDeadlock.java

CooperatingNoDeadlock.java

import java.util.*;
import java.util.concurrent.*;
/**
* CooperatingNoDeadlock
* <p/>
* Using open calls to avoiding deadlock between cooperating objects
*
*/
public class CooperatingNoDeadlock {
private static final int NUM_THREADS = 20;
private static final int NUM_ACCOUNTS = 5;
private static final int NUM_EVENTS = 10000;
private static final int NUM_SCREEN_UPDATS = 10000;
private static final int MAP_SIZE = 10;

private static final int[][] DIRS = {{0,1},{0,-1},{1,0},{-1,0}};
public static void main(String[] args) {
final Random rnd = new Random();
final Taxi[] accounts = new Taxi[NUM_ACCOUNTS];
final CooperatingNoDeadlock cndl = new CooperatingNoDeadlock();
final Dispatcher dispatcher = cndl.new Dispatcher();

final BlockingDeque<String> gpsEvents = new LinkedBlockingDeque<>();
//final BlockingDeque<String> taxiBookedEvents = new LinkedBlockingDeque<>();

for (int i = 0; i < accounts.length; i++) {
accounts[i] = cndl.new Taxi(dispatcher);
accounts[i].setLocation(cndl.new Point(i, i));
accounts[i].setDestination(cndl.new Point(i, i+1));
dispatcher.addTaxi(accounts[i]);
}

//simulate gps location time sequences
int[] xp = new int[NUM_ACCOUNTS];
int[] yp = new int[NUM_ACCOUNTS];
for (int i = 0; i < NUM_ACCOUNTS; i++) {
xp[i] = accounts[i].getLocation().x;
yp[i] = accounts[i].getLocation().y;
}
for (int i = 0; i < NUM_EVENTS; i++) {
int index = rnd.nextInt(NUM_ACCOUNTS);
Taxi taxi = accounts[index];
Point p = cndl.new Point(xp[index], yp[index]);
int[] v = DIRS[rnd.nextInt(4)]; //random move
if(p.equals(taxi.getDestination())) {
// System.out.println("create reaching event for " + index);
gpsEvents.offer(index + "," + p.x + "," + p.y);
} else {
Point np = cndl.new Point((p.x + v[0]) % MAP_SIZE, p.y+v[1] % MAP_SIZE);
gpsEvents.offer(index + "," + np.x + "," + np.y);
xp[index] = np.x;
yp[index] = np.y;
}
}

System.out.println("Done simulate gps update events");
class TaxiMoveThread extends Thread {
public void run() {
while(!gpsEvents.isEmpty()) { //in uber case, never exit, keep checking
try {
String[] info = gpsEvents.takeFirst().split(",");
int id = Integer.parseInt(info[0]);
int x = Integer.parseInt(info[1]);
int y = Integer.parseInt(info[2]);

Taxi t = accounts[id];
t.setLocation(cndl.new Point(x, y));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
//event queue workers
for (int i = 0; i < NUM_THREADS; i++)
new TaxiMoveThread().start();

//gui updater
new Thread(()->{
for (int i = 0; i < NUM_SCREEN_UPDATS; i++)
dispatcher.getImage();
}).start();

System.out.println("Done.");
}

private static int uid = 0;
//deadlock-prone code fixed!
class Taxi {
private Point location, destination;
private final Dispatcher dispatcher;
public final int id;

public Taxi(Dispatcher dispatcher) {
this.dispatcher = dispatcher;
this.id = uid;
uid += 1;
}

public synchronized Point getLocation() {
return location;
}

// public synchronized void setLocation(Point location) {
// this.location = location;
// if (location.equals(destination))
// dispatcher.notifyAvailable(this);
// }

public void setLocation(Point location) {
boolean reachedDestination;
synchronized (this) {
this.location = location;
reachedDestination = location.equals(destination);
}
if (reachedDestination) {
dispatcher.notifyAvailable(this);
}
}

public synchronized Point getDestination() {
return destination;
}

public synchronized void setDestination(Point destination) {
this.destination = destination;
}
}

class Dispatcher {
private final Set<Taxi> taxis;
private final Set<Taxi> availableTaxis;

public Dispatcher() {
taxis = new HashSet<Taxi>();
availableTaxis = new HashSet<Taxi>();
}

public synchronized void addTaxi(Taxi taxi) {
this.taxis.add(taxi);
}

public synchronized void notifyAvailable(Taxi taxi) {
System.out.println(taxi.id + " is available.");
availableTaxis.add(taxi);
}

// public synchronized Image getImage() {
// Image image = new Image();
// for (Taxi t : taxis) {
// System.out.print("draw taxi " + t.id + " ");
// image.drawMarker(t.getLocation());
// }
// return image;
// }

public Image getImage() {
Set<Taxi> copy;
synchronized (this) {
copy = new HashSet<>(taxis);
}
Image image = new Image();
for(Taxi t : copy) {
System.out.print("draw taxi " + t.id + " ");
image.drawMarker(t.getLocation());
}
return image;
}
}

class Image {
public void drawMarker(Point p) {
System.out.println("at location " + p.x + ", " + p.y);
}
}

class Point {
public final int x, y;
public Point(int x, int y) {
this.x = x;
this.y = y;
}

@Override
public boolean equals(Object o) {
if (o == this)
return true;
if (!(o instanceof Point))
return false;
Point p = (Point)o;
return this.x == p.x && this.y == p.y;
}

@Override
public int hashCode() {
int result = 17;
result = 31 * result + x;
result = 31 * result + y;
return result;
}
}
}

This program simulates a taxi dispatch system where multiple threads update taxi locations and the dispatcher renders a live map, just like in the earlier CooperatingDeadlock version. However, it fixes the deadlock issue by using open calls, which means that synchronized methods no longer call other synchronized methods while holding their locks. Specifically, Taxi.setLocation now checks if the taxi has reached its destination inside a synchronized block but calls dispatcher.notifyAvailable outside of it, preventing nested locking between Taxi and Dispatcher. Similarly, Dispatcher.getImage first copies the set of taxis while holding its own lock and then accesses each Taxi outside the dispatcher lock. This eliminates circular wait and allows cooperating objects to work together safely without risking deadlock.

No comments:

Post a Comment