Cgoyal.dev

Kafka Series (Part 2: Example)

Cover Image for Kafka Series (Part 2: Example)
Chetan Goyal
Chetan Goyal

Kafka Series (Part 2: Basic Example)

In this article, we will go through the basic example for starting up our first kafka broker in KRaft mode, connect it with client applications (admin, producer and consumer).

Configuring Kafka for the first time is a very easy task without any security measure. We will cover security checks and ACL in upcoming chapters. :)

Github Repo: https://github.com/Chetan-Goyal/Kafka-Series/tree/part2-example

❔ Requirements

  1. Docker must be installed. - https://docs.docker.com/desktop/
  2. nodejs must be installed - for running the admin, producer and consumer client. - https://github.com/nvm-sh/nvm?tab=readme-ov-file#installing-and-updating

🎯 Goals

  1. Start Single Kafka Broker in KRaft Mode.
  2. Creating Topics dynamically via admin client.
  3. Publishing Messages to the created topics using Producer Client.
  4. Subscribing to Messages from the created topics using Consumer Client.
  5. Running the code.

Create a new directory with name something like kafka-deepdive and follow the below steps:

1. 🗄️ Starting Single Kafka broker in KRaft Mode

For starting the kafka broker, run the following command:

docker run --rm -p 9092:9092 apache/kafka:3.9.0

Explanation of Kafka:

9092:9092 - This port is used for Kafka communication from outside the container / host machine.

2. 🛡️ Creating Topics dynamically via admin client.

kafka.admin() gives us the admin instance which can be used to access the advanced action which are available to the admin apps only. We will look into Access Control in later articles.

Here, We are creating topics dynamically using admin client of Kafka for tasks which we can represent as creating new device category. While creating a topic, we have to specify the partitions count which represents the parts in which the topic must be divided. It has benefits like parallel processing but be cautious while choosing this number since a very large value is bad for the performance.

topic.js

const { Kafka } = require("kafkajs");

const TOPIC_NAME = "Users";
const NUM_PARTITIONS = 2;

async function createTopicFn() {
  try {
    const kafka = new Kafka({
      brokers: ["localhost:9092"],
      clientId: "kafka-playground",
      connectionTimeout: 10_000,
      authenticationTimeout: 10_000,
    });

    const admin = kafka.admin();
    await admin.connect();


    console.log("Kafka Admin Connected!");

    await admin.createTopics({
      topics: [{
        "topic": TOPIC_NAME,
        "numPartitions": NUM_PARTITIONS
      }]
    });
    console.log("Topic Created Successfully!");

    await admin.disconnect();
    console.log("Admin Client Disconnected!");

  } catch (e) {
    console.error(`Something bad happened ${e}`);
  } finally {
    process.exit(0);
  }
}

createTopicFn();

3. 📤 Publishing Messages to the created topics using Producer Client.

Producer is allowed to push the messages into the kafka stream on a particular topic like "heartbeat_sensor", "temperature_sensor" along with the partition number and the actual message/value. Here partition number is chosen using the application level logic and starts from 0 and must be less than the partitions available in the topic [entered at the time of creation].

producer.js

const { Kafka } = require("kafkajs");

const msgCount = process.argv[2];
const TOPIC_NAME = "Users";

async function producerFn() {
  try {
    const kafka = new Kafka({
      brokers: ["localhost:9092"],
      clientId: "kafka-playground",
      connectionTimeout: 10_000,
      authenticationTimeout: 10_000,
    });

    const producer = kafka.producer();
    await producer.connect();


    console.log("Kafka Producer Connected!");

    const msgs = ["ABC", "ZYX"]

    for (let i = 0; i < parseInt(msgCount); i++) {
      await producer.send({
        topic: TOPIC_NAME,
        messages: [{
          partition: (i % 2),
          value: msgs[i % 2] + `_${i + 1}`
        }]
      });
      console.log(`[${Date.now()}] Producer: Msg Sent # ${i + 1}`);

      await new Promise(resolve => setTimeout(resolve, 500));
    }

    await producer.disconnect();

  } catch (e) {
    console.error(`Something bad happened ${e}`);
  } finally {
    process.exit(0);
  }
}

producerFn();

4. 📥 Consuming Messages from the created topics using Consumer Client.

Consumer are used to listen to the messages in a particular topic or a group of topic. Kafka consumer must be inside a consumer group and it basically represents the state of the event stream.

For example, Let's consider 2 Consumers: Consumer#1 and Consumer#2. Both Consumers are listening at same time but due to some reason, Consumer#1 goes down. When it comes up again it will continue from the same index from where it left whereas Consumer#2 will keep continuing his index without any outage.

