Kafka in Go

Use the instructions below to instrument Kafka in your Go service.

Now that your Go tracer provider is all set, you may instrument your Kafka consumer and producer to get end-to-end visibility and troubleshooting capabilities in Helios.

Package github.com/Shopify/sarama

In order to instrument and track flows using Kafka implemented by github.com/Shopify/sarama, inject and extract context in your consumer and producer definitions (install the dependency first with go get go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama/otelsarama):

import (
   "github.com/Shopify/sarama"
   "go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama/otelsarama"
)

//producer
producer, err := sarama.NewAsyncProducer(brokerList, config)
if err != nil {
    return nil, fmt.Errorf("starting Sarama producer: %w", err)
}

// Wrap instrumentation
producer = otelsarama.WrapAsyncProducer(config, producer)
// Inject tracing info into message
msg := sarama.ProducerMessage{
    Topic: example.KafkaTopic,
    Key:   sarama.StringEncoder("random_number"),
    Value: sarama.StringEncoder(fmt.Sprintf("%d", rand.Intn(1000))),
    }
otel.GetTextMapPropagator().Inject(context.Background(), otelsarama.NewProducerMessageCarrier(&msg))

//consumer
consumerGroupHandler := Consumer{}
// Wrap instrumentation
handler := otelsarama.WrapConsumerGroupHandler(&consumerGroupHandler)
//inside your ConsumeClaim:

func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for message := range claim.Messages() {
    // Extract tracing info from message
    ctx := otel.GetTextMapPropagator().Extract(context.Background(), otelsarama.NewConsumerMessageCarrier(msg))
    session.MarkMessage(message, "")
}

    return nil
}

πŸ‘

All set

After setup is complete and once the service is up and running, it will show up in the Helios application.


Did this page help you?