Integrating Messaging in VB.NET
This article assumes the reader has some experience with VB, the Windows environment, event-based programming, basic HTML, and scripting. This material is based on the Beta2 release version of Microsoft's .NET technology.
One of the particularly effective ways to increase the scalability and reliability of a distributed application is to move from a model where application requests are processed synchronously to one where some or all of the requests are processed asynchronously. As discussed in Chapter 8, Microsoft includes the Microsoft Message Queue (MSMQ) product as a service in their server operating systems to provide the queueing infrastructure for applications to create and manipulate queues in addition to sending to and receiving messages from those queues.
TIP
MSMQ is not installed by default in Windows 2000 Server. You can do so by using the "Configure Your Server" utility found in the Administrative Tools group and looking under the Advanced option.
Not surprisingly, the Services Framework provides a namespace, System.Messaging, that encapsulates the functionality of MSMQ. This section will examine the System.Messaging namespace, first in how queues are programmatically referenced and administered and secondly in how messages are serialized, sent, and received by application programs.
Administering Queues
Although the System.Messaging namespace contains over 20 classes, the central class is MessageQueue. This class contains both shared and instance methods to allow you to query for queues contained on a particular machine or across the network in addition to manipulating individual queues.
At the most basic level, the set of shared members include Create, Delete, Exists, and several methods prefixed by Get that allow you to query for queues. For example, the following code uses the Exists method to determine if a queue identified by the mstrPath variable exists and if not creates it. In either case the queue is then referenced by creating an instance of MessageQueue and passing in the identifier of the queue:
If Not MessageQueue.Exists(mstrPath) Then MessageQueue.Create(mstrPath, False) End If Dim oQueue As MessageQueue oQueue = New MessageQueue(mstrPath, False)
Note that in the Create method, the second argument indicates whether the queue should be transactional, in other words use the Microsoft Distributed Transaction Coordinator (MSDTC) service to ensure message delivery. And, in the constructor of the MessageQueue object, the second argument specifies whether the first application to access the queue receives exclusive access to it. Note that creating a new MessageQueue object using the New keyword does not create a new queue, it simply references one that already exists.
You'll notice that the path passed into these methods is simply a string that identifies the queue. In fact, the string can take one of three forms:
The path to the queue as returned by the Path property of the MessageQueue object. This is the typical approach in the form MachineName\QueueName. For private queues, it is MachineName\private$\QueueName. System queues such as Deadletter$ and Journal$ can also be accessed this way.
The format name returned by the FormatName property prefixed with "FormatName:". This is typically used for offline access to queues.
The label of the queue as returned by the Label property prefixed with "Label:". The Label property can be set for a queue to provide a description. Using the Label is not recommended since labels are not required to be unique and can thus cause exceptions to be thrown when sending messages or referencing a queue.
In addition to creating and deleting queues, the MessageQueue class also provides query and enumeration methods to list the queues on a machine and the message within the queue. To illustrate these capabilities, review the SetupQueues method shown in Listing 13.10.
Listing 13.10. Manipulating Queues. This method manipulates the private queues on the given machine.
Imports System.Messaging Public Sub SetupQueues(Byval pMachine As String) Dim oQueue As MessageQueue Dim arQueues() As MessageQueue ' Enable the connection cache MessageQueue.EnableConnectionCache = True Try ' List the private queues on the machine arQueues = MessageQueue.GetPrivateQueuesByMachine(pMachine) For Each oQueue In arQueues If Right(oQueue.Path, 1) <> "$" Then With oQueue .MaximumQueueSize = 2048 .MaximumJournalSize = 4192 .UseJournalQueue = True .EncryptionRequired = EncryptionRequired.None End With ' Purge the journals Dim oJournal As MessageQueue If MessageQueue.Exists(oQueue.Path & "\Journal$") Then oJournal = New MessageQueue(oQueue.Path & "\Journal$") oJournal.Purge() End If ' Delete acknowledgement messages Dim enMessages As MessageEnumerator = oQueue.GetEnumerator() While (enMessages.MoveNext()) If enMessages.Current.MessageType = MessageType.Acknowledgment Then enMessages.RemoveCurrent() End If End While Next Catch e As MessageQueueException ' Log a message End Try End Sub
First, you'll notice that the SetupQueues method uses the EnableConnectionCache shared property to enable the MessageQueue class to reuse read and write handles to queues, thereby increasing performance. You can clear the cache using the ClearConnection cache shared method.
Next, the GetPrivateQueuesByMachine method is invoked with the name of the computer on which to retrieve the queues. The result is an array of MessageQueue objects stored in the arQueues object. The array can then be iterated over. Note that the Path property is parsed to determine whether the queue is a system queue (denoted with a $ at the end). If not, various properties that put storage limits on the queue ensure that copies of all messages are saved to a journal queue, and that encryption is not required are set. These changes take effect immediately and will throw exceptions if the code does not have permissions.
NOTE
Each queue has a set of permissions, for example, "Set Properties," and "Get Properties" permissions that can be set in the MSMQ UI by right-clicking on the queue and selecting Properties. MSMQ can be administered in the Computer Management MMC console under Services and Applications. Permissions can also be programmatically manipulated using the SetPermissions and ResetPermissions methods of the MessageQueue class.
Using the Path of the current queue, the journal queue, if it exists, is then referenced using its path name by the oJournal object. The entire collection of messages in the queue is then deleted using the Purge method.
Finally, the messages in a particular queue are traversed using a MessageEnumerator object. Simply put, the MessageEnumerator exposes a forward-only cursor of messages populated by one of several Get methods in the MessageQueue class. In this case, the GetEnumerator instance method simply returns an enumerator that traverses all the messages in the queue. The MessageEnumerator itself returns the current message (Message object) in the Current property and exposes methods such as MoveNext and RemoveCurrent to manipulate the list. In this case, the MessageType of each message is checked; if it is an acknowledgement message, it is deleted.
NOTE
Acknowledgement messages are special types of messages where the body of the message is empty. Acknowledgements can be automatically sent when the message reaches its destination queue or is successfully retrieved from the queue. MSMQ can send both positive and negative acknowledgement, for example in order to send a message if the message is not retrieved in a set amount of time. To enable acknowledgement, you must tell the queue you wish to send acknowledgment to in the AdministrationQueue property of the Message object and then set the AcknowledgeType property to one of the AcknowledgeTypes enumeration constants. You then check the AdministrationQueue as you would any other queue. We will discuss this later in the chapter.
The interesting aspect of the enumerator is that it is dynamic. In other words, if new messages with a lower priority than the Current message are added to the queue they will be included in the cursor. To retrieve a static list of messages, you can use the GetAllMessages method.
Not only does the MessageQueue class support querying for private queues, it can also query for publicly available queues. The GetPublicQueues, GetPublicQueuesByCategory, GetPublicQueuesByLabel, and the GetPublicQueuesByMachine all return an array of MessageQueue objects. However, the first method in this list is also overloaded to accept a MessageQueueCriteria object in which you can specify multiple criteria like those exposed by the other methods (Category, Label, and MachineName) in addition to when the queue was created (CreatedAfter, CreatedBefore) and last modified (ModifiedAfter, ModifiedBefore).
Alternatively, rather than return a static array of queues, the GetMessageQueueEnumerator can be used to create a dynamic cursor that can query the public queues based on criteria specified in a MessageQueueCriteria object. For example, the following code queries the network for queues that match the given category and adds the Path and CreateTime properties to a list to display to the user:
Dim enQueues As MessageQueueEnumerator Dim crMessage As MessageQueueCriteria crMessage.Category = New Guid("00000000-0000-0000-0000-000000000002") enQueued = MessageQueue.GetMessageQueueEnumerator(crMessage) While enQueues.MoveNext AddToList(enQueues.Current.Path(), enQueue.Current.CreateTime) End While
Note that the Category is simply a Guid that needn't be unique. As the name implies, you can use this property to categorize your queues.
Installing Queues
As shown in the previous discussion, the MessageQueue class can be used to create and delete queues programmatically. However, as with event logs, performance counters, and other system resources, the recommended technique is to install the resource if needed along with the application that uses it. Not surprisingly the System.Messaging namespace contains the MessageQueueInstaller class to do just that.
The MessageQueueInstaller class works like the installer classes discussed in Chapter 12. In order to use it, you first need to derive a class from Installer that will be run by the Installutil.exe utility when its RunInstaller attribute is set to True. Next you can declare a variable to hold an instance of the MessageQueueInstaller class and use it in the constructor of the derived Installer class to specify how the queue will be installed. For example, the following code would be contained in the New method of the derived Installer class:
Imports System.Configuration.Install Imports System.Messaging mMSMQInstaller = New MessageQueueInstaller() With mMSMQInstaller .Label = "QuilgoyDocQueue" .UninstallAction = UninstallAction.Remove .UseJournalQueue = True .Transactional = True .Path = ".\Private$\QuilogyDocs" End With Installers().Add(mMSMQInstaller)
In this example, the variable mMSMQInstaller is instantiated and its properties are set to those required for this queue. Note that only the Path property is required and here uses the "." to indicate that the queue will be installed on the local machine. In addition, the UninstallAction property is used to make sure the queue is removed from the system when this application is uninstalled. As with other installers, you must add it to the Installers collection using the Add method as shown here.
TIP
Rather than setting all of the properties manually as is done in this case, you also have the option of passing a MessageQueue object to the overloaded constructor of MessageQueueInstaller. Doing so copies the queue properties to the newly created queue.
Sending and Receiving Messages
Once a queue has been referenced, applications use the MessageQueue object to place (send) messages on the queue and read (receive) messages from the queue.
Sending (Placing) Messages
To send (place) a message, all you need do is call the overloaded Send instance method, passing it the object to place on the queue. The object is then automatically serialized to either XML or a binary representation and placed in the Body of the Message object. How the serialization takes place is determined by the Formatter property of the either the MessageQueue object or the Formatter property of the Message being sent to the queue.
The System.Messaging namespace supports three types of formatters: XmlMessageFormatter (the default), ActiveXMessageFormatter, and BinaryMessageFormatter. By default, an instance of XmlMessageFormatter is created with the MessageQueue object and is assigned to the Formatter property. It is then used to serialize the message to a Stream and place it in the Body property of the Message object when the message is sent and again to deserialize it when the message is read from the queue. The ActiveXMessageFormatter can be used to serialize COM objects and allows interoperability with VB 6.0, while the BinaryMessageFormatter can be used to serialize a more compact (and complete) binary representation of the type.
By using this approach, the Send method can simply accept the System.Object data type and, as long as the object can be serialized to a binary format (by marking the class with the Serializable attribute or to XML), it can be saved in the message body. For example, an instance of the QStudents class shown in Listing 13.10 can be sent to queue like so:
Dim o As New QStudents() o.AddStudent("Sammy", "Sossa", 1233, "Cubs") o.AddStudent("Kerry", "Wood", 232, "Cubs") Dim oQueue As New MessageQueue(".\private$\Registrations") oQueue.Formatter = New BinaryMessageFormatter() oQueue.Send(o)
In this case, the default XmlMessageFormatter is replaced with a BinaryMessageFormatter. In a similar fashion, by omitting the explicit population of the Formatter property, the QStudents object will be serialized to XML. The message body as viewed from the MSMQ snap-in can be seen in Figure 13.1.
Figure 13.1 A Serialized Message. This dialog shows the serialized MSMQ message in XML format.
NOTE
Note that the QStudents, Student, and Name classes each must have the Serializable attribute set, which is not shown in Listing 13.10.
Although using the automatic serialization provided by the formatters is the easiest way to place objects into a message, you can also directly populate the Body or BodyStream properties of the Message object itself. This is useful when you are placing data from files or other sources into the queue. To illustrate how this works, review the ProcessDocs method shown in Listing 13.11.
Listing 13.11. Writing to a Message. This method opens files and writes their contents directly to the Body of a Message object using the BodyStream property.
Imports System.IO Imports System.Messaging Public Sub ProcessDocs(ByVal pPath As String, ByVal pQueue As String) ' Loop through all docs in a directory Dim oFile As FileInfo Dim strFile As String Dim oQueue As MessageQueue Dim fs As FileStream Try ' Ensure queue exists If Not MessageQueue.Exists(pQueue) Then Throw New ArgumentException("Queue " & pQueue & " does not exist.") Return End If ' Reference the queue oQueue = New MessageQueue(pQueue) 'Go get each file For Each strFile In Directory.GetFileSystemEntries(pPath, "*.xml") ' Open the file oFile = New FileInfo(strFile) fs = oFile.OpenRead ' Send the contents to the queue Dim oMessage As New Message() With oMessage .BodyStream = fs .Label = oFile.FullName .UseDeadLetterQueue = True .TimeToBeReceived = New TimeSpan(1, 0, 0, 0) .Priority = MessagePriority.Normal End With oQueue.Send(oMessage) fs.Close() Next Catch e As MessageQueueException ' Log the fact that an error occurred Catch e As Exception ' Log the fact that an error occurred End Try End Sub
You'll notice in Listing 13.11 that the method accepts a file path and the path to a queue. After determining that the queue is available and referencing it as oQueue, the directory is traversed for files with the .xml extension. As each file is encountered, it is opened for reading using the OpenRead method of the FileInfo object. This method returns a FileStream that can then be placed directly into the BodyStream property of the Message object. In addition, this method sets some of the properties of the Message, including the Priority and Label, the latter of which can be used as a description and a property to query on.
NOTE
The Message object also supports the AppSpecific and Extension properties that can be used to store application-specific data along with the message. A typical use for these properties is the storage of properties that describe the Body of the message but that are separate from it.
The UseDeadLetterQueue and TimeToBeReceived properties work together to ensure that if the message is not read by another application before the time elapses as specified by the TimeSpan object (in this case 1 day), the message will be sent to the dead letter queue (MachineName\Deadletter$).
When the Send method is invoked, the Stream (in this case the FileStream) is read and the Body property of the Message is populated.
Receiving (Reading) Messages
The MessageQueue class also supports several methods for receiving (reading) messages from a queue. These methods fall into two categories: "peek" methods and "receive" methods.
The peek methods include Peek, PeekByCorrelationId, PeekById, BeginPeek, and EndPeek. In the first three cases, the method returns a Message object from the queue without removing it. In this way, an application can read a message before determining whether it needs to be processed. While Peek returns the first message in the queue, PeekByCorrelationId and PeekById search the queue to find the first message whose CorrelationId and Id properties match the given criteria, respectively.
NOTE
The CorrelationId property is used by acknowledgement, response, and report messages to reference the original message. In other words, it allows you to link an originating message with messages created in response.
All three methods are synchronous, and so they block the current thread until a message is received. In order to avoid blocking indefinitely, they are also overloaded to accept a TimeSpan argument that releases the thread and throws a MessageQueueException when the time expires. The BeginPeek and EndPeek methods allow asynchronous access to the first message in the queue so that the current thread is not blocked.
The collection of "receive" methods includes analogous Receive, ReceiveByCorrelationId, ReceiveById, BeginReceive, and EndReceive. As you might imagine, the first three in this list behave analogously to the peek methods but have the effect of removing the message from the queue once it is read. The latter two methods are used to read messages asynchronously.
While reading simple messages synchronously is fairly straightforward, when receiving serialized objects as messages you must be aware of the type you wish to deserialize to. For example, in an earlier code snippet, an instance of QStudents was serialized to a Message using the BinaryMessageFormatter. To read this message, your code needs to set the Formatter property appropriately and then cast the Body of the Message to the appropriate type like so:
Dim oQueue As New MessageQueue(".\private$\Registrations") Dim oNew As QStudents oQueue.Formatter = New BinaryMessageFormatter() oNew = CType(oQueue.Receive.Body, QStudents)
The oNew variable now contains the deserialized QStudents object.
TIP
To improve performance, you can set properties of the MessagePropertyFilter object exposed in the MessagePropertyFilter property of the MessageQueue instance you are reading from. Each property represents one of the message properties and can be set to True or False to specify whether the property is returned. By default only nine of the approximately 40 properties are returned.
While the previous code snippet used the BinaryMessageFormatter to serialize the object, using the XmlMessageFormatter is more flexible. The reason is that when using the BinaryMessageFormatter, the object is serialized using the type definition (including the version) of the class. This implies that when the object is deserialized, it must be cast to exactly the same type. In other words, the QStudents class must be publicly available in the Global Assembly Cache (GAC) or included as a private assembly and referenced by the receiving application.
By using the XmlMessageFormatter, the receiving application can create its own proxy class to handle the deserialized object. This class can be manually created or generated from XSD. However, when using this approach, the TargetTypes or TargetTypeNames property must be populated before receiving the message. This is required so that the XmlMessageFormatter knows into which object to deserialize the message body. For example, if the QStudents object was serialized to XML, as shown in Figure 13.1, it could be deserialized using the code
Dim oNew As QStudentsNew oQueue.Formatter = New XmlMessageFormatter(New String() {"QStudentsNew"}) oNew = CType(oQueue.Receive.Body, QStudentsNew)
where QStudentsNew is a new class built from the same schema. In this case, the TargetTypeNames property (an array of Strings) is populated in the constructor. The overloaded constructor can also accept an array of Type objects.
One of the subtle advantages to serializing objects directly to the queue is that the objects can encapsulate required behavior. For example, the QStudents class can expose a SaveToDatabase method that knows how to persist the contents of the object to SQL Server using the System.Data.SqlClient namespace. This makes working with the object simple for the receiving application, which can simply deserialize the object and invoke the SaveToDatabase method. In this way, the receiving application needn't understand the internal structure of the QStudents object.
However, in other scenarios, you may wish to parse the message yourself, for example if it contains an XML document. To that end, just as the BodyStream property of the Message object can be populated with a Stream, it can also be used to access the Body of the message. For example, the RetrieveDocs method shown in Listing 13.12 drains the given queue of its messages and passes the BodyStream of the Message to the ExtractStudents method.
Listing 13.12. Draining the queue. This method drains the given queue by calling the Receive method and then passes the BodyStream to a method that processes it.
Imports System.Messaging Public Sub RetrieveDocs(ByVal pQueue As String) Dim oQueue As MessageQueue Dim oMessage As Message Dim flDone As Boolean = False Try ' Ensure queue exists If Not MessageQueue.Exists(pQueue) Then Throw New ArgumentException("Queue " & pQueue & " does not exist.") Return End If ' Reference the queue oQueue = New MessageQueue(pQueue) ' Drain the queue While Not flDone Try oMessage = oQueue.Receive(New TimeSpan(0, 0, 5)) Call ExtractStudents(oMessage.BodyStream) Catch e As MessageQueueException flDone = True End Try End While Catch e As MessageQueueException ' Log the fact that an error occurred Catch e As Exception ' Log the fact that an error occurred End Try End Sub
Note that the Receive method is called with a timeout value of 5 seconds so that as soon as no new messages are received in a 5-second interval, the loop is exited. The ExtractStudents method is similar to that shown in Listing 13.1, with the exception that it has been modified to accept a Stream object rather than a file name like so:
Public Sub ExtractStudents(ByVal pStream As Stream)
In this way, you can easily take advantage of the many classes in the Services Framework that rely on streams.
Finally, the MessageQueue class follows the Services Framework pattern for asynchronous operations by exposing BeginPeek, BeginReceive and EndPeek, EndReceive methods. The Begin methods are overloaded and as expected do not block the current thread and return immediately. The methods spawn a background thread that waits until a message is found. When found, the application is notified through either the PeekCompleted and ReceiveCompleted events or an explicit AsyncCallback object passed to the method.
For example, to add the event handler for the ReceiveCompleted event and initiate the asynchronous process, the following code could be used:
oQueue = New MessageQueue(pQueue) AddHandler oQueue.ReceiveCompleted, AddressOf MessageFound oQueue.BeginReceive(New TimeSpan(0, 0, 5))
You'll notice that the BeginReceive method can also be passed a TimeSpan object that fires the ReceiveCompleted event if the time expires before a message is found. Although not shown here, an Object can be passed to the begin methods that contains state information return in the AsyncState property of the AsyncResult object.
Within the event handlers, the MessageQueue from which the message is returned is populated in the first argument while the arguments are encapsulated in the ReceiveCompletedEventArgs object. The actual Message object can then be accessed by calling the EndReceive method, as illustrated by the template code that follows:
Public Sub MessageFound(ByVal s As Object, _ ByVal args As ReceiveCompletedEventArgs) Dim oQueue As MessageQueue Dim oMessage As Message Dim oState As Object ' Retrieve the state if needed oState = args.AsyncResult.AsyncState() ' Retrieve the queue from which the message originated oQueue = CType(s, MessageQueue) Try oMessage = oQueue.EndReceive(args.AsyncResult) ' Process the message here Catch e As MessageQueueException ' Timeout expired End Try End Sub
When using the asynchronous peek methods, there is an analogous PeekCompletedEventArgs object for use in the PeekCompleted event handler.
TIP
If you wish to continue receiving documents in the background, you can call BeginReceive at the end of the event handler.