Important Announcement
PubHTML5 Scheduled Server Maintenance on (GMT) Sunday, June 26th, 2:00 am - 8:00 am.
PubHTML5 site will be inoperative during the times indicated!

Home Explore Thinking In Java

Thinking In Java

Published by jack.zhang, 2014-07-28 04:28:47

Description: “He gave man speech, and speech created thought, Which is the
measure of the Universe”—Prometheus Unbound, Shelley
Human beings ... are very much at the mercy of the particular language which has
become the medium of expression for their society. It is quite an illusion to imagine
that one adjusts to reality essentially without the use of language and that language
is merely an incidental means of solving specific problems of communication and
reflection. The fact of the matter is that the “real world” is to a large extent
unconsciously built up on the language habits of the group.
The Status of Linguistics as a Science, 1929, Edward Sapir
Like any human language, Java provides a way to express concepts. If successful, this
medium of expression will be significantly easierand more flexible than the alternatives as
problems grow larger and more complex.
You can’t look at Java as just a collection of features—some of the features make no sense in
isolation. You can use the

Search

Read the Text Version

  New library components The java.util.concurrent library in Java SE5 introduces a significant number of new classes designed to solve concurrency problems. Learning to use these can help you produce simpler and more robust concurrent programs. This section includes a representative set of examples of various components, but a few of the components—ones that you may be less likely to use and encounter—are not discussed here. Because these components solve various problems, there is no clear way to organize them, so I shall attempt to start with simpler examples and proceed through examples of increasing complexity. CountDownLatch This is used to synchronize one or more tasks by forcing them to wait for the completion of a set of operations being performed by other tasks. You give an initial count to a CountDownLatch object, and any task that calls await( ) on that object will block until the count reaches zero. Other tasks may call countDown( ) on the object to reduce the count, presumably when a task finishes its job. A CountDownLatch is designed to be used in a one-shot fashion; the count cannot be reset. If you need a version that resets the count, you can use a CyclicBarrier instead. The tasks that call countDown( ) are not blocked when they make that call. Only the call to await( ) is blocked until the count reaches zero. A typical use is to divide a problem into n independently solvable tasks and create a CountDownLatch with a value of n. When each task is finished it calls countDown( ) on the latch. Tasks waiting for the problem to be solved call await( ) on the latch to hold themselves back until it is completed. Here’s a skeleton example that demonstrates this technique: //: concurrency/CountDownLatchDemo.java import java.util.concurrent.*; import java.util.*; import static net.mindview.util.Print.*; // Performs some portion of a task: class TaskPortion implements Runnable { private static int counter = 0; private final int id = counter++; private static Random rand = new Random(47); private final CountDownLatch latch; TaskPortion(CountDownLatch latch) { this.latch = latch; } public void run() { try { doWork(); latch.countDown(); } catch(InterruptedException ex) { // Acceptable way to exit } } public void doWork() throws InterruptedException { TimeUnit.MILLISECONDS.sleep(rand.nextInt(2000)); Concurrency 879 

  print(this + \"completed\"); } public String toString() { return String.format(\"%1$-3d \", id); } } // Waits on the CountDownLatch: class WaitingTask implements Runnable { private static int counter = 0; private final int id = counter++; private final CountDownLatch latch; WaitingTask(CountDownLatch latch) { this.latch = latch; } public void run() { try { latch.await(); print(\"Latch barrier passed for \" + this); } catch(InterruptedException ex) { print(this + \" interrupted\"); } } public String toString() { return String.format(\"WaitingTask %1$-3d \", id); } } public class CountDownLatchDemo { static final int SIZE = 100; public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); // All must share a single CountDownLatch object: CountDownLatch latch = new CountDownLatch(SIZE); for(int i = 0; i < 10; i++) exec.execute(new WaitingTask(latch)); for(int i = 0; i < SIZE; i++) exec.execute(new TaskPortion(latch)); print(\"Launched all tasks\"); exec.shutdown(); // Quit when all tasks complete } } /* (Execute to see output) *///:~ TaskPortion sleeps for a random period to simulate the completion of part of the task, and WaitingTask indicates a part of the system that must wait until the initial portion of the problem is complete. All tasks work with the same single CountDownLatch, which is defined in main( ). Exercise 32: (7) Use a CountDownLatch to solve the problem of correlating the results from the Entrances in OrnamentalGarden.java. Remove the unnecessary code from the new version of the example. Library thread safety Notice that TaskPortion contains a static Random object, which means that multiple tasks may be calling Random.nextInt( ) at the same time. Is this safe? If there is a problem, it can be solved in this case by giving TaskPortion its own Random object—that is, by removing the static specifier. But the question remains for Java standard library methods in general: Which ones are thread-safe and which ones aren’t? 880 Thinking in Java Bruce Eckel

  Unfortunately, the JDK documentation is not forthcoming on this point. It happens that Random.nextInt( ) is thread-safe, but alas, you shall have to discover this on a case-by- case basis, using either a Web search or by inspecting the Java library code. This is not a particularly good situation for a programming language that was, at least in theory, designed to support concurrency. CyclicBarrier A CyclicBarrier is used in situations where you want to create a group of tasks to perform work in parallel, and then wait until they are all finished before moving on to the next step (something like join( ), it would seem). It brings all the parallel tasks into alignment at the barrier so you can move forward in unison. This is very similar to the CountDownLatch, except that a CountDownLatch is a one-shot event, whereas a CyclicBarrier can be reused over and over. I’ve been fascinated with simulations from the beginning of my experience with computers, and concurrency is a key factor of making simulations possible. The very first program that I can remember writing 22 was a simulation: a horse-racing game written in BASIC called (because of the file name limitations) HOSRAC.BAS. Here is the object-oriented, threaded version of that program, utilizing a CyclicBarrier: //: concurrency/HorseRace.java // Using CyclicBarriers. import java.util.concurrent.*; import java.util.*; import static net.mindview.util.Print.*; class Horse implements Runnable { private static int counter = 0; private final int id = counter++; private int strides = 0; private static Random rand = new Random(47); private static CyclicBarrier barrier; public Horse(CyclicBarrier b) { barrier = b; } public synchronized int getStrides() { return strides; } public void run() { try { while(!Thread.interrupted()) { synchronized(this) { strides += rand.nextInt(3); // Produces 0, 1 or 2 } barrier.await(); } } catch(InterruptedException e) { // A legitimate way to exit } catch(BrokenBarrierException e) { // This one we want to know about throw new RuntimeException(e); } } public String toString() { return \"Horse \" + id + \" \"; } public String tracks() { StringBuilder s = new StringBuilder(); for(int i = 0; i < getStrides(); i++) s.append(\"*\"); s.append(id); return s.toString();                                                              22 As a freshman in high school; the classroom had an ASR-33 teletype with a 110-baud acoustic-coupler modem accessing an HP-1000. Concurrency 881 

  } } public class HorseRace { static final int FINISH_LINE = 75; private List<Horse> horses = new ArrayList<Horse>(); private ExecutorService exec = Executors.newCachedThreadPool(); private CyclicBarrier barrier; public HorseRace(int nHorses, final int pause) { barrier = new CyclicBarrier(nHorses, new Runnable() { public void run() { StringBuilder s = new StringBuilder(); for(int i = 0; i < FINISH_LINE; i++) s.append(\"=\"); // The fence on the racetrack print(s); for(Horse horse : horses) print(horse.tracks()); for(Horse horse : horses) if(horse.getStrides() >= FINISH_LINE) { print(horse + \"won!\"); exec.shutdownNow(); return; } try { TimeUnit.MILLISECONDS.sleep(pause); } catch(InterruptedException e) { print(\"barrier-action sleep interrupted\"); } } }); for(int i = 0; i < nHorses; i++) { Horse horse = new Horse(barrier); horses.add(horse); exec.execute(horse); } } public static void main(String[] args) { int nHorses = 7; int pause = 200; if(args.length > 0) { // Optional argument int n = new Integer(args[0]); nHorses = n > 0 ? n : nHorses; } if(args.length > 1) { // Optional argument int p = new Integer(args[1]); pause = p > -1 ? p : pause; } new HorseRace(nHorses, pause); } } /* (Execute to see output) *///:~ A CyclicBarrier can be given a \"barrier action,\" which is a Runnable that is automatically executed when the count reaches zero—this is another distinction between CyclicBarrier and CountdownLatch. Here, the barrier action is created as an anonymous class that is handed to the constructor of CyclicBarrier. I tried having each horse print itself, but then the order of display was dependent on the task manager. The CyclicBarrier allows each horse to do whatever it needs to do in order to move forward, and then it has to wait at the barrier until all the other horses have moved forward. When all horses have moved, the CyclicBarrier automatically calls its Runnable barrieraction task to display the horses in order, along with the fence. 882 Thinking in Java Bruce Eckel

  Once all the tasks have passed the barrier, it is automatically ready for the next round. To give it the effect of very simple animation, make the size of your console window small enough so that only the horses show. DelayQueue This is an unbounded BlockingQueue of objects that implement the Delayed interface. An object can only be taken from the queue when its delay has expired. The queue is sorted so that the object at the head has a delay that has expired for the longest time. If no delay has expired, then there is no head element and poll( ) will return null (because of this, you cannot place null elements in the queue). Here’s an example where the Delayed objects are themselves tasks, and the DelayedTaskConsumer takes the most \"urgent\" task (the one that has been expired for the longest time) off the queue and runs it. Note that DelayQueue is thus a variation of a priority queue. //: concurrency/DelayQueueDemo.java import java.util.concurrent.*; import java.util.*; import static java.util.concurrent.TimeUnit.*; import static net.mindview.util.Print.*; class DelayedTask implements Runnable, Delayed { private static int counter = 0; private final int id = counter++; private final int delta; private final long trigger; protected static List<DelayedTask> sequence = new ArrayList<DelayedTask>(); public DelayedTask(int delayInMilliseconds) { delta = delayInMilliseconds; trigger = System.nanoTime() + NANOSECONDS.convert(delta, MILLISECONDS); sequence.add(this); } public long getDelay(TimeUnit unit) { return unit.convert( trigger - System.nanoTime(), NANOSECONDS); } public int compareTo(Delayed arg) { DelayedTask that = (DelayedTask)arg; if(trigger < that.trigger) return -1; if(trigger > that.trigger) return 1; return 0; } public void run() { printnb(this + \" \"); } public String toString() { return String.format(\"[%1$-4d]\", delta) + \" Task \" + id; } public String summary() { return \"(\" + id + \":\" + delta + \")\"; } public static class EndSentinel extends DelayedTask { private ExecutorService exec; public EndSentinel(int delay, ExecutorService e) { super(delay); exec = e; Concurrency 883 

  } public void run() { for(DelayedTask pt : sequence) { printnb(pt.summary() + \" \"); } print(); print(this + \" Calling shutdownNow()\"); exec.shutdownNow(); } } } class DelayedTaskConsumer implements Runnable { private DelayQueue<DelayedTask> q; public DelayedTaskConsumer(DelayQueue<DelayedTask> q) { this.q = q; } public void run() { try { while(!Thread.interrupted()) q.take().run(); // Run task with the current thread } catch(InterruptedException e) { // Acceptable way to exit } print(\"Finished DelayedTaskConsumer\"); } } public class DelayQueueDemo { public static void main(String[] args) { Random rand = new Random(47); ExecutorService exec = Executors.newCachedThreadPool(); DelayQueue<DelayedTask> queue = new DelayQueue<DelayedTask>(); // Fill with tasks that have random delays: for(int i = 0; i < 20; i++) queue.put(new DelayedTask(rand.nextInt(5000))); // Set the stopping point queue.add(new DelayedTask.EndSentinel(5000, exec)); exec.execute(new DelayedTaskConsumer(queue)); } } /* Output: [128 ] Task 11 [200 ] Task 7 [429 ] Task 5 [520 ] Task 18 [555 ] Task 1 [961 ] Task 4 [998 ] Task 16 [1207] Task 9 [1693] Task 2 [1809] Task 14 [1861] Task 3 [2278] Task 15 [3288] Task 10 [3551] Task 12 [4258] Task 0 [4258] Task 19 [4522] Task 8 [4589] Task 13 [4861] Task 17 [4868] Task 6 (0:4258) (1:555) (2:1693) (3:1861) (4:961) (5:429) (6:4868) (7:200) (8:4522) (9:1207) (10:3288) (11:128) (12:3551) (13:4589) (14:1809) (15:2278) (16:998) (17:4861) (18:520) (19:4258) (20:5000) [5000] Task 20 Calling shutdownNow() Finished DelayedTaskConsumer *///:~ DelayedTask contains a List<DelayedTask> called sequence that preserves the order in which the tasks were created, so that we can see that sorting does in fact take place. The Delayed interface has one method, getDelay( ), which tells how long it is until the delay time expires or how long ago the delay time has expired. This method forces us to use the TimeUnit class because that’s the argument type. This turns out to be a very convenient class because you can easily convert units without doing any calculations. For example, the value of delta is stored in milliseconds, but the Java SE5 method System.nanoTime( ) 884 Thinking in Java Bruce Eckel

  produces time in nanoseconds. You can convert the value of delta by saying what units it is in and what units you want it to be in, like this: NANOSECONDS.convert(delta, MILLISECONDS); In getDelay( ), the desired units are passed in as the unit argument, and you use this to convert the time difference from the trigger time to the units requested by the caller, without even knowing what those units are (this is a simple example of the Strategy design pattern, where part of the algorithm is passed in as an argument). For sorting, the Delayed interface also inherits the Comparable interface, so compareTo( ) must be implemented so that it produces a reasonable comparison. toString( ) and summary( ) provide output formatting, and the nested EndSentinel class provides a way to shut everything down by placing it as the last element in the queue. Note that because DelayedTaskConsumer is itself a task, it has its own Thread which it can use to run each task that comes out of the queue. Since the tasks are being performed in queue priority order, there’s no need in this example to start separate threads to run the DelayedTasks. You can see from the output that the order in which the tasks are created has no effect on execution order—instead, the tasks are executed in delay order as expected. PriorityBlockingQueue This is basically a priority queue that has blocking retrieval operations. Here’s an example where the objects in the priority queue are tasks that emerge from the queue in priority order. A PrioritizedTask is given a priority number to provide this order: //: concurrency/PriorityBlockingQueueDemo.java import java.util.concurrent.*; import java.util.*; import static net.mindview.util.Print.*; class PrioritizedTask implements Runnable, Comparable<PrioritizedTask> { private Random rand = new Random(47); private static int counter = 0; private final int id = counter++; private final int priority; protected static List<PrioritizedTask> sequence = new ArrayList<PrioritizedTask>(); public PrioritizedTask(int priority) { this.priority = priority; sequence.add(this); } public int compareTo(PrioritizedTask arg) { return priority < arg.priority ? 1 : (priority > arg.priority ? -1 : 0); } public void run() { try { TimeUnit.MILLISECONDS.sleep(rand.nextInt(250)); } catch(InterruptedException e) { // Acceptable way to exit } print(this); } public String toString() { Concurrency 885 

  return String.format(\"[%1$-3d]\", priority) + \" Task \" + id; } public String summary() { return \"(\" + id + \":\" + priority + \")\"; } public static class EndSentinel extends PrioritizedTask { private ExecutorService exec; public EndSentinel(ExecutorService e) { super(-1); // Lowest priority in this program exec = e; } public void run() { int count = 0; for(PrioritizedTask pt : sequence) { printnb(pt.summary()); if(++count % 5 == 0) print(); } print(); print(this + \" Calling shutdownNow()\"); exec.shutdownNow(); } } } class PrioritizedTaskProducer implements Runnable { private Random rand = new Random(47); private Queue<Runnable> queue; private ExecutorService exec; public PrioritizedTaskProducer( Queue<Runnable> q, ExecutorService e) { queue = q; exec = e; // Used for EndSentinel } public void run() { // Unbounded queue; never blocks. // Fill it up fast with random priorities: for(int i = 0; i < 20; i++) { queue.add(new PrioritizedTask(rand.nextInt(10))); Thread.yield(); } // Trickle in highest-priority jobs: try { for(int i = 0; i < 10; i++) { TimeUnit.MILLISECONDS.sleep(250); queue.add(new PrioritizedTask(10)); } // Add jobs, lowest priority first: for(int i = 0; i < 10; i++) queue.add(new PrioritizedTask(i)); // A sentinel to stop all the tasks: queue.add(new PrioritizedTask.EndSentinel(exec)); } catch(InterruptedException e) { // Acceptable way to exit } print(\"Finished PrioritizedTaskProducer\"); } } class PrioritizedTaskConsumer implements Runnable { private PriorityBlockingQueue<Runnable> q; public PrioritizedTaskConsumer( 886 Thinking in Java Bruce Eckel

  PriorityBlockingQueue<Runnable> q) { this.q = q; } public void run() { try { while(!Thread.interrupted()) // Use current thread to run the task: q.take().run(); } catch(InterruptedException e) { // Acceptable way to exit } print(\"Finished PrioritizedTaskConsumer\"); } } public class PriorityBlockingQueueDemo { public static void main(String[] args) throws Exception { Random rand = new Random(47); ExecutorService exec = Executors.newCachedThreadPool(); PriorityBlockingQueue<Runnable> queue = new PriorityBlockingQueue<Runnable>(); exec.execute(new PrioritizedTaskProducer(queue, exec)); exec.execute(new PrioritizedTaskConsumer(queue)); } } /* (Execute to see output) *///:~ As with the previous example, the creation sequence of the PrioritizedTask objects is remembered in the sequence List, for comparison with the actual order of execution. The run( ) method sleeps for a short random time and prints the object information, and the EndSentinel provides the same functionality as before while guaranteeing that it is the last object in the queue. The PrioritizedTaskProducer and PrioritizedTaskConsumer connect to each other through a PriorityBlockingQueue. Because the blocking nature of the queue provides all the necessary synchronization, notice that no explicit synchronization is necessary—you don’t have to think about whether the queue has any elements in it when you’re reading from it, because the queue will simply block the reader when it is out of elements. The greenhouse controller with ScheduledExecutor The Inner Classes chapter introduced the example of a control system applied to a hypothetical greenhouse, turning various facilities on or off or otherwise adjusting them. This can be seen as a kind of concurrency problem, with each desired greenhouse event as a task that is run at a predefined time. The ScheduledThreadPoolExecutor provides just the service necessary to solve the problem. Using either schedule( ) (to run a task once) or scheduleAtFixedRate( ) (to repeat a task at a regular interval), you set up Runnable objects to be executed at some time in the future. Compare the following with the approach used in the Inner Classes chapter to notice how much simpler it is when you can use a predefined tool like ScheduledThreadPoolExecutor: //: concurrency/GreenhouseScheduler.java // Rewriting innerclasses/GreenhouseController.java // to use a ScheduledThreadPoolExecutor. // {Args: 5000} import java.util.concurrent.*; import java.util.*; public class GreenhouseScheduler { Concurrency 887 

  private volatile boolean light = false; private volatile boolean water = false; private String thermostat = \"Day\"; public synchronized String getThermostat() { return thermostat; } public synchronized void setThermostat(String value) { thermostat = value; } ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(10); public void schedule(Runnable event, long delay) { scheduler.schedule(event,delay,TimeUnit.MILLISECONDS); } public void repeat(Runnable event, long initialDelay, long period) { scheduler.scheduleAtFixedRate( event, initialDelay, period, TimeUnit.MILLISECONDS); } class LightOn implements Runnable { public void run() { // Put hardware control code here to // physically turn on the light. System.out.println(\"Turning on lights\"); light = true; } } class LightOff implements Runnable { public void run() { // Put hardware control code here to // physically turn off the light. System.out.println(\"Turning off lights\"); light = false; } } class WaterOn implements Runnable { public void run() { // Put hardware control code here. System.out.println(\"Turning greenhouse water on\"); water = true; } } class WaterOff implements Runnable { public void run() { // Put hardware control code here. System.out.println(\"Turning greenhouse water off\"); water = false; } } class ThermostatNight implements Runnable { public void run() { // Put hardware control code here. System.out.println(\"Thermostat to night setting\"); setThermostat(\"Night\"); } } class ThermostatDay implements Runnable { public void run() { // Put hardware control code here. System.out.println(\"Thermostat to day setting\"); setThermostat(\"Day\"); } } 888 Thinking in Java Bruce Eckel

  class Bell implements Runnable { public void run() { System.out.println(\"Bing!\"); } } class Terminate implements Runnable { public void run() { System.out.println(\"Terminating\"); scheduler.shutdownNow(); // Must start a separate task to do this job, // since the scheduler has been shut down: new Thread() { public void run() { for(DataPoint d : data) System.out.println(d); } }.start(); } } // New feature: data collection static class DataPoint { final Calendar time; final float temperature; final float humidity; public DataPoint(Calendar d, float temp, float hum) { time = d; temperature = temp; humidity = hum; } public String toString() { return time.getTime() + String.format( \" temperature: %1$.1f humidity: %2$.2f\", temperature, humidity); } } private Calendar lastTime = Calendar.getInstance(); { // Adjust date to the half hour lastTime.set(Calendar.MINUTE, 30); lastTime.set(Calendar.SECOND, 00); } private float lastTemp = 65.0f; private int tempDirection = +1; private float lastHumidity = 50.0f; private int humidityDirection = +1; private Random rand = new Random(47); List<DataPoint> data = Collections.synchronizedList( new ArrayList<DataPoint>()); class CollectData implements Runnable { public void run() { System.out.println(\"Collecting data\"); synchronized(GreenhouseScheduler.this) { // Pretend the interval is longer than it is: lastTime.set(Calendar.MINUTE, lastTime.get(Calendar.MINUTE) + 30); // One in 5 chances of reversing the direction: if(rand.nextInt(5) == 4) tempDirection = -tempDirection; // Store previous value: lastTemp = lastTemp + tempDirection * (1.0f + rand.nextFloat()); if(rand.nextInt(5) == 4) humidityDirection = -humidityDirection; lastHumidity = lastHumidity + humidityDirection * rand.nextFloat(); Concurrency 889 

  // Calendar must be cloned, otherwise all // DataPoints hold references to the same lastTime. // For a basic object like Calendar, clone() is OK. data.add(new DataPoint((Calendar)lastTime.clone(), lastTemp, lastHumidity)); } } } public static void main(String[] args) { GreenhouseScheduler gh = new GreenhouseScheduler(); gh.schedule(gh.new Terminate(), 5000); // Former \"Restart\" class not necessary: gh.repeat(gh.new Bell(), 0, 1000); gh.repeat(gh.new ThermostatNight(), 0, 2000); gh.repeat(gh.new LightOn(), 0, 200); gh.repeat(gh.new LightOff(), 0, 400); gh.repeat(gh.new WaterOn(), 0, 600); gh.repeat(gh.new WaterOff(), 0, 800); gh.repeat(gh.new ThermostatDay(), 0, 1400); gh.repeat(gh.new CollectData(), 500, 500); } } /* (Execute to see output) *///:~ This version reorganizes the code and adds a new feature: collecting temperature and humidity readings in the greenhouse. A DataPoint holds and displays a single piece of data, while CollectData is the scheduled task that generates simulated data and adds it to the List<DataPoint> in Greenhouse each time it is run. Notice the use of both volatile and synchronized in appropriate places to prevent tasks from interfering with each other. All the methods in the List that holds DataPoints are synchronized using the java.util.Collections utility synchronizedList( ) when the List is created. Exercise 33: (7) Modify GreenhouseScheduler.java so that it uses a DelayQueue instead of a ScheduledExecutor. Semaphore A normal lock (from concurrent.locks or the built-in synchronized lock) only allows one task at a time to access a resource. A counting semaphore allows n tasks to access the resource at the same time. You can also think of a semaphore as handing out \"permits\" to use a resource, although no actual permit objects are used. As an example, consider the concept of the object pool, which manages a limited number of objects by allowing them to be checked out for use, and then checked back in again when the user is finished. This functionality can be encapsulated in a generic class: //: concurrency/Pool.java // Using a Semaphore inside a Pool, to restrict // the number of tasks that can use a resource. import java.util.concurrent.*; import java.util.*; public class Pool<T> { private int size; private List<T> items = new ArrayList<T>(); private volatile boolean[] checkedOut; private Semaphore available; public Pool(Class<T> classObject, int size) { 890 Thinking in Java Bruce Eckel

  this.size = size; checkedOut = new boolean[size]; available = new Semaphore(size, true); // Load pool with objects that can be checked out: for(int i = 0; i < size; ++i) try { // Assumes a default constructor: items.add(classObject.newInstance()); } catch(Exception e) { throw new RuntimeException(e); } } public T checkOut() throws InterruptedException { available.acquire(); return getItem(); } public void checkIn(T x) { if(releaseItem(x)) available.release(); } private synchronized T getItem() { for(int i = 0; i < size; ++i) if(!checkedOut[i]) { checkedOut[i] = true; return items.get(i); } return null; // Semaphore prevents reaching here } private synchronized boolean releaseItem(T item) { int index = items.indexOf(item); if(index == -1) return false; // Not in the list if(checkedOut[index]) { checkedOut[index] = false; return true; } return false; // Wasn’t checked out } } ///:~ In this simplified form, the constructor uses newInstance( ) to load the pool with objects. If you need a new object, you call checkOut( ), and when you’re finished with an object, you hand it to checkIn( ). The boolean checkedOut array keeps track of the objects that are checked out, and is managed by the getItem( ) and releaseItem( ) methods. These, in turn, are guarded by the Semaphore available, so that, in checkOut( ), available blocks the progress of the call if there are no more semaphore permits available (which means there are no more objects in the pool). In checkIn( ), if the object being checked in is valid, a permit is returned to the semaphore. To create an example, we can use Fat, a type of object that is expensive to create because its constructor takes time to run: //: concurrency/Fat.java // Objects that are expensive to create. public class Fat { private volatile double d; // Prevent optimization private static int counter = 0; private final int id = counter++; public Fat() { Concurrency 891 

  // Expensive, interruptible operation: for(int i = 1; i < 10000; i++) { d += (Math.PI + Math.E) / (double)i; } } public void operation() { System.out.println(this); } public String toString() { return \"Fat id: \" + id; } } ///:~ We’ll pool these objects to limit the impact of this constructor. We can test the Pool class by creating a task that will check out Fat objects, hold them for a while, and then check them back in: //: concurrency/SemaphoreDemo.java // Testing the Pool class import java.util.concurrent.*; import java.util.*; import static net.mindview.util.Print.*; // A task to check a resource out of a pool: class CheckoutTask<T> implements Runnable { private static int counter = 0; private final int id = counter++; private Pool<T> pool; public CheckoutTask(Pool<T> pool) { this.pool = pool; } public void run() { try { T item = pool.checkOut(); print(this + \"checked out \" + item); TimeUnit.SECONDS.sleep(1); print(this +\"checking in \" + item); pool.checkIn(item); } catch(InterruptedException e) { // Acceptable way to terminate } } public String toString() { return \"CheckoutTask \" + id + \" \"; } } public class SemaphoreDemo { final static int SIZE = 25; public static void main(String[] args) throws Exception { final Pool<Fat> pool = new Pool<Fat>(Fat.class, SIZE); ExecutorService exec = Executors.newCachedThreadPool(); for(int i = 0; i < SIZE; i++) exec.execute(new CheckoutTask<Fat>(pool)); print(\"All CheckoutTasks created\"); List<Fat> list = new ArrayList<Fat>(); for(int i = 0; i < SIZE; i++) { Fat f = pool.checkOut(); printnb(i + \": main() thread checked out \"); f.operation(); list.add(f); } Future<?> blocked = exec.submit(new Runnable() { public void run() { try { 892 Thinking in Java Bruce Eckel

  // Semaphore prevents additional checkout, // so call is blocked: pool.checkOut(); } catch(InterruptedException e) { print(\"checkOut() Interrupted\"); } } }); TimeUnit.SECONDS.sleep(2); blocked.cancel(true); // Break out of blocked call print(\"Checking in objects in \" + list); for(Fat f : list) pool.checkIn(f); for(Fat f : list) pool.checkIn(f); // Second checkIn ignored exec.shutdown(); } } /* (Execute to see output) *///:~ In main( ), a Pool is created to hold Fat objects, and a set of CheckoutTasks begins exercising the Pool. Then the main( ) thread begins checking out Fat objects, and not checking them back in. Once it has checked out all the objects in the pool, no more checkouts will be allowed by the Semaphore. The run( ) method of blocked is thus blocked, and after two seconds the cancel( ) method is called to break out of the Future. Note that redundant checkins are ignored by the Pool. This example relies on the client of the Pool to be rigorous and to voluntarily check items back in, which is the simplest solution when it works. If you cannot always rely on this, Thinking in Patterns (at www.MindView.net) contains further explorations of ways to manage the objects that have been checked out of object pools. Exchanger An Exchanger is a barrier that swaps objects between two tasks. When the tasks enter the barrier, they have one object, and when they leave, they have the object that was formerly held by the other task. Exchangers are typically used when one task is creating objects that are expensive to produce and another task is consuming those objects; this way, more objects can be created at the same time as they are being consumed. To exercise the Exchanger class, we’ll create producer and consumer tasks which, via generics and Generators, will work with any kind of object, and then we’ll apply these to the Fat class. The ExchangerProducer and ExehangerConsumer use a List<T> as the object to be exchanged; each one contains an Exchanger for this List<T>. When you call the Exchanger.exchange( ) method, it blocks until the partner task calls its exchange( ) method, and when both exchange( ) methods have completed, the List<T> has been swapped: //: concurrency/ExchangerDemo.java import java.util.concurrent.*; import java.util.*; import net.mindview.util.*; class ExchangerProducer<T> implements Runnable { private Generator<T> generator; private Exchanger<List<T>> exchanger; private List<T> holder; ExchangerProducer(Exchanger<List<T>> exchg, Generator<T> gen, List<T> holder) { exchanger = exchg; Concurrency 893 

  generator = gen; this.holder = holder; } public void run() { try { while(!Thread.interrupted()) { for(int i = 0; i < ExchangerDemo.size; i++) holder.add(generator.next()); // Exchange full for empty: holder = exchanger.exchange(holder); } } catch(InterruptedException e) { // OK to terminate this way. } } } class ExchangerConsumer<T> implements Runnable { private Exchanger<List<T>> exchanger; private List<T> holder; private volatile T value; ExchangerConsumer(Exchanger<List<T>> ex, List<T> holder){ exchanger = ex; this.holder = holder; } public void run() { try { while(!Thread.interrupted()) { holder = exchanger.exchange(holder); for(T x : holder) { value = x; // Fetch out value holder.remove(x); // OK for CopyOnWriteArrayList } } } catch(InterruptedException e) { // OK to terminate this way. } System.out.println(\"Final value: \" + value); } } public class ExchangerDemo { static int size = 10; static int delay = 5; // Seconds public static void main(String[] args) throws Exception { if(args.length > 0) size = new Integer(args[0]); if(args.length > 1) delay = new Integer(args[1]); ExecutorService exec = Executors.newCachedThreadPool(); Exchanger<List<Fat>> xc = new Exchanger<List<Fat>>(); List<Fat> producerList = new CopyOnWriteArrayList<Fat>(), consumerList = new CopyOnWriteArrayList<Fat>(); exec.execute(new ExchangerProducer<Fat>(xc, BasicGenerator.create(Fat.class), producerList)); exec.execute( new ExchangerConsumer<Fat>(xc,consumerList)); TimeUnit.SECONDS.sleep(delay); exec.shutdownNow(); } } /* Output: (Sample) Final value: Fat id: 29999 894 Thinking in Java Bruce Eckel

  *///:~ In main( ), a single Exchanger is created for both tasks to use, and two CopyOnWriteArrayLists are created for swapping. This particular variant of List can tolerate the remove( ) method being called while the list is being traversed, without throwing a ConcurrentModificationException. The ExchangerProducer fills a List, then swaps the full list for the empty one that the ExchangerConsumer hands it. Because of the Exchanger, the filling of one list and consuming of the other list can happen simultaneously. Exercise 34: (1) Modify ExchangerDemo.java to use your own class instead of Fat.   Concurrency 895 

  Simulation One of the most interesting and exciting uses of concurrency is to create simulations. Using concurrency, each component of a simulation can be its own task, and this makes a simulation much easier to program. Many video games and CGI animations in movies are simulations, and HorseRace.java and GreenhouseScheduler.java, shown earlier, could also be considered simulations. Bank teller simulation This classic simulation can represent any situation where objects appear randomly and require a random amount of time to be served by a limited number of servers. It’s possible to build the simulation to determine the ideal number of servers. In this example, each bank customer requires a certain amount of service time, which is the number of time units that a teller must spend on the customer to serve that customer’s needs. The amount of service time will be different for each customer and will be determined randomly. In addition, you won’t know how many customers will be arriving in each interval, so this will also be determined randomly. //: concurrency/BankTellerSimulation.java // Using queues and multithreading. // {Args: 5} import java.util.concurrent.*; import java.util.*; // Read-only objects don’t require synchronization: class Customer { private final int serviceTime; public Customer(int tm) { serviceTime = tm; } public int getServiceTime() { return serviceTime; } public String toString() { return \"[\" + serviceTime + \"]\"; } } // Teach the customer line to display itself: class CustomerLine extends ArrayBlockingQueue<Customer> { public CustomerLine(int maxLineSize) { super(maxLineSize); } public String toString() { if(this.size() == 0) return \"[Empty]\"; StringBuilder result = new StringBuilder(); for(Customer customer : this) result.append(customer); return result.toString(); } } // Randomly add customers to a queue: class CustomerGenerator implements Runnable { private CustomerLine customers; private static Random rand = new Random(47); public CustomerGenerator(CustomerLine cq) { customers = cq; } public void run() { 896 Thinking in Java Bruce Eckel

  try { while(!Thread.interrupted()) { TimeUnit.MILLISECONDS.sleep(rand.nextInt(300)); customers.put(new Customer(rand.nextInt(1000))); } } catch(InterruptedException e) { System.out.println(\"CustomerGenerator interrupted\"); } System.out.println(\"CustomerGenerator terminating\"); } } class Teller implements Runnable, Comparable<Teller> { private static int counter = 0; private final int id = counter++; // Customers served during this shift: private int customersServed = 0; private CustomerLine customers; private boolean servingCustomerLine = true; public Teller(CustomerLine cq) { customers = cq; } public void run() { try { while(!Thread.interrupted()) { Customer customer = customers.take(); TimeUnit.MILLISECONDS.sleep( customer.getServiceTime()); synchronized(this) { customersServed++; while(!servingCustomerLine) wait(); } } } catch(InterruptedException e) { System.out.println(this + \"interrupted\"); } System.out.println(this + \"terminating\"); } public synchronized void doSomethingElse() { customersServed = 0; servingCustomerLine = false; } public synchronized void serveCustomerLine() { assert !servingCustomerLine:\"already serving: \" + this; servingCustomerLine = true; notifyAll(); } public String toString() { return \"Teller \" + id + \" \"; } public String shortString() { return \"T\" + id; } // Used by priority queue: public synchronized int compareTo(Teller other) { return customersServed < other.customersServed ? -1 : (customersServed == other.customersServed ? 0 : 1); } } class TellerManager implements Runnable { private ExecutorService exec; private CustomerLine customers; private PriorityQueue<Teller> workingTellers = new PriorityQueue<Teller>(); private Queue<Teller> tellersDoingOtherThings = new LinkedList<Teller>(); private int adjustmentPeriod; Concurrency 897 

  private static Random rand = new Random(47); public TellerManager(ExecutorService e, CustomerLine customers, int adjustmentPeriod) { exec = e; this.customers = customers; this.adjustmentPeriod = adjustmentPeriod; // Start with a single teller: Teller teller = new Teller(customers); exec.execute(teller); workingTellers.add(teller); } public void adjustTellerNumber() { // This is actually a control system. By adjusting // the numbers, you can reveal stability issues in // the control mechanism. // If line is too long, add another teller: if(customers.size() / workingTellers.size() > 2) { // If tellers are on break or doing // another job, bring one back: if(tellersDoingOtherThings.size() > 0) { Teller teller = tellersDoingOtherThings.remove(); teller.serveCustomerLine(); workingTellers.offer(teller); return; } // Else create (hire) a new teller Teller teller = new Teller(customers); exec.execute(teller); workingTellers.add(teller); return; } // If line is short enough, remove a teller: if(workingTellers.size() > 1 && customers.size() / workingTellers.size() < 2) reassignOneTeller(); // If there is no line, we only need one teller: if(customers.size() == 0) while(workingTellers.size() > 1) reassignOneTeller(); } // Give a teller a different job or a break: private void reassignOneTeller() { Teller teller = workingTellers.poll(); teller.doSomethingElse(); tellersDoingOtherThings.offer(teller); } public void run() { try { while(!Thread.interrupted()) { TimeUnit.MILLISECONDS.sleep(adjustmentPeriod); adjustTellerNumber(); System.out.print(customers + \" { \"); for(Teller teller : workingTellers) System.out.print(teller.shortString() + \" \"); System.out.println(\"}\"); } } catch(InterruptedException e) { System.out.println(this + \"interrupted\"); } System.out.println(this + \"terminating\"); } public String toString() { return \"TellerManager \"; } } 898 Thinking in Java Bruce Eckel

  public class BankTellerSimulation { static final int MAX_LINE_SIZE = 50; static final int ADJUSTMENT_PERIOD = 1000; public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); // If line is too long, customers will leave: CustomerLine customers = new CustomerLine(MAX_LINE_SIZE); exec.execute(new CustomerGenerator(customers)); // Manager will add and remove tellers as necessary: exec.execute(new TellerManager( exec, customers, ADJUSTMENT_PERIOD)); if(args.length > 0) // Optional argument TimeUnit.SECONDS.sleep(new Integer(args[0])); else { System.out.println(\"Press ‘Enter’ to quit\"); System.in.read(); } exec.shutdownNow(); } } /* Output: (Sample) [429][200][207] { T0 T1 } [861][258][140][322] { T0 T1 } [575][342][804][826][896][984] { T0 T1 T2 } [984][810][141][12][689][992][976][368][395][354] { T0 T1 T2 T3 } Teller 2 interrupted Teller 2 terminating Teller 1 interrupted Teller 1 terminating TellerManager interrupted TellerManager terminating Teller 3 interrupted Teller 3 terminating Teller 0 interrupted Teller 0 terminating CustomerGenerator interrupted CustomerGenerator terminating *///:~ The Customer objects are very simple, containing only a final int field. Because these objects never change, they are read-only objects and they do not require synchronization or the use of volatile. On top of that, each Teller task only removes one Customer at a time from the input queue, and works on that Customer until it is complete, so a Customer will only be accessed by one task at a time, anyway. CustomerLine represents a single line that the customers wait in before being served by a Teller. This is just an ArrayBlockingQueue that has a toString( ) that prints the results in the desired fashion. A CustomerGenerator is attached to a CustomerLine and puts Customers onto the queue at randomized intervals. A Teller takes Customers off of the CustomerLine and processes them one at a time, keeping track of the number of Customers it has served during that particular shift. It can be told to doSomethingElse( ) when there aren’t enough customers, and to serveCustomerLine( ) when lots of customers show up. To choose the next teller to put back on the line, the compareTo( ) method looks at the number of customers served so that a PriorityQueue can automatically put the least-worked teller at the forefront. Concurrency 899 

  The TellerManager is the hub of activity. It keeps track of all the tellers and what’s going on with the customers. One of the interesting things about this simulation is that it attempts to discover the optimum number of tellers for a given customer flow. You can see this in the adjustTellerNumber( ), which is a control system to add and remove tellers in a stable fashion. All control systems have stability issues; if they react too quickly to a change, they are unstable, and if they react too slowly, the system moves to one of its extremes. Exercise 35: (8) Modify BankTellerSimulation.java so that it represents Web clients making requests of a fixed number of servers. The goal is to determine the load that the group of servers can handle. The restaurant simulation This simulation fleshes out the simple Restaurant.java example shown earlier in this chapter by adding more simulation components, such as Orders and Plates, and it reuses the menu classes from the Enumerated Types chapter. It also introduces the Java SE5 SynchronousQueue, which is a blocking queue that has no internal capacity, so each put( ) must wait for a take( ), and vice versa. It’s as if you were handing an object to someone—there’s no table to put it on, so it only works if that person is holding a hand out, ready to receive the object. In this example, the SynchronousQueue represents the place setting in front of a diner, to enforce the idea that only one course can be served at a time. The rest of the classes and functionality of this example either follow from the structure of Restaurant.java or are intended to be a fairly direct mapping from the operations of an actual restaurant: //: concurrency/restaurant2/RestaurantWithQueues.java // {Args: 5} package concurrency.restaurant2; import enumerated.menu.*; import java.util.concurrent.*; import java.util.*; import static net.mindview.util.Print.*; // This is given to the waiter, who gives it to the chef: class Order { // (A data-transfer object) private static int counter = 0; private final int id = counter++; private final Customer customer; private final WaitPerson waitPerson; private final Food food; public Order(Customer cust, WaitPerson wp, Food f) { customer = cust; waitPerson = wp; food = f; } public Food item() { return food; } public Customer getCustomer() { return customer; } public WaitPerson getWaitPerson() { return waitPerson; } public String toString() { return \"Order: \" + id + \" item: \" + food + \" for: \" + customer + \" served by: \" + waitPerson; } } // This is what comes back from the chef: 900 Thinking in Java Bruce Eckel

  class Plate { private final Order order; private final Food food; public Plate(Order ord, Food f) { order = ord; food = f; } public Order getOrder() { return order; } public Food getFood() { return food; } public String toString() { return food.toString(); } } class Customer implements Runnable { private static int counter = 0; private final int id = counter++; private final WaitPerson waitPerson; // Only one course at a time can be received: private SynchronousQueue<Plate> placeSetting = new SynchronousQueue<Plate>(); public Customer(WaitPerson w) { waitPerson = w; } public void deliver(Plate p) throws InterruptedException { // Only blocks if customer is still // eating the previous course: placeSetting.put(p); } public void run() { for(Course course : Course.values()) { Food food = course.randomSelection(); try { waitPerson.placeOrder(this, food); // Blocks until course has been delivered: print(this + \"eating \" + placeSetting.take()); } catch(InterruptedException e) { print(this + \"waiting for \" + course + \" interrupted\"); break; } } print(this + \"finished meal, leaving\"); } public String toString() { return \"Customer \" + id + \" \"; } } class WaitPerson implements Runnable { private static int counter = 0; private final int id = counter++; private final Restaurant restaurant; BlockingQueue<Plate> filledOrders = new LinkedBlockingQueue<Plate>(); public WaitPerson(Restaurant rest) { restaurant = rest; } public void placeOrder(Customer cust, Food food) { try { // Shouldn’t actually block because this is // a LinkedBlockingQueue with no size limit: restaurant.orders.put(new Order(cust, this, food)); } catch(InterruptedException e) { print(this + \" placeOrder interrupted\"); } } public void run() { Concurrency 901 

  try { while(!Thread.interrupted()) { // Blocks until a course is ready Plate plate = filledOrders.take(); print(this + \"received \" + plate + \" delivering to \" + plate.getOrder().getCustomer()); plate.getOrder().getCustomer().deliver(plate); } } catch(InterruptedException e) { print(this + \" interrupted\"); } print(this + \" off duty\"); } public String toString() { return \"WaitPerson \" + id + \" \"; } } class Chef implements Runnable { private static int counter = 0; private final int id = counter++; private final Restaurant restaurant; private static Random rand = new Random(47); public Chef(Restaurant rest) { restaurant = rest; } public void run() { try { while(!Thread.interrupted()) { // Blocks until an order appears: Order order = restaurant.orders.take(); Food requestedItem = order.item(); // Time to prepare order: TimeUnit.MILLISECONDS.sleep(rand.nextInt(500)); Plate plate = new Plate(order, requestedItem); order.getWaitPerson().filledOrders.put(plate); } } catch(InterruptedException e) { print(this + \" interrupted\"); } print(this + \" off duty\"); } public String toString() { return \"Chef \" + id + \" \"; } } class Restaurant implements Runnable { private List<WaitPerson> waitPersons = new ArrayList<WaitPerson>(); private List<Chef> chefs = new ArrayList<Chef>(); private ExecutorService exec; private static Random rand = new Random(47); BlockingQueue<Order> orders = new LinkedBlockingQueue<Order>(); public Restaurant(ExecutorService e, int nWaitPersons, int nChefs) { exec = e; for(int i = 0; i < nWaitPersons; i++) { WaitPerson waitPerson = new WaitPerson(this); waitPersons.add(waitPerson); exec.execute(waitPerson); } for(int i = 0; i < nChefs; i++) { Chef chef = new Chef(this); chefs.add(chef); 902 Thinking in Java Bruce Eckel

  exec.execute(chef); } } public void run() { try { while(!Thread.interrupted()) { // A new customer arrives; assign a WaitPerson: WaitPerson wp = waitPersons.get( rand.nextInt(waitPersons.size())); Customer c = new Customer(wp); exec.execute(c); TimeUnit.MILLISECONDS.sleep(100); } } catch(InterruptedException e) { print(\"Restaurant interrupted\"); } print(\"Restaurant closing\"); } } public class RestaurantWithQueues { public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); Restaurant restaurant = new Restaurant(exec, 5, 2); exec.execute(restaurant); if(args.length > 0) // Optional argument TimeUnit.SECONDS.sleep(new Integer(args[0])); else { print(\"Press ‘Enter’ to quit\"); System.in.read(); } exec.shutdownNow(); } } /* Output: (Sample) WaitPerson 0 received SPRING_ROLLS delivering to Customer 1 Customer 1 eating SPRING_ROLLS WaitPerson 3 received SPRING_ROLLS delivering to Customer 0 Customer 0 eating SPRING_ROLLS WaitPerson 0 received BURRITO delivering to Customer 1 Customer 1 eating BURRITO WaitPerson 3 received SPRING_ROLLS delivering to Customer 2 Customer 2 eating SPRING_ROLLS WaitPerson 1 received SOUP delivering to Customer 3 Customer 3 eating SOUP WaitPerson 3 received VINDALOO delivering to Customer 0 Customer 0 eating VINDALOO WaitPerson 0 received FRUIT delivering to Customer 1 ... *///:~ One very important thing to observe about this example is the management of complexity using queues to communicate between tasks. This single technique greatly simplifies the process of concurrent programming by inverting the control: The tasks do not directly interfere with each other. Instead, the tasks send objects to each other via queues. The receiving task handles the object, treating it as a message rather than having the message inflicted upon it. If you follow this technique whenever you can, you stand a much better chance of building robust concurrent systems. Exercise 36: (10) Modify RestaurantWithQueues.java so there’s one OrderTicket object per table. Change order to orderTicket, and add a Table class, with multiple Customers per table. Concurrency 903 

  Distributing work Here’s a simulation example that brings together many of the concepts in this chapter. Consider a hypothetical robotic assembly line for automobiles. Each Car will be built in several stages, starting with chassis creation, followed by the attachment of the engine, drive train, and wheels. //: concurrency/CarBuilder.java // A complex example of tasks working together. import java.util.concurrent.*; import java.util.*; import static net.mindview.util.Print.*; class Car { private final int id; private boolean engine = false, driveTrain = false, wheels = false; public Car(int idn) { id = idn; } // Empty Car object: public Car() { id = -1; } public synchronized int getId() { return id; } public synchronized void addEngine() { engine = true; } public synchronized void addDriveTrain() { driveTrain = true; } public synchronized void addWheels() { wheels = true; } public synchronized String toString() { return \"Car \" + id + \" [\" + \" engine: \" + engine + \" driveTrain: \" + driveTrain + \" wheels: \" + wheels + \" ]\"; } } class CarQueue extends LinkedBlockingQueue<Car> {} class ChassisBuilder implements Runnable { private CarQueue carQueue; private int counter = 0; public ChassisBuilder(CarQueue cq) { carQueue = cq; } public void run() { try { while(!Thread.interrupted()) { TimeUnit.MILLISECONDS.sleep(500); // Make chassis: Car c = new Car(counter++); print(\"ChassisBuilder created \" + c); // Insert into queue carQueue.put(c); } } catch(InterruptedException e) { print(\"Interrupted: ChassisBuilder\"); } print(\"ChassisBuilder off\"); } } class Assembler implements Runnable { private CarQueue chassisQueue, finishingQueue; private Car car; private CyclicBarrier barrier = new CyclicBarrier(4); private RobotPool robotPool; 904 Thinking in Java Bruce Eckel

  public Assembler(CarQueue cq, CarQueue fq, RobotPool rp){ chassisQueue = cq; finishingQueue = fq; robotPool = rp; } public Car car() { return car; } public CyclicBarrier barrier() { return barrier; } public void run() { try { while(!Thread.interrupted()) { // Blocks until chassis is available: car = chassisQueue.take(); // Hire robots to perform work: robotPool.hire(EngineRobot.class, this); robotPool.hire(DriveTrainRobot.class, this); robotPool.hire(WheelRobot.class, this); barrier.await(); // Until the robots finish // Put car into finishingQueue for further work finishingQueue.put(car); } } catch(InterruptedException e) { print(\"Exiting Assembler via interrupt\"); } catch(BrokenBarrierException e) { // This one we want to know about throw new RuntimeException(e); } print(\"Assembler off\"); } } class Reporter implements Runnable { private CarQueue carQueue; public Reporter(CarQueue cq) { carQueue = cq; } public void run() { try { while(!Thread.interrupted()) { print(carQueue.take()); } } catch(InterruptedException e) { print(\"Exiting Reporter via interrupt\"); } print(\"Reporter off\"); } } abstract class Robot implements Runnable { private RobotPool pool; public Robot(RobotPool p) { pool = p; } protected Assembler assembler; public Robot assignAssembler(Assembler assembler) { this.assembler = assembler; return this; } private boolean engage = false; public synchronized void engage() { engage = true; notifyAll(); } // The part of run() that’s different for each robot: abstract protected void performService(); public void run() { try { powerDown(); // Wait until needed Concurrency 905 

  while(!Thread.interrupted()) { performService(); assembler.barrier().await(); // Synchronize // We’re done with that job... powerDown(); } } catch(InterruptedException e) { print(\"Exiting \" + this + \" via interrupt\"); } catch(BrokenBarrierException e) { // This one we want to know about throw new RuntimeException(e); } print(this + \" off\"); } private synchronized void powerDown() throws InterruptedException { engage = false; assembler = null; // Disconnect from the Assembler // Put ourselves back in the available pool: pool.release(this); while(engage == false) // Power down wait(); } public String toString() { return getClass().getName(); } } class EngineRobot extends Robot { public EngineRobot(RobotPool pool) { super(pool); } protected void performService() { print(this + \" installing engine\"); assembler.car().addEngine(); } } class DriveTrainRobot extends Robot { public DriveTrainRobot(RobotPool pool) { super(pool); } protected void performService() { print(this + \" installing DriveTrain\"); assembler.car().addDriveTrain(); } } class WheelRobot extends Robot { public WheelRobot(RobotPool pool) { super(pool); } protected void performService() { print(this + \" installing Wheels\"); assembler.car().addWheels(); } } class RobotPool { // Quietly prevents identical entries: private Set<Robot> pool = new HashSet<Robot>(); public synchronized void add(Robot r) { pool.add(r); notifyAll(); } public synchronized void hire(Class<? extends Robot> robotType, Assembler d) throws InterruptedException { for(Robot r : pool) if(r.getClass().equals(robotType)) { pool.remove(r); 906 Thinking in Java Bruce Eckel

  r.assignAssembler(d); r.engage(); // Power it up to do the task return; } wait(); // None available hire(robotType, d); // Try again, recursively } public synchronized void release(Robot r) { add(r); } } public class CarBuilder { public static void main(String[] args) throws Exception { CarQueue chassisQueue = new CarQueue(), finishingQueue = new CarQueue(); ExecutorService exec = Executors.newCachedThreadPool(); RobotPool robotPool = new RobotPool(); exec.execute(new EngineRobot(robotPool)); exec.execute(new DriveTrainRobot(robotPool)); exec.execute(new WheelRobot(robotPool)); exec.execute(new Assembler( chassisQueue, finishingQueue, robotPool)); exec.execute(new Reporter(finishingQueue)); // Start everything running by producing chassis: exec.execute(new ChassisBuilder(chassisQueue)); TimeUnit.SECONDS.sleep(7); exec.shutdownNow(); } } /* (Execute to see output) *///:~ The Cars are transported from one place to another via a CarQueue, which is a type of LinkedBlockingQueue. A ChassisBuilder creates an unadorned Car and places it on a CarQueue. The Assembler takes the Car off a CarQueue and hires Robots to work on it. A CyclicBarrier allows the Assembler to wait until all the Robots are finished, at which time it puts the Car onto the outgoing CarQueue to be transported to the next operation. The consumer of the final CarQueue is a Reporter object, which just prints the Car to show that the tasks have been properly completed. The Robots are managed in a pool, and when work needs to be done, the appropriate Robot is hired from the pool. After the work is completed, the Robot returns to the pool. In main( ), all the necessary objects are created and the tasks are initialized, with the ChassisBuilder begun last to start the process. (However, because of the behavior of the LinkedBlockingQueue, it wouldn’t matter if it were started first.) Note that this program follows all the guidelines regarding object and task lifetime presented in this chapter, and so the shutdown process is safe. You’ll notice that Car has all of its methods synchronized. As it turns out, in this example this is redundant, because within the factory the Cars move through the queues and only one task can work on a car at a time. Basically, the queues force serialized access to the Cars. But this is exactly the kind of trap you can fall into—you can say \"Let’s try to optimize by not synchronizing the Car class because it doesn’t look like it needs it here.\" But later, when this system is connected to another which does need the Car to be synchronized, it breaks. Brian Goetz comments: It’s much easier to say, \"Car might be used from multiple threads, so let’s make it thread-safe in the obvious way.\" The way I characterize this approach is: At public parks, you will find guard rails where there is a steep drop, and you may find signs that say, \"Don’t lean on the guard rail.\" Of course, the real purpose of this rule is not to Concurrency 907 

  prevent you from leaning on the rail—it is to prevent you from falling off the cliff. But \"Don’t lean on the rail\" is a much easier rule to follow than \"Don’t fall off the cliff\" Exercise 37: (2) Modify CarBuilder.java to add another stage to the car-building process, whereby you add the exhaust system, body, and fenders. As with the second stage, assume these processes can be performed simultaneously by robots. Exercise 38: (3) Using the approach in CarBuilder.java, model the house-building story that was given in this chapter.   908 Thinking in Java Bruce Eckel

  Performance tuning A significant number of classes in Java SEs’s java.util.concurrent library exist to provide performance improvements. When you peruse the concurrent library, it can be difficult to discern which classes are intended for regular use (such as BlockingQueues) and which ones are only for improving performance. In this section we will look at some of the issues and classes surrounding performance tuning. Comparing mutex technologies Now that Java includes the old synchronized keyword along with the new Java SE5 Lock and Atomic classes, it is interesting to compare the different approaches so that we can understand more about the value of each and where to use them. The naive approach is to try a simple test on each approach, like this: //: concurrency/SimpleMicroBenchmark.java // The dangers of microbenchmarking. import java.util.concurrent.locks.*; abstract class Incrementable { protected long counter = 0; public abstract void increment(); } class SynchronizingTest extends Incrementable { public synchronized void increment() { ++counter; } } class LockingTest extends Incrementable { private Lock lock = new ReentrantLock(); public void increment() { lock.lock(); try { ++counter; } finally { lock.unlock(); } } } public class SimpleMicroBenchmark { static long test(Incrementable incr) { long start = System.nanoTime(); for(long i = 0; i < 10000000L; i++) incr.increment(); return System.nanoTime() - start; } public static void main(String[] args) { long synchTime = test(new SynchronizingTest()); long lockTime = test(new LockingTest()); System.out.printf(\"synchronized: %1$10d\n\", synchTime); System.out.printf(\"Lock: %1$10d\n\", lockTime); System.out.printf(\"Lock/synchronized = %1$.3f\", (double)lockTime/(double)synchTime); } } /* Output: (75% match) synchronized: 244919117 Lock: 939098964 Concurrency 909 

  Lock/synchronized = 3.834 *///:~ You can see from the output that calls to the synchronized method appear to be faster than using a ReentrantLock. What’s happened here? 23 This example demonstrates the dangers of so-called \"microbenchmarking.\" This term generally refers to performance testing a feature in isolation, out of context. Of course, you must still write tests to verify assertions like \"Lock is much faster than synchronized.\" But you need an awareness of what’s really happening during compilation and run time when you write these kinds of tests. There are a number of problems with the above example. First and foremost, we will only see the true performance difference if the mutexes are under contention, so there must be multiple tasks trying to access the mutexed code sections. In the above example, each mutex is tested by the single main( ) thread, in isolation. Secondly, it’s possible that the compiler can perform special optimizations when it sees the synchronized keyword, and perhaps even notice that this program is single-threaded. The compiler might even identify that the counter is simply being incremented a fixed number of times, and just precalculate the result. Different compilers and runtime systems vary, so it’s hard to know exactly what will happen, but we need to prevent the possibility that the compiler can predict the outcome. To create a valid test, we must make the program more complex. First we need multiple tasks, and not just tasks that change internal values, but also tasks that read those values (otherwise the optimizer may recognize that the values are never being used). In addition, the calculation must be complex and unpredictable enough that the compiler will have no chance to perform aggressive optimizations. This will be accomplished by pre-loading a large array of random ints (pre-loading to reduce the impact of calls to Random.nextInt( ) on the main loops) and using those values in a summation: //: concurrency/SynchronizationComparisons.java // Comparing the performance of explicit Locks // and Atomics versus the synchronized keyword. import java.util.concurrent.*; import java.util.concurrent.atomic.*; import java.util.concurrent.locks.*; import java.util.*; import static net.mindview.util.Print.*; abstract class Accumulator { public static long cycles = 50000L; // Number of Modifiers and Readers during each test: private static final int N = 4; public static ExecutorService exec = Executors.newFixedThreadPool(N*2); private static CyclicBarrier barrier = new CyclicBarrier(N*2 + 1); protected volatile int index = 0; protected volatile long value = 0; protected long duration = 0; protected String id = \"error\"; protected final static int SIZE = 100000; protected static int[] preLoaded = new int[SIZE]; static { // Load the array of random numbers:                                                              23 Brian Goetz was very helpful in explaining these issues to me. See his article at www- 128.ibm.com/developerworks/library/j-jtp12214 for more about performance measurement. 910 Thinking in Java Bruce Eckel

  Random rand = new Random(47); for(int i = 0; i < SIZE; i++) preLoaded[i] = rand.nextInt(); } public abstract void accumulate(); public abstract long read(); private class Modifier implements Runnable { public void run() { for(long i = 0; i < cycles; i++) accumulate(); try { barrier.await(); } catch(Exception e) { throw new RuntimeException(e); } } } private class Reader implements Runnable { private volatile long value; public void run() { for(long i = 0; i < cycles; i++) value = read(); try { barrier.await(); } catch(Exception e) { throw new RuntimeException(e); } } } public void timedTest() { long start = System.nanoTime(); for(int i = 0; i < N; i++) { exec.execute(new Modifier()); exec.execute(new Reader()); } try { barrier.await(); } catch(Exception e) { throw new RuntimeException(e); } duration = System.nanoTime() - start; printf(\"%-13s: %13d\n\", id, duration); } public static void report(Accumulator acc1, Accumulator acc2) { printf(\"%-22s: %.2f\n\", acc1.id + \"/\" + acc2.id, (double)acc1.duration/(double)acc2.duration); } } class BaseLine extends Accumulator { { id = \"BaseLine\"; } public void accumulate() { value += preLoaded[index++]; if(index >= SIZE) index = 0; } public long read() { return value; } } class SynchronizedTest extends Accumulator { { id = \"synchronized\"; } public synchronized void accumulate() { value += preLoaded[index++]; Concurrency 911 

  if(index >= SIZE) index = 0; } public synchronized long read() { return value; } } class LockTest extends Accumulator { { id = \"Lock\"; } private Lock lock = new ReentrantLock(); public void accumulate() { lock.lock(); try { value += preLoaded[index++]; if(index >= SIZE) index = 0; } finally { lock.unlock(); } } public long read() { lock.lock(); try { return value; } finally { lock.unlock(); } } } class AtomicTest extends Accumulator { { id = \"Atomic\"; } private AtomicInteger index = new AtomicInteger(0); private AtomicLong value = new AtomicLong(0); public void accumulate() { // Oops! Relying on more than one Atomic at // a time doesn’t work. But it still gives us // a performance indicator: int i = index.getAndIncrement(); value.getAndAdd(preLoaded[i]); if(++i >= SIZE) index.set(0); } public long read() { return value.get(); } } public class SynchronizationComparisons { static BaseLine baseLine = new BaseLine(); static SynchronizedTest synch = new SynchronizedTest(); static LockTest lock = new LockTest(); static AtomicTest atomic = new AtomicTest(); static void test() { print(\"============================\"); printf(\"%-12s : %13d\n\", \"Cycles\", Accumulator.cycles); baseLine.timedTest(); synch.timedTest(); lock.timedTest(); atomic.timedTest(); Accumulator.report(synch, baseLine); Accumulator.report(lock, baseLine); Accumulator.report(atomic, baseLine); Accumulator.report(synch, lock); Accumulator.report(synch, atomic); Accumulator.report(lock, atomic); 912 Thinking in Java Bruce Eckel

  } public static void main(String[] args) { int iterations = 5; // Default if(args.length > 0) // Optionally change iterations iterations = new Integer(args[0]); // The first time fills the thread pool: print(\"Warmup\"); baseLine.timedTest(); // Now the initial test doesn’t include the cost // of starting the threads for the first time. // Produce multiple data points: for(int i = 0; i < iterations; i++) { test(); Accumulator.cycles *= 2; } Accumulator.exec.shutdown(); } } /* Output: (Sample) Warmup BaseLine : 34237033 ============================ Cycles : 50000 BaseLine : 20966632 synchronized : 24326555 Lock : 53669950 Atomic : 30552487 synchronized/BaseLine : 1.16 Lock/BaseLine : 2.56 Atomic/BaseLine : 1.46 synchronized/Lock : 0.45 synchronized/Atomic : 0.79 Lock/Atomic : 1.76 ============================ Cycles : 100000 BaseLine : 41512818 synchronized : 43843003 Lock : 87430386 Atomic : 51892350 synchronized/BaseLine : 1.06 Lock/BaseLine : 2.11 Atomic/BaseLine : 1.25 synchronized/Lock : 0.50 synchronized/Atomic : 0.84 Lock/Atomic : 1.68 ============================ Cycles : 200000 BaseLine : 80176670 synchronized : 5455046661 Lock : 177686829 Atomic : 101789194 synchronized/BaseLine : 68.04 Lock/BaseLine : 2.22 Atomic/BaseLine : 1.27 synchronized/Lock : 30.70 synchronized/Atomic : 53.59 Lock/Atomic : 1.75 ============================ Cycles : 400000 BaseLine : 160383513 synchronized : 780052493 Lock : 362187652 Atomic : 202030984 synchronized/BaseLine : 4.86 Concurrency 913 

  Lock/BaseLine : 2.26 Atomic/BaseLine : 1.26 synchronized/Lock : 2.15 synchronized/Atomic : 3.86 Lock/Atomic : 1.79 ============================ Cycles : 800000 BaseLine : 322064955 synchronized : 336155014 Lock : 704615531 Atomic : 393231542 synchronized/BaseLine : 1.04 Lock/BaseLine : 2.19 Atomic/BaseLine : 1.22 synchronized/Lock : 0.47 synchronized/Atomic : 0.85 Lock/Atomic : 1.79 ============================ Cycles : 1600000 BaseLine : 650004120 synchronized : 52235762925 Lock : 1419602771 Atomic : 796950171 synchronized/BaseLine : 80.36 Lock/BaseLine : 2.18 Atomic/BaseLine : 1.23 synchronized/Lock : 36.80 synchronized/Atomic : 65.54 Lock/Atomic : 1.78 ============================ Cycles : 3200000 BaseLine : 1285664519 synchronized : 96336767661 Lock : 2846988654 Atomic : 1590545726 synchronized/BaseLine : 74.93 Lock/BaseLine : 2.21 Atomic/BaseLine : 1.24 synchronized/Lock : 33.84 synchronized/Atomic : 60.57 Lock/Atomic : 1.79 *///:~ 24 This program uses the Template Method design pattern to put all the common code in the base class and isolate all the varying code in the derivedclass implementations of accumulate( ) and read( ). In each of the derived classes SynchronizedTest, LockTest, and AtomicTest, you can see how accumulate( ) and read( ) express different ways of implementing mutual exclusion. In this program, tasks are executed via a FixedThreadPool in an attempt to keep all the thread creation at the beginning, and prevent any extra cost during the tests. Just to make sure, the initial test is duplicated and the first result is discarded because it includes the initial thread creation. A CyclicBarrier is necessary because we want to make sure all the tasks have completed before declaring each test complete. A static clause is used to pre-load the array of random numbers, before any tests begin. This way, if there is any overhead to generating random numbers, we won’t see it during the test.                                                              24 See Thinking in Patterns at www.MindView.net. 914 Thinking in Java Bruce Eckel

  Each time accumulate( ) is called, it moves to the next place in the array preLoaded (wrapping to the beginning of the array) and adds another randomly generated number to value. The multiple Modifier and Reader tasks provide contention on the Accumulator object. Notice that in AtomicTest, I observe that the situation is too complex to try to use Atomic objects—basically, if more than one Atomic object is involved, you will probably be forced to give up and use more conventional mutexes (the JDK documentation specifically states that using Atomic objects only works when the critical updates for an object are confined to a single variable). However, the test is left in place so that you can still get a feel for the performance benefit of Atomic objects. In main( ), the test is run repeatedly and you can decide to ask for more than five repetitions (the default). For each repetition, the number of test cycles is doubled, so you can see how the different mutexes behave when running for longer and longer times. As you can see from the output, the results are rather surprising. For the first four iterations, the synchronized keyword seems to be more efficient than using a Lock or an Atomic. But suddenly, a threshold is crossed and synchronized seems to become quite inefficient, while Lock and Atomic seem to roughly maintain their proportion to the BaseLine test, and therefore become much more efficient than synchronized. Keep in mind that this program only gives an indication of the differences between the various mutex approaches, and the output above only indicates these differences on my particular machine under my particular circumstances. As you can see if you experiment with it, there can be significant shifts in behavior when different numbers of threads are used and when the program is run for longer periods of time. Some hotspot runtime optimizations are not invoked until a program has been running for several minutes, and in the case of server programs, several hours. That said, it is fairly clear that using Lock is usually significantly more efficient than using synchronized, and it also appears that the overhead of synchronized varies widely, while Locks are relatively consistent. Does this mean you should never use the synchronized keyword? There are two factors to consider: First, in SynchronizationComparisons.java, the bodies of the mutexed methods are extremely small. In general, this is a good practice—only mutex the sections that you absolutely must. However, in practice the mutexed sections may be larger than those in the above example, and so the percentage of time in the body will probably be significantly bigger than the overhead of entering and exiting the mutex, and could overwhelm any benefit of speeding up the mutex. Of course, the only way to know is— when you’re tuning for performance, no sooner—to try the different approaches and see what impact they have. Second, it’s clear from reading the code in this chapter that the synchronized keyword produces much more readable code than the lock try/finally-unlock idiom that Locks require, and that’s why this chapter primarily uses the synchronized keyword. As I’ve stated elsewhere in this book, code is read much more than it is written—when programming, it is more important to communicate with other humans than it is to communicate with the computer—and so readability of code is critical. As a result, it makes sense to start with the synchronized keyword and only change to Lock objects when you are tuning for performance. Finally, it’s nice when you can use the Atomic classes in your concurrent program, but be aware that, as we saw in SynchronizationComparisons.java, Atomic objects are only useful in very simple cases, generally when you only have one Atomic object that’s being modified and when that object is independent from all other objects. It’s safer to start with more traditional mutexing approaches and only attempt to change to Atomic later, if performance requirements dictate. Concurrency 915 

  Lock-free containers As emphasized in the Holding Your Objects chapter, containers are a fundamental tool in all programming, and this includes concurrent programming. For this reason, early containers like Vector and Hashtable had many synchronized methods, which caused unacceptable overhead when they were not being used in multithreaded applications. In Java 1.2, the new containers library was unsynchronized, and the Collections class was given various static \"synchronized\" decoration methods to synchronize the different types of containers. Although this was an improvement because it gave you a choice about whether you use synchronization with your container, the overhead is still based on synchronized locking. Java SE5 has added new containers specifically to increase thread-safe performance, using clever techniques to eliminate locking. The general strategy behind these lock-free containers is this: Modifications to the containers can happen at the same time that reads are occurring, as long as the readers can only see the results of completed modifications. A modification is performed on a separate copy of a portion of the data structure (or sometimes a copy of the whole thing), and this copy is invisible during the modification process. Only when the modification is complete is the modified structure atomically swapped with the \"main\" data structure, and after that readers will see the modification. In CopyOnWriteArrayList, a write will cause a copy of the entire underlying array to be created. The original array is left in place so that reads can safely occur while the copied array is being modified. When the modification is complete, an atomic operation swaps the new array in so that new reads will see the new information. One of the benefits of CopyOnWriteArrayList is that it does not throw ConcurrentModificationException when multiple iterators are traversing and modifying the list, so you don’t have to write special code to protect against such exceptions, as you’ve had to do in the past. CopyOnWriteArraySet uses CopyOnWriteArrayList to achieve its lock-free behavior. ConcurrentHashMap and ConcurrentLinkedQueue use similar techniques to allow concurrent reads and writes, but only portions of the container are copied and modified rather than the entire container. However, readers will still not see any modifications before they are complete. ConcurrentHashMap doesn’t throw ConcurrentModificationExceptions. Performance issues As long as you are primarily reading from a lock-free container, it will be much faster than its synchronized counterpart because the overhead of acquiring and releasing locks is eliminated. This is still true for a small number of writes to a lock-free container, but it would be interesting to get an idea of what \"small\" means. This section will produce a rough idea of the performance differences of these containers under different conditions. I’ll start with a generic framework for performing tests on any type of container, including Maps. The generic parameter C represents the container type: //: concurrency/Tester.java // Framework to test performance of concurrency containers. import java.util.concurrent.*; import net.mindview.util.*; public abstract class Tester<C> { static int testReps = 10; static int testCycles = 1000; static int containerSize = 1000; 916 Thinking in Java Bruce Eckel

  abstract C containerInitializer(); abstract void startReadersAndWriters(); C testContainer; String testId; int nReaders; int nWriters; volatile long readResult = 0; volatile long readTime = 0; volatile long writeTime = 0; CountDownLatch endLatch; static ExecutorService exec = Executors.newCachedThreadPool(); Integer[] writeData; Tester(String testId, int nReaders, int nWriters) { this.testId = testId + \" \" + nReaders + \"r \" + nWriters + \"w\"; this.nReaders = nReaders; this.nWriters = nWriters; writeData = Generated.array(Integer.class, new RandomGenerator.Integer(), containerSize); for(int i = 0; i < testReps; i++) { runTest(); readTime = 0; writeTime = 0; } } void runTest() { endLatch = new CountDownLatch(nReaders + nWriters); testContainer = containerInitializer(); startReadersAndWriters(); try { endLatch.await(); } catch(InterruptedException ex) { System.out.println(\"endLatch interrupted\"); } System.out.printf(\"%-27s %14d %14d\n\", testId, readTime, writeTime); if(readTime != 0 && writeTime != 0) System.out.printf(\"%-27s %14d\n\", \"readTime + writeTime =\", readTime + writeTime); } abstract class TestTask implements Runnable { abstract void test(); abstract void putResults(); long duration; public void run() { long startTime = System.nanoTime(); test(); duration = System.nanoTime() - startTime; synchronized(Tester.this) { putResults(); } endLatch.countDown(); } } public static void initMain(String[] args) { if(args.length > 0) testReps = new Integer(args[0]); if(args.length > 1) testCycles = new Integer(args[1]); if(args.length > 2) containerSize = new Integer(args[2]); System.out.printf(\"%-27s %14s %14s\n\", Concurrency 917 

  \"Type\", \"Read time\", \"Write time\"); } } ///:~ The abstract method containerInitializer( ) returns the initialized container to be tested, which is stored in the field testContainer. The other abstract method, startReadersAndWriters( ), starts the reader and writer tasks that will read and modify the container under test. Different tests are run with varying number of readers and writers to see the effects of lock contention (for the synchronized containers) and writes (for the lock-free containers). The constructor is given various information about the test (the argument identifiers should be self-explanatory), then it calls the runTest( ) method repetitions times. runTest( ) creates a CountDownLatch (so the test can know when all the tasks are complete), initializes the container, then calls startReadersAndWriters( ) and waits until they all complete. Each \"Reader\" or \"Writer\" class is based on TestTask, which measures the duration of its abstract test( ) method, then calls putResults( ) inside a synchronized block to store the results. To use this framework (in which you’ll recognize the Template Method design pattern), we must inherit from Tester for the particular container type we wish to test, and provide appropriate Reader and Writer classes: //: concurrency/ListComparisons.java // {Args: 1 10 10} (Fast verification check during build) // Rough comparison of thread-safe List performance. import java.util.concurrent.*; import java.util.*; import net.mindview.util.*; abstract class ListTest extends Tester<List<Integer>> { ListTest(String testId, int nReaders, int nWriters) { super(testId, nReaders, nWriters); } class Reader extends TestTask { long result = 0; void test() { for(long i = 0; i < testCycles; i++) for(int index = 0; index < containerSize; index++) result += testContainer.get(index); } void putResults() { readResult += result; readTime += duration; } } class Writer extends TestTask { void test() { for(long i = 0; i < testCycles; i++) for(int index = 0; index < containerSize; index++) testContainer.set(index, writeData[index]); } void putResults() { writeTime += duration; } } void startReadersAndWriters() { for(int i = 0; i < nReaders; i++) exec.execute(new Reader()); 918 Thinking in Java Bruce Eckel

  for(int i = 0; i < nWriters; i++) exec.execute(new Writer()); } } class SynchronizedArrayListTest extends ListTest { List<Integer> containerInitializer() { return Collections.synchronizedList( new ArrayList<Integer>( new CountingIntegerList(containerSize))); } SynchronizedArrayListTest(int nReaders, int nWriters) { super(\"Synched ArrayList\", nReaders, nWriters); } } class CopyOnWriteArrayListTest extends ListTest { List<Integer> containerInitializer() { return new CopyOnWriteArrayList<Integer>( new CountingIntegerList(containerSize)); } CopyOnWriteArrayListTest(int nReaders, int nWriters) { super(\"CopyOnWriteArrayList\", nReaders, nWriters); } } public class ListComparisons { public static void main(String[] args) { Tester.initMain(args); new SynchronizedArrayListTest(10, 0); new SynchronizedArrayListTest(9, 1); new SynchronizedArrayListTest(5, 5); new CopyOnWriteArrayListTest(10, 0); new CopyOnWriteArrayListTest(9, 1); new CopyOnWriteArrayListTest(5, 5); Tester.exec.shutdown(); } } /* Output: (Sample) Type Read time Write time Synched ArrayList 10r 0w 232158294700 0 Synched ArrayList 9r 1w 198947618203 24918613399 readTime + writeTime = 223866231602 Synched ArrayList 5r 5w 117367305062 132176613508 readTime + writeTime = 249543918570 CopyOnWriteArrayList 10r 0w 758386889 0 CopyOnWriteArrayList 9r 1w 741305671 136145237 readTime + writeTime = 877450908 CopyOnWriteArrayList 5r 5w 212763075 67967464300 readTime + writeTime = 68180227375 *///:~ In ListTest, the Reader and Writer classes perform the specific actions for a List<Integer>. In Reader.putResults( ), the duration is stored but so is the result, to prevent the calculations from being optimized away. startReadersAndWriters( ) is then defined to create and execute the specific Readers and Writers. Once ListTest is created, it must be further inherited to override containerInitializer( ) to create and initialize the specific test containers. In main( ), you can see variations on the tests with different numbers of readers and writers. You can change the test variables using command-line arguments because of the call to Tester.initMain(args). Concurrency 919 

  The default behavior is to run each test 10 times; this helps stabilize the output, which can 25 change because of JVM activities like hotspot optimization and garbage collection. The sample output that you see has been edited to show only the last iteration from each test. From the output, you can see that a synchronized ArrayList has roughly the same performance regardless of the number of readers and writers—readers contend with other readers for locks in the same way that writers do. The CopyOnWriteArrayList, however, is dramatically faster when there are no writers, and is still significantly faster when there are five writers. It would appear that you can be fairly liberal with the use of CopyOnWriteArrayList; the impact of writing to the list does not appear to overtake the impact of synchronizing the entire list for a while. Of course, you must try the two different approaches in your specific application to know for sure which one is best. Again, note that this isn’t close to being a good benchmark for absolute numbers, and your numbers will almost certainly be different. The goal is just to give you an idea of the relative behaviors of the two types of container. Since CopyOnWriteArraySet uses CopyOnWriteArrayList, its behavior will be similar and it doesn’t need a separate test here. Comparing Map implementations We can use the same framework to get a rough idea of the performance of a synchronized HashMap compared to a ConcurrentHashMap: //: concurrency/MapComparisons.java // {Args: 1 10 10} (Fast verification check during build) // Rough comparison of thread-safe Map performance. import java.util.concurrent.*; import java.util.*; import net.mindview.util.*; abstract class MapTest extends Tester<Map<Integer,Integer>> { MapTest(String testId, int nReaders, int nWriters) { super(testId, nReaders, nWriters); } class Reader extends TestTask { long result = 0; void test() { for(long i = 0; i < testCycles; i++) for(int index = 0; index < containerSize; index++) result += testContainer.get(index); } void putResults() { readResult += result; readTime += duration; } } class Writer extends TestTask { void test() { for(long i = 0; i < testCycles; i++) for(int index = 0; index < containerSize; index++) testContainer.put(index, writeData[index]); } void putResults() { writeTime += duration; }                                                              25 For an introduction to benchmarking under the influence of Java’s dynamic compilation, see www- 128.ibm.com/developerworks/library/j-jtp12214. 920 Thinking in Java Bruce Eckel

  } void startReadersAndWriters() { for(int i = 0; i < nReaders; i++) exec.execute(new Reader()); for(int i = 0; i < nWriters; i++) exec.execute(new Writer()); } } class SynchronizedHashMapTest extends MapTest { Map<Integer,Integer> containerInitializer() { return Collections.synchronizedMap( new HashMap<Integer,Integer>( MapData.map( new CountingGenerator.Integer(), new CountingGenerator.Integer(), containerSize))); } SynchronizedHashMapTest(int nReaders, int nWriters) { super(\"Synched HashMap\", nReaders, nWriters); } } class ConcurrentHashMapTest extends MapTest { Map<Integer,Integer> containerInitializer() { return new ConcurrentHashMap<Integer,Integer>( MapData.map( new CountingGenerator.Integer(), new CountingGenerator.Integer(), containerSize)); } ConcurrentHashMapTest(int nReaders, int nWriters) { super(\"ConcurrentHashMap\", nReaders, nWriters); } } public class MapComparisons { public static void main(String[] args) { Tester.initMain(args); new SynchronizedHashMapTest(10, 0); new SynchronizedHashMapTest(9, 1); new SynchronizedHashMapTest(5, 5); new ConcurrentHashMapTest(10, 0); new ConcurrentHashMapTest(9, 1); new ConcurrentHashMapTest(5, 5); Tester.exec.shutdown(); } } /* Output: (Sample) Type Read time Write time Synched HashMap 10r 0w 306052025049 0 Synched HashMap 9r 1w 428319156207 47697347568 readTime + writeTime = 476016503775 Synched HashMap 5r 5w 243956877760 244012003202 readTime + writeTime = 487968880962 ConcurrentHashMap 10r 0w 23352654318 0 ConcurrentHashMap 9r 1w 18833089400 1541853224 readTime + writeTime = 20374942624 ConcurrentHashMap 5r 5w 12037625732 11850489099 readTime + writeTime = 23888114831 *///:~ The impact of adding writers to a ConcurrentHashMap is even less evident than for a CopyOnWriteArrayList, but the ConcurrentHashMap uses a different technique that clearly minimizes the impact of writes. Concurrency 921 

  Optimistic locking Although Atomic objects perform atomic operations like decrementAndGet( ), some Atomic classes also allow you to perform what is called \"optimistic locking.\" This means that you do not actually use a mutex when you are performing a calculation, but after the calculation is finished and you’re ready to update the Atomic object, you use a method called compareAndSet( ). You hand it the old value and the new value, and if the old value doesn’t agree with the value it finds in the Atomic object, the operation fails—this means that some other task has modified the object in the meantime. Remember that we would ordinarily use a mutex (synchronized or Lock) to prevent more than one task modifying an object at the same time, but here we are \"optimistic\" by leaving the data unlocked and hoping that no other task comes along and modifies it. Again, all this is done in the name of performance—by using an Atomic instead of synchronized or Lock, you might gain performance benefits. What happens if the compareAndSet( ) operation fails? This is where it gets tricky, and where you are limited in applying this technique only to problems that can be molded to the requirements. If compareAndSet( ) fails, you must decide what to do; this is very important because if you can’t do something to recover, then you cannot use this technique and must use conventional mutexes instead. Perhaps you can retry the operation and it will be OK if you get it the second time. Or perhaps it’s OK just to ignore the failure—in some simulations, if a data point is lost, it will eventually be made up in the grand scheme of things (of course, you must understand your model well enough to know whether this is true). Consider a fictitious simulation that consists of 100,000 \"genes\" of length 30; perhaps this is the beginning of some kind of genetic algorithm. Suppose that for each \"evolution\" of the genetic algorithm, some very expensive calculations take place, so you decide to use a multiprocessor machine to distribute the tasks and improve performance. In addition, you use Atomic objects instead of Lock objects to prevent mutex overhead. (Naturally, you only produced this solution after first writing the code in the simplest way that could possibly work, using the synchronized keyword. Once you had the program running, only then did you discover that it was too slow, and begin applying performance techniques!) Because of the nature of your model, if there’s a collision during a calculation, the task that discovers the collision can just ignore it and not update its value. Here’s what it looks like: //: concurrency/FastSimulation.java import java.util.concurrent.*; import java.util.concurrent.atomic.*; import java.util.*; import static net.mindview.util.Print.*; public class FastSimulation { static final int N_ELEMENTS = 100000; static final int N_GENES = 30; static final int N_EVOLVERS = 50; static final AtomicInteger[][] GRID = new AtomicInteger[N_ELEMENTS][N_GENES]; static Random rand = new Random(47); static class Evolver implements Runnable { public void run() { while(!Thread.interrupted()) { // Randomly select an element to work on: int element = rand.nextInt(N_ELEMENTS); for(int i = 0; i < N_GENES; i++) { int previous = element - 1; if(previous < 0) previous = N_ELEMENTS - 1; int next = element + 1; if(next >= N_ELEMENTS) next = 0; int oldvalue = GRID[element][i].get(); 922 Thinking in Java Bruce Eckel

  // Perform some kind of modeling calculation: int newvalue = oldvalue + GRID[previous][i].get() + GRID[next][i].get(); newvalue /= 3; // Average the three values if(!GRID[element][i] .compareAndSet(oldvalue, newvalue)) { // Policy here to deal with failure. Here, we // just report it and ignore it; our model // will eventually deal with it. print(\"Old value changed from \" + oldvalue); } } } } } public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); for(int i = 0; i < N_ELEMENTS; i++) for(int j = 0; j < N_GENES; j++) GRID[i][j] = new AtomicInteger(rand.nextInt(1000)); for(int i = 0; i < N_EVOLVERS; i++) exec.execute(new Evolver()); TimeUnit.SECONDS.sleep(5); exec.shutdownNow(); } } /* (Execute to see output) *///:~ The elements are all placed inside an array with the assumption that this will help performance (this assumption will be tested in an exercise). Each Evolver object averages its value with the one before and after it, and if there’s a failure when it goes to update, it simply prints the value and goes on. Note that no mutexes appear in the program. Exercise 39: (6) Does FastSimulation.java make reasonable assumptions? Try changing the array to ordinary ints instead of AtomicInteger and using Lock mutexes. Compare the performance between the two versions of the program. ReadWriteLocks ReadWriteLocks optimize the situation where you write to a data structure relatively infrequently, but multiple tasks read from it often. The ReadWriteLock allows you to have many readers at one time as long as no one is attempting to write. If the write lock is held, then no readers are allowed until the write lock is released. It’s completely uncertain whether a ReadWriteLock will improve the performance of your program, and it depends on issues like how often data is being read compared to how often it is being modified, the time of the read and write operations (the lock is more complex, so short operations will not see the benefits), how much thread contention there is, and whether you are running on a multiprocessor machine. Ultimately, the only way to know whether a ReadWriteLock will benefit your program is to try it out. Here’s an example showing only the most basic use of ReadWriteLocks: //: concurrency/ReaderWriterList.java import java.util.concurrent.*; import java.util.concurrent.locks.*; import java.util.*; import static net.mindview.util.Print.*; public class ReaderWriterList<T> { Concurrency 923 

  private ArrayList<T> lockedList; // Make the ordering fair: private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); public ReaderWriterList(int size, T initialValue) { lockedList = new ArrayList<T>( Collections.nCopies(size, initialValue)); } public T set(int index, T element) { Lock wlock = lock.writeLock(); wlock.lock(); try { return lockedList.set(index, element); } finally { wlock.unlock(); } } public T get(int index) { Lock rlock = lock.readLock(); rlock.lock(); try { // Show that multiple readers // may acquire the read lock: if(lock.getReadLockCount() > 1) print(lock.getReadLockCount()); return lockedList.get(index); } finally { rlock.unlock(); } } public static void main(String[] args) throws Exception { new ReaderWriterListTest(30, 1); } } class ReaderWriterListTest { ExecutorService exec = Executors.newCachedThreadPool(); private final static int SIZE = 100; private static Random rand = new Random(47); private ReaderWriterList<Integer> list = new ReaderWriterList<Integer>(SIZE, 0); private class Writer implements Runnable { public void run() { try { for(int i = 0; i < 20; i++) { // 2 second test list.set(i, rand.nextInt()); TimeUnit.MILLISECONDS.sleep(100); } } catch(InterruptedException e) { // Acceptable way to exit } print(\"Writer finished, shutting down\"); exec.shutdownNow(); } } private class Reader implements Runnable { public void run() { try { while(!Thread.interrupted()) { for(int i = 0; i < SIZE; i++) { list.get(i); TimeUnit.MILLISECONDS.sleep(1); } 924 Thinking in Java Bruce Eckel

  } } catch(InterruptedException e) { // Acceptable way to exit } } } public ReaderWriterListTest(int readers, int writers) { for(int i = 0; i < readers; i++) exec.execute(new Reader()); for(int i = 0; i < writers; i++) exec.execute(new Writer()); } } /* (Execute to see output) *///:~ A ReaderWriterList can hold a fixed number of any type. You must give the constructor the desired size of the list and an initial object to populate the list with. The set( ) method acquires the write lock in order to call the underlying ArrayList. set( ), and the get( ) method acquires the read lock in order to call ArrayList.get( ). In addition, get( ) checks to see if more than one reader has acquired the read lock and, if so, displays that number to demonstrate that multiple readers may acquire the read lock. To test the ReaderWriterList, ReaderWriterListTest creates both reader and writer tasks for a ReaderWriterList<Integer>. Notice that there are far fewer writes than reads. If you look at the JDK documentation for ReentrantReadWriteLock, you’ll see that there are a number of other methods available, as well as issues of \"fairness\" and \"policy decisions.\" This is a rather sophisticated tool, and one to use only when you are casting about for ways to improve performance. Your first draft of your program should use straightforward synchronization, and only if necessary should you introduce ReadWriteLock. Exercise 40: (6) Following the example of ReaderWriterList.java, create a ReaderWriterMap using a HashMap. Investigate its performance by modifying MapComparisons.java. How does it compare to a synchronized HashMap and a ConcurrentHashMap? Active objects After working your way through this chapter, you may observe that threading in Java seems very complex and difficult to use correctly. In addition, it can seem a bit counterproductive— although tasks work in parallel, you must invest great effort to implement techniques that prevent those tasks from interfering with each other. If you’ve ever written assembly language, writing threaded programs has a similar feel: Every detail matters, you’re responsible for everything, and there’s no safety net in the form of compiler checking. Could there be a problem with the threading model itself? After all, it comes relatively unchanged from the world of procedural programming. Perhaps there is a different model for concurrency that is a better fit for objectoriented programming. One alternative approach is called active objects or actors. 26 The reason the objects are called \"active\" is that each object maintains its own worker thread and message queue, and all requests to that object are enqueued, to be run one at a time. So with active objects, we                                                              26 Thanks to Allen Holub for taking the time to explain this to me. Concurrency 925 

  serialize messages rather than methods, which means we no longer need to guard against problems that happen when a task is interrupted midway through its loop. When you send a message to an active object, that message is transformed into a task that goes on the object’s queue to be run at some later point. The Java SE5 Future comes in handy for implementing this scheme. Here’s a simple example that has two methods which enqueue method calls: //: concurrency/ActiveObjectDemo.java // Can only pass constants, immutables, \"disconnected // objects,\" or other active objects as arguments // to asynch methods. import java.util.concurrent.*; import java.util.*; import static net.mindview.util.Print.*; public class ActiveObjectDemo { private ExecutorService ex = Executors.newSingleThreadExecutor(); private Random rand = new Random(47); // Insert a random delay to produce the effect // of a calculation time: private void pause(int factor) { try { TimeUnit.MILLISECONDS.sleep( 100 + rand.nextInt(factor)); } catch(InterruptedException e) { print(\"sleep() interrupted\"); } } public Future<Integer> calculateInt(final int x, final int y) { return ex.submit(new Callable<Integer>() { public Integer call() { print(\"starting \" + x + \" + \" + y); pause(500); return x + y; } }); } public Future<Float> calculateFloat(final float x, final float y) { return ex.submit(new Callable<Float>() { public Float call() { print(\"starting \" + x + \" + \" + y); pause(2000); return x + y; } }); } public void shutdown() { ex.shutdown(); } public static void main(String[] args) { ActiveObjectDemo d1 = new ActiveObjectDemo(); // Prevents ConcurrentModificationException: List<Future<?>> results = new CopyOnWriteArrayList<Future<?>>(); for(float f = 0.0f; f < 1.0f; f += 0.2f) results.add(d1.calculateFloat(f, f)); for(int i = 0; i < 5; i++) results.add(d1.calculateInt(i, i)); print(\"All asynch calls made\"); while(results.size() > 0) { 926 Thinking in Java Bruce Eckel

  for(Future<?> f : results) if(f.isDone()) { try { print(f.get()); } catch(Exception e) { throw new RuntimeException(e); } results.remove(f); } } d1.shutdown(); } } /* Output: (85% match) All asynch calls made starting 0.0 + 0.0 starting 0.2 + 0.2 0.0 starting 0.4 + 0.4 0.4 starting 0.6 + 0.6 0.8 starting 0.8 + 0.8 1.2 starting 0 + 0 1.6 starting 1 + 1 0 starting 2 + 2 2 starting 3 + 3 4 starting 4 + 4 6 8 *///:~ The \"single thread executor\" produced by the call to Executors.newSingleThreadExecutor( ) maintains its own unbounded blocking queue, and has only one thread taking tasks off the queue and running them to completion. All we need to do in calculateInt( ) and calculateFloat( ) is to submit( ) a new Callable object in response to a method call, thus converting method calls into messages. The method body is contained within the call( ) method in the anonymous inner class. Notice that the return value of each active object method is a Future with a generic parameter that is the actual return type of the method. This way, the method call returns almost immediately, and the caller uses the Future to discover when the task completes and to collect the actual return value. This handles the most complex case, but if the call has no return value, then the process is simplified. In main( ), a List<Future<?>> is created to capture the Future objects returned by the calculateFloat( ) and calculateInt( ) messages sent to the active object. This list is polled using isDone( ) for each Future, which is removed from the List when it completes and its results are processed. Notice that the use of CopyOnWriteArrayList removes the need to copy the List in order to prevent ConcurrentModificationExceptions. In order to inadvertently prevent coupling between threads, any arguments to pass to an active-object method call must be either read-only, other active objects, or disconnected objects (my term), which are objects that have no connection to any other task (this is hard to enforce because there’s no language support for it). With active objects: Concurrency 927 

  1. Each object has its own worker thread. 2. Each object maintains total control of its own fields (which is somewhat more rigorous than normal classes, which only have the option of guarding their fields). 3. All communication between active objects happens in the form of messages between those objects. 4. All messages between active objects are enqueued. The results are quite compelling. Since a message from one active object to another can only be blocked by the delay in enqueuing it, and because that delay is always very short and is not dependent on any other objects, the sending of a message is effectively unblockable (the worst that will happen is a short delay). Since an active-object system only communicates via messages, two objects cannot be blocked while contending to call a method on another object, and this means that deadlock cannot occur, which is a big step forward. Because the worker thread within an active object only executes one message at a time, there is no resource contention and you don’t have to worry about synchronizing methods. Synchronization still happens, but it happens on the message level, by enqueuing the method calls so that only one can happen at a time. Unfortunately, without direct compiler support, the coding approach shown above is too cumbersome. However, progress is occurring in the field of active objects and actors, and more interestingly, in the field called agent-based programming. Agents are effectively active objects, but agent systems also support transparency across networks and machines. It would not surprise me if agent-based programming becomes the eventual successor to objectoriented programming, because it combines objects with a relatively easy concurrency solution. You can find more information about active objects, actors and agents by searching the Web. In particular, some of the ideas behind active objects come from C.A.R. Hoare’s theory of Communicating Sequential Processes (CSP). Exercise 41: (6) Add a message handler to ActiveObjectDemo.java that has no return value, and call this within main( ). Exercise 42: (7) Modify WaxOMatic.java so that it implements active objects. 27 Project: Use annotations and Javassist to create a class annotation @Active that transforms the target class into an active object.                                                                27 Projects are suggestions to be used (for example) as term projects. Solutions to projects are not included in the solution guide. 928 Thinking in Java Bruce Eckel


Like this book? You can publish your book online for free in a few minutes!
Create your own flipbook