Browse Source

Merge pull request #61 from vernu/job-queue

Implement optional job queue for sending sms with delay
pull/64/head
Israel Abebe 12 months ago
committed by GitHub
parent
commit
5233c56021
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 4
      api/.env.example
  2. 6
      api/package.json
  3. 536
      api/pnpm-lock.yaml
  4. 43
      api/src/app.module.ts
  5. 21
      api/src/gateway/gateway.module.ts
  6. 202
      api/src/gateway/gateway.service.ts
  7. 102
      api/src/gateway/queue/sms-queue.processor.ts
  8. 66
      api/src/gateway/queue/sms-queue.service.ts
  9. 15
      api/src/gateway/schemas/sms-batch.schema.ts
  10. 7
      api/src/gateway/schemas/sms.schema.ts

4
api/.env.example

@ -19,3 +19,7 @@ MAIL_USER=
MAIL_PASS=
MAIL_FROM=
MAIL_REPLY_TO=textbee.dev@gmail.com
# SMS Queue Configuration
USE_SMS_QUEUE=false
REDIS_URL=redis://localhost:6379 # if queue is enabled, redis url is required

6
api/package.json

@ -21,9 +21,12 @@
},
"dependencies": {
"@nest-modules/mailer": "^1.3.22",
"@nestjs/bull": "^11.0.2",
"@nestjs/common": "^10.4.5",
"@nestjs/config": "^4.0.1",
"@nestjs/core": "^10.4.5",
"@nestjs/jwt": "^10.2.0",
"@nestjs/mapped-types": "^2.1.0",
"@nestjs/mongoose": "^10.0.10",
"@nestjs/passport": "^10.0.3",
"@nestjs/platform-express": "^10.4.5",
@ -33,10 +36,13 @@
"@polar-sh/sdk": "^0.30.0",
"axios": "^1.8.2",
"bcryptjs": "^2.4.3",
"bull": "^4.16.5",
"class-validator": "^0.14.1",
"dotenv": "^16.4.5",
"express": "^4.21.2",
"firebase-admin": "^12.6.0",
"handlebars": "^4.7.8",
"ioredis": "^5.6.0",
"mongoose": "^8.12.1",
"nodemailer": "^6.10.0",
"passport": "^0.7.0",

536
api/pnpm-lock.yaml
File diff suppressed because it is too large
View File

43
api/src/app.module.ts

@ -1,4 +1,9 @@
import { Module } from '@nestjs/common'
import {
MiddlewareConsumer,
Module,
NestModule,
RequestMethod,
} from '@nestjs/common'
import { MongooseModule } from '@nestjs/mongoose'
import { GatewayModule } from './gateway/gateway.module'
import { AuthModule } from './auth/auth.module'
@ -7,12 +12,29 @@ import { ThrottlerModule } from '@nestjs/throttler'
import { APP_GUARD } from '@nestjs/core/constants'
import { WebhookModule } from './webhook/webhook.module'
import { ThrottlerByIpGuard } from './auth/guards/throttle-by-ip.guard'
import { Injectable, NestMiddleware } from '@nestjs/common'
import { Request, Response, NextFunction } from 'express'
import { ScheduleModule } from '@nestjs/schedule'
import { BillingModule } from './billing/billing.module';
import { BillingModule } from './billing/billing.module'
import { ConfigModule, ConfigService } from '@nestjs/config'
import { BullModule } from '@nestjs/bull'
@Injectable()
export class LoggerMiddleware implements NestMiddleware {
use(req: Request, res: Response, next: NextFunction) {
console.log('req.originalUrl: ', req.originalUrl)
if (next) {
next()
}
}
}
@Module({
imports: [
MongooseModule.forRoot(process.env.MONGO_URI),
ConfigModule.forRoot({
isGlobal: true,
}),
ThrottlerModule.forRoot([
{
ttl: 60000,
@ -20,6 +42,15 @@ import { BillingModule } from './billing/billing.module';
},
]),
ScheduleModule.forRoot(),
BullModule.forRootAsync({
imports: [ConfigModule],
inject: [ConfigService],
useFactory: async (configService: ConfigService) => {
return {
redis: configService.get('REDIS_URL'),
}
},
}),
AuthModule,
UsersModule,
GatewayModule,
@ -34,4 +65,10 @@ import { BillingModule } from './billing/billing.module';
},
],
})
export class AppModule {}
export class AppModule implements NestModule {
configure(consumer: MiddlewareConsumer) {
consumer
.apply(LoggerMiddleware)
.forRoutes({ path: '*', method: RequestMethod.ALL })
}
}

