OpenTelemetry for Kafka and NestJS

Photo by Ben Krb on Unsplash

OpenTelemetry for Kafka and NestJS

How to enable Open Telemetry for Event Driven Architectures using KafkaJS (in NestJS)

In this tutorial, let me show you how to set up OpenTelemetry in a Kafka and NestJS microservices architecture. I will use a practical example of an e-commerce application to demonstrate Event Driven Architecture but show the implementation with the toy example that I have previously shown in https://alexknips.hashnode.dev/open-telemetry-with-multiple-nestjs-apps.

Let’s first discuss what Event Driven Architecture is.

What is Event Driven Architecture

Event-Driven Architecture (EDA) is a design paradigm where systems communicate through events—persisted messages that describe completed changes of state. Unlike command-driven approaches, where services send direct requests to perform actions, EDA emphasizes using events to notify services about state changes, though it also allows for commands when necessary. This architecture decouples microservices asynchronously, enabling each service to handle its domain logic independently. A service can achieve a completed change within its domain when satisfied with the received event or command. This decoupling enhances scalability and resilience, allowing microservices to evolve independently and respond dynamically to changes.

Command driven architecture might not sound familiar but one micro-service calling the REST interface to command it do something is exactly that.

Within my sphere of experience, I have used EDA mostly for its capability to adapt to changes and allow better separation of concerns.

An example could be an e-commerce application with a shopping cart and multiple other micro-services. Here I still kept the command driven nature of the payment processing but decided to separate the ordering service and shipment fulfillment in an event-driven manner.

Let’s investigate this on our much smaller example

Now we have separated the three micro-services nest1, nest2 and nest3 through events instead of commands (REST in implementation). The services are more independent in execution and conceptually. This would also allow us to independently evolve the micro-services themselves.

The need for telemetry in Event Driven Architecture

Similar to how we previously separated functionality into microservices, we want to trace user interactions and all their components. This includes seeing how a client's call leads to subsequent steps, whether our architecture is command-driven or event-driven. This tracing allows us to understand and monitor the complete flow of user actions through the system.

How to achieve this conceptually

