HTTP endpoints with Kafka Streams Interactive Queries

Published in kafka, kafka streams, kotlin on April 3, 2022 last edited on April 12, 2022

Interactive queries are a somewhat advanced topic in the context of Kafka Streams application, so this article assumes the reader knows the basics of Kafka Streams.

Interactive queries enable Kafka Streams application to query their persistent local stores. The Kafka Streams API also provides a mechanics to query the state of remote application instances. The feature enables us to build HTTP endpoints with interesting properties. But as any architecture has its trade-offs, we’ll discuss them again once we’ve build a simple interactive query endpoint. It’s easier to go over pros and cons of this approach with a concrete example in mind. So what are we building here? For the sake of the discussion, we’ll imagine a purposely trivial application: we collects words from a Kafka topic and then expose a /search endpoint that takes a word and returns its count. This requirement gives us enough to discuss the underlining architecture of this approach.

To build this application, we’ll use Kotlin and Spring Boot. I’m not trying to make some point about these tools by using them here. There’s nothing special about them in the context of Kafka Streams applications. It’s worth mentioning though that HTTP endpoints with interactive queries only make sense on JVM as they’re a feature of the official Kafka Streams library.

Here’s the code of our topology:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
private fun buildTopology(): Topology {
    val builder = StreamsBuilder()

    builder.stream<String, String>("words", Consumed.`as`("words_input_topic"))
        .groupByKey(Grouped.`as`("group_by_word"))
        .count(Materialized.`as`("words_count"))

    val topology = builder.build()

    log.info(topology.describe().toString())

    return topology
}

Even though it’s a simple topology, there are a few things to note:

  • Each node of the topology is named. It’s good practice: the official documentation does a great job explaining why.
  • The log statement on line 10 is helpful especially in the early phases of developing a new Kafka Streams application.
  • The highlighted line is where we’re telling Kafka “hey I’m going to ask you about this store later by calling it words_count. We cannot write the search method without it.

Speaking of the search method, here’s the code:

@PostMapping("/search")
fun search(@RequestBody input: SearchRequest): ResponseEntity<SearchResponse> {
    val store = kafkaStreams.store(
        StoreQueryParameters.fromNameAndType(
            "words_count",
            QueryableStoreTypes.keyValueStore<String, Long>()
        )
    )

    val count = store.get(input.query) ?: return ResponseEntity.notFound().build()

    return ResponseEntity.ok(SearchResponse(input.query, count))
}

//I like to put these classes at the end of the controller file when they're so small.
data class SearchRequest(val query: String)

data class SearchResponse(val word: String, val count: Long)

Easy enough. We’re telling Kafka Streams “hey remember that store I asked you to name for me? I’m going to need it now”. Then store.get(input.query) gives us the count of the word we’re looking for. The operator ?: is called the elvis operator and is Kotlin idiomatic way of saying “if X is not null then X otherwise Y”. We’re using it here to return 404 when queried for a word we do not know anything about.

Let’s test this, shall we?

The first thing we need it to create the topic:

bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic words --replication-factor 1 --partitions 3
Created topic words.
Warning: neither the code nor the settings presented in this article are recommended for production use. Production readiness code and configuration would add too much detail and reduce clarity. At the end of the article, I will provide a list of recommendations for production-ready Kafka Streams applications.

Now let’s add some words to the topic:

# using https://github.com/httpie/httpie to send a POST request
# to an endpoint that I added to our application (you'll find the code in IngestionController.kt)
http localhost:8080/accept word=kafka

HTTP/1.1 200
Connection: keep-alive
Content-Length: 0
Date: Sat, 02 Apr 2022 07:16:16 GMT
Keep-Alive: timeout=60
# more of this...

After playing around with the endpoint a little, the topic looked like this:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic words --from-beginning
kafka
kafka
kafka
kafka
kafka
kafka
franz
franz
franz
caste
caste
kafka
kafka
trail
castle

Now we can check the interactive query. It should give us 1 for trail and 8 for kafka:

http localhost:8080/search query=trail
HTTP/1.1 200
Connection: keep-alive
Content-Type: application/json
Date: Sat, 02 Apr 2022 07:19:26 GMT
Keep-Alive: timeout=60
Transfer-Encoding: chunked

{
    "count": 1,
    "word": "trail"
}

http localhost:8080/search query=kafka
HTTP/1.1 200
Connection: keep-alive
Content-Type: application/json
Date: Sat, 02 Apr 2022 07:19:35 GMT
Keep-Alive: timeout=60
Transfer-Encoding: chunked

{
    "count": 8,
    "word": "kafka"
}

It works, right?!? Well… not quite 😒

For this test, I run one application instance therefore the local store contained all of the state. Let’s see what happens if we add one more instance. To keep it simple, I run both instances on the same machine so the INSTANCE_ID determines the name of the state directory:

SERVER_PORT=8081 INSTANCE_ID=1 java -jar build/libs/interactive-queries-0.0.1-SNAPSHOT.jar

