Distributed Tracing with Apache Kafka and Jaeger

If you are using Apache Kafka, you are almost certainly dealing with many applications that need to work together to accomplish some big picture goal. In other words, you are working within a distributed system. And because Kafka decouples consumers and producers (meaning applications do not directly communicate with each other) it can be a challenge to illustrate exactly how data flows through your system. I plan to demonstrate how Jaeger is up to that challenge while navigating the pitfalls of an example project.

The source code for that example project is available on GitHub here: https://github.com/burkaa01/jaeger-tracing-kafka

What is Jaeger? What is OpenTracing?

Jaeger (https://www.jaegertracing.io) is an open source distributed tracer. It records and illustrates the life cycle of transactions as they propagate through a distributed system. Jaeger does the heavy lifting and ultimately paints the pretty picture, while OpenTracing (https://opentracing.io) provides the APIs I will use to interact with it.

Two vocabulary words to get straight when working with Jaeger and OpenTracing: span and trace. A span represents an individual unit of work done in a distributed system. In Jaeger a span has an operation name, start time, and duration. A trace is made up of and defined by one or more spans and in that way the trace is meant to represent the life cycle of an entire transaction.

Example project

The example project diagrammed above, consists of five standalone Spring Boot applications. One of the applications (topic-configuration) simply configures all of our Kafka topics and exits upon completion, another (rest-app) defines an HTTP endpoint that will respond with a random number, and the other three (stream-app, spring-consumer-app, consumer-app) all consume and produce messages with Kafka. Ultimately what this project accomplishes is arbitrary. Given a text file of numbers, it will take each even number, add a random number to it, and produce sentence messages like: 6 + 1 = 7 where 6 is the initial input number from the text file. The important part, for the purposes of demonstrating distributed tracing with Kafka and Jaeger, is that the example project makes use of a Kafka Stream (in the stream-app), a Kafka Consumer/Producer (in the consumer-app), and a Spring Kafka Consumer/Producer (in the spring-consumer-app). In this way it is a perfect example to demonstrate how the tracing configuration differs in all three of these cases.

The example project also has two branches of code. The final result is in master, while what you have after initially configuring Jaeger tracing is in the init-tracing branch. That is to say, the init-tracing branch represents the state of the codebase after I make the changes outline in the “Configuring Jaeger tracing” sections below, but before I make the changes outlined in the “Finish tracing configuration” sections below, and I highly recommend comparing the two branches.

Reminder, the source code for this example project is available on GitHub here: https://github.com/burkaa01/jaeger-tracing-kafka

Configuring Jaeger tracing

In order to get started configuring Jaeger tracing for each of the Spring Boot applications, you’ll need to include the appropriate dependencies. I used Jaeger’s jaeger-client along with a few of OpenTracing’s Kafka instrumentations (see each applications build.gradle for specifics).

For each application the next step is the same: to configure an OpenTracing Tracer and register it as the GlobalTracer. Take a look at the class com.github.burkaa01.stream.config.TracingConfig in the stream-app for an example of doing this and feel free to play with the sample type, the flush interval, the max queue size, etc. as you see fit. Here is a snippet:

    @Bean
    public Tracer tracer() {
        return io.jaegertracing.Configuration.fromEnv(applicationName)
                .withSampler(
                        io.jaegertracing.Configuration.SamplerConfiguration.fromEnv()
                                .withType(ConstSampler.TYPE)
                                .withParam(1))
                .withReporter(
                        io.jaegertracing.Configuration.ReporterConfiguration.fromEnv()
                                .withLogSpans(true)
                                .withFlushInterval(1000)
                                .withMaxQueueSize(10000)
                                .withSender(
                                        io.jaegertracing.Configuration.SenderConfiguration.fromEnv()
                                                .withAgentHost(jaegerHost)
                                                .withAgentPort(jaegerPort)
                                ))
                .getTracer();
    }
    @PostConstruct
    public void registerToGlobalTracer() {
        if (!GlobalTracer.isRegistered()) {
            GlobalTracer.register(tracer());
        }
    }

The @PostConstruct above registers the Tracer as the GlobalTracer after initialization. This is important because many of the tracing interceptors make use of the GlobalTracer behind the scenes whenever an instance of a Tracer is not explicitly provided.

Configuring Jaeger tracing: Kafka Streams

There is one more important @PostConstruct annotation in com.github.burkaa01.stream.config.TracingConfig for the stream-app specifically:

    @PostConstruct
    public void setClientSupplierForStreams() {
        streamsBuilderFactory.setClientSupplier(new TracingKafkaClientSupplier(tracer()));
    }

Because the stream-app makes use of Kafka Streams’ StreamBuilder, I am also providing the instance of the Tracer to the TracingKafkaClientSupplier when I set it as the StreamsBuilderFactoryBean’s KafkaClientSupplier.

Configuring Jaeger tracing: Spring Kafka Consumer/Producer

In the spring-consumer-app, I needed to add the following class to the list of interceptor.classes when configuring the Spring Kafka ConsumerFactory properties in com.github.burkaa01.springconsumer.config.NumberConsumerConfig like so:

    properties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, Collections.singletonList(TracingConsumerInterceptor.class));

