Creating Multithreaded Applications
This article assumes that the reader has some experience with VB, the Windows environment, event-based programming, basic HTML, and scripting. This material is based on the Beta2 release version of Microsoft's .NET.
One of the most eagerly anticipated features available in VB.NET is the ability to natively create and manage threads. While it was possible to create a multithreaded VB 6 application using the Win32 CreateThread API or by fooling the COM library into creating a component in a separate thread, these techniques were both difficult to debug and maintain.
The main reason for these difficulties, of course, was that VB 6.0 wasn't built to handle multithreaded applications, which resulted in access violations and memory corruption. In stark contrast, the Common Language Runtime (CLR) was designed for a multithreaded environment, and indeed the Services Framework incorporates them implicitly in the basic architecture of delegates, as we've seen in both Chapter 4 and this chapter. However, the Services Framework also supports explicit use of threading APIs using the System.Threading namespace.
For those readers unfamiliar with threads, simply put, they allow your application to spawn multiple units of execution that are independently scheduled and prioritized by preemptive multitasking operating systems such as Windows 2000. Based on the thread priority and the particular scheduling algorithm, the OS schedules each thread to run for certain amount of time called a time slice. When the time slice elapses, the thread is suspended and placed back into a queue while another thread is then allocated a time slice. When the thread is suspended, its state (or context) is saved so that it can continue its work where it left off. The CLR supports threading by starting each AppDomain with a main thread and allowing it to create multiple worker threads, each with their own exception handlers and context data.
The obvious advantage to using more than one thread in an application is that your app can appear to be performing several activities simultaneously as the various threads are swapped in and out of the CPU. In fact, on machines with multiple processors, the threads from a single AppDomain may be scheduled across any available processor, allowing work to actually be done concurrently. In distributed applications, this can increase scalability as more clients can share the CPU resources available on a server while desktop applications such as spreadsheets and word processors frequently take advantage of threads to perform background operations such as recalculation and printing. However, how does this concept apply to distributed applications you'll write with VB.NET?
For starters, keep in mind that you're already using a multithreaded architecture that you get for free if you're building distributed applications like those described in this book. This is the case since application services such as IIS, Component Services, and SQL Server all are multithreaded. For example, as clients request web pages, their requests are carried out by worker threads controlled by IIS. One of these threads may execute an ASP.NET page that calls a component hosted in Component Services. The component could be configured to run as a Server Application and so it is executed on a thread allocated from a thread pool for the application. The component, in turn, may use a database connection pulled from a pool of worker threads allocated by the SQL Server engine. As a result, as multiple users request pages that instantiate components and access the database, their activities are not serialized and therefore constrained by single-threaded execution.
Since most of the code you write in distributed applications executes within the middle-tier, there are a limited number of scenarios for which you'll likely need to create threads explicitly. Some of these scenarios include lengthy operations such as file I/O and database maintenance tasks, servicing multiple clients in a Windows service application, and listening for messages from a Microsoft Message Queue. As a result, this section will cover the basics to prepare you for using threads, although for further information and additional samples you should consult the documentation.
A word of warning: Because threads are expensive for the operating system to track and schedule you should not go wild creating a new thread for everything your application does. Since memory must be allocated for each thread, too many threads can and will decrease the overall performance of the system. In addition, threads introduce problems that VB developers have not had to deal with before such as synchronizing access to shared resources. As a result, you should only add support for multiple threads after careful consideration.
In the rest of this section we'll discuss using threads and thread pools.
Using Threads
The primary class used to create and manipulate threads is, not surprisingly, the Thread class. It contains methods that allow you to Start, Stop, Resume, Abort, Suspend, and Join (wait for) a thread in addition to querying and setting the thread's state using methods such as Sleep, IsAlive, IsBackground, Priority, ApartmentState, and ThreadState.
NOTE
Keep in mind that most members of Thread are virtual members and so can only be accessed from a particular instance of the Thread class. To manipulate a particular thread, you either need to create a new instance of the Thread class or get a reference to the current Thread using the CurrentThread property. The primary exception to this is the Sleep method, which suspends the current thread for specific number of milliseconds.
In order to start a new thread you must specify an entry point for the thread to begin its execution. The requirement is that the method, either a method on an object or within a module, include no arguments and for obvious reasons should be defined as a Sub procedure. It is also possible to execute a method within the same object on a separate thread.
For example, consider the code snippet below. In this example, the GetPhotos method of the Instructors class is to be executed on a separate thread. This method (not shown) queries the database for all of the instructor images and saves each one to a file on the file system. Unlike the asynchronous I/O example shown in Listing 11.5, both the database access and the file access is performed on a separate thread.
Dim tPhoto As Thread Dim tsStart As ThreadStart Dim objIns As New Instructors tsStart = New ThreadStart(AddressOf objIns.GetPhotos) tPhoto = New Thread(tsStart) tPhoto.Priority = ThreadPriority.BelowNormal tPhoto.Name = "SavingPhotos" tPhoto.Start() ' Wait for the started thread to become alive While (tPhoto.ThreadState = ThreadState.Unstarted) Thread.Sleep(100) End While ... If tPhoto.IsAlive Then MsgBox("Still processing images...") MsgBox("Waiting to finish processing images...") tPhoto.Join End If MsgBox("Done processing images.")
You'll notice in this snippet that starting a thread actually involves instantiating a ThreadStart delegate with the address of the entry point using the AddressOf operator. The delegate is then passed to the constructor of the Thread class. Before the thread is actually started, the Priority is set to BelowNormal so that the main thread will continue to service requests more promptly. There are four other priorities that can be set using the ThreadPriority enumeration (AboveNormal, Highest, Lowest, and Normal) even though the Win32 API supports over 30 priority levels.
NOTE
As mentioned the priorities exposed by the ThreadPriority enumeration map to a small subset of the 32 levels of priority available in the Win32 API. In fact, they map from lowest priority (Lowest) at a 6 to highest (Highest) at 10.
The code then sets the Name property of the thread. While at first glance this seems strange since a thread or its name should never appear in the user interface, the name does appear in the debugger and may also be useful for logging purposes. The Start method is then used to actually begin execution.
TIP
Sometimes it comes in handy to get a numeric identifier for the thread for logging and reporting purposes. One technique for doing so is to call the GetHashCode method on the CurrentThread property or an instance of the Thread class. This will return a number you can log to your application or event log.
After starting the thread, the snippet goes into a loop to wait until the thread has started by repeatedly checking the ThreadState property for a value other than Unstarted, which is the initial state of the thread. There are nine other states ranging from Running to Stopped exposed by the ThreadState enumeration. Note that calling the shared Sleep method of Thread class tells the thread running the statement to sleep for a specified number of milliseconds, in this case the main thread and not the one represented by tPhoto. Finally, after performing some other work, the main thread checks to see if tPhoto is still executing by checking the IsAlive property. If so, messages to that effect are relayed to the user before calling the Join method. This method synchronizes the two threads by blocking (suspending execution of the current thread) until the thread on which the method was called has stopped.
TIP
Unrelated to the Priority property previously mentioned, the CLR makes a distinction between threads that run in the foreground and those that run in the background. If a thread is marked as a background thread, the CLR does not wait for it to finish in the event that the AppDomain is shut down. As discussed in the previous section, the threads created by the runtime when using asynchronous file I/O are background threads and so you'd want to make sure the main thread of your code did not finish before the I/O was complete. Conversely, by default threads created as shown above are marked as foreground and consequently their IsBackground property is set to False.
Although not shown in the snippet, during the execution of the thread it can be suspended using the Suspend method and then subsequently resumed with Resume. In addition the thread can be deallocated using the Abort method, which raises an exception inside the thread.
Synchronizing Access to Resources
Typically you'll want to run processes on separate threads that do not require access to shared resources. The recommended way to do this is as follows:
Encapsulate the process that is to be run in a class that exposes an entry point used to start the process, i.e. Public Sub Start() and instance variables to handle the state.
Create a separate instance of the class.
Set any instance variables required by the process.
Invoke the entry point on a separate thread.
Do not reference the instance variables of the class.
By following this approach, all of the instance variables will be "private" to the thread and can be used without fear of synchronization problems.
However, that may not always be possible in the case of database connections or file handles, for example. In order to ensure that threads wait for each other when accessing these resources, you can use the Monitor class and its associated Enter, Exit, TryEnter, Wait, Pulse, and PulseAll methods.
For example, assume the Instructors class used in the previous snippet includes a class level SqlConnection object shared by all methods and used to connect to the database. This is an example of a resources shared by all methods in the class.
NOTE
Although using connection pooling, as discussed in Chapter 7, would provide a more scalable solution, this example serves our current purpose and allows all database access to go through a single database connection. This approach might be warranted for applications that require a constant database connection but probably not for distributed applications.
In this case assume that after the call to GetPhotos the client continues on and subsequently calls a method that attempts to use the connection object. Since the connection may be in use by GetPhotos, the method may throw an exception if the SqlConnection is busy processing other results.
To avoid this situation, the GetPhotos method can use shared methods of the Monitor to class to create critical section inside of its code. Simply put, a critical section is a block of code enclosed by calls to the Enter and Exit methods of the Monitor class through which access must be synchronized based on the object passed to the Enter method. In other words, if the GetPhotos method wishes to exclusively use the SqlConnection for a block of statements it must create a critical section by passing the SqlConnection to the Enter method of Monitor at the beginning of the section and invoke Exit when it is finished. The object passed can be any object derived from System.Object.
If the object is currently being used by another thread the Enter method will block until the object is released. Alternatively, you can call the TryEnter method, which will not block, and simply returns a Boolean value indicating whether the object is in use. Once safely in the critical section, the GetPhotos method can use the SqlConnection to execute a stored procedure and write out the results. After closing the resulting SqlDataReader the Pulse method of the Monitor class is called to inform the next thread in the wait queue that the object is free. This moves the thread to the ready queue so that is ready for processing. The PulseAll method informs all waiting threads that the object is about to be freed. Finally, a call to Exit is made which releases the monitor and ends the critical section. The skeleton code for GetPhotos with the monitoring code can be seen in Listing 11.8.
Listing 11.8 Synchronizing Resources. This example shows how the GetPhotos method would use the Monitor class to ensure that two threads do not attempt to use the SqlConnection object simultaneously.
Public Sub GetPhotos() Dim cmSQL As SqlCommand Dim sdrIns As SqlDataReader Try ' Execute proc cmSQL = New SqlCommand("usp_GetPhotos", mcnSQL) cmSQL.CommandType = CommandType.StoredProcedure ' Enter critical section Monitor.Enter(mcnSQL) ' Alternate code ' Do While Not Monitor.TryEnter(mcnSQL) ' Thread.CurrentThread.Sleep(100) ' Loop sdrIns = cmSQL.ExecuteReader() Catch e As Exception End Try Do While sdrIns.Read ' Read the data and write it to a binary stream Loop sdrIns.Close Monitor.Pulse(mcnSQL) Monitor.Exit(mcnSQL) ' Exited critical section Return End Sub
Obviously, critical sections should only be created when absolutely necessary since they slow overall throughput since they have the effect of blocking threads.
A simpler technique that can be used to synchronize instance variables that are shared between threads is to use the Interlocked class. This class contains shared Increment and Decrement methods that combine the operations of changing the variable and checking the result into a single atomic operation. This is needed since a thread could change the value of a variable and then have its time slice expire before being able to check the result. In the intervening time before the thread was again run the variable could have been changed by another thread.
For example the following code increments the mPhotosProcessed instance level variable of the Instructors class:
Interlocked.Increment(mPhotosProcessed)
The Interlocked class also supports Exchange and CompareExchange to set a variable to a specified value and to do so only if the variable is equal to a specified value, respectively.
Using Thread Local Storage
Although in the ideal case your threads will use instance variables that are in effect private to the thread, there may be times when your thread runs a method in an object that may be shared by other threads. If so your thread may need to store and retrieve its own truly private data. This may be the case, for example, when a thread in a thread pool monitors an MSMQ queue and needs to store data pulled from the queue to be used for later processing.
As it turns out each thread in a Windows operating system contains its own thread local storage (TLS) used to track state information. Luckily the Thread class makes accessing TLS simple by exposing a set of methods to create and manipulate memory areas in the TLS called data slots.
As to the particulars, the Thread class exposes the shared AllocateNamedDataSlot method that creates a new data slot on all threads in the AppDomain with a specific name. This slot can subsequently be populated and read using the SetData and GetData methods. For example, assume there is a class called WorkerClass that performs some processing activity and we want to create a certain number of threads to perform the work. The following code snippet creates a data slot called "ID" for all threads and then spins up the appropriate number of threads on the StartWork method of the objWorker instance:
Dim dssSlot As LocalDataStoreSlot Dim tNew As Thread Dim objWorker As WorkerClass dssSlot = Thread.AllocateNamedDataSlot("ID") For i = 0 to intMaxThreads tNew = New Thread(New ThreadStart(AddressOf objWorker.StartWork) tNew.Start Next
Notice that since all of the new threads will share the instance variables associated with objWorker, the StartWorker method and any methods called by Start would need to use synchronization to prevent concurrent access to these variables. However, if the threads each require their own data to be shared between methods they can place a copy in the "ID" slot in TLS like so.
Public Sub Start() Dim dssIDSlot As LocalDataStoreSlot Dim myID As Integer ' Do other work dssIDSlot = Thread.GetNamedDataSlot("ID") Thread.SetData(dssIDSlot, myID) Call NextProcess() End Sub Private Sub NextProcess() Dim myID As Integer Dim dssIDSlot As LocalDataStoreSlot dssIDSlot = Thread.GetNamedDataSlot("ID") myID = Thread.GetData(dssIDSlot) ' Do other work End Sub
When the NextProcess method is called, the data can once again be read from the slot, using GetData as shown.
Once again it should be pointed out that the design pattern discussed in the previous section is the one that should be employed where possible. Only when your designs are more complicated and require concurrent access to the same object from multiple threads would you consider using TLS.
Using Thread Pools
While you can create and manage your own threads using the Thread class, the System.Threading namespace also provides a simple way to use threads from a pool allocated by the CLR. This is possible since the CLR automatically creates and manages one thread pool per process that it uses to handle asynchronous operations such as file I/O and events, as we've seen earlier. Within the pool, one thread is assigned Highest priority and is used to monitor the status of the other threads on the queue. Using the ThreadPool class, your code can tap into this pool to make more efficient use of this architecture already employed by the runtime. In essence, the ThreadPool class allows you to post work items, i.e. methods to execute, to the pool that are subsequently serviced by worker threads.
As mentioned earlier, using threads should be reserved only for applications that require it and only after careful analysis. For example, a good use of the thread pool might be in a Windows service application that listens for new messages entering one or more message queues. Although as we'll see in Chapter 13, the System.Messaging namespace supports asynchronous operations, creating a thread pool allows you to control specifics like how many threads are processing messages and how long the threads live.
To give you an example of using the ThreadPool class the simplified classes in Listing 11.9 will be used to monitor an MSMQ queue.
Listing 11.9 The QueueListener class. This class uses the ThreadPool class to monitor an MSMQ queue.
Option Strict Off Imports System Imports System.Threading Imports System.Messaging Imports Microsoft.VisualBasic Public Class QueueListener ' Used to listen for MSMQ messages Protected Class EventState ' Used to store the event and any other state data required by the listener Public ResetEvent As ManualResetEvent Public ThreadName As String Public Overloads Sub New(ByVal myEvent As ManualResetEvent) MyBase.New() ResetEvent = myEvent End Sub Public Overloads Sub New(ByVal myEvent As ManualResetEvent, ByVal Name As String) MyBase.New() ResetEvent = myEvent ThreadName = Name End Sub End Class Private mstrMachine As String Private mstrQueue As String Private mWorkItems As Integer = 7 Private mFinished As Boolean = False Dim mEvs() As ManualResetEvent Public Property WorkItems() As Integer Get Return mWorkItems End Get Set(ByVal Value As Integer) If Value > 15 Then mWorkItems = 15 Else mWorkItems = Value End If End Set End Property Public Sub New(ByVal Machine As String, ByVal Queue As String) ' Constructor accepts the necessary queue information mstrMachine = Machine mstrQueue = Queue End Sub Public Sub Listen(ByVal state As Object) ' Method that each thread uses to listen for messages ' Create a MessageQueue object Dim objMQ As System.Messaging.MessageQueue = New System.Messaging.MessageQueue() ' Create a Message object Dim objMsg As System.Messaging.Message ' = New System.Messaging.Message() ' Event from the state Dim evs As ManualResetEvent ' Cast the state into the event evs = state.ResetEvent ' Set the priority and name Thread.CurrentThread.Priority = ThreadPriority.BelowNormal Try If Not state.ThreadName Is Nothing Then Thread.CurrentThread.Name = state.ThreadName End If Catch e As Exception ' Thread name can only be set once ' Don't set it and get out End Try 'Console.WriteLine("Listen {0} ", state.ThreadName) Try ' Set the path property on the MessageQueue object, assume private in this case objMQ.Path = mstrMachine & "\private$\" & mstrQueue ' Repeat until Interrupt received While True Try ' Sleep in order to catch the interrupt if it has been thrown Thread.CurrentThread.Sleep(100) ' Set the Message object equal to the result from the receive function ' Will block for 1 second if a message is not received objMsg = objMQ.Receive(New TimeSpan(0, 0, 0, 1)) ' Message found so signal the event to say we're working evs.Reset() ' Processing the message ProcessMsg(objMsg) ' Done processing Catch e As ThreadInterruptedException ' Catch the ThreadInterrupt from the main thread and exit Exit While Catch excp As MessageQueueException ' Catch any exceptions thrown in receive ' Probable timeout Finally ' Console.WriteLine("Setting Event " & Thread.CurrentThread.GetHashCode()) ' Done with this iteration of the loop so set the event evs.Set() End Try ' If finished then exit thread If mFinished Then 'console.WriteLine("exiting " & thread.CurrentThread.GetHashCode) Exit While End If End While Catch e As ThreadInterruptedException ' Catch the ThreadInterrupt from the main thread and exit End Try End Sub Private Sub ProcessMsg(ByVal pMsg As Message) ' Here is where we would process the message End Sub Public Sub Monitor() Dim intItem As Integer Dim objState As EventState ReDim mEvs(mWorkItems) mFinished = False 'Console.WriteLine("Queuing {0} items to Thread Pool", mWorkItems) For intItem = 0 To mWorkItems - 1 'Console.WriteLine("Queue to Thread Pool {0}", intItem) mEvs(intItem) = New ManualResetEvent(False) objState = New EventState(mEvs(intItem), "Worker " & intItem) ThreadPool.QueueUserWorkItem(New WaitCallback(AddressOf Me.Listen), _ objState) Next End Sub Public Sub Finish(Optional ByVal pTimeout As Integer = 0) 'Console.WriteLine("Waiting for Thread Pool to drain") ' Make sure everyone gets through the last iteration mFinished = True ' Block until all have been set If pTimeout = 0 Then WaitHandle.WaitAll(mEvs) ' Waiting until all threads signal that they are done. Else WaitHandle.WaitAll(mEvs, pTimeout, True) End If 'Console.WriteLine("Thread Pool has been drained (Event fired)") End Sub End Class
Notice that the listing contains two classes: EventState, which is a protected child class, and QueueListener. As we'll see, EventState contains a field called ResetEvent of the type ManualResetEvent that will be used to ensure that any worker thread can finish its work without interruption by signaling what state it is in using the ResetEvent field. The class also contains a ThreadName field used to set the name of thread associated with the class for debugging purposes.
TIP
Figure 11.2 shows VS.NET in debug mode when running this multithreaded listener application. Note that the drop-down window displays each thread with its associated name. By selecting the thread the code window shifts to the point at which that thread is currently executing. Note that a thread can only have its name set once and so in a situation where work items may end up using the same thread, the code must trap for an exception when setting the Name property.
Figure 11.2 Debugging multithreaded applications. This screen shot shows the debugger with the threads drop down exposed. Clicking on a thread shows the point at which the thread is paused
The QueueListener class is the class that actually polls the MSMQ queue on multiple threads and contains a constructor that accepts the machine name and queue name to monitor. The public Listen method receives messages from the queue while the public Monitor method initiates the process and creates the thread pool. The private ProcessMsg method is a stub that is used to process a message when it is received. Finally, the public Finish method can be called with an optional timeout argument to allow the threads used by the QueueListener class to complete their work within a specified time period.
To begin, notice that the Listen method accepts a state object as parameter. This object will contain an instance of EventState that will be used by Listen to signal when the method is in the midst of processing a message and when it is done. Doing so will ensure that the Finish method blocks until all the threads finish their current processing. After setting the ThreadPriority and Name and retrieving the EventState, you'll notice that the method simply contains a While loop inside a Try block. This loop repeatedly calls the Receive method of the MessageQueue class that returns the first available message within a specified timeout period. In this case, a TimeSpan object (discussed in chapter 12) is used to instruct the Receive method to block for one second before returning if there is no message. In the event that no message is received, a MessageQueueException will be thrown. Note that if a message is received, the method continues and calls the Reset method of the ResetEvent field in the EventState object. In either case, the Finally block calls the Set method of the ResetEvent field to signal that the thread is finished processing for this iteration of the loop.
As mentioned previously, the ResetEvent field of EventState contains an instance of ManualResetEvent, an event object whose signaled and non-signaled states can be manually changed using the Reset and Set methods. When the Reset method is called the state is changed to non-signaled to indicate that the thread is busy. When the state is set to signaled with the Set event, the thread is finished doing its useful work and can be safely destroyed.
The interesting work is actually performed by the Monitor method. In this method, a class level array of ManualResetEvent objects is created of the same size as the number of work items that will be serviced by the pool.
NOTE
Remember that work items are not the same thing as threads in this context. Work items are serviced by threads but your application could conceivably create more work items than there are threads available. Currently the runtime supports a thread pool size of 30, so posting more than 30 work items to the pool will automatically cause some work items to wait until the processing of other work items is completed. In this example, more than 30 work items would never run since each work item invokes Listen, which essentially keeps control of the thread until the Finish method is called. That, and to make sure that the runtime has available threads for other purposes such as firing events, is the reason the WorkItems property is capped at 15.
Note that the number of work items can be set by the WorkItems property of the QueueListener class, although it is defaulted to seven. A For loop is then used to create each of the ManualResetEvent objects and associate them with a new instance of EventState. The resulting object, objState, is then passed as the second argument to the shared QueueUserWorkItem method of the ThreadPool class. As implied by the name, this method queues a work item to the thread pool managed by the runtime to be serviced by the next available worker thread. The first argument specifies the method to call back into when the work item begins execution, in this case Listen. By passing the EventState as the second argument, the Listen method can retrieve the object and use any state information stored inside as discussed previously. In this case, the state consists of the name of the thread to use in debugging and a ManualResetEvent object used to synchronize the thread. Once the loop has completed, the specified number of work items will have been queued for execution by the thread pool. At this point the threads will continually check the specified queue for new messages.
When the client finally calls the Finish method to complete execution the method first sets the private mFinished variable to True. This variable is checked on each iteration of the loop in the Listen method and if set to True exits the loop, freeing the thread to return to the pool. The Finish method then uses the shared WaitAll method of the WaitHandle class to block until all of the ManualReset event objects in the mEvs array have been set to a signaled state (True). The optional second argument used if the timeout value is passed to the method, specifies the time period to wait for this to occur before unblocking the current thread. Using this approach ensures that the Finish method will block until each worker thread has completed its current iteration of the loop in the Listen method. Note that the threads are actually returned to the pool and not destroyed. As a result another call to Monitor will reuse the existing threads and will not incur the overhead of recreating them.
For the client using the QueueListener class, the implementation is straightforward as shown in the following snippet:
Dim objQ As New QueueListener("ssosa", "tester") objQ.WorkItems = 10 objQ.Monitor() ' Do other work here objQ.Finish()
After instantiating a new object and passing it the machine name and queue to listen for, the number of work item is set and the Monitor method called. At some point later, the client can call the Finish method with the optional timeout to clean up the worker threads.
While this example was created to illustrate how to use the ThreadPool class, it is obviously not the only way to create a pool of threads to perform actions like monitoring a message queue. For example, the QueueListener class could be easily modified to create and track an array of Thread objects from within the class to implement the thread pool. The Finish method could then have executed a loop after setting the mFinished flag looking at the IsAlive property to determine when the thread pool was drained rather than using a ManualResetEvent object. In addition, the technique shown in the previous section for using TLS could have been used to pass state information to the threads. In many respects, this architecture would allow you more control over the threads and in fact, for scenarios where the runtime managed thread pool is already heavily used or where more worker threads are needed, this alternate approach would be preferred.