Implemnetation of Kafka in Node.js
We will use the kafkajs
library to interact with the Kafka cluster. The kafkajs
library is a modern Apache Kafka client for Node.js.
Installation
npm install kafkajs
This will install the kafkajs
library in your project.
Client Configuration
const { Kafka } = require("kafkajs");
const kafka = new Kafka({
clientId: "my-app",
brokers: ["<YOUR_IP>:9092"],
});
The 9092
is the default port for Kafka. You can change it if you have a different port.
This Client Configuration must be present at every place where you want to interact with the Kafka cluster.
Admin Client
The Admin Client is used to manage topics, partitions, and other Kafka resources.
const { kafka } = require("./client.js");
async function init() {
const admin = kafka.admin(); // Create an admin client
console.log("Connecting...");
await admin.connect(); // Connect the admin client to the Kafka cluster
console.log("Admin Connected!");
// Create a topic
await admin.createTopics({
topics: [
{
topic: "rider-updates", // Topic name
numPartitions: 2, // Number of partitions
},
],
});
console.log("Topic created! [rider-updates]");
await admin.disconnect(); // Disconnect the admin client after use to free up resources
}
init(); // Call the function
If you are trying to create a topic that already exists, you will get an error. You can ignore the error or check if the topic exists before creating it.
Producer
The Producer is used to send messages to a Kafka topic.
const { kafka } = require("./client.js");
async function init() {
const producer = kafka.producer(); // Create a producer client
console.log("Connecting...");
await producer.connect(); // Connect the producer client to the Kafka cluster
console.log("Producer Connected!");
// Send a message
await producer.send({
topic: "rider-updates", // Topic name
messages: [
{
key: "location-update",
value: JSON.stringify({
rider_id: 1,
latitude: 34.22,
longitude: -118.55,
timestamp: new Date(),
name: "Sunny",
location: "SOUTH",
}),
},
],
});
console.log("Message sent!");
await producer.disconnect(); // Disconnect the producer client after use to free up resources
}
The key
is optional. It is used to distribute messages to partitions.
Consumer
The Consumer is used to read messages from a Kafka topic.
const { kafka } = require("./client.js");
async function init() {
const consumer = kafka.consumer({ groupId: "user-1" });
console.log("Connecting...");
await consumer.connect();
console.log("Consumer Connected!");
await consumer.subscribe({ topic: "rider-updates", fromBeginning: true });
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log(
`TOPIC: ${topic} \n, PART:${partition}\n`,
message.value.toString()
);
},
});
}
init();
The groupId
is used to group consumers. If you have multiple consumers
with the same groupId
, they will share the messages.
You dont have to disconnect the consumer as it will keep running and listening to the messages.