Taming our distributed systems

At Trovit, we’re big fans of distributed systems. We have quite hard problems to solve, which require complex infrastructures. We run Hadoop, Storm, HBase, Zookeeper, Kafka… you name it. It’s a lot of fun, but it’s also a lot of pain.

We have lots of batch processing jobs and more recently we started using some real time systems inside our architecture. This means that the system needs to run continuously and provide fast answers to lots of simultaneous queries from other systems. We’re not going to get into much detail here, but mark these words:

  • System cannot go down.
  • System needs to be fast. Fast as in “few microseconds”.

So far so good, we had a great time implementing it. It hit production with very good responses and the development team was happy. But it was the calm before the storm. Of course.

Before we go on with the gory details, here’s a bit more info.

This stuff depends on some other systems, as in any real distributed system out there. This one, in particular, relies quite heavily on Netty, Redis, which is the main datastore and in smaller measure on Statsd and Kafka, where it’s basically writing stats and data. The error handling in the program was done considering two kinds of errors (no big surprises here, I hope):

  • Real errors: There is a big problem. The system can’t run correctly and goes down in flames, spiralling out of control. Dev team receives a dreaded SMS in the middle of the night.
  • Small errors: The system has some errors but its main functionality is not affected. We’ll look at those next Monday morning. No rush.

The problems started at peak hours (as usual), what we saw was that under heavy stress the system started throwing some of the small errors, which was expected, and then the system started getting slower and slower until we couldn’t meet the performance SLA’s, which resulted in lots of timeouts from our clients and some angry faces, and this, was something that was definitely NOT expected.

What happened here is that the system was getting slow because of what we call small errors. We had a look at the offending code and we saw that the problem was when we were writing to Kafka, which is a non-critical operation.

The code was rather simple, an object was received, it was serialised and it was written into Kafka. This is not a critical operation because those messages are not critical for the system. It’s nice to have them, we get some useful information from them, but it’s not the end of the world if we lose some. So, the code writes it to Kafka, and if it fails, it complains, but it continues to run. The problem here is that the code was synchronous and we were flooding Kafka with connections. Eventually it stopped responding, so it blocked until the sockets timed out.

So, what happened here is that a non-critical part of the system was failing and it was bringing down the whole system with it.

The solution was quite clear, we had to improve the code that writes to Kafka (which was far from an optimal piece of code, we had to add threads, pools…) and we had to isolate it, which is not rocket science.

But wait, before doing anything let’s think for a second, because that rung a bell.

Maybe you’ve heard of this book:

Release it.

If you haven’t heard of it, or haven’t read it yet and you are programming anything that has to do with distributed systems you should stop right now, get the book and read it.

This book, amongst other interesting things, talks about two very simple and logic architectural patterns for software: so simple and logical that they sometimes are forgotten. Which is exactly what happened to us. These patterns are:

  • Bulkhead pattern: If a software component fails it should not affect other components (if possible). A simile would be an oil tanker. The ship has several compartments, if one has a problem and floods, well, it’s just one. The ship keeps sailing.
  • Circuitbreaker pattern: The component detects that there is an error and reacts to it, it might be by changing behaviour or by communicating with other software components to tell them that it is unreliable.

So, we needed both patterns in our system.

If you have any experience building software systems you will surely understand the idea, and it will strike as easy and logical, and it is. But sometimes you just forget to implement them in a proper way.

So, we had identified the problem, and we had the solutions. We just needed to implement them, three options:

  1. Implement the solution fast and without too much hassle.
  2. Use a resilience library such as Hystrix, from Netflix.
  3. Implement our own solution for this case, but taking into account both patterns in quite a strict way.

1 was out of the question, we really don’t want to be doing this over and over. 2 was a bit of an overkill for this system (we really love Hystrix, but this was not the best place to use it) so we went with option 3.

And what does this mean? It basically means that you get the code, and make the required changes strictly following these rules:

  1. Code needs to be totally asynchronous. The client just sends the object to the Kafka writer. This operation never fails and it’s always atomic, no matter what’s happening inside the Kafka writer. (Bulkhead pattern).
  2. The Kafka writer has a buffer (or queue) of the objects to send to Kafka and it’s able to detect timeouts and problems in its workers. When there are timeouts and the buffer is full it just discards the new objects (circuit breaker). We are basically decreasing the load on Kafka, letting it recover.
  3. After a given time, the writer tries to write to Kafka again to see if the problem is solved.
  4. When everything is back to normal the Kafka writer starts queuing objects again.

So, nothing new, everybody would have done that, but thinking about it, taking into account these design patterns, we are totally sure that the code is isolated and that there is a circuit breaker, meaning that if this part of the system fails the rest of the system will continue working as usual and that this affected subsystem is able “to heal itself and react” in case of stress.

Let’s see what happened next. Here you can see some metrics from Graphite:


This image is just after a stress situation starts. We can see the blue line, which represents the messages we ignore in the Kafka Writer. The green line reflects Kafka errors. So, problems start and we ignore nearly everything that comes, this gives some time to Kafka to recover, after some time (and retries) the situation starts to get stable, until the green lines drops completely.

Some more info and resources about these amazing patterns:


  1. This is pretty cool! I’ve been recently implementing something very similar. Not with Kafka (which is working pretty well for us) but with Cassandra.

    In my case I’m fetching some non-vital information, and if Cassandra was busy and taking some time to return my service was increasing latency, so we defined a Cassandra response time threshold and when we detect it’s slow, we disable it for some time.

    Nice article, it was interesting to read ;) !

  2. Marc

    Twitter is not really great for long explanations, so I’ll answer this tweet: https://twitter.com/dani_sola/status/491285912694034432 here.

    Why Hystrix was an overkill?

    Basically there are two points.

    First one is that we thought that Hystrix provided lots of features that we really did not need. This reason is not really “valid” anymore as we find ourselves implementing these features into our small “resilience wrappers”. Everything started really simple, just providing one or two features, but as time goes by we realise that we really need others.

    So, if this was the only reason we really wouldn’t discard switching to Hystrix, as we don’t aim to implement yet another framework.

    Second reason, which is the big one is that Hystrix does not offer a full control of the Circuitbreaker. If I recall correctly the framework decides when the circuitbreaker should be tripped and when everything should go back to normal. I guess this works fine for Netflix, but our use cases are quite different here. We decide when the circuitbreaker is open, and when it’s closed, and this is a big blocker for us.

    As I said, in the first point we are starting to implement (maybe) too many features, so I don’t discard having a go at Hystrix code to adapt it to our more general needs.

    Maybe at the next Trovit Hackaton :P

Leave a Reply

Your email address will not be published. Required fields are marked *

Time limit is exhausted. Please reload CAPTCHA.