I did the same when configuring the Spring Kafka ProducerFactory properties:

    properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, Collections.singletonList(TracingProducerInterceptor.class));

Configuring Jaeger tracing: Kafka Consumer/Producer

The idea is the same in the consumer-app, but the actual configuration differs slightly because for this example I am intentionally not using org.springframework.kafka. In this case, there is an instance of a TracingKafkaConsumer and a TracingKafkaProducer that, given the instance of the Tracer and an instance of a KafkaConsumer/KafkaProducer, wraps the KafkaConsumer/KafkaProducer. It looks like this on the consumer side of things (see com.github.burkaa01.consumer.config.SentenceConsumerConfig):

    KafkaConsumer consumer = new KafkaConsumer(properties);
    consumer.subscribe(Collections.singletonList(sentenceTopic));
    return new TracingKafkaConsumer(consumer, tracer);

And it looks like this on the producer side of things:

    KafkaProducer producer = new KafkaProducer(properties);
    return new TracingKafkaProducer(producer, tracer);

Initial Jaeger tracing result

If you are following along on GitHub here: https://github.com/burkaa01/jaeger-tracing-kafka switch to the init-tracing branch and run this command from the base directory of the project to start everything up:

    docker-compose up -d

Wait a moment and then run the following commands:

    cd ./kafka-connect/file-source
    curl -X POST -H "Content-Type: application/json" --data @file-source.json http://localhost:8083/connectors
    curl http://localhost:8083/connectors
 
    cd ../elastic-sink
    curl -X POST -H "Content-Type: application/json" --data @elastic-sink.json http://localhost:8083/connectors
    curl http://localhost:8083/connectors

Wait another moment and navigate to http://localhost:16686. You should see something like this:

The good news is that we have Jaeger up and running now and each of the applications is communicating with it. Along with each of the application level tracing configurations outlined above, that is made possible thanks to the following lines that start up Jaeger in the docker-compose.yml:

    jaeger:
      image: jaegertracing/all-in-one
      container_name: jaeger
      ports:
        - "6831:6831/udp"
        - "16686:16686"

The first port under ports (6831:6831/udp) is used by the Jaeger reporter we configured for each application to send the captured span via UDP to the Jaeger agent. The second port under ports (16686:16686) is there for the Jaeger UI. Thus, we also need lines like the following for each of our applications in that same docker-compose.yml:

    environment:
      - jaeger.tracer.host=jaeger
      - jaeger.tracer.port=6831

The bad news is that the stream-app has at most 2 spans in a trace and each of our other apps in the “Services” list tell a disconnected story as well. Here is the picture for the spring-consumer-app for example:

Though each of our applications are able to communicate with Jaeger just fine, the spans are not connecting with each other in a single trace.

Finish tracing configuration: Headers

Correlation ids that are propagated through a distributed system for the purpose of tying a series of related events together is not a concept that is unique to Jaeger or OpenTracing. And whether or not you use Jaeger and OpenTracing, when applying that concept to Kafka, Kafka message headers are the perfect place to put this correlation information.

In this example project, thanks to the OpenTracing Kafka instrumentation I outlined in the “Configuring Jaeger tracing” sections above, the correlation information is already in the Kafka message headers. The problem right now is that my applications are not passing on this information properly as they consume and produce new messages.

Take for example, how messages are produced in the spring-consumer-app right now (see the init-tracing branch):

    ProducerRecord producerRecord = new ProducerRecord(sentenceTopic, null, sentence);

The ProducerRecord only knows the topic, the key (null), and the value. In order to continue the trace and associate the spans to the same trace, the message being produced needs the headers from the message the application consumed (with Jaeger an example of a header the next message needs to include would be “uber-trace-id”). Thus, that ProducerRecord initialization will have to change to this:

    ProducerRecord producerRecord = new ProducerRecord(sentenceTopic, null, null, sentence, headers);

And similarly in the consumer-app the record produced in com.github.burkaa01.consumer.answer.AnswerProducer changes to this:

    ProducerRecord record = new ProducerRecord(answerTopic, null, null, jsonString, headers);

Finish tracing configuration: Kafka Streams first consumer

In the stream-app that uses Kafka Streams, when a message is produced with the following line of code, the KStream takes care of passing on any headers that were on the consumer record:

    stream.to(targetTopic, Produced.with(Serdes.String(), Serdes.Integer()));

