- Dealing with Failure
- Guarded Methods
- Structuring and Refactoring Classes
- Using Concurrency Control Utilities
- Joint Actions
- Transactions
- Implementing Utilities
3.4 Using Concurrency Control Utilities
Built-in waiting and notification methods provide sufficient mechanisms for implementing any kind of state-dependent coordination scheme. But they present three related obstacles:
The requirements and properties of waiting and notification methods often intrude into apparently unrelated aspects of class design, leading to unnecessary conceptual overhead and code complexity. For example, while the template-method version of Readers and Writers in 3.3.3 is sound and flexible, using it requires more understanding of the underlying design than does the version supporting the ReadWriteLock interface in 2.5.2.
While simple applications of monitor methods are indeed simple, the chances for error (for example slipped conditions) can increase dramatically when additional factors are addressed, especially performance and robustness in the face of thread cancellation. When solutions are encapsulated as utility classes, the hard work of putting them together need be done only once. This may be worthwhile even when the resulting classes impose additional programming obligations on their users, as long as reusing classes is not more difficult and error-prone than re-inventing them. To improve software quality, utility classes (of any sort) should encapsulate not-so-simple implementations of simple ideas and impose minimal obstacles surrounding their use.
While an unbounded number of designs can in principle be implemented via guarded methods, a large fraction of those used in practice fall under a small number of general categories. Much of the code for these can be reused, rather than rewritten from scratch inside each class using them. This also provides a clearer separation between the choice of a particular concurrency control policy and its implementation.
This section discusses four representative examples of utilities and their applications, including the construction of larger utilities out of more basic ones. A few others are introduced later in this book. For concreteness, descriptions focus on the versions available in the util.concurrent package, but nearly all discussions apply to any others you could construct. Most implementation details surrounding these classes are postponed to 3.7 (which is probably of interest only to developers creating their own customized versions of such utilities).
3.4.1 Semaphores
Semaphores (specifically, counting semaphores) are classic concurrency control constructs. Like many other utilities, they conform to an acquire-release protocol, and thus support the same Sync interface as class Mutex in 2.5.
Conceptually, a semaphore maintains a set of permits initialized in a constructor. Each acquire blocks if necessary until a permit is available, and then takes it. Method attempt is the same except that it fails upon time-out. Each release adds a permit. However, no actual permit-objects are used; the semaphore just keeps track of the number available and acts accordingly.
There are other ways to describe semaphores as well, including those based on their original motivating metaphor: the signaling flags used to prevent railroad collisions.
3.4.1.1 Mutual exclusion locks
Semaphores can be used to implement mutual exclusion locks simply by initializing the number of permits to 1. For example, a Mutex class could be defined as:
class Mutex implements Sync { private Semaphore s = new Semaphore(1); public void acquire() throws InterruptedException { s.acquire(); } public void release(); { s.release(); } public boolean attempt(long ms) throws InterruptedException { return s.attempt(ms); } }
This kind of lock is also known as a binary semaphore, since the counter should only take on the values zero and one. One minor detail that is not (but could be) addressed here is that by the most common convention, releasing a Mutex that is not held has no effect. (A less common alternative convention is to throw an exception.) Otherwise, there is no strict need to define a Mutex class at all. A Semaphore initialized to 1 can be used directly as a lock, in which case “extra” releases are remembered and thus allow extra acquires. While this property is not at all desirable here, in contexts unrelated to locking it can be exploited as a cure for missed signals (see 3.2.4.1).
Because semaphores can be used as locks as well as other forms of concurrency control, they suffice as a single primitive concurrency control construct. For example, it is possible to implement the equivalents of synchronized method locks, wait, notify, and notifyAll operations out of semaphores rather than vice versa. (For details, see for example Andrews's book listed in the Further Readings in 1.2.5.)
Several systems and languages have in fact offered semaphores as their sole concurrency control construct. However, overreliance on bare semaphores for mutual exclusion purposes tends to be more complex and error-prone than block-structured locking, as enforced by synchronized methods and blocks and assisted by before/after constructions surrounding the use of Mutex. Semaphores are much more valuable in contexts that exploit their counting and signaling capabilities rather than their use as locks.
3.4.1.2 Resource pools
Semaphores are specialized counters, and so are natural choices for concurrency control in many classes involving counts. For example, pool classes of various kinds normally keep counts of resource items (e.g., file descriptors, printers, buffers, large graphical objects) that clients can check out and later check back in.
The following Pool class illustrates the basic structure of most resource pools. This class contains only one of several common and useful safeguards, ensuring that items checked back into the pool had actually been checked out. Others could be added, for example, checks to make sure that callers are eligible to obtain items.
To aid conformance to this check- out/check-in protocol, users of pools should normally employ before/after constructions, as in:
try { Object r = pool.getItem(); try { use(r); } finally { pool.returnItem(r); } } catch (InterruptedException ie) { // deal with interrupt while trying to obtain item }
The Pool class displays a layered structure characteristic of nearly all classes using concurrency control utilities: public unsynchronized control methods surround internal synchronized helper methods. Exclusion is needed in methods doGet and doReturn because multiple clients may pass available.acquire. Without locking, several threads could operate concurrently on the underlying lists. On the other hand, it would be a mistake to declare the getItem and returnItem methods as synchronized. Not only would this make no sense, it can also cause a form of nested monitor lockout (see 3.3.4) when a thread waiting in acquire holds the lock needed by any thread that could perform a release.
class Pool { // Incomplete protected java.util.ArrayList items = new ArrayList(); protected java.util.HashSet busy = new HashSet(); protected final Semaphore available; public Pool(int n) { available = new Semaphore(n); initializeItems(n); } public Object getItem() throws InterruptedException { available.acquire(); return doGet(); } public void returnItem(Object x) { if (doReturn(x)) available.release(); } protected synchronized Object doGet() { Object x = items.remove(items.size()-1); busy.add(x); // put in set to check returns return x; } protected synchronized boolean doReturn(Object x) { if (busy.remove(x)) { items.add(x); // put back into available item list return true; } else return false; } protected void initializeItems(int n) { // Somehow create the resource objects // and place them in items list. } }
Note that the use of HashSet here requires that the classes defining resource items not override method equals in a way that disrupts the identity-based comparisons (see 2.1.1) needed for pool maintenance.
3.4.1.3 Bounded buffers
Semaphores are useful tools whenever you can conceptualize a design in terms of permits. For example, we can design a BoundedBuffer based on the idea that:
Initially, for a buffer of size n, there are n put-permits and 0 take-permits.
A take operation must acquire a take-permit and then release a put- permit.
A put operation must acquire a put-permit and then release a take- permit.
To exploit this, it is convenient to isolate the underlying array operations in a simple BufferArray helper class. (In fact, as illustrated in 4.3.4, a completely different underlying data structure such as a linked list can be used without otherwise altering the logic of this design.) The BufferArray class uses synchronized methods, maintaining exclusion when multiple clients receive permits and could otherwise insert or extract elements concurrently.
class BufferArray { protected final Object[] array; // the elements protected int putPtr = 0; // circular indices protected int takePtr = 0; BufferArray(int n) { array = new Object[n]; } synchronized void insert(Object x) { // put mechanics array[putPtr] = x; putPtr = (putPtr + 1) % array.length; } synchronized Object extract() { // take mechanics Object x = array[takePtr]; array[takePtr] = null; takePtr = (takePtr + 1) % array.length; return x; } }
The corresponding BoundedBufferWithSemaphores class surrounds buffer operations with semaphore operations to implement put and take. Even though each method starts with an acquire and ends with a release, they follow a different usage pattern than seen with locks in 2.5. The release is on a different semaphore from the acquire, and is performed only after the element is successfully inserted or extracted. So, among other consequences, these releases are not placed in finally clauses: If there were any chance that buffer operations could fail, some recovery actions would be needed, but these trailing release statements are not among them.
class BoundedBufferWithSemaphores { protected final BufferArray buff; protected final Semaphore putPermits; protected final Semaphore takePermits; public BoundedBufferWithSemaphores(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); buff = new BufferArray(capacity); putPermits = new Semaphore(capacity); takePermits = new Semaphore(0); } public void put(Object x) throws InterruptedException { putPermits.acquire(); buff.insert(x); takePermits.release(); } public Object take() throws InterruptedException { takePermits.acquire(); Object x = buff.extract(); putPermits.release(); return x; } public Object poll(long msecs) throws InterruptedException { if (!takePermits.attempt(msecs)) return null; Object x = buff.extract(); putPermits.release(); return x; } public boolean offer(Object x, long msecs) throws InterruptedException { if (!putPermits.attempt(msecs)) return false; buff.insert(x); takePermits.release(); return true; } }
This class also includes variants of put and take, called offer and poll, that support balking (when msecs is 0) or time-out policies. These methods are implemented using Semaphore.attempt, which handles the messy time- based constructions described in 3.2.5. Methods offer and poll allow clients to choose the guarding policy most appropriate for their needs. However, clients must still pick compatible policies. For example, if a producer relied solely on offer(x, 0) and its only consumer used poll(0), items would rarely be transferred.
The BoundedBufferWithSemaphores class is likely to run more efficiently than the BoundedBufferWithStateTracking class in 3.3.1 when there are many threads all using the buffer. BoundedBufferWithSemaphores relies on two different underlying wait sets. The BoundedBufferWithStateTracking class gets by with only one, so any empty-to- partial or full-to-partial state transition causes all waiting threads to wake up, including those waiting for the other logical condition and those that will immediately rewait because some other thread took the only item or filled the only available slot.
The BoundedBufferWithSemaphores class isolates the monitors for these two conditions. This can be exploited by the underlying Semaphore implementation (see 3.7.1) to eliminate unnecessary context switching by using notify instead of notifyAll. This reduces the worst- case number of wakeups from being a quadratic function of the number of invocations to being linear. More generally, whenever you can isolate a condition using a semaphore, you can usually improve performance as compared to notifyAll-based solutions.
3.4.1.4 Synchronous channels
As mentioned in 3.3.1, the interface for BoundedBuffer can be broadened to describe any kind of Channel that supports a put and a take operation:
interface Channel { // Repeated void put(Object x) throws InterruptedException; Object take() throws InterruptedException; }
(The util.concurrent version of this interface also includes the offer and poll methods that support time-outs, and declares it to extend interfaces Puttable and Takable to allow type enforcement of one-sided usages.)
There are many possible semantics that you might attach to a Channel. For example, the queue class in 2.4.2 has unbounded capacity (at least conceptually — failing only when a system runs out of memory), while bounded buffers have finite predetermined capacity. A limiting case is the idea of a synchronous channel that has no internal capacity. With synchronous channels, every thread attempting a put must wait for a thread attempting a take, and vice versa. This allows the precise control over thread interaction needed in several of the design frameworks and patterns discussed in 4.1.4 and 4.5.1.
Semaphores can be used to implement synchronous channels. Here, we can use the same approach as with bounded buffers, adding another semaphore that permits a put to continue only after its offered item has been taken. However, this introduces a new problem. So far, we have used only blocking constructions that can throw InterruptedExceptions as the first lines of methods, allowing simple clean exit upon interruption. But here, we need to do a second acquire at the end of the put method. Aborting at this point of no return would break the protocol. While it is possible to define a version of this class that performs full rollback, the simplest solution here is to roll forward (see 3.1.1.4), ignoring any interrupt until after the second acquire completes:
class SynchronousChannel implements Channel { protected Object item = null;// to hold while in transit protected final Semaphore putPermit; protected final Semaphore takePermit; protected final Semaphore taken; public SynchronousChannel() { putPermit = new Semaphore(1); takePermit = new Semaphore(0); taken = new Semaphore(0); } public void put(Object x) throws InterruptedException { putPermit.acquire(); item = x; takePermit.release(); // Must wait until signaled by taker InterruptedException caught = null; for (;;) { try { taken.acquire(); break; } catch(InterruptedException ie) { caught = ie; } } if (caught != null) throw caught; // can now rethrow } public Object take() throws InterruptedException { takePermit.acquire(); Object x = item; item = null; putPermit.release(); taken.release(); return x; } }
3.4.1.5 Fairness and scheduling
Built-in waiting and notification methods do not provide any fairness guarantees. They make no promises about which of the threads in a wait set will be chosen in a notify operation, or which thread will grab the lock first and be able to proceed (thus excluding others) in a notifyAll operation.
This flexibility in JVM implementations permitted by the JLS makes it all but impossible to prove particular liveness properties of a system. But this is not a practical concern in most contexts. For example, in most buffer applications, it doesn't matter at all which of the several threads trying to take an item actually do so. On the other hand, in a resource pool management class, it is prudent to ensure that threads waiting for needed resource items don't continually get pushed aside by others because of unfairness in how the underlying notify operations choose which threads to unblock. Similar concerns arise in many applications of synchronous channels.
It is not possible to change the semantics of notify, but it is possible to implement Semaphore (sub)class acquire operations to provide stronger fairness properties. A range of policies can be supported, varying in exactly how fairness is defined.
The best-known policy is First- In-First-Out (FIFO), in which the thread that has been waiting the longest is always selected. This is intuitively desirable, but can be unnecessarily demanding and even somewhat arbitrary on multiprocessors where different threads on different processors start waiting at (approximately) the same time. However, various weakenings of and approximations to FIFO are available that provide sufficient fairness for applications that need to avoid indefinite postponement.
There are, however, some intrinsic limitations to such guarantees: There is no way to ensure that an underlying system will ever actually execute a given runnable process or thread unless the system provides guarantees that go beyond the minimal requirements stated in the JLS. However, this is unlikely to be a significant pragmatic issue. Most if not all JVM implementations strive to provide sensible scheduling policies that extend well beyond minimal requirements. They display some sort of weak, restricted, or probabilistic fairness properties with respect to executing runnable threads. However, it is difficult for a language specification to state all the reasonable ways in which this may occur. The matter is left as a quality-of-implementation issue in the JLS.
Utility classes such as semaphores are convenient vehicles for establishing different fairness policies, modulo these scheduling caveats. For example, 3.7.3 describes implementation of a FIFOSemaphore class that maintains FIFO notification order. Applications such as the Pool class can use this or other implementations of semaphores that provide any supported fairness properties, at the potential cost of additional overhead.
3.4.1.6 Priorities
In addition to addressing fairness, semaphore implementation classes can pay attention to thread priorities. The notify method is not guaranteed to do so, but it is of course allowed to, and does so on some JVM implementations.
Priority settings (see 1.1.2.3) tend to be of value only when there may be many more runnable threads than CPUs, and the tasks running in these threads intrinsically have different urgencies or importances. This occurs most commonly in embedded (soft) real-time systems where a single small processor must carry out many tasks that interact with its environment.
Reliance on priority settings can complicate notification policies. Even if notifications unblock (and run) threads in highest-priority-first order, systems may still encounter priority inversions. A priority inversion occurs when a high-priority thread becomes blocked waiting for a low- priority thread to complete and then release a lock or change a condition needed by the high-priority thread. In a system using strict priority scheduling, this can cause the high- priority thread to starve if the low-priority thread does not get a chance to run.
One solution is to use special semaphore classes or lock classes constructed via such semaphores. Here, the concurrency control objects themselves manipulate priorities. When a high-priority thread becomes blocked, the concurrency control object can temporarily raise the priority of a low-priority thread that could unblock it. This reflects the fact that proceeding to a release point is a high-priority action (see Further Readings in 1.2.5). For this to work, all relevant synchronization and locking must rely on such priority-adjusting utility classes.
Further, this tactic is guaranteed to maintain the intended properties only on particular JVM implementations that use strict priority scheduling. In practice, any usable JVM implementation supporting strict priority scheduling is sure to apply priority adjustment for built-in lock and monitor operations. Doing otherwise would defeat most of the rationale for adopting strict priority scheduling in the first place.
The main practical consequence is that programs that absolutely rely on strict priority scheduling sacrifice portability. They need additional JVM implementation-specific guarantees that may be bolstered via construction and use of additional concurrency control utilities. In other more portable programs, semaphore classes and related utilities that prefer higher-priority threads can still be used occasionally as devices for heuristically improving responsiveness.
3.4.2 Latches
A latching variable or condition is one that eventually receives a value from which it never again changes. A binary latching variable or condition (normally just called a latch, also known as a one- shot) can change value only once, from its initial state to its final state.
Concurrency control techniques surrounding latches can be encapsulated using a simple Latch class that again obeys the usual acquire-release interface, but with the semantics that a single release permits all previous and future acquire operations to proceed.
Latches help structure solutions to initialization problems (see 2.4.1) where you do not want a set of activities to proceed until all objects and threads have been completely constructed. For example, a more ambitious game-playing application than shown in 3.2.4 might need to ensure that all players wait until the game officially begins. This could be arranged using code such as:
class Player implements Runnable { // Code sketch // ... protected final Latch startSignal; Player(Latch l) { startSignal = l; } public void run() { try { startSignal.acquire(); play(); } catch(InterruptedException ie) { return; } } // ... } class Game { // ... void begin(int nplayers) { Latch startSignal = new Latch(); for (int i = 0; i < nplayers; ++i) new Thread(new Player(startSignal)).start(); startSignal.release(); } }
Extended forms of latches include countdowns, which allow acquire to proceed when a fixed number of releases occur, not just one. Latches, countdowns, and other simple utilities built on top of them can be used to coordinate responses to conditions involving:
Completion indicators. For example, to force a set of threads to wait until some other activity completes.
Timing thresholds. For example, to trigger a set of threads at a certain date.
Event indications. For example, to trigger processing that cannot occur until a certain packet is received or button is clicked.
Error indications. For example, to trigger a set of threads to proceed with global shut-down tasks.
3.4.2.1 Latching variables and predicates
While utility classes are convenient for most one-shot triggering applications, latching fields (also known as permanent variables) and predicates can improve reliability, simplify usage, and improve efficiency in other contexts as well.
Among their other properties, latching predicates (including the common special case of threshold indicators) are among the very few conditions for which unsynchronized busy-wait loops (see 3.2.6) may be a possible (although rarely taken) implementation option for guarded methods. If a predicate is known to latch, then there is no risk that it will slip (see 3.2.4.1). Its value cannot change between the check to see if it is true and a subsequent action that requires it to remain true. For example:
class LatchingThermometer { // Seldom useful private volatile boolean ready; // latching private volatile float temperature; public double getReading() { while (!ready) Thread.yield(); return temperature; } void sense(float t) { // called from sensor temperature = t; ready = true; } }
Note that this kind of construction is confined to classes in which all relevant variables are either declared as volatile or are read and written only under synchronization (see 2.2.7).
3.4.3 Exchangers
An exchanger acts as a synchronous channel (see 3.4.1.4) except that instead of supporting two methods, put and take, it supports only one method, rendezvous (sometimes just called exchange) that combines their effects (see 2.3.4). This operation takes an argument representing an Object offered by one thread to another, and returns the Object offered by the other thread.
Exchangers can be generalized to more than two parties, and can be further generalized to apply arbitrary functions on arguments rather than simply exchanging them. These capabilities are supported by the Rendezvous class in util.concurrent. But the majority of applications are restricted to the exchange of resource objects among two threads (as arranged below by using only the default two-party constructor for Rendezvous).
Exchange-based protocols extend those described in 2.3.4 to serve as alternatives to resource pools (see 3.4.1.2). They can be used when two or more tasks running in different threads at all times each maintain one resource. When one thread is finished with one resource and needs another, it exchanges with another thread. The most common application of this protocol is buffer exchange. Here, one thread fills up a buffer (for example by reading in data). When the buffer is full, it exchanges it with a thread that processes the buffer, thereby emptying it. In this way, only two buffers are ever used, no copying is needed, and a resource management pool becomes unnecessary.
The following FillAndEmpty class gives a glimpse of the additional exception-handling obligations required with exchangers. Because the protocol is symmetric, cancellation or time-out of one party in the midst of an attempted exchange must lead to an exception (here, BrokenBarrierException) in the other party. In the example below, this is handled simply by returning from the run method. A more realistic version would entail further cleanup, including additional adjustments to deal with incompletely filled or emptied buffers upon termination, as well as to deal with IO exceptions and end-of-file conditions surrounding the readByte method.
class FillAndEmpty { // Incomplete static final int SIZE = 1024; // buffer size, for demo protected Rendezvous exchanger = new Rendezvous(2); protected byte readByte() { /* ... */; } protected void useByte(byte b) { /* ... */ } public void start() { new Thread(new FillingLoop()).start(); new Thread(new EmptyingLoop()).start(); } class FillingLoop implements Runnable { // inner class public void run() { byte[] buffer = new byte[SIZE]; int position = 0; try { for (;;) { if (position == SIZE) { buffer = (byte[])(exchanger.rendezvous(buffer)); position = 0; } buffer[position++] = readByte(); } } catch (BrokenBarrierException ex) {} // die catch (InterruptedException ie) {} // die } } class EmptyingLoop implements Runnable { // inner class public void run() { byte[] buffer = new byte[SIZE]; int position = SIZE; // force exchange first time through try { for (;;) { if (position == SIZE) { buffer = (byte[])(exchanger.rendezvous(buffer)); position = 0; } useByte(buffer[position++]); } } catch (BrokenBarrierException ex) {} // die catch (InterruptedException ex) {} // die } } }
The use of exchangers here illustrates one of the design advantages of utility classes that replace concerns surrounding the fields of objects with those surrounding the passing of messages. This can be much easier to deal with as coordination schemes scale up (see 4).
3.4.4 Condition Variables
Monitor operations in the Java programming language maintain a single wait set for each object. Some other languages and thread libraries (in particular POSIX pthreads) include support for multiple wait sets associated with multiple condition variables managed under a common object or lock.
While any design requiring multiple wait sets can be implemented using other constructions such as semaphores, it is possible to create utilities that mimic the condition variables found in other systems. In fact, support for pthreads-style condvars leads to usage patterns that are almost identical to those in concurrent C and C++ programs.
A CondVar class can be used to represent a condition variable that is managed in conjunction with a given Mutex, where this Mutex is also (unenforceably) used for all exclusion locking in the associated class(es). Thus, classes using CondVar must also rely on the “manual” locking techniques discussed in 2.5.1. More than one CondVar can use the same Mutex6.
The class supports analogs of the standard waiting and notification methods, here given names based on those in pthreads:
class CondVar { // Implementation omitted protected final Sync mutex; public CondVar(Sync lock) { mutex = lock; } public void await() throws InterruptedException; public boolean timedwait(long ms) throws InterruptedException; public void signal(); // analog of notify public void broadcast(); // analog of notifyAll }
(In the util.concurrent version, the nuances of these operations also mirror those in pthreads. For example, unlike notify, signal does not require the lock to be held.)
The main applications of such a class lie not in original design efforts, but in adapting code originally written using other languages and systems. In other respects, a CondVar may be employed in the same design patterns, encountering the same design issues, as discussed in 3.3. For example, here is another bounded buffer class. Except for the structured exception handling, this version almost looks as if it came out of a pthreads programming book (see Further Readings in 1.2.5).
class PThreadsStyleBuffer { private final Mutex mutex = new Mutex(); private final CondVar notFull = new CondVar(mutex); private final CondVar notEmpty = new CondVar(mutex); private int count = 0; private int takePtr = 0; private int putPtr = 0; private final Object[] array; public PThreadsStyleBuffer(int capacity) { array = new Object[capacity]; } public void put(Object x) throws InterruptedException { mutex.acquire(); try { while (count == array.length) notFull.await(); array[putPtr] = x; putPtr = (putPtr + 1) % array.length; ++count; notEmpty.signal(); } finally { mutex.release(); } } public Object take() throws InterruptedException { Object x = null; mutex.acquire(); try { while (count == 0) notEmpty.await(); x = array[takePtr]; array[takePtr] = null; takePtr = (takePtr + 1) % array.length; --count; notFull.signal(); } finally { mutex.release(); } return x; } }
3.4.5 Further Readings
Additional discussions and examples of semaphores and condition variables can be found in almost any book on concurrent programming (see 1.2.5).
Resource pools can be extended into more general Object Manager classes. See:
Sommerlad, Peter. “Manager”, in Robert Martin, Dirk Riehle, and Frank Buschmann (eds.), Pattern Languages of Program Design, Volume 3, Addison-Wesley, 1998.
Exchangers are described in more detail in:
Sane, Aamod, and Roy Campbell. “Resource Exchanger”, in John Vlissides, James Coplien, and Norman Kerth (eds.), Pattern Languages of Program Design, Volume 2, Addison-Wesley, 1996.
The approximate fairness of some commonly used scheduling policies is discussed in:
Epema, Dick H. J. “Decay-Usage Scheduling in Multiprocessors”, ACM Transactions on Computer Systems, Vol. 16, 367-415, 1998.