We’re going to build a very simple distributed search engine using Finagle. First, create a skeleton project using scala-bootstrapper

scala-bootstrapper will create a very simple finagle based scala service that exports an in-memory key-value store. We’ll extend this to support searching of the values, and then extend it to support searching multiple in-memory stores over several processes.

$ mkdir searchbird; cd searchbird
$ scala-bootstrapper -p searchbird
$ find . -type f
./Capfile
./config/development.scala
./config/production.scala
./config/staging.scala
./config/test.scala
./Gemfile
./project/build/SearchbirdProject.scala
./project/build.properties
./project/plugins/Plugins.scala
./run
./src/main/scala/com/twitter/searchbird/config/SearchbirdServiceConfig.scala
./src/main/scala/com/twitter/searchbird/Main.scala
./src/main/scala/com/twitter/searchbird/SearchbirdServiceImpl.scala
./src/main/thrift/searchbird.thrift
./src/scripts/console
./src/scripts/searchbird.sh
./src/test/scala/com/twitter/searchbird/AbstractSpec.scala
./src/test/scala/com/twitter/searchbird/SearchbirdServiceSpec.scala

Exploring the default bootstrapper project

Let’s first explore the default project scala-bootstrapper creates for us. This is meant as a template. You’ll end up substituting most of it, but it serves as a convenient scaffold. It defines a simple (but complete) key-value store. Configuration, a thrift interface, stats export and logging are all included.

Since searchbird is a thrift service (like most of our services), its external interface is defined in the thrift IDL.

src/main/thrift/searchbird.thrift
service SearchbirdService {
  string get(1: string key) throws(1: SearchbirdException ex)
  void put(1: string key, 2: string value)
}

This is pretty straightforward: our service SearchbirdService exports 2 RPC methods, get and put. They comprise a simple interface to a key-value store.

Now let’s run the default service, and explore it through this interface.

First build the project, and run the service (which is also the default “main” method that sbt will run).

$ sbt
…
> compile
> run -f config/development.scala

We’ve also written a simple client library that you can run from the sbt console

$ sbt console
+ exec java -Djava.net.preferIPv4Stack=true -Dhttp.connection.timeout=2 -Dhttp.connection-manager.timeout=2 -Dhttp.socket.timeout=6 -Dcom.sun.management.jmxremote.port=0 -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote '-Xmx512m -XX:MaxPermSize=256m' -XX:+AggressiveOpts -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=256m -XX:SurvivorRatio=128 -XX:MaxTenuringThreshold=0 -Xss200M -Xms512M -Xmx2G -ea -server -jar /Users/stevej/bin/sbt-launch-0.7.4.jar console
[info] Standard project rules 0.12.7 loaded (2011-05-24).
[info] Building project searchbird 1.0.0-SNAPSHOT against Scala 2.8.1
[info]    using SearchbirdProject with sbt 0.7.4 and Scala 2.7.7
[info]
[info] == copy-resources ==
[info] == copy-resources ==
[info]
[info] == write-build-properties ==
[info] == write-build-properties ==
[info]
[info] == console ==
[info] Starting scala interpreter...
[info]
Welcome to Scala version 2.8.1.final (Java HotSpot(TM) 64-Bit Server VM, Java 1.6.0_26).
Type in expressions to have them evaluated.
Type :help for more information.

scala> import com.twitter.searchbird.Client
import com.twitter.searchbird.Client

scala> val client = new Client()
client: com.twitter.searchbird.Client = com.twitter.searchbird.Client@24759265

scala> client.put("marius", "Marius Eriksen")
res0: ...

scala> client.put("stevej", "Steve Jenson")
res1: ...

The server also exports runtime statistics. These are convenient both for inspecting individual servers as well as aggregating into global service statistics (a machine-readable json interface is also provided).

$ curl localhost:9900/stats.txt
counters:
  Searchbird/connects: 2
  Searchbird/requests: 5
  Searchbird/success: 5
  jvm_gc_ConcurrentMarkSweep_cycles: 2
  jvm_gc_ConcurrentMarkSweep_msec: 102
  jvm_gc_ParNew_cycles: 9
  jvm_gc_ParNew_msec: 210
  jvm_gc_cycles: 11
  jvm_gc_msec: 312