21
api/src/gateway/gateway.module.ts

@ -9,6 +9,10 @@ import { SMS, SMSSchema } from './schemas/sms.schema'
import { SMSBatch, SMSBatchSchema } from './schemas/sms-batch.schema'
import { WebhookModule } from 'src/webhook/webhook.module'
import { BillingModule } from 'src/billing/billing.module'
import { BullModule } from '@nestjs/bull'
import { ConfigModule } from '@nestjs/config'
import { SmsQueueService } from './queue/sms-queue.service'
import { SmsQueueProcessor } from './queue/sms-queue.processor'
@Module({
imports: [
@ -26,13 +30,26 @@ import { BillingModule } from 'src/billing/billing.module'
schema: SMSBatchSchema,
},
]),
BullModule.registerQueue({
name: 'sms',
defaultJobOptions: {
attempts: 2,
backoff: {
type: 'exponential',
delay: 1000,
},
removeOnComplete: false,
removeOnFail: false,
},
}),
AuthModule,
UsersModule,
WebhookModule,
forwardRef(() => BillingModule),
ConfigModule,
],
controllers: [GatewayController],
providers: [GatewayService],
exports: [MongooseModule, GatewayService],
providers: [GatewayService, SmsQueueService, SmsQueueProcessor],
exports: [MongooseModule, GatewayService, SmsQueueService],
})
export class GatewayModule {}

202
api/src/gateway/gateway.service.ts

