Message Expiration
If it is possible for a given message to become obsolete or in some way invalid because of a time lapse, use a Message Expiration (218) to control the timeout (see Figure 6.7). While you have already dealt with the process timeouts in the Scatter-Gather (272) implementation, this is different. A Message Expiration is used to determine when a single message has expired, rather than setting a limit on the completion of a larger process.
Figure 6.7 A Message Expiration is attached to a Message that may become stale.
When using message-based middleware, it is possible to ask the messaging system to expire a message before it is ever delivered. Currently Akka does not support a mailbox that automatically detects expired messages. No worries, you can accomplish that on your own quite easily. You could create a custom mailbox type or just place the expiration behavior on the message itself. There are advantages to both. Here I explain how to do this using a trait for messages. Whether or not the mailbox supports expiring messages, the message itself must supply some parts of the solution.
It is the message sender that should determine the possibility of message expiration. After all, the sender is in the best position to set the message time-to-live based on some user or system specification for the type of operation being executed. Here is how it can be done. First design a trait that allows an extending message to specify the timeToLive value.
trait ExpiringMessage { val occurredOn = System.currentTimeMillis() val timeToLive: Long def isExpired(): Boolean = { val elapsed = System.currentTimeMillis() - occurredOn elapsed > timeToLive } }
The trait initializes its occurredOn with the timestamp of when it was created. The trait also declares an abstract timeToLive, which must be set by the extending concrete class.
The ExpiringMessage trait also provides behavior, through method isExpired(), that indicates whether the message has expired. This operation first gets the system’s current time in milliseconds, subtracts the number of milliseconds since the message was created (occurredOn) to calculate the elapsed time, and then compares the elapsed time to the client-specified timeToLive.
Note that this basic algorithm does not consider differences in time zones, which may need to be given consideration depending on the system’s network topology. At a minimum, this approach assumes that different computing nodes that host various actors will have their system clocks synchronized closely enough to make this sort of calculation successful.
This trait is used in the implementation sample, which defines a PlaceOrder Command Message (202):
package co.vaughnvernon.reactiveenterprise.messageexpiration import java.util.concurrent.TimeUnit import java.util.Date import scala.concurrent._ import scala.concurrent.duration._ import scala.util._ import ExecutionContext.Implicits.global import akka.actor._ import co.vaughnvernon.reactiveenterprise._ case class PlaceOrder( id: String, itemId: String, price: Money, timeToLive: Long) extends ExpiringMessage object MessageExpiration extends CompletableApp(3) { val purchaseAgent = system.actorOf( Props[PurchaseAgent], "purchaseAgent") val purchaseRouter = system.actorOf( Props(classOf[PurchaseRouter], purchaseAgent), "purchaseRouter") purchaseRouter ! PlaceOrder("1", "11", 50.00, 1000) purchaseRouter ! PlaceOrder("2", "22", 250.00, 100) purchaseRouter ! PlaceOrder("3", "33", 32.95, 10) awaitCompletion println("MessageExpiration: is completed.") }
In the MessageExpiration sample runner, you create two actors, a PurchaseAgent and a PurchaseRouter. In a real application, the PurchaseRouter could be a Content-Based Router (228) and route to any number of different purchase agents based on the kind of purchase message. Here you aren’t really concerned about that kind of routing but use the PurchaseRouter to simulate delays in message delivery from various causes.
class PurchaseRouter(purchaseAgent: ActorRef) extends Actor { val random = new Random((new Date()).getTime) def receive = { case message: Any => val millis = random.nextInt(100) + 1 println(s"PurchaseRouter: delaying delivery of↵ $message for $millis milliseconds") val duration = Duration.create(millis, TimeUnit.MILLISECONDS) context .system .scheduler .scheduleOnce(duration, purchaseAgent, message) } }
To familiarize yourself even more with the Akka Scheduler, you can see another example in Resequencer (264).
Now, more to the point, this is how the actual PurchaseAgent checks for Message Expiration and branches accordingly:
class PurchaseAgent extends Actor { def receive = { case placeOrder: PlaceOrder => if (placeOrder.isExpired()) { context.system.deadLetters ! placeOrder println(s"PurchaseAgent: delivered expired↵ $placeOrder to dead letters") } else { println(s"PurchaseAgent: placing order for↵ $placeOrder") } MessageExpiration.completedStep() case message: Any => println(s"PurchaseAgent: received unexpected:↵ $message") } }
If the PlaceOrder message is expired, the PurchaseAgent sends the message to the Akka ActorSystem’s special deadLetters actor, which implements the Dead Letter Channel (172). Note that Enterprise Integration Patterns [EIP] discusses the possibility of expired messages being delivered to a different Message Channel (128) for one reason or another, but the motivation is the same. You also have the option to ignore the message altogether.
Here’s the output from running the process:
PurchaseRouter: delaying delivery of PlaceOrder(↵ 1,11,50.0,1000) for 87 milliseconds PurchaseRouter: delaying delivery of PlaceOrder(↵ 2,22,250.0,100) for 63 milliseconds PurchaseRouter: delaying delivery of PlaceOrder(↵ 3,33,32.95,10) for 97 milliseconds PurchaseAgent: placing order for PlaceOrder(↵ 2,22,250.0,100) PurchaseAgent: placing order for PlaceOrder(↵ 1,11,50.0,1000) PurchaseAgent: delivered expired PlaceOrder(↵ 3,33,32.95,10) to dead letters MessageExpiration: is completed.