In this tutorial, we will implement a task queue for sending email notifications to users. Our task queue will handle sending emails to users for various events, such as account activation, password reset, and promotional campaigns. This will allow our application to offload the email-sending process to the task queue, improving the overall performance and responsiveness.
Why Kafka?
This question accrued to me way too many times before starting to use Kafka why would write extra code just to accomplish something I can do directly? well, the answer is quite simple! IT'S A GAME CHANGER... why you may ask? I'll answer you.
Using Kafka to handle tasks instead of sending them directly offers several benefits:
- Scalability: Kafka enables you to distribute tasks across multiple instances and threads, which allows for better load balancing and parallel processing of tasks. This means that your application can handle a higher volume of tasks more efficiently.
- Resilience: Kafka provides built-in support for fault tolerance and failover. If one instance or thread fails, the remaining active instances can pick up the task assignment(s) of the downed instance, ensuring that your application continues to process tasks without interruption.
- Decoupling: Using Kafka as a message broker allows you to decouple the task-producing and task-consuming parts of your application. This means that changes or updates to one part of the system do not require changes to other parts, making it easier to maintain and evolve your application over time.
- Asynchronous processing: Kafka allows for asynchronous processing of tasks, which means that your application can continue to function while tasks are being processed in the background. This can improve the responsiveness and performance of your application, especially in situations where tasks may take a long time to complete.
- Durability: Kafka stores messages in a distributed, fault-tolerant, and highly available manner. This ensures that your tasks are not lost in case of failures and can be reprocessed if needed.
Now that you are convinced that kafka can solve the problems of the world (I wish) we can start with the tutorial.
we will implement a task queue in NestJS using Kafka as the message broker. We will cover the following steps:
- Setting up a new NestJS project
- Installing and configuring Kafka
- Setting up Kafka producers and consumers
- Testing the task queue
- creating our email service
- Testing the email task queue
Step 1: Setting up a new NestJS project
First, create a new NestJS project using the Nest CLI:
npm i -g @nestjs/cli nest new nestjs-kafka-task-queue
Navigate to the project directory:
cd nestjs-kafka-task-queue
Step 2: Installing and configuring Kafka
Install the required Kafka dependencies:
npm install kafkajs @nestjs/microservices
Create a kafka.config.ts
file in the src
directory to configure Kafka:
import { KafkaOptions } from '@nestjs/microservices';
export const kafkaConfig: KafkaOptions = {
transport: Transport.KAFKA,
options: {
client: {
brokers: ['localhost:9092'],
},
consumer: {
groupId: 'nestjs-kafka-task-queue-group',
},
},
};
In this configuration, we set the transport to Kafka and specify the broker address and consumer group ID. Adjust the broker address if needed.
Step 3: Setting up Kafka producers and consumers
Create a new service called task.service.ts
in the src
directory:
nest generate service task
In task.service.ts
, import the required modules and inject the Kafka client:
import { Injectable, OnModuleInit } from '@nestjs/common';
import { ClientKafka, MessagePattern, Payload } from '@nestjs/microservices';
import { kafkaConfig } from './kafka.config';
@Injectable()
export class TaskService implements OnModuleInit {
constructor(private readonly client: ClientKafka) {}
async onModuleInit() {
this.client.subscribeToResponseOf('nestjs-kafka-task-queue.addTask');
await this.client.connect();
}
async addTask(payload: any) {
return this.client.send('nestjs-kafka-task-queue.addTask', payload);
}
@MessagePattern('nestjs-kafka-task-queue.addTask')
async handleTask(payload: any) {
console.log('Received task:', payload);
// Process the task here
}
}
In the onModuleInit
method, we subscribe to the addTask
topic and connect to the Kafka broker. The addTask
method sends a message to the topic, and the handleTask
method listens for messages on the topic and processes the tasks.
Now, update the app.module.ts
file to include the Kafka configuration and the task service:
import { Module } from '@nestjs/common';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { TaskService } from './task.service';
import { ClientsModule, Transport } from '@nestjs/microservices';
import { kafkaConfig } from './kafka.config';
@Module({
imports: [ClientsModule.register([kafkaConfig])],
controllers: [AppController],
providers: [AppService, TaskService],
})
export class AppModule {}
Step 4: Testing the task queue
To test the task queue, update the app.controller.ts
file to include an endpoint for adding tasks:
import { Controller, Get, Param } from '@nestjs/common';
import { AppService } from './app.service';
import { TaskService } from './task.service';
@Controller()
export class AppController {
constructor(private readonly appService: AppService, private readonly taskService: TaskService) {}
@Get('addTask/:task')
async addTask(@Param('task') task: string) {
await this.taskService.addTask(task);
return 'Task added';
}
}
Now, start the NestJS application:
npm run start
Open a new terminal and use curl
or any API client to send a request to the addTask
endpoint:
curl http://localhost:3000/addTask/sample-task
You should see the "Task added" response, and in the NestJS application console, you should see the "Received task" message with the task payload.
Step 5: creating our email service
Install the nodemailer
library to handle sending emails:
npm install nodemailer
Update the task.service.ts
file to include the email-sending functionality. Import the required modules and create a sendEmail
method:
import * as nodemailer from 'nodemailer';
async sendEmail(emailData: { to: string; subject: string; text: string }) {
// Configure the email transporter
const transporter = nodemailer.createTransport({
host: 'smtp.example.com',
port: 587,
secure: false,
auth: {
user: 'your_email@example.com',
pass: 'your_email_password',
},
});
// Send the email
await transporter.sendMail({
from: 'no-reply@example.com',
to: emailData.to,
subject: emailData.subject,
text: emailData.text,
});
}
@MessagePattern('nestjs-kafka-task-queue.addTask')
async handleTask(payload: any) {
console.log('Received task:', payload);
// Send email based on the task payload
await this.sendEmail(payload);
}
Replace the host
, user
, and pass
fields with your email server's SMTP settings.
Step 6: Testing the email task queue
Update the app.controller.ts
file to include an endpoint for adding email tasks:
import { Controller, Get, Param } from '@nestjs/common';
import { AppService } from './app.service';
import { TaskService } from './task.service';
@Controller()
export class AppController {
constructor(private readonly appService: AppService, private readonly taskService: TaskService) {}
@Get('sendEmail/:email')
async sendEmail(@Param('email') email: string) {
const emailData = {
to: email,
subject: 'Welcome to our application',
text: 'Thank you for joining our application!',
};
await this.taskService.addTask(emailData);
return 'Email task added';
}
}
Now, start the NestJS application, and use curl
or any API client to send a request to the sendEmail
endpoint:
curl http://localhost:3000/sendEmail/test@example.com
You should see the "Email task added" response, and the email should be sent to the specified address.
And that's it!!! You have successfully implemented a task queue for sending email notifications. You can now build upon this foundation to create more complex task-processing systems for various use cases.