Previously we attached the trace parent (https://opentelemetry.io/docs/concepts/glossary/) completely automatically by using HttpInstrumentation.

Here, headers are used to attach the current trace id as the subsequent trace parent when making REST call to another micro-service.

The Open Telemetry documentation explains context propagation in a lot of detail here: https://opentelemetry.io/docs/concepts/signals/traces/#context-propagation

Nested span in Command Driven Architecture

We will do exactly the same in Kafka, except of the parent span actually completing. This does not prevent us from propagating the trace through Kafka and it’s headers.

Let's go through the steps what's happening here again.

  1. A Span is created in Nest1. This span might have a whole hirarchy of child spans in itself. This span has a specific spanId. A sample span might look like

     {
       "name": "/kafka",
       "context": {
         "trace_id": "7bba9f33312b3dbb8b2c2c62bb7abe2d",
         "span_id": "086e83747d0e381e"
       },
       "parent_id": "",
       "start_time": "2021-10-22 16:04:01.209458162 +0000 UTC",
       "end_time": "2021-10-22 16:04:01.209514132 +0000 UTC",
       "status_code": "STATUS_CODE_OK",
       "status_message": "",
       "attributes": {
         "net.transport": "IP.TCP",
         // more attributed
       }
     }
    
  2. Now we will publish an event to Kafka with the additional header

     {"traceparent":"086e83747d0e381e"}
    
  3. The subscriber that will subscribe to the topic will pick up the message and open a new span. The new span might look similar to

     {
       "name": "/kafka",
       "context": {
         "trace_id": "7bba9f33312b3dbb8b2c2c62bb7abe2d",
         "span_id": "094e12347a0a211d"
       },
       "parent_id": "086e83747d0e381e",
       "start_time": "2021-10-22 16:04:01.209458162 +0000 UTC",
       "end_time": "2021-10-22 16:04:01.209514132 +0000 UTC",
       "status_code": "STATUS_CODE_OK",
       "status_message": "",
       "attributes": {
         "net.transport": "IP.TCP",
         // more attributed
       }
     }
    

    where the previous span was added as a parent_id. When this is then pushed to the Open Telemetry backend (e.g. Jaeger or something similar), it can make sense of the span and how they relate to each other.

Implementation

Let’s install and configure the Open Telemetry KafkaJS extension (https://github.com/aspecto-io/opentelemetry-ext-js/tree/master/packages/instrumentation-kafkajs) in our 3 micro-services.

We will add the KafkaJsInstrumentation to our tracing configuration.

const sdk = new NodeSDK({
  traceExporter,
  instrumentations: [
    new NestInstrumentation(),
    new HttpInstrumentation(),
    new ExpressInstrumentation(),
    new KafkaJsInstrumentation({
      // see under for available configuration
    })
  ],
  resource: new Resource({
    [SemanticResourceAttributes.SERVICE_NAME]: 'nest1',
  }),
});

Follow the Kafka documentation to publish and subscribe to topics within our demo application. Refer to:

or take a look the the full source code provided in:

Kafka runs locally in my example through docker-compose configured as

version: '3.7'
services:
  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    ports:
      - 8080:8080
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
      KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: debezium
      KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://debezium-connect:8083
      DYNAMIC_CONFIG_ENABLED: 'true'
    networks:
      - jaeger-example
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    hostname: zookeeper
    ports:
      - '2181:2181'
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    networks:
      - jaeger-example
  kafka:
    image: confluentinc/cp-kafka:7.5.0
    hostname: kafka
    depends_on:
      - zookeeper
    ports:
      - '29092:29092'
      - '9092:9092'
      - '9101:9101'
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost
    healthcheck:
      test: ['CMD', 'kafka-topics', '--bootstrap-server', 'kafka:29092', '--list']
      interval: 10s
      timeout: 10s
      retries: 10
    networks:
      - jaeger-example
  wait-kafka-init:
    image: confluentinc/cp-kafka:7.2.1
    hostname: wait-kafka-init
    depends_on:
      kafka:
        condition: service_healthy
    entrypoint: ['/bin/sh', '-c']
    command: |
      "
      # blocks until kafka is reachable
      kafka-topics --bootstrap-server kafka:29092 --list

      echo -e 'Kafka is reachable'
      "
    networks:
      - jaeger-example
  jaeger:
    image: jaegertracing/all-in-one:latest
    ports:
      - "16686:16686"
      - "4318:4318"
      - "4317:4317"
    environment:
      - LOG_LEVEL=debug
    networks:
      - jaeger-example
  nestapp1:
    image: nestapp1
    build:
      context: nest1
      dockerfile: Dockerfile
    ports:
      - "3001:3000"
    environment:
      - OTEL_EXPORTER_OTLP_ENDPOINT=http://jaeger:4318
      - KAFKA_BOOTSTRAP_SERVERS=kafka:29092
    networks:
      - jaeger-example
    depends_on:
      - jaeger
      - nestapp2
      - nestapp3
      - wait-kafka-init
  nestapp2:
    image: nestapp2
    build:
      context: nest2
      dockerfile: Dockerfile
    ports:
      - "3002:3000"
    environment:
      - OTEL_EXPORTER_OTLP_ENDPOINT=http://jaeger:4318
      - KAFKA_BOOTSTRAP_SERVERS=kafka:29092
    networks:
      - jaeger-example
    depends_on:
      - jaeger
      - nestapp3
      - wait-kafka-init
  nestapp3:
    image: nestapp3
    build:
      context: nest3
      dockerfile: Dockerfile
    ports:
      - "3003:3000"
    environment:
      - OTEL_EXPORTER_OTLP_ENDPOINT=http://jaeger:4318
      - KAFKA_BOOTSTRAP_SERVERS=kafka:29092
    networks:
      - jaeger-example
    depends_on:
      - jaeger
      - wait-kafka-init

networks:
  jaeger-example:

I am aware that this looks very complex and I can get into more details if needed at another time.

Result

In the Jaeger UI, you will see the full traces of our Event-Driven Architecture example, showing how one application creates an event, pushes it to Kafka, and the next service in line picks it up successfully. This visibility into the system's operations makes observability much easier in distributed systems.

We can now observe how one application creates an event, pushes it to Kafka and the next one in line picks it up successfully.

As mentionted previously take a look at the source code provided in github.com/alexknips/nest-open-telemetry.

Good luck playing around and extending this to your use-cases.

Further reading

The actual extension is explained in a lot of detail in the following artices:

Distributed Tracing for Kafka with OpenTelemetry in Node | Aspecto
*Learn how to run OpenTelemetry in Node to generate spans for different Kafka operations and ultimately visualize your…*
aspecto.io

OpenTelemetry KafkaJS Instrumentation for Node.js | Aspecto
*Track all Kafka interactions in your collected traces and get a more comprehensive view of your application behavior…*
aspecto.io