Kafka
Implmentation

Implemnetation of Kafka in Node.js

We will use the kafkajs library to interact with the Kafka cluster. The kafkajs library is a modern Apache Kafka client for Node.js.

Installation

npm install kafkajs

This will install the kafkajs library in your project.

Client Configuration

client.js
const { Kafka } = require("kafkajs");
 
const kafka = new Kafka({
  clientId: "my-app",
  brokers: ["<YOUR_IP>:9092"],
});

The 9092 is the default port for Kafka. You can change it if you have a different port.

This Client Configuration must be present at every place where you want to interact with the Kafka cluster.

Admin Client

The Admin Client is used to manage topics, partitions, and other Kafka resources.

admin.js
const { kafka } = require("./client.js");
 
async function init() {
  const admin = kafka.admin(); // Create an admin client
  console.log("Connecting...");
  await admin.connect(); // Connect the admin client to the Kafka cluster
  console.log("Admin Connected!");
 
  // Create a topic
  await admin.createTopics({
    topics: [
      {
        topic: "rider-updates", // Topic name
        numPartitions: 2, // Number of partitions
      },
    ],
  });
  console.log("Topic created! [rider-updates]");
  await admin.disconnect(); // Disconnect the admin client after use to free up resources
}
 
init(); // Call the function
🚫

If you are trying to create a topic that already exists, you will get an error. You can ignore the error or check if the topic exists before creating it.

Producer

The Producer is used to send messages to a Kafka topic.

producer.js
const { kafka } = require("./client.js");
 
async function init() {
  const producer = kafka.producer(); // Create a producer client
  console.log("Connecting...");
  await producer.connect(); // Connect the producer client to the Kafka cluster
  console.log("Producer Connected!");
 
  // Send a message
  await producer.send({
    topic: "rider-updates", // Topic name
    messages: [
      {
        key: "location-update",
        value: JSON.stringify({
          rider_id: 1,
          latitude: 34.22,
          longitude: -118.55,
          timestamp: new Date(),
          name: "Sunny",
          location: "SOUTH",
        }),
      },
    ],
  });
  console.log("Message sent!");
  await producer.disconnect(); // Disconnect the producer client after use to free up resources
}
💡

The key is optional. It is used to distribute messages to partitions.

Consumer

The Consumer is used to read messages from a Kafka topic.

consumer.js
const { kafka } = require("./client.js");
 
async function init() {
  const consumer = kafka.consumer({ groupId: "user-1" });
  console.log("Connecting...");
  await consumer.connect();
  console.log("Consumer Connected!");
 
  await consumer.subscribe({ topic: "rider-updates", fromBeginning: true });
 
  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      console.log(
        `TOPIC: ${topic} \n, PART:${partition}\n`,
        message.value.toString()
      );
    },
  });
}
 
init();
💡

The groupId is used to group consumers. If you have multiple consumers with the same groupId, they will share the messages.

💡

You dont have to disconnect the consumer as it will keep running and listening to the messages.