That is great news, but that’s not it. Take a closer look at the detail for the stream-app that came from the initial Jaeger tracing result:

There is a “send” (produce) for the stream-app and a “receive” (consume) in the spring-consumer-app, but there is no initial “receive” in the stream-app.

After digging through OpenTracing’s source, I discovered this is because the io.opentracing.contrib.kafka.TracingKafkaConsumer only builds and injects a span when there is a parent context where as the io.opentracing.contrib.kafka.TracingKafkaProducer builds and injects a span no matter what. Therefore, I expect only the first consumer in the project to have this problem, because every consumer thereafter will read records from topics that were produced by a io.opentracing.contrib.kafka.TracingKafkaProducer and thus those records have a parent context defined.

The following method in com.github.burkaa01.stream.divider.DividerStream is my workaround for the first Kafka consumer in the project:

    private void injectFirstKafkaConsumerSpan(String number, ProcessorContext context) {
        Tracer tracer = GlobalTracer.get();
        Tracer.SpanBuilder spanBuilder = tracer.buildSpan("receive")
                .withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CONSUMER);
 
        Span span = spanBuilder.start();
        Tags.COMPONENT.set(span, TRACING_COMPONENT_NAME);
        Tags.PEER_SERVICE.set(span, TRACING_SERVICE_NAME);
        span.setTag("partition", context.partition());
        span.setTag("topic", context.topic());
        span.setTag("offset", context.offset());
        /* add the number (aka the first consumer record value) as a span tag */
        span.setTag(TRACING_NUMBER_VALUE, number);
        span.finish();
 
        TextMap headersMapInjectAdapter = new TextMap() {
            @Override
            public Iterator<Map.Entry> iterator() {
                throw new UnsupportedOperationException("iterator should never be used with Tracer.inject()");
            }
 
            @Override
            public void put(String key, String value) {
                context.headers().add(key, value.getBytes(StandardCharsets.UTF_8));
            }
        };
        tracer.inject(span.context(), Format.Builtin.TEXT_MAP, headersMapInjectAdapter);
    }

It builds and injects the first span manually.

Finish tracing configuration: Kafka Streams dead letter queue

Earlier in the “Example project” outline I mentioned that given a text file of numbers, only even numbers get a random number added to them. This is because I wanted to simulate an error with input data and demonstrate how to trace a message that interrupts the normal data flow and instead writes any bad input data to a dead letter queue. This dead letter technique is common practice when building data pipelines and for this example, any input number that is odd is considered bad data.

I already have a Kafka ProducerFactory configured in the stream-app, but the ProducerRecord initialized in com.github.burkaa01.stream.divider.DividerMapper has the same problem that the other Consumer/Producer applications had: the record needs the headers from the message the application consumed.

In order to get the ConsumerRecord headers in the context of a KStream, I have to retrieve the headers from the ProcessorContext, create a composite header value object, and pass that composite object as the value argument to the KStream map like this:

    .transform(tracingTransformerSupplier())
    .map(dividerMapper::map)

The transform gave me access to the ProcessorContext and allowed me to transform the String value into the composite ValueWithHeaders object. This is what tracingTransformerSupplier() looks like in com.github.burkaa01.stream.divider.DividerStream:

    private TransformerSupplier<String, String, KeyValue> tracingTransformerSupplier() {
        return new TransformerSupplier<String, String, KeyValue>() {
            public Transformer<String, String, KeyValue> get() {
                return new Transformer<String, String, KeyValue>() {
 
                    private ProcessorContext context;
 
                    @Override
                    public void init(ProcessorContext context) {
                        this.context = context;
                    }
 
                    @Override
                    public KeyValue transform(String key, String value) {
                        if (value == null || value.isEmpty()) {
                            return KeyValue.pair(key, null);
                        }
 
                        injectFirstKafkaConsumerSpan(value, context);
 
                        return KeyValue.pair(key, new ValueWithHeaders(value, context.headers()));
                    }
 
                    @Override
                    public void close() {
                    }
                };
            }
        };
    }

Now map in com.github.burkaa01.stream.divider.DividerMapper has both the value and the headers and will produce the following dead letter record when the input number is odd:

    ProducerRecord deadLetterRecord = new ProducerRecord(deadLetterTopic, null, key, value, headers);
    deadLetterTemplate.send(deadLetterRecord);

Finish tracing configuration: Kafka and HTTP

At this point, all of the applications officially pass on all the headers from the Kafka records they consume to the Kafka records they produce, but there is a little bit more left to do still.

The spring-consumer-app makes an HTTP request to get the random number that it adds to the even input number. That rest-app is also configured to trace with Jaeger, but it isn’t enough just to do the following inside our RestController:

    Tracer tracer = GlobalTracer.get();
    Span span = tracer.activeSpan();
    span.setTag(RANDOM_NUMBER_TAG, randomNumber);

