In this tutorial, we will implement a task queue for sending email notifications to users. Our task queue will handle sending emails to users for various events, such as account activation, password reset, and promotional campaigns. This will allow our application to offload the email-sending process to the task queue, improving the overall performance and responsiveness.

Why Kafka?

This question accrued to me way too many times before starting to use Kafka why would write extra code just to accomplish something I can do directly? well, the answer is quite simple! IT'S A GAME CHANGER... why you may ask? I'll answer you.

Using Kafka to handle tasks instead of sending them directly offers several benefits:

  1. Scalability: Kafka enables you to distribute tasks across multiple instances and threads, which allows for better load balancing and parallel processing of tasks. This means that your application can handle a higher volume of tasks more efficiently.
  2. Resilience: Kafka provides built-in support for fault tolerance and failover. If one instance or thread fails, the remaining active instances can pick up the task assignment(s) of the downed instance, ensuring that your application continues to process tasks without interruption.
  3. Decoupling: Using Kafka as a message broker allows you to decouple the task-producing and task-consuming parts of your application. This means that changes or updates to one part of the system do not require changes to other parts, making it easier to maintain and evolve your application over time.
  4. Asynchronous processing: Kafka allows for asynchronous processing of tasks, which means that your application can continue to function while tasks are being processed in the background. This can improve the responsiveness and performance of your application, especially in situations where tasks may take a long time to complete.
  5. Durability: Kafka stores messages in a distributed, fault-tolerant, and highly available manner. This ensures that your tasks are not lost in case of failures and can be reprocessed if needed.

Now that you are convinced that kafka can solve the problems of the world (I wish) we can start with the tutorial.

we will implement a task queue in NestJS using Kafka as the message broker. We will cover the following steps:

  1. Setting up a new NestJS project
  2. Installing and configuring Kafka
  3. Setting up Kafka producers and consumers
  4. Testing the task queue
  5. creating our email service
  6. Testing the email task queue

Step 1: Setting up a new NestJS project


First, create a new NestJS project using the Nest CLI:

npm i -g @nestjs/cli nest new nestjs-kafka-task-queue

Navigate to the project directory:

cd nestjs-kafka-task-queue

Step 2: Installing and configuring Kafka

Install the required Kafka dependencies:

npm install kafkajs @nestjs/microservices

Create a kafka.config.ts file in the src directory to configure Kafka:

import { KafkaOptions } from '@nestjs/microservices';

export const kafkaConfig: KafkaOptions = {
  transport: Transport.KAFKA,
  options: {
    client: {
      brokers: ['localhost:9092'],
    },
    consumer: {
      groupId: 'nestjs-kafka-task-queue-group',
    },
  },
};

In this configuration, we set the transport to Kafka and specify the broker address and consumer group ID. Adjust the broker address if needed.

Step 3: Setting up Kafka producers and consumers


Create a new service called task.service.ts in the src directory:

nest generate service task

In task.service.ts, import the required modules and inject the Kafka client:

import { Injectable, OnModuleInit } from '@nestjs/common';
import { ClientKafka, MessagePattern, Payload } from '@nestjs/microservices';
import { kafkaConfig } from './kafka.config';

@Injectable()
export class TaskService implements OnModuleInit {
  constructor(private readonly client: ClientKafka) {}

  async onModuleInit() {
    this.client.subscribeToResponseOf('nestjs-kafka-task-queue.addTask');
    await this.client.connect();
  }

  async addTask(payload: any) {
    return this.client.send('nestjs-kafka-task-queue.addTask', payload);
  }

  @MessagePattern('nestjs-kafka-task-queue.addTask')
  async handleTask(payload: any) {
    console.log('Received task:', payload);
    // Process the task here
  }
}

In the onModuleInit method, we subscribe to the addTask topic and connect to the Kafka broker. The addTask method sends a message to the topic, and the handleTask method listens for messages on the topic and processes the tasks.

Now, update the app.module.ts file to include the Kafka configuration and the task service:

import { Module } from '@nestjs/common';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { TaskService } from './task.service';
import { ClientsModule, Transport } from '@nestjs/microservices';
import { kafkaConfig } from './kafka.config';

@Module({
  imports: [ClientsModule.register([kafkaConfig])],
  controllers: [AppController],
  providers: [AppService, TaskService],
})
export class AppModule {}

Step 4: Testing the task queue

To test the task queue, update the app.controller.ts file to include an endpoint for adding tasks:

import { Controller, Get, Param } from '@nestjs/common';
import { AppService } from './app.service';
import { TaskService } from './task.service';

@Controller()
export class AppController {
  constructor(private readonly appService: AppService, private readonly taskService: TaskService) {}

  @Get('addTask/:task')
  async addTask(@Param('task') task: string) {
    await this.taskService.addTask(task);
    return 'Task added';
  }
}

Now, start the NestJS application:

npm run start


Open a new terminal and use curl or any API client to send a request to the addTask endpoint:

curl http://localhost:3000/addTask/sample-task

You should see the "Task added" response, and in the NestJS application console, you should see the "Received task" message with the task payload.

Step 5: creating our email service

Install the nodemailer library to handle sending emails:

npm install nodemailer


Update the task.service.ts file to include the email-sending functionality. Import the required modules and create a sendEmail method:

import * as nodemailer from 'nodemailer';
async sendEmail(emailData: { to: string; subject: string; text: string }) {
    // Configure the email transporter
    const transporter = nodemailer.createTransport({
      host: 'smtp.example.com',
      port: 587,
      secure: false,
      auth: {
        user: 'your_email@example.com',
        pass: 'your_email_password',
      },
    });

    // Send the email
    await transporter.sendMail({
      from: 'no-reply@example.com',
      to: emailData.to,
      subject: emailData.subject,
      text: emailData.text,
    });
  }

  @MessagePattern('nestjs-kafka-task-queue.addTask')
  async handleTask(payload: any) {
    console.log('Received task:', payload);

    // Send email based on the task payload
    await this.sendEmail(payload);
  }

Replace the host, user, and pass fields with your email server's SMTP settings.

Step 6: Testing the email task queue

Update the app.controller.ts file to include an endpoint for adding email tasks:

import { Controller, Get, Param } from '@nestjs/common';
import { AppService } from './app.service';
import { TaskService } from './task.service';

@Controller()
export class AppController {
  constructor(private readonly appService: AppService, private readonly taskService: TaskService) {}

  @Get('sendEmail/:email')
  async sendEmail(@Param('email') email: string) {
    const emailData = {
      to: email,
      subject: 'Welcome to our application',
      text: 'Thank you for joining our application!',
    };
    await this.taskService.addTask(emailData);
    return 'Email task added';
  }
}

Now, start the NestJS application, and use curl or any API client to send a request to the sendEmail endpoint:

curl http://localhost:3000/sendEmail/test@example.com

You should see the "Email task added" response, and the email should be sent to the specified address.


And that's it!!! You have successfully implemented a task queue for sending email notifications. You can now build upon this foundation to create more complex task-processing systems for various use cases.