On Scala's futures 21-10-2015

Liberation

This is a write up of a lunch session on Scala's Future and Promise abstractions. We'll implement parts of these abstractions and gloss over details like synchronization to focus on the abstractions themselves.

You can clone fgeller/futures.scala and follow the examples via tags, but this write-up should be self-contained as well.

Background

Commonly the JVM runs a single OS process and we use threads for performing parallel or asynchronous computation. Scala offers thin abstractions wrapping around such threads called Future and Promise. These abstractions grew out of SIP-14 which included learnings from akka as well as Scala's own implementation of actors that is deprecated by now. Compared to actors a Future is a thinner abstraction with more emphasis on composability.

Basics

Let's look at a simple multi-threaded application in Java:

  public class Runner {
    public static void main(String[] args) {
      new Thread() {
        public void run() {
          System.out.println("hello, from elsewhere.");
        }
      }.start();
      try { Thread.sleep(1); } catch (InterruptedException ex) {}
      System.out.println("hello, from runner.");
    }
  }

We create a new Thread object and override the run method to print a simple greeting. We start this thread right away and briefly block the main thread by sleeping for one millisecond, before we print a greeting from the main thread. It prints the following:

hello, from elsewhere.
hello, from runner.

What would this look like in Scala?

  new Thread() {
    override def run(): Unit = {
      println("hello, from elsewhere.")
    }
  }.start()
  Thread.sleep(1)
  println("hello, from runner.")

We drop some of the boilerplate code, but we're doing the same thing as in the Java example above. It still prints:

hello, from elsewhere.
hello, from runner.

How would we achieve this with futures?

  import scala.concurrent._
  import ExecutionContext.Implicits._
  val f: Future[Unit] =
    Future { println("hello, from elsewhere.") }
  Thread.sleep(1)
  println("hello, from runner.")

This still prints:

hello, from elsewhere.
hello, from runner.

Some differences to notice when compared to the examples using Java's thread represenation:

  • We need to import the desired abstractions from scala.concurrent
  • We import a global execution context, implicitly defining that our futures should be scheduled on this default thread pool.
  • The Future is parameterized, identifying the type of the computation that we perform. In our case println "produces" a Unit.

Let's do this ourselves.

So let's try to implement this:

  class Future[T](value: T)
  object Future {
    def apply[T](v: T): Future[T] = { new Future(v) }
  }
  println("hello, world.")
  val of: Future[Unit] = Future(println("In the future!"))

We create a class Future that holds a given value and an apply method in its companion object to construct instances of this class. It prints the following:

hello, world.
In the future!

Looks good right?

Really?

Ok, ok, let's check whether it's multi-threaded:

  def log(msg: String) = println(s"${Thread.currentThread}: $msg")
  class Future[T](value: T)
  object Future {
    def apply[T](v: T): Future[T] = { new Future(v) }
  }
  log("hello, world.")
  val of: Future[Unit] = Future(log("In the future!"))

So we wrap the println call and include the current thread. This should tell us what thread a given println expression is evaluated on. It prints the following:

Thread[main,5,main]: hello, world.
Thread[main,5,main]: In the future!

So I guess that didn't work. Let's actually add multi-threading:

  def log(msg: String) = println(s"${Thread.currentThread}: $msg")
  class Future[T]() { var value: T = _ }
  object Future {
    def apply[T](v: T): Future[T] = {
      val result = new Future[T]()
      val thread = new Thread() {
        override def run(): Unit = { result.value = v }
      }
      thread.start()
      result
    }
  }
  log("hello, world.")
  val of: Future[Unit] = Future(log("In the future!"))

So we create a new Thread instance in our apply method and change the container class to allow us to assign the value from the outside by making the instance variable re-assignable. That should do it, right? Let's see what it prints:

Thread[main,5,main]: hello, world.
Thread[main,5,main]: In the future!

Well, not really. What's missing? Scala is an eager or applicative order language. This means that it evaluates the arguments to a function call before evaluating the function itself. This means that we evaluate the call to log before we even start the evaluation of the apply method, let alone start a new thread.

What to do? Compiler magic to the rescue!

  def log(msg: String) = println(s"${Thread.currentThread}: $msg")
  class Future[T]() { var value: T = _ }
  object Future {
    def apply[T](v: ⇒ T): Future[T] = {
      val result = new Future[T]()
      val thread = new Thread() {
        override def run(): Unit = { result.value = v }
      }
      thread.start()
      result
    }
  }
  log("hello, world.")
  val of: Future[Unit] = Future(log("In the future!"))

