- 26.1 Correctness and Performance Issues with Blocking
- 26.2 Callbacks
- 26.3 Higher-Order Functions on Futures
- 26.4 Function flatMap on Futures
- 26.5 Illustration: Parallel Server Revisited
- 26.6 Functional-Concurrent Programming Patterns
- 26.7 Summary
26.5 Illustration: Parallel Server Revisited
Equipped with standard higher-order functions on futures, we can now go back to the server example. First, you can replace the callback action in Listing 26.3 with a call to map to produce a Future[Page] out of a Future[Ad]:
Scala
Listing 26.6: Ad-fetching example with map and foreach on futures.
val futureAd: Future[Ad] = Future(fetchAd(request)) val data: Data = dbLookup(request) val futurePage: Future[Page] = futureAd.map(ad => makePage(data, ad)) futurePage.foreach(page => connection.write(page))
You use map to transform an ad into a full page by combining the ad with data already retrieved from the database. You now have a Future[Page], which you can use wherever the page is needed. In particular, sending the page back to the client is a no-value action, for which a callback fits naturally. For illustration purposes, this code registers the callback with foreach instead of onComplete. The two methods differ in that foreach does not deal with errors: Its action is not run if the future fails.
In Listing 26.6, the database is queried by the connection-handling thread while a customized ad is fetched in the background. Alternatively, database lookup can be turned over to another thread, resulting in a value of type Future[Data]. You can then combine the two futures using flatMap/map:
Scala
Listing 26.7: Ad-fetching example with flatMap and foreach on futures.
val futureAd: Future[Ad] = Future(fetchAd(request)) val futureData: Future[Data] = Future(dbLookup(request)) val futurePage: Future[Page] = futureData.flatMap(data => futureAd.map(ad => makePage(data, ad))) futurePage.foreach(page => connection.write(page))
What is interesting about this code is that the thread that executes it does not perform any database lookup, ad fetching, or page assembling and writing. It simply creates futures and invokes non-blocking higher-order methods on them. If you write the remainder of the connection-handling code in the same style, you end up with a handleConnection function that is entirely asynchronous and non-blocking:
Scala
Listing 26.8: A fully non-blocking parallel server.
given exec: ExecutionContextExecutorService = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(16)) val server = ServerSocket(port) def handleConnection(connection: Connection): Unit = val requestF = Future(connection.read()) val adF = requestF.map(request => fetchAd(request)) val dataF = requestF.map(request => dbLookup(request)) val pageF = dataF.flatMap(data => adF.map(ad => makePage(data, ad))) dataF.foreach(data => addToLog(data)) pageF.foreach(page => updateStats(page)) pageF.foreach(page => { connection.write(page); connection.close() }) while true do handleConnection(Connection(server.accept()))
The handleConnection function starts by submitting to the thread pool a task that reads a request from a socket and produces a future, requestF. From then on, the code proceeds by calling higher-order functions on futures. First, using map, an ad-fetching task is scheduled to run once the request has been read. This call produces a future adF. A database lookup future, dataF, is created in the same way. The two futures dataF and adF are combined into a future pageF using flatMap and map, as before. Finally, three callback actions are registered: one on dataF for logging, and two on pageF for statistics recording and to reply to the client.
No actual connection-handling work is performed by the thread that runs function handleConnection. The thread simply creates futures and invokes non-blocking functions on them. The time it takes to run the entire body of handleConnection is negligible. In particular, you could make the listening thread itself do it, in contrast to Listing 25.2, where a separate task is created for this purpose.
The various computations that need to happen when handling a request depend on each other, as depicted in Figure 26.2. The server implemented in Listing 26.8 executes a task as soon as its dependencies have been completed, unless all 16 threads in the pool are busy. Indeed, the 16 threads jump from computation to computation—fetching ads, logging, building pages, and so on—as the tasks become eligible to run, across request boundaries. They never block, unless there is no task at all to run. This implementation maximizes parallelism and is deadlock-free.
Figure 26.2 Activity dependencies in the server example.
Instead of Scala futures, you could implement the server of Listing 26.8 using Java’s CompletableFuture (see Appendix A.14 for a pure Java implementation). Note, however, that CompletableFuture tends to use less standard names: thenApply, thenCompose, and thenAccept are equivalent to map, flatMap, and foreach, respectively.
There is one aspect of concurrent programming that a non-blocking approach tends to make more difficult: handling timeouts. For instance, you could decide that it is undesirable to have the server wait for more than 0.5 second for a customized ad after data has been retrieved from the database. In a blocking style, you can achieve this easily by adding a timeout argument when invoking futureAd.get, as in Listing 25.3. It can be somewhat more challenging when using a non-blocking style.
Here, CompletableFuture has the advantage over Scala’s Future. It defines a method completeOnTimeout to complete a future with an alternative value after a given timeout. If the future is already finished, completeOnTimeout has no effect. You can use it to fetch a default ad:
Scala
val adF: CompletableFuture[Ad] = ... ... adF.completeOnTimeout(timeoutAd, 500, MILLISECONDS)
Scala’s Future type has no such method, which makes the implementation of a timeout ad more difficult. You can follow a do-it-yourself approach by creating a promise and relying on an external timer to complete it if needed. First, you create a timer as a scheduling thread pool:
Scala
val timer = Executors.newScheduledThreadPool(1)
Then, you create a promise when the database lookup finishes:
Scala
val pageF = dataF.flatMap { data => val safeAdF = if adF.isCompleted then adF else val promise = Promise[Ad]() val timerF = timer.schedule(() => promise.trySuccess(timeoutAd), 500, MILLISECONDS) adF.foreach { ad => timerF.cancel(false) promise.trySuccess(ad) } promise.future safeAdF.map(ad => makePage(data, ad)) }
This code runs when future dataF completes—when data from the database becomes available. If, at that point, the ad is ready—adF.isCompleted is true—you can use it. Otherwise, you need to make sure that an ad will be available quickly. For this purpose, you create a promise, and you use the timer to fulfill this promise with a default ad after 0.5 second. You also add a callback action to adF, which tries to fulfill the same promise. Whichever runs first—the timer task or the customized ad task—will set the promise with its value.2 The call timerF.cancel is not strictly necessary, but it is used to avoid creating an unnecessary default ad if the customized ad is available in time.
As part of the last case study, Listing 28.4 uses a similar strategy to extend Scala futures with a completeOnTimeout method.