- What Are Threads?
- Interrupting Threads
- Thread Properties
- Thread Priorities
- Selfish Threads
- Synchronization
- Deadlocks
- User Interface Programming with Threads
- Using Pipes for Communication Between Threads
Using Pipes for Communication Between Threads
Sometimes, the communication pattern between threads is very simple. A producer thread generates a stream of bytes. A consumer thread reads and processes that byte stream. If no bytes are available for reading, the consumer thread blocks. If the producer generates data much more quickly than the consumer can handle it, then the write operation of the producer thread blocks. The Java programming language has a convenient set of classes, PipedInputStream and PipedOutputStream, to implement this communication pattern. (There is another pair of classes, PipedReader and PipedWriter, if the producer thread generates a stream of Unicode characters instead of bytes.)
The principal reason to use pipes is to keep each thread simple. The producer thread simply sends its results to a stream and forgets about them. The consumer simply reads the data from a stream, without having to care where it comes from. By using pipes, you can connect multiple threads with each other without worrying about thread synchronization.
Example 113 is a program that shows off piped streams. We have a producer thread that emits random numbers at random times, a filter thread that reads the input numbers and continuously computes the average of the data, and a consumer thread that prints out the answers. (You'll need to use ctrl+c to stop this program.) Figure 118 shows the threads and the pipes that connect them. UNIX users will recognize these pipe streams as the equivalent of pipes connecting processes in UNIX.
Piped streams are only appropriate if the communication between the threads is on a low level, such as a sequence of numbers as in this example. In other situations, you can use queues. The producing thread inserts objects into the queue, and the consuming thread removes them.
Figure 118: A sequence of pipes
Example 113: PipeTest.java
1. import java.util.*; 2. import java.io.*; 3. 4. /** 5. This program demonstrates how multiple threads communicate 6. through pipes. 7. */ 8. public class PipeTest 9. { 10. public static void main(String args[]) 11. { 12. try 13. { 14. /* set up pipes */ 15. PipedOutputStream pout1 = new PipedOutputStream(); 16. PipedInputStream pin1 = new PipedInputStream(pout1); 17. 18. PipedOutputStream pout2 = new PipedOutputStream(); 19. PipedInputStream pin2 = new PipedInputStream(pout2); 20. 21. /* construct threads */ 22. 23. Producer prod = new Producer(pout1); 24. Filter filt = new Filter(pin1, pout2); 25. Consumer cons = new Consumer(pin2); 26. 27. /* start threads */ 28. 29. prod.start(); 30. filt.start(); 31. cons.start(); 32. } 33. catch (IOException e){} 34. } 35. } 36. 37. /** 38. A thread that writes random numbers to an output stream. 39. */ 40. class Producer extends Thread 41. { 42. /** 43. Constructs a producer thread. 44. @param os the output stream 45. */ 46. public Producer(OutputStream os) 47. { 48. out = new DataOutputStream(os); 49. } 50. 51. public void run() 52. { 53. while (true) 54. { 55. try 56. { 57. double num = rand.nextDouble(); 58. out.writeDouble(num); 59. out.flush(); 60. sleep(Math.abs(rand.nextInt() % 1000)); 61. } 62. catch(Exception e) 63. { 64. System.out.println("Error: " + e); 65. } 66. } 67. } 68. 69. private DataOutputStream out; 70. private Random rand = new Random(); 71. } 72. 73. /** 74. A thread that reads numbers from a stream and writes their 75. average to an output stream. 76. */ 77. class Filter extends Thread 78. { 79. /** 80. Constructs a filter thread. 81. @param is the output stream 82. @param os the output stream 83. */ 84. public Filter(InputStream is, OutputStream os) 85. { 86. in = new DataInputStream(is); 87. out = new DataOutputStream(os); 88. } 89. 90. public void run() 91. { 92. for (;;) 93. { 94. try 95. { 96. double x = in.readDouble(); 97. total += x; 98. count++; 99. if (count != 0) out.writeDouble(total / count); 100. } 101. catch(IOException e) 102. { 103. System.out.println("Error: " + e); 104. } 105. } 106. } 107. 108. private DataInputStream in; 109. private DataOutputStream out; 110. private double total = 0; 111. private int count = 0; 112. } 113. 114. /** 115. A thread that reads numbers from a stream and 116. prints out those that deviate from previous inputs 117. by a threshold value. 118. */ 119. class Consumer extends Thread 120. { 121. /** 122. Constructs a consumer thread. 123. @param is the input stream 124. */ 125. public Consumer(InputStream is) 126. { 127. in = new DataInputStream(is); 128. } 129. 130. public void run() 131. { 132. for(;;) 133. { 134. try 135. { 136. double x = in.readDouble(); 137. if (Math.abs(x - oldx) > THRESHOLD) 138. { 139. System.out.println(x); 140. oldx = x; 141. } 142. } 143. catch(IOException e) 144. { 145. System.out.println("Error: " + e); 146. } 147. } 148. } 149. 150. private double oldx = 0; 151. private DataInputStream in; 152. private static final double THRESHOLD = 0.01; 153. }
java.io.PipedInputStream
PipedInputStream()
creates a new piped input stream that is not yet connected to a piped output stream.PipedInputStream(PipedOutputStream out)
creates a new piped input stream that reads its data from a piped output stream.Parameters:
out
the source of the data
void connect(PipedOutputStream out)
attaches a piped output stream from which the data will be read.Parameters:
out
the source of the data
java.io.PipedOutputStream
PipedOutputStream()
creates a new piped output stream that is not yet connected to a piped input stream.PipedOutputStream(PipedInputStream in)
creates a new piped output stream that writes its data to a piped input stream.Parameters:
in
the destination of the data
void connect(PipedInputStream in) attaches a piped input stream to which the data will be written.
Parameters:
in
the destination of the data