After a quick rebalancing process, both apps are in running state and we can test the endpoint again:

http localhost:8080/search query=kafka
HTTP/1.1 404
Connection: keep-alive
Content-Length: 0
Date: Sat, 02 Apr 2022 08:18:13 GMT
Keep-Alive: timeout=60

http localhost:8081/search query=kafka
HTTP/1.1 200
Connection: keep-alive
Content-Type: application/json
Date: Sat, 02 Apr 2022 08:18:18 GMT
Keep-Alive: timeout=60
Transfer-Encoding: chunked

{
    "count": 8,
    "word": "kafka"
}

If we’d put a load balancer in front of our two instances right now, we’d get a 404 for around half the requests. Obviously the endpoint isn’t really working. Why?

In order to achieve data parallelism, Kafka Streams relies on the same ideas (and implementation) of consumer groups. If we run a Kafka Streams application with multiple instances, the local state of each instance will contain only a a slice of the state (based on the partitions it has assigned). This is what we need to care when building an HTTP endpoint with interactive queries. To make this work, we need to rewrite out search endpoint using the RPC mechanism Kafka Streams provides us.

It’s a two-step process:

  • we configure the RPC endpoint via StreamsConfig.APPLICATION_SERVER_CONFIG so that each instance tells the coordinator how other instances can reach them.

  • we change the search endpoint to fetch the word count via the RPC endpoint when we detect the word requested is not available in the local store.

Here’s the new search endpoint:

@PostMapping("/search")
    fun search(@RequestBody input: SearchRequest): ResponseEntity<SearchResponse> {
        val store = kafkaStreams.store(
            StoreQueryParameters.fromNameAndType(
                "words_count", QueryableStoreTypes.keyValueStore<String, Long>()
            )
        )

        val keyQueryMetadata =
            kafkaStreams.queryMetadataForKey("words_count", input.query, Serdes.String().serializer())

        return when (val activeHost = keyQueryMetadata.activeHost()) {
            HostInfo(config.rpcHost, config.rpcPort) -> {
                val count = store.get(input.query) ?: return ResponseEntity.notFound().build()
                ResponseEntity.ok(SearchResponse(input.query, count))
            }
            HostInfo.unavailable() -> ResponseEntity.notFound().build()
            else -> fetchViaRPC(activeHost, input)
        }
    }

It’s a little more complicated now because it knows about other instances. Let’s see if it works:

# First we start two instances
SERVER_PORT=8080 INSTANCE_ID=0 java -jar build/libs/interactive-queries-0.0.1-SNAPSHOT.jar
SERVER_PORT=8081 INSTANCE_ID=1 java -jar build/libs/interactive-queries-0.0.1-SNAPSHOT.jar
# logs not shown... there's a lot of it in Kafka Streams applications :)

# Once both apps are in running state, we can test the endpoint again
http localhost:8080/search query=kafka
HTTP/1.1 200
Connection: keep-alive
Content-Type: application/json
Date: Sun, 03 Apr 2022 07:08:26 GMT
Keep-Alive: timeout=60
Transfer-Encoding: chunked

{
    "count": 8,
    "word": "kafka"
}
http localhost:8081/search query=kafka
HTTP/1.1 200
Connection: keep-alive
Content-Type: application/json
Date: Sun, 03 Apr 2022 07:08:30 GMT
Keep-Alive: timeout=60
Transfer-Encoding: chunked

{
    "count": 8,
    "word": "kafka"
}

It works now 🎉🎉🎉

We get a count no matter which instance we hit. So now we could put a load balancer in front of them and clients could start querying for words.

Now that we know how an interactive query endpoint looks like, we’re ready to go over trade-offs again.

An endpoint that uses interactive queries is first of all a Kafka Streams application so most considerations that apply for a Kafka Stream application apply here. Such applications have a pretty peculiar deployment strategy. For example, there’s no way to do rolling restart. This strategy conflicts with how Kafka Streams applications do parallelism. You can’t really start a new version of an app with new code but the same StreamsConfig.APPLICATION_ID_CONFIG: the app would immediately go into rebalancing mode to reassign some of the partitions. So what are the options?

If your endpoint can afford a little downtime, you can just take it down and start the updated application. Generally, restoration is really fast (thanks to those persistent stores we use for interactive queries). Often though even a little downtime might be problematic. In such cases, the easiest way out is to deploy a new version of the application all together (meaning with a new app id). While I don’t really advocate for “immutable deployment” in general, that’s probably the simplest strategy here. Also, it’s worth noticing you must deploy an new application anyway in case you change the topology of your app. Since you know upfront you will need to recompute the whole state sometimes, how long this process takes is very relevant to the way you operate the endpoint. Fortunately, it can be estimated with great confidence. You need to know your end to end throughput (how long it takes a record to travel the whole topology from input nodes till the terminal) and how many records you have in your input topics. That’s the whole formula.

