- The <tt>RealtimeThread</tt> Class
- The Real-Time Garbage Collector (RTGC)
- Conclusion
The Real-Time Garbage Collector (RTGC)
The RTGC in Java RTS is a fully concurrent, parallel, mark-and-sweep garbage collector (GC) with no stop-world pauses. By parallel, we mean that it does GC work in multiple worker threads; by concurrent, we mean that it does its work concurrently with application threads. In Java SE, the collector needs to pause all application threads to collect dead objects, which results in unpredictable delays in application execution. With the real-time garbage collector, this doesn't happen.
To do its work efficiently, Java RTS doesn't maintain heap spaces or generations. The entire heap is one large space where application threads and RTGC threads operate side-by-side. Its theory of operation is simple: RTGC threads work at lower priority than application RTTs, thereby never interrupting them when eligible for execution. Your application RTTs will be dispatched ahead of RTGC threads when competing for processor time.
Although there are no stop-world pauses, some per-thread pauses can occur when the RTGC needs to scan an application thread's stack. This amounts to a maximum of less than 200 microseconds of latency, and only when the RTGC is in its marking phase. Overall, the RTGC should never interfere with your application threads, and adjusts its own running characteristics to ensure that enough free memory is produced continuously for your application.
Non-real-time threads (JLTs) run at lower priority than the RTGC, and may experience GC-related latency. To gain true real-time application behavior, you must use RTTs with the heap. In cases where no RTGC-related latency in application execution can be tolerated, the RTSJ and Java RTS allow you to use other areas of the memory than the garbage-collected heap. Let's take a look at these now.
New Memory Models
Java RTS defines two main types of non-heap memory: immortal memory and scoped memory. These memory areas are outside of the control of the garbage collector, and hence objects and their memory are not reclaimed automatically. You must manage these areas and associated objects yourself. Let's begin with immortal memory, represented by the ImmortalMemory class in Java RTS.
ImmortalMemory
There is only one immortal memory region in Java RTS, and objects created within it live for the lifetime of the virtual machine. You must take care when placing objects in this region; once it's filled, the space is never reclaimed. To use immortal memory, you have two choices:
- Specify it as the memory area in the Schedulable object's constructor (see Listing 4).
- Obtain a reference to the ImmortalMemory singleton object, call its enter method, and provide a Runnable object as the parameter (see Listing 5).
Listing 4: Creating a Schedulable object to execute in immortal memory.
import javax.realtime.*; public class MyApp { static { // Create NHRT (within immortal memory) RealtimeThread rt = new RealtimeThread( null, ImmortalMemory.instance() ) { public void run() { // Execute real-time code in immortal memory here... } }; rt.start(); } public static void main(String[] args) { // ... } }
As these code examples show, the ImmortalMemory singleton object represents the immortal memory region. Also, to enter immortal memory, not only must you provide a Runnable, but you must do so from another Schedulable object, as shown in Listing 5.
Listing 5: Entering Immortal Memory with a Runnable.
import javax.realtime.*; public class Main extends RealtimeThread { class TempGauge implements Runnable { public TempGauge() { System.out.println("TempGauge class created"); } public void run() { MemoryArea memArea = RealtimeThread.getCurrentMemoryArea(); if ( memArea == ImmortalMemory.instance() ) System.out.println("Running in Immortal Memory"); else System.out.println("NOT running in Immortal Memory!"); // ... System.out.println("TempGauge terminating"); } } public void run() { TempGauge temp = new TempGauge(); ImmortalMemory.instance().enter(temp); } public static void main(String[] args) { Main app = new Main(); app.start(); } }
You can use these two common patterns to enter any memory region, including scoped memory. Let's examine this possibility.
ScopedMemory
In the RTSJ and Java RTS, scoped memory regions are those that you define and create in your application. When a Schedulable object enters scoped memory, all objects created are allocated in this region. Since scoped memory isn't garbage collected, Schedulable objects running in this allocation context are not subject to any RTGC-related latency whatsoever. Also, the objects created within a scoped memory region live for the lifetime of the region. Hence, you must ensure that the region is large enough for the time your Schedulable executes within it.
A scoped memory region is reclaimed once a Schedulable exits it, making it completely available for use again. Let me reiterate: The objects within the scoped memory region are not garbage collected, the entire region is simply marked as available, and its objects are "forgotten." As a result, scoped memory regions are useful when you need to execute time-critical logic in response to an event; or a periodic timer, where you clearly understand the allocation needs of the associated code.
The NoHeapRealtimeThread Class
A NoHeapRealtimeThread (NHRT) is just as it name suggests: a Schedulable object that cannot access the Java heap in any way. This means that code executing within an NHRT cannot allocate from the heap, reference objects that reside in the heap, or even exist within the heap itself. Instead, NHRTs must be created within and execute within immortal memory or a scoped memory region.
Since the NoHeapRealtimeThread class extends RealtimeThread, you create and use objects of this class as you would RTTs. Listing 6 shows a simple example. Here, an NHRT is created in a static initializer. By default, all Class objects, static initializers, and interned strings are created in immortal memory. This makes the code in this listing safe to execute, since it will do so in immortal memory by default.
Listing 6: Creating and starting an NHRT in a static initializer.
import javax.realtime.*; public class Main { static { // Create NHRT (within immortal memory) NoHeapRealtimeThread nhrt = new NoHeapRealtimeThread( null, ImmortalMemory.instance() ) { public void run() { // Execute NHRT code here... System.out.println("NHRT running"); } }; nhrt.start(); } public static void main(String[] args) { } }
The code within the static block in Listing 6 will be executed when the Java RTS virtual machine starts. As a result, any code within the run method will execute immediately within immortal memory.
Asynchronous Events
Event processing is a common task in many applications. Similar to the Java Bean events, the Java RTS asynchronous event handler (AEJ) facility is designed for system- and application-specific events that a real-time application may need to handle. Events that are external to Java RTS are often referred to as happenings. Examples include OS-level events (such as POSIX signals), hardware interrupts, and related functionality (that is, file and network I/O), as well as custom events fired by the application itself.
In Java RTS, two main classes make up the AEH facility:
- AsyncEvent. An object of this type represents the event itself. In the current implementation of Java RTS, this type of object doesn't contain application-specific event data; it needs to be delivered through other means.
- AsyncEventHandler. A Schedulable object that's executed by the AEH facility within Java RTS when the related event is fired. This object has associated ReleaseParameters, SchedulingParameters, and MemoryParameters objects, which you populate to indicate how the event handler should be scheduled.
Let's look at a sample application event handler for Java RTS. In this example, we have an RTT that simulates a stock-quote data feed that sends quote updates to interested listeners. The listeners, in this case, are asynchronous event handler objects. The class Stock contains the asynchronous event object (see Listing 7).
Listing 7: The Stock asynchronous event.
class Stock { public Stock (String s, double v) { symbol = s; value = v; event = new AsyncEvent(); } public String symbol; public double value; public AsyncEvent event; }
When an instance of the Stock class is created, an individual AsyncEvent object is created also. Application code that's interested in a particular stock's updates needs to create an AEH and call addHandler on the appropriate stock's AsyncEvent object (see Listing 8).
Listing 8: Adding an AEH to an AsyncEvent object.
public double addListener(String symbol, AsyncEventHandler aeh) { for ( int s = 0; s < stocks.length; s++ ) { if ( stocks[s].symbol.equalsIgnoreCase(symbol)) { stocks[s].event.addHandler(aeh); return stocks[s].value; } } return -1; // symbol not found }
When a stock update occurs (as received by the simulated data feed), the event is fired, which in turn notifies each AEH object of the event (see Listing 9).
Listing 9: Generating an asynchronous event.
class DataFeed implements Runnable { Random random = new Random(System.currentTimeMillis()); public void run() { int updates = 0; while ( updates++ < 10000 ) { // Generate a stock update int stock = ... // Notify handler(s) stocks[stock].event.fire(); RealtimeThread.waitForNextPeriod(); } } }
This code, being a simulation, generates a random update every period (much of the code has been omitted here for brevity). The important point is that every AEH that added itself as a handler for a particular stock object will be notified when the call to fire is made. The AEH code itself is straightforward, as shown in Listing 10.
Listing 10: An asynchronous event-handler class.
class Listener extends AsyncEventHandler { String symbol; public Listener(String symbol) { this.symbol = symbol; } public void handleAsyncEvent() { if ( getPendingFireCount() > 1 ) System.out.println("** Updates Queued ***"); do { double value = getValue(symbol); System.out.println( "Listener(" + symbol + "):\tUpdate for " + symbol + "=" + value + " on thread: " + RealtimeThread.currentThread().getName()); } while ( getAndDecrementPendingFireCount() > 0 ); } }
When the associated event object is fired, the AEH object's handleAsyncEvent method is called by the Java RTS virtual machine. All of this code is executed deterministically at a real-time priority to ensure the timely delivery of events.
If events fire faster than your handler code can process, they will queue up inside the virtual machine. As a result, it's a good idea to implement a do..while loop, as in Listing 10, to ensure that your code doesn't miss a single event. A call to getPendingFireCount that returns a value greater than 1 indicates that events are queued. As each event is processed, a call to getAndDecrementFireCount subtracts from the queued event count and returns the remaining queued events.
A Simple Message Application
As a starting point to more practical examples of how to use Java RTS to solve real-world problems, let's look at a simple HTTP message-processing application. In this example, a very basic HTTP server is implemented with Java RTS, and uses RealtimeThread listener objects to process HTTP requests. As a result, HTTP responses are sent deterministically when each request is received. This example should form the basis for many real-world solutions, as many real-time applications perform some sort of message-processing function.
We begin with the RequestProcessor class, which implements Runnable, and responds to all HTTP requests (see Listing 11).
Listing 11: The HTTP request processor.
public class RequestProcessor implements Runnable { private static Listpool = new LinkedList (); private int id = 0; public RequestProcessor(int id) { this.id = id; } public void run() { boolean run = true; // Wait for a request while ( run ) { Socket conn; synchronized ( pool ) { while ( pool.isEmpty() ) { try { pool.wait(); } catch ( InterruptedException e ) { run = false; } } conn = pool.remove(0); } sendHTTPResponse(conn); } } private void sendHTTPResponse( Socket conn ) { try { // Read and respond to the request OutputStream os = new BufferedOutputStream( conn.getOutputStream() ); Writer out = new OutputStreamWriter( os ); InputStream is = new BufferedInputStream( conn.getInputStream() ); Reader in = new InputStreamReader( is, "ASCII" ); StringBuffer reqLine = new StringBuffer(); Integer c; while ( true ) { c = in.read(); if ( c == '\r' || c == '\n' ) break; reqLine.append((char)c.byteValue()); } Long tid = Thread.currentThread().getId(); Integer pri = Thread.currentThread().getPriority(); // Send a message acknowledgement to the sender String resp = "MESSAGE ACK"; out.write("HTTP 1.1 200 OK\r\n"); out.write("Date: " + new Date() + "\r\n"); out.write("Server: MSGPROC 1.0\r\n"); out.write("Content-length: " + resp.length() + "\r\n"); out.write("Content-type: ASCII\r\n\r\n"); out.write(resp); out.flush(); } catch ( Exception e ) { e.printStackTrace(); } } public static void processRequest(Socket request) { synchronized ( pool ) { // Make sure that the request gets queued pool.add( request ); // Notify one of the blocked threads in the pool. The // first to wake up will process the request pool.notifyAll(); } } }
Each instance of RequestProcessor, which can run in a Thread or RealtimeThread, waits on a shared monitor. This monitor, the static member Pool, is also a list of requests. When signaled, the thread wakes up and attempts to remove a request from the list. If none exists (because another RequestProcessor instance got to it first), the thread loops back and waits on the monitor again. Otherwise, the request is handled, and a response is sent to the requester.
The main class that creates the RequestProcessor objects also creates the associated RealtimeThread instances running at a specific priority, and adds them to a queue (see Listing 12).
Listing 12: A pool of RequestProcessor objects is created.
for ( int i = 0; i < 25; i++ ) { PriorityParameters pri = new PriorityParameters(boosted + 1); RealtimeThread rtt = new RealtimeThread( null, null, null, null, null, new RequestProcessor(i) ); rtt.setSchedulingParameters(pri); listeners.add(rtt); rtt.start(); }
Next, a separate RealtimeThread is created to listen on a port (port 80 for HTTP), queue each request as it arrives, and then notify the listening RequestProcessor objects of the new request (see Listing 13).
Listing 13: Processing new requests and handing them off to a RequestProcessor object.
PriorityParameters HiPri = new PriorityParameters( PriorityScheduler.instance().getMaxPriority() ); RealtimeThread rt = new RealtimeThread(HiPri) { public void run() { // Listen on server socket here, and assign // requests to pooled RTTs while ( run ) { try { Socket request = server.accept(); RequestProcessor.processRequest(request); } catch ( Exception e ) { e.printStackTrace();; } } } }; rt.start();
The call to the static method RequestProcessor.processRequest signals all objects waiting on the shared monitor that a new request is pending. The first available thread to process the signal will get the request and send a response deterministically.