gauges:
  Searchbird/connections: 0
  Searchbird/pending: 0
  jvm_fd_count: 147
  jvm_fd_limit: 10240
  jvm_heap_committed: 588251136
  jvm_heap_max: 3220570112
  jvm_heap_used: 39530208
  jvm_nonheap_committed: 81481728
  jvm_nonheap_max: 1124073472
  jvm_nonheap_used: 69312424
  jvm_num_cpus: 4
  jvm_post_gc_CMS_Old_Gen_used: 5970824
  jvm_post_gc_CMS_Perm_Gen_used: 46407832
  jvm_post_gc_Par_Eden_Space_used: 0
  jvm_post_gc_Par_Survivor_Space_used: 0
  jvm_post_gc_used: 52378656
  jvm_start_time: 1314124442749
  jvm_thread_count: 14
  jvm_thread_daemon_count: 8
  jvm_thread_peak_count: 14
  jvm_uptime: 404221
labels:
metrics:
  Searchbird/connection_duration: (average=25115, count=2, maximum=52068, minimum=142, p25=142, p50=142, p75=52068, p90=52068, p95=52068, p99=52068, p999=52068, p9999=52068, sum=50230)
  Searchbird/connection_received_bytes: (average=84, count=2, maximum=142, minimum=29, p25=29, p50=29, p75=142, p90=142, p95=142, p99=142, p999=142, p9999=142, sum=169)
  Searchbird/connection_requests: (average=2, count=2, maximum=4, minimum=1, p25=1, p50=1, p75=4, p90=4, p95=4, p99=4, p999=4, p9999=4, sum=5)
  Searchbird/connection_sent_bytes: (average=61, count=2, maximum=95, minimum=23, p25=23, p50=23, p75=95, p90=95, p95=95, p99=95, p999=95, p9999=95, sum=123)
  Searchbird/request_latency_ms: (average=20, count=5, maximum=95, minimum=1, p25=1, p50=2, p75=8, p90=95, p95=95, p99=95, p999=95, p9999=95, sum=103)

In addition to our own service statistics, we are also given some generic JVM stats that are often useful.

…/config/SearchbirdServiceConfig.scala

Configurations are simply any scala trait that has a method apply: RuntimeEnvironment => T for some T we want to create. In this sense, they are “factories”. At runtime, a configuration file is evaluated as a script (by using the scala compiler as a library), and is expected to produce such a configuration object. RuntimeEnvironments are objects queried for various runtime parameters (command line flags, JVM version, build timestamps, etc.).

The SearchbirdServiceConfig class specifies such a class. It specifies configuration parameters together with their defaults.

class SearchbirdServiceConfig extends ServerConfig[SearchbirdServiceServer] {
  var thriftPort: Int = 9999

  def apply(runtime: RuntimeEnvironment) = new SearchbirdServiceImpl(this)
}

In our case, we want to create a SearchbirdServiceServer. This is the server type generated by the thrift code generator1.

…/Main.scala

The main function is very simple: it reads the configuration, creates a SearchbirdServiceServer and starts it. RuntimeEnvironment.loadRuntimeConfig performs the configuration evaluation and calls its apply method with itself as an argument2.

object Main {
  def main(args: Array[String]) {
    val env = RuntimeEnvironment(this, args)
    val service = env.loadRuntimeConfig[SearchbirdServiceServer]
    service.start()
  }
}
…/SearchbirdServiceImpl.scala

This is the meat of the service: we extend the SearchbirdServiceServer with our custom implementation. Recall that SearchbirdServiceServer has been created for us by the thrift code generator. It generates a scala method per thrift method. In our example so far, the generated interface is:

trait SearchbirdService {
  def put(key: String, value: String): Future[Void]
  def get(key: String): Future[String]
}

Future[Value]s are returned instead of the values directly so that their computation may be deferred.

The default implementation provided by scala-bootstrapper is quite simple:

class SearchbirdServiceImpl(config: SearchbirdServiceConfig) extends SearchbirdServiceServer {
  val serverName = "Searchbird"
  val thriftPort = config.thriftPort

  val database = new mutable.HashMap[String, String]()