While such an “immutable” strategy may feel too complicated (well it indeed is), it enables interesting testing strategies. For example, I often found myself writing scripts to compare the JSON response of two different versions of an endpoint. That gave me very high confidence that the new version did was working as expected. Another interesting consequence of this strategy is that it often enables “wild” experimentation with new versions. Because the increased confidence often leads to leap changes in a topology, after all you can verify new versions with great accuracy without impacting the production endpoint.

Operating interactive queries endpoint is almost the same operating a regular Kafka Streams application:

  • You definitely want to be looking at lag. After all, the main argument for having an interactive query endpoint is its “freshness”.
  • You want to keep an eye on the disk space of each instance. You’d be getting all kind of “interesting” failures if one of your interactive queries runs out of space.
  • You want to continuously collect throughput metrics about your application. You can do that using the lag of internal topics as a proxy. Often that’s a good indicator. Sometimes, the nature of a topology (especially when there’s a lot of grouping going on) is such that the app will “naturally” slow down its throughput over time.

If you made it this far in the article, I would not be surprised if you’re not sure there are good use cases for interactive queries HTTP endpoints. After all these applications seem pretty tricky to operate and deploy. So here’s a list of considerations that can help you orient yourself better:

  • Despite the funny deployment strategy, a Kafka Streams application is still just a bunch of Kafka consumers/producers. Meaning you can compute and serve pretty complex data, at “Kafka speed”, while not needing to operate any other store. It’s hard to imagine you can achieve comparable performance with just one store.
  • Up to your number of partitions, you can just add more instances to improve parallel processing of your data.
  • The local stores update really fast, so the endpoints we’ll build respond to change quickly. If the endpoint does key lookups, you can keep response time always pretty low. Especially if you employ a fast RPC layer.
  • Your applications are self-sufficient in terms of state. If you’d put all of your data in Kafka then you could stretch this idea quite a bit. You would end up with services that build their own state locally and do not need to talk to each other. Less dependencies, no shared state. One store remote store (Kafka), one local store (RocksDB): both handled kind of automatically by the fluent API Kafka Streams provides. To be fair, this last point deserves to be its own article. Nudge me if you’re interested, it’ll speed me up :)
  • As long as all the information we need is available in some Kafka topic, we can build HTTP endpoints that do not depend on other endpoints (by querying them via HTTP). We avoid the traditional pain of a micro-services architecture where a service calls a service that calls a service and so on.
  • We also avoid the usual headaches of a more traditional architecture where Kafka is used, at best, as a ingestion and transport layer where we move the data into a separate store (for example, Cassandra). Keeping the two stores in sync is a full time job. Relying on interactive queries has a much lighter impact on the infrastructure. There are fewer moving parts so less things will break.
  • Kafka Streams uses persistent local stores anyway for caching (so that restarting a Kafka Stream app does not have to read the whole computed state from internal topics). If you’re building Kafka Streams applications, you can immediately rely on interactive queries (as long as you’re on JVM).

The one use case where I think interactive queries endpoints are actually a bit of a stretch is those situations in which your endpoint always needs all of the state to compute the response. For example, imagine we’d change our /search endpoint so that it also needs to return things like “top X words by count” or “least recurrent words of the month”. In such cases, you’d always need to query all the instances which may turn a little slow (or just make the response time of your endpoint unpredictable). Having said that, I would consider building a separate interactive query for such a use case if you know your system can afford to run the endpoint on a single instance.

As I mentioned, the code and the settings I wrote for this article are not recommended for production usage. So, as promised, here’s a production checklist for interactive queries endpoints:

  • You want to setup your Kafka Streams application so that it’s resilient to brokers outages:
// from kafka streams in action. Great book!
Properties props = new Properties();
props.put(StreamsConfig.producerPrefix(
 ProducerConfig.RETRIES_CONFIG), Integer.MAX_VALUE);
props.put(StreamsConfig.producerPrefix(
 ProducerConfig.MAX_BLOCK_MS_CONFIG), Integer.MAX_VALUE);
props.put(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG, 305000);
props.put(StreamsConfig.consumerPrefix(
 ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), Integer.MAX_VALUE);
  • You want to do some RocksDB tuning.
  • Kafka Streams stateful topologies come with internal topics. Often, it’s helpful to do some manual tuning on them. The pointer here is “now you know that’s an option to consider”.
  • The RPC mechanism we implemented for this article may not be the best for production especially if your fetching a relatively large amount of data (there’s a lot of serialization overhead with a JSON via HTTP endpoint as RPC).
  • You want integration tests. Put some data into input topics, wait a bit (this needs a little care), hit the endpoint and assert results. This will give you high confidence your endpoint is working as intended.
  • The number of partitions you choose of the input topics is always relevant, especially so in an interactive queries endpoint.

I’m using the demo app we built for this article as a Kotlin playground so I will probably work on some of this points myself. If you’re interested, check the issues as I generally use them as a TODO list and maybe give it a star :)