consumer.js

const { Kafka } = require("kafkajs");

const TOPIC_NAME = "Users";

async function consumerFn() {
  try {
    const kafka = new Kafka({
      brokers: ["localhost:9092"],
      clientId: "kafka-playground-consumer",
      connectionTimeout: 10_000,
      authenticationTimeout: 10_000,
    });

    const consumer = kafka.consumer({
      groupId: "consumer-group-2",
    });

    await consumer.connect();
    console.log("Kafka consumer Connected!");

    await consumer.subscribe({
      topic: TOPIC_NAME,
      fromBeginning: true
    });

    console.log("Kafka consumer Subscribed Successfully!");

    await consumer.run({
      eachMessage: (msg) => {
        console.log(`[${Date.now()}] Consumer: Msg Rcvd - ${msg.message.value.toString()}`);
      },
    })
  } catch (e) {
    console.error(`Something bad happened ${e}`);
  } finally {
  }
}

consumerFn();

5. ⚒️ Running the code.

Note: You must be in the same directory where you created producer.js, consumer.js and admin.js files.

Now, Let's create a topic named "Users" with 2 partitions. Run the following command to create this topic:

node topic.js

Output:

$ node topic.js
Kafka Admin Connected!
Topic Created Successfully!
Admin Client Disconnected!

Now, let's start our Consumer so that we can listen for all of the incoming messages:

node consumer.js

Finally, we can start publishing on our topic by starting Producer script with 10 to send total of 10 messages with 500ms delay:

node producer.js 10

As soon as you run the last command, you will start receiving messages sent from producer in the consumer terminal.

Producer Output

$ node producer.js 10
Kafka Producer Connected!
[1715883036583] Producer: Msg Sent # 1
[1715883037089] Producer: Msg Sent # 2
[1715883037602] Producer: Msg Sent # 3
[1715883038117] Producer: Msg Sent # 4
[1715883038630] Producer: Msg Sent # 5
[1715883039145] Producer: Msg Sent # 6
[1715883039663] Producer: Msg Sent # 7
[1715883040178] Producer: Msg Sent # 8
[1715883040687] Producer: Msg Sent # 9
[1715883041205] Producer: Msg Sent # 10

Consumer Output

$ node consumer.js
Kafka consumer Connected!
Kafka consumer Subscribed Successfully!
[1715883056243] Consumer: Msg Rcvd - ABC_1
[1715883056245] Consumer: Msg Rcvd - ABC_3
[1715883056246] Consumer: Msg Rcvd - ABC_5
[1715883056246] Consumer: Msg Rcvd - ABC_7
[1715883056246] Consumer: Msg Rcvd - ABC_9
[1715883056281] Consumer: Msg Rcvd - ZYX_2
[1715883056281] Consumer: Msg Rcvd - ZYX_4
[1715883056281] Consumer: Msg Rcvd - ZYX_6
[1715883056281] Consumer: Msg Rcvd - ZYX_8
[1715883056282] Consumer: Msg Rcvd - ZYX_10

Here, message ABC represents partition 0 messages and ZYX represents partition 2 messages. The number at the end represents the index.

You may have noticed that the ordering is different but we learned that kafka maintains order. Kafka maintains order within a partition but here, we are using 2 partitions. That's why messges are not partitions but if you notice ABC and ZYX messages separately, they are ordered. :)

📌 Conclusion

There you have it! 🎉 You've just taken the first step on your Kafka journey, and now you're ready to dive even deeper into this fascinating world of event streaming.

Getting the hang of setting up Kafka and tinkering with its features is like opening a door to endless possibilities. So, give yourself a pat on the back for tackling this challenge head-on!

But guess what? We're just scratching the surface here. There's a whole bunch more to explore about Kafka, and we're in for quite the adventure together. 🚀

In our upcoming articles, we'll dig into some really cool stuff, check out how Kafka works in the real world, and maybe even uncover a few Kafka secrets along the way! So, stay tuned and keep that curiosity going strong because the fun is far from over!

Until next time, keep on Kafka-ing! 🌟

📑 References

  1. Hussein Nasser: Hussein Nasser has provided invaluable insights into various system design concepts, inspiring much of the content in this article. You can explore more of his work on his YouTube channel.

  2. KafkaJS: KafkaJS is a modern Apache Kafka client for Node.js. It is compatible with Kafka 0.10+ and is used by many companies in production. You can find more about KafkaJS here.

  3. Kafka Documentation: Kafka Documentation is the official documentation for Apache Kafka which finally simplified the kafka initial setup. You can find more about Kafka here.