The HTTP response headers need to include the span context too:

    HttpHeaders responseHeaders = new HttpHeaders();
    tracer.inject(span.context(), Format.Builtin.HTTP_HEADERS, new HttpHeadersCarrier(responseHeaders));
    return new ResponseEntity(Integer.toString(randomNumber), responseHeaders, HttpStatus.OK);

On the spring-consumer-app side of things (the side that issues the HTTP request) there is an added challenge: OpenTracing’s io.opentracing.contrib.spring.web.client.TracingRestTemplateInterceptor does not address that this HTTP request span could actually be a child span of a Kafka message the application consumed.

So, instead of sending the request to the “/random” endpoint like this in com.github.burkaa01.springconsumer.sentence.SentenceProducer:

    private HttpEntity sendToRestApp(String url) {
        RestTemplate restTemplate = new RestTemplate();
        restTemplate.setInterceptors(Collections.singletonList(new TracingRestTemplateInterceptor()));
        return restTemplate.exchange(url, HttpMethod.POST, null, String.class);
    }

I needed to send the request to the “/random” endpoint like this:

    private HttpEntity sendToRestApp(String url, ConsumerRecord record) {
        RestTemplate restTemplate = new RestTemplate();
        restTemplate.setInterceptors(Collections.singletonList(new TracingChildRestTemplateInterceptor(record.headers())));
        return restTemplate.exchange(url, HttpMethod.POST, null, String.class);
    }

Where the class com.github.burkaa01.springconsumer.config.TracingChildRestTemplateInterceptor in the code snippet above is my own custom implementation of the public interface ClientHttpRequestInterceptor and differs from OpenTracing’s io.opentracing.contrib.spring.web.client.TracingRestTemplateInterceptor because it supports headers that represent a parent span context and it makes sure the span that it builds is a child of that parent span:

    SpanContext parentContext = TracingKafkaUtils.extractSpanContext(headers, tracer);
    tracer.buildSpan(httpRequest.getMethod().toString())
      .asChildOf(parentContext)
      .withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CLIENT)
      .startActive(true);

Then when I get the HTTP headers back from the response I need to update the headers I will pass on to my producer to include any span context we gained calling the “/random” endpoint:

    private void updateTracingHeaders(ConsumerRecord record, HttpHeaders httpHeaders) {
        String spanContext = spanContextFromHttpHeaders(httpHeaders);
        if (spanContext != null) {
            record.headers().remove(TRACING_SPAN_CONTEXT_KEY);
            record.headers().remove(TRACING_SECOND_SPAN_CONTEXT_KEY);
            record.headers().add(TRACING_SPAN_CONTEXT_KEY, spanContext.getBytes(StandardCharsets.UTF_8));
        }
    }

Final Jaeger tracing result

Now, if we start everything up again (as described how to do so in the “Initial Jaeger tracing result” section above) using the master branch of code this time (which represents the changes outlined in each of the “Finish tracing configuration” sections above) this is what the final Jaeger tracing result looks like:

I see eight spans for the stream-app trace where I used to see two, but I also still only see two spans when the stream-app simply consumed an input record and sent it to the dead letter queue because the number was odd, which is exactly what I expect!

Take a look at the trace detail view now:

Jaeger is able to clearly illustrate that all of the following occurred:

Note: in order to see the topic information click on one of the spans in the detail view and look for the “message_bus.destination” tag when it is a send and the “topic” tag when it is a receive.

(1) the stream-app received data from the number-topic
(2) the stream-app sent data to the even-topic
(3) the spring-consumer-app received data from the even-topic
(4) the spring-consumer-app POSTs to the rest-app /random endpoint
(5) the rest-app responded with data
(6) the spring-consumer-app sent data to the sentence-topic after it received the response from the /random endpoint
(7) the consumer-app received data from the sentence-topic
(8) the consumer-app sent data to the answer-topic

Which, if you compare this to the initial example project diagram, lines up perfectly.

One thought on “Distributed Tracing with Apache Kafka and Jaeger

  1. Ricardo Ferreira says:

    Hi Aaron,

    That is a nice blog, congratulations!

    If you ever need to deal with distributed tracing in Kafka-related technologies like Kafka Connect, REST Proxy or KSQL; where you don’t have their source-code and thus, there is no way to write the tracing code, take a look in this open-source project I have built:

    https://github.com/riferrei/jaeger-tracing-support

    It uses interceptors to create the tracing code automatically, and it uses Jaeger as tracer. Conceptually it can be even used in Java-based applications like the one you described here. Nonethless, please give and try and perhaps blog about it =)

Leave a Reply

Your email address will not be published. Required fields are marked *

*

*