- 26.1 Correctness and Performance Issues with Blocking
- 26.2 Callbacks
- 26.3 Higher-Order Functions on Futures
- 26.4 Function flatMap on Futures
- 26.5 Illustration: Parallel Server Revisited
- 26.6 Functional-Concurrent Programming Patterns
- 26.7 Summary
26.6 Functional-Concurrent Programming Patterns
Both futures, on the one hand, and higher-order functions, on the other hand, are powerful abstractions. Together, they form a potent combination, though one that can take some effort to master. Even so, it is a worthwhile effort. This section illustrates a few guidelines you should keep in mind as you venture into functional-concurrent programming.
flatMap as an Alternative to Blocking
Higher-order functions are abstractions for code you don’t have to write. They are convenient but could often be replaced with handwritten implementations. If opt is an option, for instance, opt.map(f) could also be written:
Scala
opt match case Some(value) => Some(f(value)) case None => None
In the case of futures, however, higher-order functions are an alternative to computations that would be hard to implement directly. If fut is a future, what can you replace fut.map(f) with? A future cannot simply be “opened” to access its value, since the value may not yet exist. Short of creating—and blocking—additional threads, there is no alternative to using higher-order functions to act inside a future.
You can leverage your functional programming skills with higher-order functions when working with futures. Earlier, for instance, we used flatMap on options to chain computations that may or may not produce a value. You can use flatMap in a similar way on futures to chain computations that may or may not be asynchronous. Instead of “optional” stages, from A to Option[B], you define asynchronous stages, as functions from A to Future[B].
As an illustration, the three optional functions used in Section 10.3 can be changed to represent asynchronous steps:
Scala
def parseRequest(request: Request): Future[User] = ... def getAccount(user: User): Future[Account] = ... def applyOperation(account: Account, op: Operation): Future[Int] = ...
The steps can then be chained using flatMap:
Scala
Listing 26.9: A pipeline of futures using flatMap.
parseRequest(request) .flatMap(user => getAccount(user)) .flatMap(account => applyOperation(account, op))
The expression in Listing 26.9 is exactly the same as that in Listing 10.5, except that it produces a value of type Future[Int] instead of Option[Int].
Uniform Treatment of Synchronous and Asynchronous Computations
You could mix synchronous and asynchronous operations by combining steps of type A => B—using map—and steps of type A => Future[B]—using flatMap. Instead, it is often more convenient to use only steps of the form A => Future[B] combined with flatMap. When needed, synchronous steps can be implemented as already completed futures. This design increases flexibility: It makes it easier to replace synchronous steps with asynchronous steps, and vice versa.
For instance, if accounts are simply stored in a map, the getAccount function from the earlier example can be implemented synchronously, within the calling thread:
Scala
val allAccounts: Map[User, Account] = ... def getAccount(user: User): Future[Account] = Future.successful(allAccounts(user))
This function returns an already completed future and does not involve any additional thread. If a need to fetch accounts asynchronously then arises, you can reimplement the function without modifying its signature, and leave all the code that uses it—such as Listing 26.9—unchanged.
Functional Handling of Failures
Exceptions are typically thrown and caught within a thread. They don’t naturally travel from thread to thread, and they are ill suited for multithreaded programming. Instead, you are better off following the functional approach to error handling discussed in Chapter 13.
An added benefit of relying on computations of type A => Future[B] instead of A => B is that futures can also carry failures—in Scala, you can think of Future as an asynchronous Try. For example, you can improve the getAccount function by making sure it always produces a future, even when a user is not found:
Scala
def getAccount(user: User): Future[Account] = Future.fromTry(Try(allAccounts(user)))
This way, an expression like getAccount(user).onComplete(...) still executes a callback action, which is not true if getAccount throws an exception. Failed futures can be handled functionally, using dedicated functions such as recover in Scala or exceptionally in Java.
For simplicity, the connection-handling function from Listing 26.8 does not deal with errors. You could use standard future functions to add robustness to the server. For instance, failure to create a page could be handled by transforming the pageF future:
Scala
val safePageF: Future[Page] = pageF.recover { case ex: PageException => errorPage(ex) }
or by adding a failure callback:
Scala
pageF.failed.foreach { ex => connection.write(errorPage(ex)) connection.close() }
Either the callback actions specified using pageF.foreach or those specified using pageF.failed.foreach will run, but not both.
Non-Blocking “Join” Pattern
In the server example, pageF is created by combining two futures, dataF and adF, using flatMap. You can use the same approach to combine three or more futures:
Scala
val f1: Future[Int] = ... val f2: Future[String] = ... val f3: Future[Double] = ... val f: Future[(Int, String, Double)] = f1.flatMap(n => f2.flatMap(s => f3.map(d => (n, s, d))))
This won’t scale to larger numbers of futures, though. An interesting and not uncommon case is to combine N futures of the same type into a single one, for an arbitrary number N. In the server example, a client might obtain data from N database queries, which are executed in parallel:
Scala
def queryDB(requests: List[Request]): Future[Page] = val futures: List[Future[Data]] = requests.map(request => Future(dbLookup(request))) val dataListF: Future[List[Data]] = Future.sequence(futures) dataListF.map(makeBigPage)
The first line uses map to create a list of database-querying tasks, one for each request. These tasks, which run in parallel, form a list of futures. The key step in queryDB is the call to Future.sequence. This function uses an input of type List[Future[A]] to produce an output of type Future[List[A]]. The future it returns is completed when all the input futures are completed, and it contains all their values as a list (assuming no errors). Invoking Future.sequence serves the same purpose as the “join” part of a fork-join pattern, but does so without blocking. The last step uses a function makeBigPage from List[Data] to Page to build the final page.
As of this writing, there is no standard sequence function for CompletableFuture, but you can implement your own using thenCompose (equivalent to flatMap) and thenApply (equivalent to map):
Scala
Listing 26.10: Joining a list of CompletableFutures into one without blocking.
def sequence[A](futures: List[CompletableFuture[A]]): CompletableFuture[List[A]] = futures match case Nil => CompletableFuture.completedFuture(List.empty) case future :: more => future.thenCompose(first => sequence(more).thenApply(others => first::others))
This function uses recursion to nest calls to thenCompose (flatMap). In the recursive branch, sequence(more) is a future that will contain the values of all the input futures, except the first. This future and the first input future are then combined using thenCompose and thenApply (flatMap and map), according to the pattern used earlier to merge two futures (as in Listings 26.5, 26.7, and 26.8).
Non-Blocking “Fork-Join” Pattern
Function queryDB uses a fork-join pattern in which sequence implements the “join” part without blocking. Fork-join is a common enough pattern that Scala defines a function traverse that implements both the “fork” and “join” parts of a computation. You can use it for a simpler implementation of queryDB:
Scala
def queryDB(requests: List[Request]): Future[Page] = Future.traverse(requests)(request => Future(dbLookup(request))).map(makeBigPage)
Instead of working on a list of futures, as sequence does, function traverse uses a list of inputs and a function from input to future output. It “forks” a collection of tasks by applying the function to all inputs, and then “joins” the tasks into a single future, as in sequence, without blocking.