  def get(key: String) = {
    database.get(key) match {
      case None =>
        log.debug("get %s: miss", key)
        Future.exception(new SearchbirdException("No such key"))
      case Some(value) =>
        log.debug("get %s: hit", key)
        Future(value)
    }
  }

  def put(key: String, value: String) = {
    log.debug("put %s", key)
    database(key) = value
    Future.void
  }
}

It implements a simple thrift interface to a scala HashMap.

A simple search engine

Now we’ll extend our example so far to create a simple search engine. We’ll then extend it further to become a distributed seach engine consisting of multiple shards so that we can fit a corpus larger than what can fit in memory of a single machine. To keep things simple, we’ll extend our current thrift service minimally in order to support a search operation.

src/main/thrift/searchbird.thrift
service SearchbirdService {
  string get(1: string key) throws(1: SearchbirdException ex)
  void put(1: string key, 2: string value)
  list<string> search(1: string query)
}

We’ve added a search method that searches the current hashtable, returning the list of keys whose values match the query. The implementation is also straightforward:

…/SearchbirdServiceImpl.scala

We first add another HashMap to hold the reverse index, giving us maps in both the forward (key to document) and reverse (token to set of documents) directions.

  val forward = new mutable.HashMap[String, String]
    with mutable.SynchronizedMap[String, String]
  val reverse = new mutable.HashMap[String, Set[String]]
    with mutable.SynchronizedMap[String, Set[String]]

The get method remains the same (it only performs forward lookups), but we need to populate the reverse index on put.

  def put(key: String, value: String) = {
    log.debug("put %s", key)
    
    forward(key) = value

    // admit only one updater.
    synchronized {
      (Set() ++ value.split(" ")) foreach { token =>
        val current = reverse.get(token) getOrElse Set()
        reverse(token) = current + key
      }
    }

    Future.void
  }

This is simple: we tokenize by splitting the value, and then store a reference to the key for each token. This will allow us to perform lookups by the tokens themselves.

Note that since we do a retrieve-modify-update, we need to synchronize the update even though the underlying HashMap is synchronized. Also, this implementation has a bug: when overwriting keys, we’re not collecting references to the old value in the reverse index. Fixing this is an excercise for the reader.

Now to the meat of the search engine: the new search method. It should tokenize its query, look up all of the matching documents and then intersect these lists. This will yield the list of documents that contain all of the tokens in the query. This is also pretty straightforward to express in Scala:

  def search(query: String) = Future.value {
    val tokens = query.split(" ")
    val hits = tokens map { token => reverse.getOrElse(token, Set()) }
    val intersected = hits reduceLeftOption { _ & _ } getOrElse Set()
    intersected.toList
  }

A few things are worth calling out in this short piece of code. When constructing the hit list, getOrElse will supply the value in the 2nd parameter if the key (token) is not found. We perform the actual intersection using a left-reduce. The particular flavor, reduceLeftOption will not attempt to perform the reduce if hits is empty, returning instead a None. This allows us to supply a default value instead of experiencing an exception. In fact, this is equivalent to:

  def search(query: String) = Future.value {
    val tokens = query.split(" ")
    val hits = tokens map { token => reverse.getOrElse(token, Set()) }
    if (hits.isEmpty)
      Nil
    else
      hits reduceLeft { _ & _ } toList
  }

Which to use is mostly a matter of taste, though functional style often eschews conditionals for sensible defaults.

We can now experiment with our store in using the console.

$ src/scripts/console
Hint: the client is in the variable `$client`
No servers specified, using 127.0.0.1:9999
>

Paste the lecture descriptions in.

client.put("basics", " values functions classes methods inheritance try catch finally expression oriented")
client.put("basics", " case classes objects packages apply update functions are objects (uniform access principle) pattern")
client.put("collections", " lists maps functional combinators (map foreach filter zip")
client.put("pattern", " more functions! partialfunctions more pattern")
client.put("type", " basic types and type polymorphism type inference variance bounds")
client.put("advanced", " advanced types view bounds higher kinded types recursive types structural")
client.put("simple", " all about sbt the standard scala build")
client.put("more", " tour of the scala collections")
client.put("testing", " write tests with specs a bdd testing framework for")
client.put("concurrency", " runnable callable threads futures twitter")
client.put("java", " java interop using scala from")
client.put("searchbird", " building a distributed search engine using")