The only thing that changes in the above example is the following line:

    def apply[T](v: ⇒ T): Future[T] = {

We added a ⇒ to the type which means that this argument will only be evaluated when needed. We could do this ourselves by wrapping the argument in a closure like this:

  val of: Future[Unit] = Future.apply({ () ⇒ log("In the future!")})

This would delay the execution of the call to log until the closure is actually evaluated. The ⇒ type annotation is essentially syntactic sugar for wrapping function arguments in closures.

So did that actually work? This is what it prints:

Thread[main,5,main]: hello, world.
Thread[Thread-0,5,main]: In the future!

Looks like we're successfully printing from different threads now!

Futures and Promises

Let's step back and look at what the SIP says about a future value:

  • A future is an abstraction which represents a value which may become available at some point.
  • A Future object either holds a result of a computation or an exception in the case that the computation failed.
  • An important property of a future is that it is in effect immutable - it can never be written to or failed by the holder of the Future object.

Our implementation seems to work for the first part, but we're missing the second and third statements. Let's first focus on the second statement:

We currently neglect the fact that a computation might fail and only implicitly encode that a computation hasn't finished through the null value that we use to initialize the container. Let's use Option[A] to encode a possibly unfinished computation and Try[B] to encode a possibly failed or succeeded computation:

  import scala.util._
  def log(msg: String) = println(s"${Thread.currentThread}: $msg")
  class Future[T]() { var value = Option.empty[Try[T]] }
  object Future {
    def apply[T](v: ⇒ T): Future[T] = {
      val result = new Future[T]()
      val thread = new Thread() {
        override def run(): Unit = { result.value = Some(Try(v)) }
      }
      thread.start()
      result
    }
  }
  log("hello, world.")
  val of: Future[Unit] = Future(log("In the future!"))

The type of the contained value changes to Option[Try[T]] and we wrap the evaluation of a computation in a Try to capture possible failures.

That should tick two boxes. How about that third statement:

  • An important property of a future is that it is in effect immutable - it can never be written to or failed by the holder of the Future object.

That's certainly not true - we use a var to enable setting the container from a different thread. That's where promises come in. If we continue reading the SIP we get to this part:

  • While futures are defined as a type of read-only placeholder object created for a result which doesn’t yet exist, a promise can be thought of as a writeable, single-assignment container, which completes a future.

Turns out we were writing a promise all along! (Ignoring the single-assignment part for now). But still, how do these immutable futures work? The REPL offers help:

  import scala.concurrent._
  import ExecutionContext.Implicits.global
  println(s"Our future is ${Future(42)}")

This prints the following:

Our future is scala.concurrent.impl.Promise$DefaultPromise@87aac27

So a future is a promise, is a future, is a promise? Can we do this then?

  import scala.concurrent._
  import ExecutionContext.Implicits.global
  println(s"Our future is ${Future(42): Promise[Int]}")

The typer doesn't like that:

found   : scala.concurrent.Future[Int]
required: scala.concurrent.Promise[Int]

How do we get a Future from a Promise in Scala? We generally call future on the promise - so how is that implemented?

  private[concurrent] trait Promise[T]
      extends scala.concurrent.Promise[T] with scala.concurrent.Future[T] {
    def future: this.type = this
  }

So a promise is a promise is a future! We're simply restricting access to the mutability to the Promise type and the immutable parts to the Future. That looks doable, let's try!

First, let's switch everything we have so far to a Promise:

  import scala.util._
  def log(msg: String) = println(s"${Thread.currentThread}: $msg")
  class Promise[T]() { var value = Option.empty[Try[T]] }
  object Promise {
    def apply[T](v: ⇒ T): Promise[T] = {
      val result = new Promise[T]()
      val thread = new Thread() {
        override def run(): Unit = { result.value = Some(Try(v)) }
      }
      thread.start()
      result
    }
  }
  log("hello, world.")
  val of: Promise[Unit] = Promise(log("In the future!"))

This still prints:

Thread[main,5,main]: hello, world.
Thread[Thread-3,5,main]: In the future!

Let's try the rest in parts. First we wrap our implementation in a namespace that we can restrict access to – I'll use objects in this example, but packages would work the same:

  import scala.util._
  object our {
    trait Promise[T] { def complete(value: Try[T]): Promise[T] }
    trait Future[T] {}
  }

We define traits to represent our two abstractions to the outside and add a complete method on promises that allows an external caller to write a value to the container. Let's add the implementation of apply using this complete method:

  import scala.util._
  object our {
    trait Promise[T] { def complete(value: Try[T]): Promise[T] }
    trait Future[T] {}
    object Future {
      def apply[T](v: ⇒ T): Future[T] = {
        val result = new impl.Promise[T]()
        val thread = new Thread() {
          override def run(): Unit = { result.complete(Try(v)) }
        }
        thread.start()
        result.future
      }
    }
  }

And then we add the implementation of our promise:

  import scala.util._
  object our {
    trait Promise[T] { def complete(value: Try[T]): Promise[T] }
    trait Future[T] {}
    object Future {
      // left out for brevity
    }
    object impl {
      private[our] class Promise[T] extends our.Promise[T] with our.Future[T] {
        def future: Future[T] = this
        private var value = Option.empty[Try[T]]
        def complete(v: Try[T]): Promise[T] = {
          if (this.value.isDefined)
            throw new IllegalStateException("Can only complete a promise once.")

          this.value = Some(v)
          this
        }
      }
    }
  }

We provide an accessor to view the underlying promise as a future and protect against completing the promise multiple times.

And the full example:

  import scala.util._
  object our {
    trait Promise[T] { def complete(value: Try[T]): Promise[T] }
    trait Future[T] {}
    object Future {
      def apply[T](v: ⇒ T): Future[T] = {
        val result = new impl.Promise[T]()
        val thread = new Thread() {
          override def run(): Unit = { result.complete(Try(v)) }
        }
        thread.start()
        result.future
      }
    }
    object impl {
      private[our] class Promise[T] extends our.Promise[T] with our.Future[T] {
        def future: Future[T] = this
        private var value = Option.empty[Try[T]]
        def complete(v: Try[T]): Promise[T] = {
          if (this.value.isDefined)
            throw new IllegalStateException("Can only complete a promise once.")

          this.value = Some(v)
          this
        }
      }
    }
  }
  import our._
  def log(msg: String) = println(s"${Thread.currentThread}: $msg")
  log("hello, world")
  val of: Future[Unit] = Future(log("In the future!"))

This will print the following:

Thread[main,5,main]: hello, world
Thread[Thread-0,5,main]: In the future!

Now let's provide access to the value of a future. Our first attempt is to install a callback that will be evaluated when the promise is completed. We add the ability to add a callback on the future:

  trait Future[T] { def onComplete(fun: Try[T] ⇒ Unit): Unit }

The callback will receive a Try[T] which encodes the result of the computation and we'll ignore the result of the provided closure. To our implementation we add a Set[Try[T] ⇒ Unit] to keep track of the installed callbacks. We'll use a set because the order of these callbacks is not guaranteed.

  private[our] class Promise[T] extends Future[T] {
    def future: Future[T] = this
    private var value = Option.empty[Try[T]]
    private var onCompletes = mutable.Set.empty[Try[T] ⇒ Unit]
    def onComplete(fun: Try[T] ⇒ Unit): Unit = {
      this.value match {
        case Some(v) ⇒ fun(v)
        case None    ⇒ onCompletes += fun
      }
    }
  }

And then we need to make sure that we execute the installed callbacks on completion of our promise:

  private[our] class Promise[T] extends Future[T] {
    def complete(v: Try[T]): Promise[T] = {
      if (this.value.isDefined)
        throw new IllegalStateException("Can only complete a promise once.")

      this.value = Some(v)
      this.onCompletes.foreach(_(v))
      this
    }
  }

Here's the full example:

  import scala.util._
  import scala.collection.mutable
  object our {
    trait Promise[T] { def complete(value: Try[T]): Promise[T] }
    trait Future[T] { def onComplete(fun: Try[T] ⇒ Unit): Unit }
    object Future {
      def apply[T](v: ⇒ T): Future[T] = {
        val result = new impl.Promise[T]()
        val thread = new Thread() {
          override def run(): Unit = { result.complete(Try(v)) }
        }
        thread.start()
        result.future
      }
    }
    object impl {
      private[our] class Promise[T] extends Future[T] {
        def future: Future[T] = this
        private var value = Option.empty[Try[T]]
        private var onCompletes = mutable.Set.empty[Try[T] ⇒ Unit]
        def onComplete(fun: Try[T] ⇒ Unit): Unit = {
          this.value match {
            case Some(v) ⇒ fun(v)
            case None    ⇒ onCompletes += fun
          }
        }
        def complete(v: Try[T]): Promise[T] = {
          if (this.value.isDefined)
            throw new IllegalStateException("Can only complete a promise once.")

          this.value = Some(v)
          this.onCompletes.foreach(_(v))
          this
        }
      }
    }
  }
  import our._
  def log(msg: String) = println(s"${Thread.currentThread}: $msg")
  val of: Future[Unit] = Future(log("In the future!"))
  of.onComplete { value ⇒ log(s"Our future: ${value}") }

It prints the following:

Thread[Thread-0,5,main]: In the future!
Thread[Thread-0,5,main]: Our future: Success(())

We started out by claiming that Scala's abstractions over Threads have an emphasis on composability and our current API doesn't allow for that. We throw away the result of installed callbacks and don't allow chaining when we want to install callbacks. Let's add a map combinator that enables composability through chaining of defered computations. The only required change is the implementation of the combinator on Future:

  object our {
    trait Future[T] {
      def onComplete(fun: Try[T] ⇒ Unit): Unit
      def map[U](fun: T ⇒ U): Future[U] = {
        val result = new impl.Promise[U]()
        this.onComplete {
          case Success(v)  ⇒ result.complete(Try(fun(v)))
          case Failure(th) ⇒ log("uhoh... not evaluating fun")
        }
        result.future
      }
    }
  }

The combinator creates a new promise and returns the corresponding future. It also installs a callback on the future that it is invoked on. In the callback we complete the new promise with the application of the result of the first future to the closure given to map. So we "thread" the result from the first future through to the next future, applying the closure given to map along the way. This allows for ordered chaining of closures like we do in this modified version of our example:

  import scala.util._
  import scala.collection.mutable
  object our {
    trait Promise[T] { def complete(value: Try[T]): Promise[T] }
    trait Future[T] {
      def onComplete(fun: Try[T] ⇒ Unit): Unit
      def map[U](fun: T ⇒ U): Future[U] = {
        val result = new impl.Promise[U]()
        this.onComplete {
          case Success(v)  ⇒ result.complete(Try(fun(v)))
          case Failure(th) ⇒ log("uhoh... not evaluating fun")
        }
        result.future
      }
    }
    object Future {
      def apply[T](v: ⇒ T): Future[T] = {
        val result = new impl.Promise[T]()
        val thread = new Thread() {
          override def run(): Unit = { result.complete(Try(v)) }
        }
        thread.start()
        result.future
      }
    }
    object impl {
      private[our] class Promise[T] extends Future[T] {
        def future: Future[T] = this
        private var value = Option.empty[Try[T]]
        private var onCompletes = mutable.Set.empty[Try[T] ⇒ Unit]
        def onComplete(fun: Try[T] ⇒ Unit): Unit = {
          this.value match {
            case Some(v) ⇒ fun(v)
            case None    ⇒ onCompletes += fun
          }
        }
        def complete(v: Try[T]): Promise[T] = {
          if (this.value.isDefined)
            throw new IllegalStateException("Can only complete a promise once.")

          this.value = Some(v)
          this.onCompletes.foreach(_(v))
          this
        }
      }
    }
  }
  import our._
  def log(msg: String) = println(s"${Thread.currentThread}: $msg")
  Future(23).map(_ + 23).map(_.toString).map(log)

Which simply prints:

Thread[Thread-0,5,main]: 46

Now that we have the map combinator the next step would be to add the flatMap combinator for composing functions that produce futures. It would also allow us to use for comprehensions that allow us to compose functions in a concise manner. But I'll leave that and the other combinators as an exercise to the reader or a second part to this post :)

Conclusion

Please note that the multi-threaded nature of our examples means that the main thread might have finished and died before execution of all other threads has finished. You can get around that by either waiting on the main thread or implementing an abstraction similar to Await using a polling mechanism.

Have a look at Scala's implementation or the docs on scala-lang.org for more details. Happy hacking!