Kafka consumer implementation using Golang

In the previous post, we used Golang to construct a Kafka producer. Where the producer reads the log file and stores the data in Kafka, which acts as a centralized log server for all IoT devices.

kafka consumer using golang

 

Check link: Kafka producer implementation using Golang

Kafka consumers are the subscribers responsible for reading records from one or more topics and one or more partitions of a topic.  Consumers subscribing to a topic can happen manually or automatically. After subscribe to topic whenever a publisher push data inside the topic, topic will brodcast those data to all the subscriber of that channel. 

We must implement the program in our programming language and acquire those details from the topic using the Apache Kafka consumer API.
The majority of Kafka's theoretical aspects have already been covered in prior posts. Let's get started with the implementation.

So, in this post, we'll show you how to use Golang to create a Kafka consumer program that consumes data from LogTopic. We're utilizing the following Golang library for this:

"github.com/segmentio/kafka-go"

We use same library for our Producer implementation program.


package main

import (
	"context"
	"fmt"

	"github.com/segmentio/kafka-go"
)

func main() {
	topic := "LogTopic"
	partition := 0
	conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)
	if err != nil {
		fmt.Println("failed to dial leader:", err)
	}
	kafka.NewReader(
		kafka.ReaderConfig{
			
		}
	)
	//conn.SetReadDeadline(time.Now().Add(10 * time.Second))
	//batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max

	//b := make([]byte, 10e3) // 10KB max per message
	for {
		msg, err := conn.ReadMessage(1000)
		if err != nil {
			fmt.Println(err)
			break
		}
		fmt.Println(string(msg.Value))
	}

	if err := conn.Close(); err != nil {
		fmt.Println("failed to close connection:", err)
	}
}

 

 

The above program will subscribe to LogTopic and partition 0 so that anytime the publisher publishes data within the topic, zookeeper will notify to all the consumer and the consumer will receive the data.

So, basically, whenever an event occurs within the topic, the consumer will consume the data and print it to the stout. There are many use cases for the incoming data from Apache Kafka Topic; we can implement a live log watcher, and if any sensor fails to send data for a specified period of time, we will notify the administrator that the sensor is down or has a hardware problem.


In real life there are multiple where Apache Kafka fits. Check other post related to kafka as well. I will post more detail if i found useful.

Post a Comment

0 Comments