We can now perform some searches.

> client.search("functions")
res0: Seq("basics")
> client.search("java")
res1: Seq("java")
> client.search("java scala")
res2: Seq("java")
> client.search("functional")
res3: Seq("collections")
> client.search("sbt")
res4: Seq("simple")
> client.search("types")
res5: Seq("type", "advanced")

Distributing our service

Our simple in-memory search engine won’t be able to search corpuses larger than the size of memory on a single machine. We’ll not venture to remedy this by distributing nodes with a simple sharding scheme.

Abstracting

To aid our work, we’ll first introduce another abstraction: an Index in order to decouple the index implementation from the SearchBirdService. This is a straightforward refactor.

…/Index.scala
package com.twitter.searchbird

import scala.collection.mutable
import com.twitter.util._
import com.twitter.logging.Logger

trait Index {
  def get(key: String): Future[String]
  def put(key: String, value: String): Future[Unit]
  def search(key: String): Future[List[String]]
}

class ResidentIndex extends Index {
  val log = Logger.get(getClass)

  val forward = new mutable.HashMap[String, String]
    with mutable.SynchronizedMap[String, String]
  val reverse = new mutable.HashMap[String, Set[String]]
    with mutable.SynchronizedMap[String, Set[String]]

  def get(key: String) = {
    forward.get(key) match {
      case None =>
        log.debug("get %s: miss", key)
        Future.exception(new SearchbirdException("No such key"))
      case Some(value) =>
        log.debug("get %s: hit", key)
        Future(value)
    }
  }

  def put(key: String, value: String) = {
    log.debug("put %s", key)
    
    forward(key) = value

    // admit only one updater.
    synchronized {
      (Set() ++ value.split(" ")) foreach { token =>
        val current = reverse.get(token) getOrElse Set()
        reverse(token) = current + key
      }
    }

    Future.Unit
  }

  def search(query: String) = Future.value {
    val tokens = query.split(" ")
    val hits = tokens map { token => reverse.getOrElse(token, Set()) }
    val intersected = hits reduceLeftOption { _ & _ } getOrElse Set()
    intersected.toList
  }
}

We now convert our thrift service to a simple dispatch mechanism: it provides a thrift interface to any Index instance. The power of this will soon be apparent.

…/SearchbirdServiceImpl.scala
class SearchbirdServiceImpl(config: SearchbirdServiceConfig, index: Index) extends SearchbirdServiceServer {
  val serverName = "Searchbird"
  val thriftPort = config.thriftPort

  def get(key: String) = index.get(key)
  def put(key: String, value: String) =
    index.put(key, value) flatMap { _ => Future.void }
  def search(query: String) = index.search(query)
}

Finally we adjust our configuration class to match the new convention.

…/config/SearchbirdServiceConfig.scala
class SearchbirdServiceConfig extends ServerConfig[SearchbirdServiceServer] {
  var thriftPort: Int = 9999
  def apply(runtime: RuntimeEnvironment) = new SearchbirdServiceImpl(this, new ResidentIndex)
}

We’ll set up our simple distributed system so that there is one distinguished node that coordinates queries to its child nodes. In order to achieve this, we’ll need two new Index types. One represents a remote index, the other is a composite index over several other Index instances. This way we can construct the distributed index by instantiating a composite index of the remote indices.

First we define a CompositeIndex.

class CompositeIndex(indices: Seq[Index]) extends Index {
  require(!indices.isEmpty)

  def get(key: String) = {
    val queries = indices.map { idx =>
      idx.get(key) map { r => Some(r) } handle { case e => None }
    }

    Future.collect(queries) flatMap { results =>
      results.find { _.isDefined } map { _.get } match {
        case Some(v) => Future.value(v)
        case None => Future.exception(new SearchbirdException("No such key"))
      }
    }
  }

  def put(key: String, value: String) =
    Future.exception(new SearchbirdException("put() not supported by CompositeIndex"))

  def search(query: String) = {
    val queries = indices.map { _.search(query) rescue { case _=> Future.value(Nil) } }
    Future.collect(queries) map { results => (Set() ++ results.flatten) toList }
  }
}

