Back in March, Felipe Oliveira wrote about Klout’s new Sexy API. We had just released the Scala Play! Framework API infrastructure that we had been writing the previous few months. Not only did it represent a big step forward on the tech side, but it was also an important cultural change for Klout. Previously, disparate teams were responsible for their own serving infrastructure; now, having a central platform has empowered Klout to scale to a billion API requests per day and export powerful new functionality to partners.
But we still had a lot of work to do back then. By now, six months after launch, we’ve made some serious improvements to the API’s scalability and availability using Akka’s rich toolset for concurrent programming. Though Akka is mostly famous for its implementation of the Actor Model, I’m going to talk about two other Akka features, Futures and Agents.
Scalability with Akka Futures
For some background on the scalability problems we face, consider that serving a simple profile page like mine (see below), requires hundreds of lookups to several different datastores. Because of the scale of Klout’s data pipeline (expect a future blog post by Sreevatsan Raman to shed more light), we need to store users’ Scores, Moments, Topics and other data all in different datastores. As such, our app is very IO bound and optimizing our IO usage was one of our biggest priorities. We needed to do our IO concurrently.
Akka Futures (soon to be part of the Scala Standard Library) have proven to be the ideal tool for concurrent work. A Future represents an asynchronous computation and many Futures can be created in parallel. The Future API in Akka is very rich, but the key for us is its monadic nature. If you don’t know what a monad is, in Scala, it is something that has the
flatMap methods. This allows Futures to be composed into new Futures with the syntactic sugar of a
for expression. Compare this to Java Futures (java.util.concurrent.*), which have no means of composition.
Consider the following example, which has three methods that call different datastores and each return a Future:
Additionally, we have a resulting type we’d like to combine the results into:
Now, how should we do this? The non-monadic way, similar to how we would do it in Java, is to start each of the tasks, wait for them, and then build the result:
This is not ideal for a few reasons:
1. We are blocking on the execution of the concurrent tasks, which means the thread running this code must wait idly, wasting resources while the app is making network IO.
2. It is rather verbose and difficult to maintain.
3. This function violates the Single Responsibility Principle, because it is responsible for both the waiting of the Future and business logic for combining the results.
A better way to do this is with Future composition:
Notice how much more readable the code is. The
for expression is sugar for calling the
flatMap methods on the Futures, and the benefit is that we can refer to the results of the Futures in the
yield block without waiting for them. This makes the method read more like a workflow and it is no longer concerned with waiting for the completion of the tasks.
One difference between the two methods is that the first returns a raw
Profile and the second returns a
Future[Profile]. This leads to an important realization, in the form of simple rules, we had while adding Futures to our code:
1. All methods that do IO should return a Future
2. Never block a thread waiting on a Future
3. Therefore, all methods that call other methods that return Futures must themselves return Futures.
In this way, we use Future almost like an IO monad (see this post for an introduction to functional IO). This allows us to push the Futures all the way up our call stack, finally wrapping them in Play’s AsyncResult in controller methods. Play handles these results in a non-blocking way, so we can be as efficient with IO as possible. (See the Play documentation for more detail).
Overall, the strategy of using Akka Future allows us to write more efficient and more readable code. I suggest becoming very familiar with the methods on Futures and the different ways to compose them, especially since they will be shipped with Scala 2.10 and later. The ability to write concurrent IO so easily is the key to our API’s performance and scalability.
High Availability with Apache Zookeeper and Akka Agents
Another one of our learnings in the last six months is that dynamic service discovery is key. For example, our MySQL cluster has one master and several slaves, and we spread read requests across the slaves as much as possible. Sometimes we need to dynamically remove slave nodes from the pool because of degraded performance or scheduled maintenance. Since we a large production cluster of API nodes, we need a to be able to make updates without re-deploying or downtime, giving our clients the best experience possible and guaranteeing that all nodes update within seconds.
Apache Zookeeper was an obvious solution for distributed configuration. To start, we created a simple wrapper for ZooKeeper on top of Twitter’s ZooKeeper client written in Scala. We use this wrapper to watch ZooKeeper nodes, issuing a callback inside our application whenever a modification happens:
But where should these callbacks go? One solution would be to create a service like this:
This service would keep track of nodes to read from, and we could use the zookeeper client to call
updateState on the service. Any read request for MySQL would use the service to determine the pool of nodes to read from.
Again, there are a couple of problems with this:
1. The same class that deals with business logic is responsible for making updates, so the interface is not safe. Clients of this service would be able to make changes when we only want Zookeeper callbacks to make these changes.
2. This service would be prone to concurrency issues when multiple threads are making updates. These issues amplify exponentially if we want to add more features to our “MySQL State” than just “up” or “down”.
At first, we thought that Actors would be a good solution for this problem. However, we soon learned that because actors process all messages one at a time, the reads would get backed up. Also, the interface to the actors is Futures, which is more complex than necessary.
Akka thankfully provides an implementation of Agents, based off of the concept by the same name from Clojure. Agents wrap an instance of some type of state and support asynchronous single-threaded updaters and synchronous getters. For an agent of type
T, the updater is a function from
T => T, and the agent updates its state by changing its value to the result of the function applied to it’s previous value.
Here’s a similar implementation of the service above but using an Agent:
As you can see, the service is much simpler and the interface hides the updating access. Also, because the update functions are applied one at a time and simply add or remove nodes from the set, there is no concurrency issue. The biggest win, however, is that this allows us to think about our state as an immutable data structure, only responsible for dealing with business logic, and wrap the updating logic in the Agent. This gives us an elegant structure to our code.
We like this pattern so much that we use it wherever we have dynamic configuration depending on Zookeeper. It’s a great abstraction that allows for both more reliable and more readable code. And having this mechanism for dynamic service discovery gives the API fault tolerance and high availability it needs to meet its SLA.
Even if you are not using the Actor Model, Akka provides many tools to improve large concurrent enterprise systems. In some cases, other abstractions are simpler to use and require less boilerplate than Actors. At Klout, we believe in using the right tool for the job, so to help the Klout API meet SLA, we have used Future and Agents heavily. In doing so, we hope to push more and more of Klout’s data into the world. Let us know if you want to help.