Finagle is Twitter’s RPC system. This blog post explains its motivations and core design tenets, the finagle README contains more detailed documentation. Finagle aims to make it easy to build robust clients and servers.

Futures

Finagle uses com.twitter.util.Future1 to express delayed operations. Futures are highly expressive and composable, allowing for the succinct expression of concurrent and sequential operations with great clarity. Futures are a handle for a value not yet available, with methods to register callbacks to be invoked when the value becomes available. They invert the “traditional” model of asynchronous computing which typically expose APIs similar to this:

Callback<R> cb = new Callback<R>() {
  void onComplete(R result) { … }
  void onFailure(Throwable error) { … }
}

dispatch(req, cb);

Here, the Callback.onComplete is invoked when the result of the dispatch operation is available, and Callback.onFailure if the operation fails. With futures, we instead invert this control flow:

val future = dispatch(req)
future onSuccess { value => … }
future onFailure { error => … }

Futures themselves have combinators similar to those we’ve encountered before in the various collections APIs. Combinators work by exploiting a uniform API, wrapping some underlying Future with new behavior without modifying that underlying Future.

Sequential composition

The most important Future combinator is flatMap2:

def Future[A].flatMap[B](f: A => Future[B]): Future[B]

flatMap sequences two features. The method signature tells the story: given the succesful value of the future f must provide the next Future. The result of this operation is another Future that is complete only when both of these futures have completed. If either Future fails, the given Future will also fail. This implicit interleaving of errors allow us to handle errors only in those places where they are semantically significant. flatMap is the standard name for the combinator with these semantics. Scala also has syntactic shorthand to invoke it: the for comprehension.

As an example, let’s assume we have methods authenticate: Request -> User, and rateLimit: User -> Boolean, then the following code:

val f = authenticate(request) flatMap { u =>
  rateLimit(u) map { r => (u, r)
}

With the help of for-comprehensions, we can write the above as:

val f = for {
  u <- authenticate(request)
  r <- rateLimit(u)
} yield (u, r)

produces a future f: Future[(User, Boolean)] that provides both the user object and and a boolean indicating whether that user has been rate limited. Note how sequential composition is required here: rateLimit takes as an argument the output of authenticate

Concurrent composition

There are also a number of concurrent combinators. Generally these convert a sequence of Future into a Future of sequence, in slightly different ways:

object Future {
  …
  def collect[A](fs: Seq[Future[A]]): Future[Seq[A]]
  def join(fs: Seq[Future[_]]): Future[Unit]
  def select(fs: Seq[Future[A]]) : Future[(Try[A], Seq[Future[A]])]
}

collect is the most straightforward one: given a set of Futures of the same type, we are given a Future of a sequence of values of that type. This future is complete when all of the underlying futures have completed, or when any of them have failed.

join takes a sequence of Futures whose types may be mixed, yielding a Future[Unit] that is completely when all of the underlying futures are (or fails if any of them do). This is useful for indicating the completion of a set of heterogeneous operations.

select returns a Future that is complete when the first of the given Futures complete, together with the remaining uncompleted futures.

In combination, this allows for powerful and concise expression of operations typical of network services. This hypothetical code performs rate limiting (in order to maintain a local rate limit cache) concurrently with dispatching a request on behalf of the user to the backend:

def serve(request: Request): Future[Response] = {
  val userLimit: Future[(User, Boolean)] =
    for {
      user    <- auth(request)
      limited <- isLimit(user)
    } yield (user, limited)
  
  val done = 
    dispatch(request) join userLimit
  
  done flatMap { case (rep, (usr, lim)) =>
    if (lim) {
      updateLocalRateLimitCache(usr)
      Future.exception(new Exception("rate limited"))
    } else {
      Future.value(rep)
    }
  }
}

This hypothetical example combines both sequential and concurrent composition. Also note how there is no explicit error handling other than converting a rate limiting reply to an exception. If any future fails here, it is automatically propagated to the returned Future.

Service

A Service is a function Req => Future[Rep] for some request and reply types. Service is used by both clients and servers: servers implement Service and clients use builders to create one used for querying.

abstract class Service[-Req, +Rep] extends (Req => Future[Rep])

A simple HTTP client might do:

service: Service[HttpRequest, HttpResponse]

val f = service(HttpRequest("/", HTTP_1_1))
f onSuccess { res =>
  println("got response", res)
} onFailure { exc =>
  println("failed :-(", exc)
}

Servers implement Service:

class MyServer 
  extends Service[HttpRequest, HttpResponse]
{
  def apply(request: HttpRequest) = {
    request.path match {
      case "/" => 
        Future.value(HttpResponse("root"))
      case _ => 
        Future.value(HttpResponse("default"))
    }
  }
}

Combining them is easy. A simple proxy might look like this:

class MyServer(client: Service[..])
  extends Service[HttpRequest, HttpResponse]
{
  def apply(request: HttpRequest) = {
    client(rewriteReq(request)) map { res =>
      rewriteRes(res)
    }
  }
}

where rewriteReq and rewriteRes can provide protocol translation, for example.

Filters

Filters are service transformers. They are useful both for providing functionality that’s service generic as well as factoring a given service into distinct phases.

abstract class Filter[-ReqIn, +RepOut, +ReqOut, -RepIn]
  extends ((ReqIn, Service[ReqOut, RepIn]) => Future[RepOut])

Its type is better viewed diagramatically:

    ((ReqIn, Service[ReqOut, RepIn]) 
         => Future[RepOut])


           (*   Service   *)
[ReqIn -> (ReqOut -> RepIn) -> RepOut]

Here’s how you might write a filter that provides a service timeout mechanism.

class TimeoutFilter[Req, Rep](
    timeout: Duration, timer: util.Timer)
  extends Filter[Req, Rep, Req, Rep]
{
  def apply(
    request: Req, service: Service[Req, Rep]
  ): Future[Rep] = {
    service(request).timeout(timer, timeout) {
      Throw(new TimedoutRequestException)
    }
  }
}

This example shows how you might provide authentication (via an authentication service) in order to convert a Service[AuthHttpReq, HttpRep] into Service[HttpReq, HttpRep].

class RequireAuthentication(authService: AuthService)
  extends Filter[HttpReq, HttpRep, AuthHttpReq, HttpRep]
{
  def apply(
    req: HttpReq, 
    service: Service[AuthHttpReq, HttpRep]
  ) = {
    authService.auth(req) flatMap {
      case AuthResult(AuthResultCode.OK, Some(passport), _) =>
        service(AuthHttpReq(req, passport))
      case ar: AuthResult =>
        Future.exception(
          new RequestUnauthenticated(ar.resultCode))
    }
  }
}

Filters compose together with andThen. Providing a Service as an argument to andThen creates a (filtered) Service (types provided for illustration).

val authFilter: Filter[HttpReq, HttpRep, AuthHttpReq, HttpRep]
val timeoutfilter[Req, Rep]: Filter[Req, Rep, Req, Rep]
val serviceRequiringAuth: Service[AuthHttpReq, HttpRep]

val authenticateAndTimedOut: Filter[HttpReq, HttpRep, AuthHttpReq, HttpRep] =
  authFilter andThen timeoutFilter

val authenticatedTimedOutService: Service[HttpReq, HttpRep] =
  authenticateAndTimedOut andThen serviceRequiringAuth

Builders

Finally, builders put it all together. A ClientBuilder produces a Service instance given a set of parameters, and a ServerBuilder takes a Service instance and dispatches incoming requests on it. In order to determine the type of Service, we must provide a Codec. Codecs provide the underlying protocol implementation (eg. HTTP, thrift, memcached). Both builders have many parameters, and require a few.

Here’s an example ClientBuilder invocation (types provided for illustration):

val client: Service[HttpRequest, HttpResponse] = ClientBuilder()
  .codec(Http)
  .hosts("host1.twitter.com:10000,host2.twitter.com:10001,host3.twitter.com:10003")
  .hostConnectionLimit(1)
  .tcpConnectTimeout(1.second)
  .retries(2)
  .reportTo(new OstrichStatsReceiver)
  .build()

This builds a client that load balances over the 3 given hosts, establishing at most 1 connection per host, and giving up only after 2 failures. Stats are reported to ostrich. The following builder options are required (and their presence statically enforced): hosts or cluster, codec and hostConnectionLimit.

val myService: Service[HttpRequest, HttpResponse] = // provided by the user
ServerBuilder()
  .codec(Http)
  .hostConnectionMaxLifeTime(5.minutes)
  .readTimeout(2.minutes)
  .name("myHttpServer")
  .bindTo(new InetSocketAddress(serverPort))
  .build(myService)

This will serve, on port serverPort an HTTP server which dispatches requests to myService. Each connection is allowed to stay alive for up to 5 minutes, and we require a request to be sent within 2 minutes. The required ServerBuilder options are: name, bindTo and codec.

1 distinct from java.util.concurrent.Future

2 this is equivalent to a monadic bind