The composite index works over a set of underlying Index instances. Note that it doesn’t care how these are actually implemented. This type of composition allows for great flexibility in constructing various querying schemes. We don’t define a sharding scheme, and so the composite index doesn’t support put operations. These are instead issued directly to the child nodes. get is implemented by querying all of our child nodes and picking the first succesful result. If there are none, we throw an exception. Note that since the absence of a value is communicated by throwing an exception, we handle this on the Future, converting any exception into a None value. In a real system, we’d probably have proper error codes for missing values rather than using exceptions. Exceptions are convenient and expedient for prototyping, but compose poorly. In order to distinguish between a real exception and a missing value, I have to examine the exception itself. Rather it is better style to embed this distinction directly in the type of the returned value.

search works in a similar way. Instead of picking the first result, we combine them, ensuring their uniqueness by using a Set construction.

RemoteIndex provides an Index interface over a number of hosts.

class RemoteIndex(hosts: String) extends Index {
  val transport = ClientBuilder()
    .name("remoteIndex")
    .hosts(hosts)
    .codec(ThriftClientFramedCodec())
    .hostConnectionLimit(1)
    .timeout(500.milliseconds)
    .build()
  val client = new SearchbirdServiceClientAdapter(
      new thrift.SearchbirdService.ServiceToClient(
        transport, new TBinaryProtocol.Factory))

  def get(key: String) = client.get(key)
  def put(key: String, value: String) = client.put(key, value) map { _ => () }
  def search(query: String) = client.search(query) map { _.toList }
}

This constructs a finagle thrift client with some sensible defaults, and just proxies the calls, adjusting the types slightly.

Putting it all together

We now have all the pieces we need. We’ll need to adjust the configuration in order to be able to invoke a given node as either a distinguished node or a data shard node. In order to do so, we’ll enumerate the shards in our system by creating a new config item for it. We’ll then use command line arguments (recall that the Config has access to these) to start the server up in either mode.

class SearchbirdServiceConfig extends ServerConfig[SearchbirdServiceServer] {
  var thriftPort: Int = 9999
  var shards: Seq[String] = Seq()

  def apply(runtime: RuntimeEnvironment) = {
    val index = runtime.arguments.get("shard") match {
      case Some(arg) =>
        val which = arg.toInt
        if (which >= shards.size || which < 0)
          throw new Exception("invalid shard number %d".format(which))

        // override with the shard port
        val Array(_, port) = shards(which).split(":")
        thriftPort = port.toInt

        new ResidentIndex

      case None =>
        require(!shards.isEmpty)
        val remotes = shards map { new RemoteIndex(_) }
        new CompositeIndex(remotes)
    }

    new SearchbirdServiceImpl(this, index)
  }
}

And finally we’ll adjust the configuration itself:

new SearchbirdServiceConfig {
  shards = Seq(
    "localhost:9000",
    "localhost:9001",
    "localhost:9002"
  )
  …

Now if we invoke our server without any arguments, it starts a distinguished node that speaks to all of the given shards. If we specify a shard argument, it starts a server on the port belonging to that shard index.

Let’s try it! We’ll launch 2 shards and 1 distinguished node.

$ sbt 'run -f config/development.scala -D shard=0'&
$ sbt 'run -f config/development.scala -D shard=1'&
$ sbt 'run -f config/development.scala'&

Then interact with it through the console. First let’s populate some data in the two shard nodes.

$ src/scripts/console localhost:9000
> $client.put("fromShardA", "a value from SHARD_A")
> $client.put("hello", "world")
^D
$ src/scripts/console localhost:9001
> $client.put("fromShardB", "a value from SHARD_B")
> $client.put("hello", "world again")

And now let’s query our database from the distinguished node.

$ src/scripts/console
No servers specified, using 127.0.0.1:9999
> $client.get("hello")
"world"
> $client.get("fromShardC")
Searchbird::SearchbirdService::Client::ApplicationException: Searchbird::SearchbirdService::Client::ApplicationException
…
> $client.get("fromShardA")
"a value from SHARD_A"
> $client.search("hello")
[]
> $client.search("world")
["hello"]
> $client.search("value")
["fromShardA", "fromShardB"]

1 In target/gen-scala/com/twitter/searchbird/SearchbirdService.scala

2 See Ostrich’s README:https://github.com/twitter/ostrich/blob/master/README.md for more information.