OpenTelemetry for Kafka and NestJS
How to enable Open Telemetry for Event Driven Architectures using KafkaJS (in NestJS)
In this tutorial, let me show you how to set up OpenTelemetry in a Kafka and NestJS microservices architecture. I will use a practical example of an e-commerce application to demonstrate Event Driven Architecture but show the implementation with the toy example that I have previously shown in https://alexknips.hashnode.dev/open-telemetry-with-multiple-nestjs-apps.
Let’s first discuss what Event Driven Architecture is.
What is Event Driven Architecture
Event-Driven Architecture (EDA) is a design paradigm where systems communicate through events—persisted messages that describe completed changes of state. Unlike command-driven approaches, where services send direct requests to perform actions, EDA emphasizes using events to notify services about state changes, though it also allows for commands when necessary. This architecture decouples microservices asynchronously, enabling each service to handle its domain logic independently. A service can achieve a completed change within its domain when satisfied with the received event or command. This decoupling enhances scalability and resilience, allowing microservices to evolve independently and respond dynamically to changes.
Command driven architecture might not sound familiar but one micro-service calling the REST interface to command it do something is exactly that.
Within my sphere of experience, I have used EDA mostly for its capability to adapt to changes and allow better separation of concerns.
An example could be an e-commerce application with a shopping cart and multiple other micro-services. Here I still kept the command driven nature of the payment processing but decided to separate the ordering service and shipment fulfillment in an event-driven manner.
Let’s investigate this on our much smaller example
Now we have separated the three micro-services nest1, nest2 and nest3 through events instead of commands (REST in implementation). The services are more independent in execution and conceptually. This would also allow us to independently evolve the micro-services themselves.
The need for telemetry in Event Driven Architecture
Similar to how we previously separated functionality into microservices, we want to trace user interactions and all their components. This includes seeing how a client's call leads to subsequent steps, whether our architecture is command-driven or event-driven. This tracing allows us to understand and monitor the complete flow of user actions through the system.
How to achieve this conceptually
Previously we attached the trace parent (https://opentelemetry.io/docs/concepts/glossary/) completely automatically by using HttpInstrumentation.
Here, headers are used to attach the current trace id as the subsequent trace parent when making REST call to another micro-service.
The Open Telemetry documentation explains context propagation in a lot of detail here: https://opentelemetry.io/docs/concepts/signals/traces/#context-propagation
We will do exactly the same in Kafka, except of the parent span actually completing. This does not prevent us from propagating the trace through Kafka and it’s headers.
Let's go through the steps what's happening here again.
A Span is created in Nest1. This span might have a whole hirarchy of child spans in itself. This span has a specific spanId. A sample span might look like
{ "name": "/kafka", "context": { "trace_id": "7bba9f33312b3dbb8b2c2c62bb7abe2d", "span_id": "086e83747d0e381e" }, "parent_id": "", "start_time": "2021-10-22 16:04:01.209458162 +0000 UTC", "end_time": "2021-10-22 16:04:01.209514132 +0000 UTC", "status_code": "STATUS_CODE_OK", "status_message": "", "attributes": { "net.transport": "IP.TCP", // more attributed } }
Now we will publish an event to Kafka with the additional header
{"traceparent":"086e83747d0e381e"}
The subscriber that will subscribe to the topic will pick up the message and open a new span. The new span might look similar to
{ "name": "/kafka", "context": { "trace_id": "7bba9f33312b3dbb8b2c2c62bb7abe2d", "span_id": "094e12347a0a211d" }, "parent_id": "086e83747d0e381e", "start_time": "2021-10-22 16:04:01.209458162 +0000 UTC", "end_time": "2021-10-22 16:04:01.209514132 +0000 UTC", "status_code": "STATUS_CODE_OK", "status_message": "", "attributes": { "net.transport": "IP.TCP", // more attributed } }
where the previous span was added as a parent_id. When this is then pushed to the Open Telemetry backend (e.g. Jaeger or something similar), it can make sense of the span and how they relate to each other.
Implementation
Let’s install and configure the Open Telemetry KafkaJS extension (https://github.com/aspecto-io/opentelemetry-ext-js/tree/master/packages/instrumentation-kafkajs) in our 3 micro-services.
We will add the KafkaJsInstrumentation to our tracing configuration.
const sdk = new NodeSDK({
traceExporter,
instrumentations: [
new NestInstrumentation(),
new HttpInstrumentation(),
new ExpressInstrumentation(),
new KafkaJsInstrumentation({
// see under for available configuration
})
],
resource: new Resource({
[SemanticResourceAttributes.SERVICE_NAME]: 'nest1',
}),
});
Follow the Kafka documentation to publish and subscribe to topics within our demo application. Refer to:
https://docs.nestjs.com/faq/hybrid-application to enable HTTP and Kafka at the same time
or take a look the the full source code provided in:
Kafka runs locally in my example through docker-compose configured as
version: '3.7'
services:
kafka-ui:
image: provectuslabs/kafka-ui:latest
ports:
- 8080:8080
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: debezium
KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://debezium-connect:8083
DYNAMIC_CONFIG_ENABLED: 'true'
networks:
- jaeger-example
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
hostname: zookeeper
ports:
- '2181:2181'
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
networks:
- jaeger-example
kafka:
image: confluentinc/cp-kafka:7.5.0
hostname: kafka
depends_on:
- zookeeper
ports:
- '29092:29092'
- '9092:9092'
- '9101:9101'
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
healthcheck:
test: ['CMD', 'kafka-topics', '--bootstrap-server', 'kafka:29092', '--list']
interval: 10s
timeout: 10s
retries: 10
networks:
- jaeger-example
wait-kafka-init:
image: confluentinc/cp-kafka:7.2.1
hostname: wait-kafka-init
depends_on:
kafka:
condition: service_healthy
entrypoint: ['/bin/sh', '-c']
command: |
"
# blocks until kafka is reachable
kafka-topics --bootstrap-server kafka:29092 --list
echo -e 'Kafka is reachable'
"
networks:
- jaeger-example
jaeger:
image: jaegertracing/all-in-one:latest
ports:
- "16686:16686"
- "4318:4318"
- "4317:4317"
environment:
- LOG_LEVEL=debug
networks:
- jaeger-example
nestapp1:
image: nestapp1
build:
context: nest1
dockerfile: Dockerfile
ports:
- "3001:3000"
environment:
- OTEL_EXPORTER_OTLP_ENDPOINT=http://jaeger:4318
- KAFKA_BOOTSTRAP_SERVERS=kafka:29092
networks:
- jaeger-example
depends_on:
- jaeger
- nestapp2
- nestapp3
- wait-kafka-init
nestapp2:
image: nestapp2
build:
context: nest2
dockerfile: Dockerfile
ports:
- "3002:3000"
environment:
- OTEL_EXPORTER_OTLP_ENDPOINT=http://jaeger:4318
- KAFKA_BOOTSTRAP_SERVERS=kafka:29092
networks:
- jaeger-example
depends_on:
- jaeger
- nestapp3
- wait-kafka-init
nestapp3:
image: nestapp3
build:
context: nest3
dockerfile: Dockerfile
ports:
- "3003:3000"
environment:
- OTEL_EXPORTER_OTLP_ENDPOINT=http://jaeger:4318
- KAFKA_BOOTSTRAP_SERVERS=kafka:29092
networks:
- jaeger-example
depends_on:
- jaeger
- wait-kafka-init
networks:
jaeger-example:
I am aware that this looks very complex and I can get into more details if needed at another time.
Result
In the Jaeger UI, you will see the full traces of our Event-Driven Architecture example, showing how one application creates an event, pushes it to Kafka, and the next service in line picks it up successfully. This visibility into the system's operations makes observability much easier in distributed systems.
We can now observe how one application creates an event, pushes it to Kafka and the next one in line picks it up successfully.
As mentionted previously take a look at the source code provided in github.com/alexknips/nest-open-telemetry.
Good luck playing around and extending this to your use-cases.
Further reading
The actual extension is explained in a lot of detail in the following artices:
Distributed Tracing for Kafka with OpenTelemetry in Node | Aspecto
*Learn how to run OpenTelemetry in Node to generate spans for different Kafka operations and ultimately visualize your…*aspecto.io
OpenTelemetry KafkaJS Instrumentation for Node.js | Aspecto
*Track all Kafka interactions in your collected traces and get a more comprehensive view of your application behavior…*aspecto.io