How to assign partitions to your consumers?

Introduction to Kafka assignors

Posted by Łukasz Chrząszcz on Friday, January 7, 2022

Using Kafka is usually fairly simple. You just create consumers, subscribe them to the topic of your choice, and your messages are being automagically consumed. Usually that is all you need to be “successful” with Kafka. Unfortunately, sometimes things do not go as planned, and you end up with your throughput looking like in one of the diagrams below.

throughput-1 throughput-2

Although it is never an easy answer what is causing such behavior, nevertheless it might be the case your consumers are suffering from using the wrong assignor. What is the assignor? It is the piece of code that is assigning concrete partitions to concrete consumers during the rebalancing. There are different implementations of assignors, each of which having different pros and cons, and if that is not enough you can write your own.

Why do we need assignor and when it is used?

Assignor is an implementation of the ConsumerPartitionAssignor interface. It is used by the leader (one of the consumers) to assign partitions to the consumers according to their subscriptions. Like in the diagram below - assignor takes data about consumer’s subscriptions and returns concrete partitions for each of the consumers.

kafka-assignors

It is particularly intriguing that the assignor is part of the consumer logic. You might wonder - why it is not a broker’s job to assign partitions? The answer is simple - new algorithms can be added by updating consumers without touching kafka brokers. You just roll out new version of your service with the new algorithm.

Currently (Kafka version 3.0) we have 4 implementations of assignors available:

  • RangeAssignor
  • RoundRobinAssignor
  • StickyAssignor
  • CooperativeStickyAssignor

You can choose any assignor by simply changing the partition.assignment.strategy property in your consumer. Value of this property is a list of assignors to choose from. Consumers must have at least one common assignor. Currently, the default assignor is a RangeAssignor, but CooperativeStickyAssignor is also added to the default list in case you need to switch to cooperative rebalancing in one deployment.

If you want to change the assignor to - for example RoundRobinAssignor - take a look at this snippet:

1
2
3
4
5
6
7
val consumer = KafkaConsumer<String, String>(
    mapOf(
        "partition.assignment.strategy" to 
            listOf("org.apache.kafka.clients.consumer.RoundRobinAssignor"),
        // ... other properties
    )
)

Knowing what assignors are and how to select them, let’s go ahead and discuss all aforementioned assignor implementations!

RangeAssignor

This is the first ever assignor available in Kafka. What is important is that it works on a per-topic basis which a contrary to all other assignors. For each topic this algorithm lays out partitions and consumers after sorting them and then divides them in ranges putting one range for each consumer. In case all partitions do not divide evenly by the number of consumers, a few first consumers will receive one extra partition.

Take a look at the example of assignment. Given one topic and 3 consumers the assignment might look like in this diagram:

kafka-assignors

As you see RangeAssignor produces good results. Additionally, if we increase the number of subscribed topics, the algorithm will analyze each of the topic separately, so it also outputs good assignment:

kafka-assignors

However, this algorithm has some flaws which come up when the number of partitions does not divide evenly. In our simple scenario, if we increase the number of partitions to 10 instead of 9, the first consumer will have one extra partition per topic, so 3 extra partitions overall. With such configuration one consumer might end up doing all the work, while the others are doing nothing. This is an unacceptable waist of resources!

kafka-assignors

Beside aforementioned uneven assignment, one of the drawbacks of this assignor is the lack of stickiness. It is not designed in a way to minimize movement of partitions between consumers. Why does that matter? In your application you might build some cache building upon assigned partitions. If that partition is taken away from you, and assigned to another consumer, your cache is useless as it has to be recreated by the other consumer.

So, when should we use RangeAssignor? Of course, it depends, nevertheless, from my experience one good reason to use RangeAssignor is when you have multiple topics, but you expect occasionally spikes of events on one of them. In such case you need to assign every topic evenly to all consumers. Due to this assignor treating every topic separately, you can expect that all consumers consume from all of the topics. In different assignors, one consumer might end up having all partitions from one topic and no partition from the others.

In the nutshell

Pros:

  • Works on per topic basis, so every consumer has a chance to get a partition for every topic, which is good for some workloads.

Cons:

  • Since the consumers and partitions are sorted before assignment, the first few consumers might receive one extra partition which might result in very imbalanced assignment if the group subscribes many topics.
  • In case of multiple topics, if there are more consumers than partitions in single topic the extra consumers receive no partitions to consume from.
  • By default, the assignment might change drastically when scaling up or down consumers.