@ -15,13 +15,12 @@ import { AuthService } from 'src/auth/auth.service'
import { SMS } from './schemas/sms.schema'
import { SMSType } from './sms-type.enum'
import { SMSBatch } from './schemas/sms-batch.schema'
import {
BatchResponse,
Message,
} from 'firebase-admin/messaging'
import { BatchResponse, Message } from 'firebase-admin/messaging'
import { WebhookEvent } from 'src/webhook/webhook-event.enum'
import { WebhookService } from 'src/webhook/webhook.service'
import { BillingService } from 'src/billing/billing.service'
import { SmsQueueService } from './queue/sms-queue.service'
@Injectable()
export class GatewayService {
constructor(
@ -31,6 +30,7 @@ export class GatewayService {
private authService: AuthService,
private webhookService: WebhookService,
private billingService: BillingService,
private smsQueueService: SmsQueueService,
) {}
async registerDevice(
@ -151,6 +151,7 @@ export class GatewayService {
message,
recipientCount: recipients.length,
recipientPreview: this.getRecipientsPreview(recipients),
status: 'pending',
})
} catch (e) {
throw new HttpException(
@ -173,6 +174,7 @@ export class GatewayService {
type: SMSType.SENT,
recipient,
requestedAt: new Date(),
status: 'pending',
})
const updatedSMSData = {
smsId: sms._id,
@ -198,6 +200,50 @@ export class GatewayService {
fcmMessages.push(fcmMessage)
}
// Check if we should use the queue
if (this.smsQueueService.isQueueEnabled()) {
try {
// Update batch status to processing
await this.smsBatchModel.findByIdAndUpdate(smsBatch._id, {
$set: { status: 'processing' },
})
// Add to queue
await this.smsQueueService.addSendSmsJob(
deviceId,
fcmMessages,
smsBatch._id.toString(),
)
return {
success: true,
message: 'SMS added to queue for processing',
smsBatchId: smsBatch._id,
recipientCount: recipients.length,
}
} catch (e) {
// Update batch status to failed
await this.smsBatchModel.findByIdAndUpdate(smsBatch._id, {
$set: { status: 'failed', error: e.message },
})
// Update all SMS in batch to failed
await this.smsModel.updateMany(
{ smsBatch: smsBatch._id },
{ $set: { status: 'failed', error: e.message } },
)
throw new HttpException(
{
success: false,
error: 'Failed to add SMS to queue',
additionalInfo: e,
},
HttpStatus.INTERNAL_SERVER_ERROR,
)
}
}
try {
const response = await firebaseAdmin.messaging().sendEach(fcmMessages)
@ -223,8 +269,26 @@ export class GatewayService {
console.log('Failed to update sentSMSCount')
console.log(e)
})
this.smsBatchModel
.findByIdAndUpdate(smsBatch._id, {
$set: { status: 'completed' },
})
.exec()
.catch((e) => {
console.error('failed to update sms batch status to completed')
})
return response
} catch (e) {
this.smsBatchModel
.findByIdAndUpdate(smsBatch._id, {
$set: { status: 'failed', error: e.message },
})
.exec()
.catch((e) => {
console.error('failed to update sms batch status to failed')
})
throw new HttpException(
{
success: false,
@ -249,7 +313,6 @@ export class GatewayService {
)
}
if (
!Array.isArray(body.messages) ||
body.messages.length === 0 ||
@ -281,9 +344,11 @@ export class GatewayService {
recipientPreview: this.getRecipientsPreview(
messages.map((m) => m.recipients).flat(),
),
status: 'pending',
})
const fcmResponses: BatchResponse[] = []
const fcmMessages: Message[] = []
for (const smsData of messages) {
const message = smsData.message
const recipients = smsData.recipients
@ -296,8 +361,6 @@ export class GatewayService {
continue
}
const fcmMessages: Message[] = []
for (const recipient of recipients) {
const sms = await this.smsModel.create({
device: device._id,
@ -306,6 +369,7 @@ export class GatewayService {
type: SMSType.SENT,
recipient,
requestedAt: new Date(),
status: 'pending',
})
const updatedSMSData = {
smsId: sms._id,
@ -330,9 +394,58 @@ export class GatewayService {
}
fcmMessages.push(fcmMessage)
}
}
// Check if we should use the queue
if (this.smsQueueService.isQueueEnabled()) {
try {
const response = await firebaseAdmin.messaging().sendEach(fcmMessages)
// Add to queue
await this.smsQueueService.addSendSmsJob(
deviceId,
fcmMessages,
smsBatch._id.toString(),
)
return {
success: true,
message: 'Bulk SMS added to queue for processing',
smsBatchId: smsBatch._id,
recipientCount: messages.map((m) => m.recipients).flat().length,
}
} catch (e) {
// Update batch status to failed
await this.smsBatchModel.findByIdAndUpdate(smsBatch._id, {
$set: {
status: 'failed',
error: e.message,
successCount: 0,
failureCount: fcmMessages.length,
},
})
// Update all SMS in batch to failed
await this.smsModel.updateMany(
{ smsBatch: smsBatch._id },
{ $set: { status: 'failed', error: e.message } },
)
throw new HttpException(
{
success: false,
error: 'Failed to add bulk SMS to queue',
additionalInfo: e,
},
HttpStatus.INTERNAL_SERVER_ERROR,
)
}
}
const fcmMessagesBatches = fcmMessages.map((m) => [m])
const fcmResponses: BatchResponse[] = []
for (const batch of fcmMessagesBatches) {
try {
const response = await firebaseAdmin.messaging().sendEach(batch)
console.log(response)
fcmResponses.push(response)
@ -346,9 +459,27 @@ export class GatewayService {
console.log('Failed to update sentSMSCount')
console.log(e)
})
this.smsBatchModel
.findByIdAndUpdate(smsBatch._id, {
$set: { status: 'completed' },
})
.exec()
.catch((e) => {
console.error('failed to update sms batch status to completed')
})
} catch (e) {
console.log('Failed to send SMS: FCM')
console.log(e)
this.smsBatchModel
.findByIdAndUpdate(smsBatch._id, {
$set: { status: 'failed', error: e.message },
})
.exec()
.catch((e) => {
console.error('failed to update sms batch status to failed')
})
}
}
@ -438,7 +569,11 @@ export class GatewayService {
return sms
}
async getReceivedSMS(deviceId: string, page = 1, limit = 50): Promise<{ data: any[], meta: any }> {
async getReceivedSMS(
deviceId: string,
page = 1,
limit = 50,
): Promise<{ data: any[]; meta: any }> {
const device = await this.deviceModel.findById(deviceId)
if (!device) {
@ -452,13 +587,13 @@ export class GatewayService {
}
// Calculate skip value for pagination
const skip = (page - 1) * limit;
const skip = (page - 1) * limit
// Get total count for pagination metadata
const total = await this.smsModel.countDocuments({
device: device._id,
type: SMSType.RECEIVED,
});
})
// @ts-ignore
const data = await this.smsModel
@ -471,7 +606,7 @@ export class GatewayService {
{
sort: { receivedAt: -1 },
limit: limit,
skip: skip
skip: skip,
},
)
.populate({
@ -481,7 +616,7 @@ export class GatewayService {
.lean() // Use lean() to return plain JavaScript objects instead of Mongoose documents
// Calculate pagination metadata
const totalPages = Math.ceil(total / limit);
const totalPages = Math.ceil(total / limit)
return {
meta: {
@ -491,10 +626,15 @@ export class GatewayService {
totalPages,
},
data,
};
}
}
async getMessages(deviceId: string, type = '', page = 1, limit = 50): Promise<{ data: any[], meta: any }> {
async getMessages(
deviceId: string,
type = '',
page = 1,
limit = 50,
): Promise<{ data: any[]; meta: any }> {
const device = await this.deviceModel.findById(deviceId)
if (!device) {
@ -508,32 +648,28 @@ export class GatewayService {
}
// Calculate skip value for pagination
const skip = (page - 1) * limit;
const skip = (page - 1) * limit
// Build query based on type filter
const query: any = { device: device._id };
const query: any = { device: device._id }
if (type === 'sent') {
query.type = SMSType.SENT;
query.type = SMSType.SENT
} else if (type === 'received') {
query.type = SMSType.RECEIVED;
query.type = SMSType.RECEIVED
}
// Get total count for pagination metadata
const total = await this.smsModel.countDocuments(query);
const total = await this.smsModel.countDocuments(query)
// @ts-ignore
const data = await this.smsModel
.find(
query,
null,
{
// Sort by the most recent timestamp (receivedAt for received, sentAt for sent)
sort: { createdAt: -1 },
limit: limit,
skip: skip
},
)
.find(query, null, {
// Sort by the most recent timestamp (receivedAt for received, sentAt for sent)
sort: { createdAt: -1 },
limit: limit,
skip: skip,
})
.populate({
path: 'device',
select: '_id brand model buildId enabled',
@ -541,7 +677,7 @@ export class GatewayService {
.lean() // Use lean() to return plain JavaScript objects instead of Mongoose documents
// Calculate pagination metadata
const totalPages = Math.ceil(total / limit);
const totalPages = Math.ceil(total / limit)
return {
meta: {
@ -551,7 +687,7 @@ export class GatewayService {
totalPages,
},
data,
};
}
}
async getStatsForUser(user: User) {

102
api/src/gateway/queue/sms-queue.processor.ts

@ -0,0 +1,102 @@
import { Process, Processor } from '@nestjs/bull'
import { InjectModel } from '@nestjs/mongoose'
import { Job } from 'bull'
import { Model } from 'mongoose'
import * as firebaseAdmin from 'firebase-admin'
import { Device } from '../schemas/device.schema'
import { SMS } from '../schemas/sms.schema'
import { SMSBatch } from '../schemas/sms-batch.schema'
import { WebhookService } from 'src/webhook/webhook.service'
import { Logger } from '@nestjs/common'
@Processor('sms')
export class SmsQueueProcessor {
private readonly logger = new Logger(SmsQueueProcessor.name)
constructor(
@InjectModel(Device.name) private deviceModel: Model<Device>,
@InjectModel(SMS.name) private smsModel: Model<SMS>,
@InjectModel(SMSBatch.name) private smsBatchModel: Model<SMSBatch>,
private webhookService: WebhookService,
) {}
@Process({
name: 'send-sms',
concurrency: 10,
})
async handleSendSms(job: Job<any>) {
this.logger.debug(`Processing send-sms job ${job.id}`)
const { deviceId, fcmMessages, smsBatchId } = job.data
try {
this.smsBatchModel
.findByIdAndUpdate(smsBatchId, {
$set: { status: 'processing' },
})
.exec()
.catch((error) => {
this.logger.error(
`Failed to update sms batch status to processing ${smsBatchId}`,
error,
)
throw error
})
const response = await firebaseAdmin.messaging().sendEach(fcmMessages)
this.logger.debug(
`SMS Job ${job.id} completed, success: ${response.successCount}, failures: ${response.failureCount}`,
)
// Update device SMS count
await this.deviceModel
.findByIdAndUpdate(deviceId, {
$inc: { sentSMSCount: response.successCount },
})
.exec()
// Update batch status
const smsBatch = await this.smsBatchModel.findByIdAndUpdate(
smsBatchId,
{
$inc: {
successCount: response.successCount,
failureCount: response.failureCount,
},
},
{ returnDocument: 'after' },
)
if (smsBatch.successCount === smsBatch.recipientCount) {
await this.smsBatchModel.findByIdAndUpdate(smsBatchId, {
$set: { status: 'completed' },
})
}
return response
} catch (error) {
this.logger.error(`Failed to process SMS job ${job.id}`, error)
const smsBatch = await this.smsBatchModel.findByIdAndUpdate(
smsBatchId,
{
$inc: {
failureCount: fcmMessages.length,
},
},
{ returnDocument: 'after' },
)
const newStatus =
smsBatch.failureCount === smsBatch.recipientCount
? 'failed'
: 'partial_success'
await this.smsBatchModel.findByIdAndUpdate(smsBatchId, {
$set: { status: newStatus },
})
throw error
}
}
}

66
api/src/gateway/queue/sms-queue.service.ts

@ -0,0 +1,66 @@
import { Injectable, Logger } from '@nestjs/common'
import { InjectQueue } from '@nestjs/bull'
import { Queue } from 'bull'
import { ConfigService } from '@nestjs/config'
import { Message } from 'firebase-admin/messaging'
@Injectable()
export class SmsQueueService {
private readonly logger = new Logger(SmsQueueService.name)
private readonly useSmsQueue: boolean
private readonly maxSmsBatchSize: number
constructor(
@InjectQueue('sms') private readonly smsQueue: Queue,
private readonly configService: ConfigService,
) {
this.useSmsQueue = this.configService.get<boolean>('USE_SMS_QUEUE', false)
this.maxSmsBatchSize = this.configService.get<number>(
'MAX_SMS_BATCH_SIZE',
5,
)
}
/**
* Check if queue is enabled based on environment variable
*/
isQueueEnabled(): boolean {
return this.useSmsQueue
}
async addSendSmsJob(
deviceId: string,
fcmMessages: Message[],
smsBatchId: string,
) {
this.logger.debug(`Adding send-sms job for batch ${smsBatchId}`)
// Split messages into batches of max smsBatchSize messages
const batches = []
for (let i = 0; i < fcmMessages.length; i += this.maxSmsBatchSize) {
batches.push(fcmMessages.slice(i, i + this.maxSmsBatchSize))
}
for (const batch of batches) {
await this.smsQueue.add(
'send-sms',
{
deviceId,
fcmMessages: batch,
smsBatchId,
},
{
priority: 1, // TODO: Make this dynamic based on users subscription plan
attempts: 1,
delay: 1000, // 1 second
backoff: {
type: 'exponential',
delay: 5000, // 5 seconds
},
removeOnComplete: false,
removeOnFail: false,
},
)
}
}
}

15
api/src/gateway/schemas/sms-batch.schema.ts

@ -26,6 +26,21 @@ export class SMSBatch {
@Prop({ type: String })
recipientPreview: string
@Prop({ type: Number, default: 0 })
successCount: number
@Prop({ type: Number, default: 0 })
failureCount: number
@Prop({ type: String, default: 'pending', enum: ['pending', 'processing', 'completed', 'partial_success', 'failed'] })
status: string
@Prop({ type: String })
error: string
@Prop({ type: Date })
completedAt: Date
// misc metadata for debugging
@Prop({ type: Object })
metadata: Record<string, any>

7
api/src/gateway/schemas/sms.schema.ts

@ -53,8 +53,11 @@ export class SMS {
// @Prop({ type: String })
// failureReason: string
// @Prop({ type: String })
// status: string
@Prop({ type: String, default: 'pending', enum: ['pending', 'sent', 'delivered', 'failed'] })
status: string
@Prop({ type: String })
error: string
// misc metadata for debugging
@Prop({ type: Object })

Loading…
Cancel
Save