RoundRobinAssignor

Although name of this assignor depicts pretty much its logic it is crucial to know that this assignor takes all the topics into consideration at the same time. It does not analyze each of the topic separately as RangeAssignor.

The first step is to lay out all the available partitions from all topics and all interested consumers. After that the algorithm assigns partitions to consumers in a round-robin fashion.

The great advantage of this assignor is the uniform assignment it produces. Assignments might differ by only one partition in case the number of partitions does not divide evenly by the number of consumers.

Take a look at the example below. Here we have 3 topics with 10 partitions each and 3 consumers that subscribe them. Each of the consumer received 10 partitions spanning all of the topics.

kafka-assignors

In the nutshell

Pros:

  • Produces uniform assignment.
  • All consumer will get fair share of partitions even if there are more consumers than partition in single topic (in case of subscribing to multiple topics)

Cons:

  • Addition or removal of a consumer results in a huge reassignment of partitions which might cause extra processing due to the partitions changing their owner.

StickyAssignor

Imagine a situation in which you have many topics and one consumer group that subscribes them all. Using RangeAssignor is not a good choice, since one of the consumer will have more partitions than the others. RoundRobinAssignor sounds like a better choice, but let’s assume you have a cache in your application that tracks some events in each partition. Moving such partition to other consumer means loosing such cache, thus decreasing performance. Ideally, after rebalancing you would like to preserve ownership of as many partitions as it is possible. Here comes the StickyAssignor that was created especially for this purpose.

This assignor has two goals. It tries to create as balanced assignment as possible, as well as it tries to preserve as many assignments as possible.

When you have an empty consumer group, and new consumers appear, the first rebalancing works similarly to RoundRobinAssignor. So in case of 3 topics (10 partitions each) and 3 consumers, our diagram would look like this:

kafka-assignors

What would happen next in case of removal of consumer 2? Of course its partitions would become available for assignment, but partitions of consumer 1 and 3 would be preserved.

kafka-assignors

Partitions that were released would be equally spread across other consumers:

kafka-assignors

As you see, even after one rebalancing the result is quite different from RoundRobinAssignor. In this case all consumers have the same number of partitions, but the first consumer has a majority of partitions in first topic, while the second consumer has a majority of partitions in the third topic.

In the nutshell:

Pros:

  • Minimizes the number of partitions movement between consumers, saving additional processing overhead.
  • Produces balanced results.

Cons:

  • After a few iterations the consumers might end up having been assigned the whole topic for themselves. Although it will still be a balanced assignment, that is a thing to keep in mind if you have uneven load on topics, as it might result in one consumer having to do all the work.

CooperativeStickyAssignor

The big issue with all aforementioned assignors is the stop the world problem. In case of any failure, removal, additional of a consumer the whole group suffers as every consumer has to give up their partitions, close connections to brokers, clean up some data. Even if using StickyAssignor it is still the case, as formally consumers still clear their assignment before joining rebalancing. Suddenly, because of the one failing node, the whole group stops.

This sounds bad, and it is bad. If you have a huge group of consumers you probably already had issues with one failing node causing mayhem in your rebalancing.

Here comes the CooperativeStickyAssignor that leverages a brand-new rebalancing protocol COOPERATIVE. In this protocol each of the consumer instead of revoking all partitions before joining rebalancing it might retain them and release them only after being told to do so.

Let’s assume we have two consumers, 3 topics, each having 10 partitions:

kafka-assignors

Adding a new consumers in any other assignor would require those two to revoke all of their partitions. But on cooperative rebalancing we will perform rebalancing twice. First to revoke partitions that will actually move to the new consumer:

kafka-assignors

After that rebalancing, the another one is instantly triggered. This one assigns revoked partitions to the new consumer:

kafka-assignors

While running those two rebalancing events, consumer 1 and 3 for partitions that were not touched:

  • Did not close connections
  • Did not clean up resources
  • Were able to still consumer from them

In the nutshell

Pros:

  • Rebalancing is much more lightweight and does not stop consumers from consuming from most of their partitions.
  • Provides a balanced results.

Cons:

  • Similarly to StickyAssignor - After a few iterations the consumers might end up having been assigned the whole topic for themselves. Although it will still be a balanced assignment, that is a thing to keep in mind if you have uneven load on topics, as it might result in one consumer having to do all the work.

Further reading

If you are interested in reading more about assignors, here are a few resources I have used to prepare this post:

Photo by Ryan Ancill on Unsplash


comments powered by Disqus