10 changed files with 495 additions and 0 deletions
-
1api/package.json
-
35api/pnpm-lock.yaml
-
4api/src/app.module.ts
-
41api/src/webhook/schemas/webhook-notification.schema.ts
-
43api/src/webhook/schemas/webhook-subscription.schema.ts
-
3api/src/webhook/webhook-event.enum.ts
-
68api/src/webhook/webhook.controller.ts
-
14api/src/webhook/webhook.dto.ts
-
35api/src/webhook/webhook.module.ts
-
251api/src/webhook/webhook.service.ts
@ -0,0 +1,41 @@ |
|||
import { Prop, Schema, SchemaFactory } from '@nestjs/mongoose' |
|||
import { Document, Types } from 'mongoose' |
|||
import { WebhookSubscription } from './webhook-subscription.schema' |
|||
import { SMS } from 'src/gateway/schemas/sms.schema' |
|||
|
|||
export type WebhookNotificationDocument = WebhookNotification & Document |
|||
|
|||
@Schema({ timestamps: true }) |
|||
export class WebhookNotification { |
|||
_id?: Types.ObjectId |
|||
|
|||
@Prop({ type: Types.ObjectId, ref: WebhookSubscription.name, required: true }) |
|||
webhookSubscription: WebhookSubscription |
|||
|
|||
@Prop({ type: String, required: true }) |
|||
event: string |
|||
|
|||
@Prop({ type: Object, required: true }) |
|||
payload: object |
|||
|
|||
@Prop({ type: Types.ObjectId, ref: SMS.name }) |
|||
sms: SMS |
|||
|
|||
@Prop({ type: String }) |
|||
deliveredAt: Date |
|||
|
|||
@Prop({ type: Date }) |
|||
lastDeliveryAttemptAt: Date |
|||
|
|||
@Prop({ type: Date }) |
|||
nextDeliveryAttemptAt: Date |
|||
|
|||
@Prop({ type: Number, default: 0 }) |
|||
deliveryAttemptCount: number |
|||
|
|||
@Prop({ type: Date }) |
|||
deliveryAttemptAbortedAt: Date |
|||
} |
|||
|
|||
export const WebhookNotificationSchema = |
|||
SchemaFactory.createForClass(WebhookNotification) |
|||
@ -0,0 +1,43 @@ |
|||
import { Prop, Schema, SchemaFactory } from '@nestjs/mongoose' |
|||
import { Document, Types } from 'mongoose' |
|||
import { User } from 'src/users/schemas/user.schema' |
|||
import { WebhookEvent } from '../webhook-event.enum' |
|||
|
|||
export type WebhookSubscriptionDocument = WebhookSubscription & Document |
|||
|
|||
@Schema({ timestamps: true }) |
|||
export class WebhookSubscription { |
|||
_id?: Types.ObjectId |
|||
|
|||
@Prop({ type: Types.ObjectId, ref: User.name }) |
|||
user: User |
|||
|
|||
@Prop({ type: Boolean, default: true }) |
|||
isActive: boolean |
|||
|
|||
@Prop({ type: [String], default: [WebhookEvent.MESSAGE_RECEIVED] }) |
|||
events: string[] |
|||
|
|||
@Prop({ type: String, required: true }) |
|||
deliveryUrl: string |
|||
|
|||
@Prop({ type: String, required: true }) |
|||
signingSecret: string |
|||
|
|||
@Prop({ type: Number, default: 0 }) |
|||
successfulDeliveryCount: number |
|||
|
|||
@Prop({ type: Number, default: 0 }) |
|||
deliveryAttemptCount: number |
|||
|
|||
@Prop({ type: Date }) |
|||
lastDeliveryAttemptAt: Date |
|||
|
|||
@Prop({ type: Date }) |
|||
lastDeliverySuccessAt: Date |
|||
} |
|||
|
|||
export const WebhookSubscriptionSchema = |
|||
SchemaFactory.createForClass(WebhookSubscription) |
|||
|
|||
WebhookSubscriptionSchema.index({ user: 1, events: 1 }, { unique: true }) |
|||
@ -0,0 +1,3 @@ |
|||
export enum WebhookEvent { |
|||
MESSAGE_RECEIVED = 'MESSAGE_RECEIVED', |
|||
} |
|||
@ -0,0 +1,68 @@ |
|||
import { |
|||
Body, |
|||
Request, |
|||
Param, |
|||
Post, |
|||
Patch, |
|||
Controller, |
|||
Get, |
|||
UseGuards, |
|||
} from '@nestjs/common' |
|||
import { WebhookService } from './webhook.service' |
|||
import { ApiBearerAuth, ApiTags } from '@nestjs/swagger' |
|||
import { CreateWebhookDto, UpdateWebhookDto } from './webhook.dto' |
|||
import { AuthGuard } from 'src/auth/guards/auth.guard' |
|||
|
|||
@ApiTags('webhooks') |
|||
@ApiBearerAuth() |
|||
@Controller('webhooks') |
|||
export class WebhookController { |
|||
constructor(private readonly webhookService: WebhookService) {} |
|||
|
|||
@Get() |
|||
@UseGuards(AuthGuard) |
|||
async getWebhooks(@Request() req) { |
|||
const data = await this.webhookService.findWebhooksForUser({ |
|||
user: req.user, |
|||
}) |
|||
return { data } |
|||
} |
|||
|
|||
@Get(':webhookId') |
|||
@UseGuards(AuthGuard) |
|||
async getWebhook(@Request() req, @Param('webhookId') webhookId: string) { |
|||
const data = await this.webhookService.findOne({ |
|||
user: req.user, |
|||
webhookId, |
|||
}) |
|||
return { data } |
|||
} |
|||
|
|||
@Post() |
|||
@UseGuards(AuthGuard) |
|||
async createWebhook( |
|||
@Request() req, |
|||
@Body() createWebhookDto: CreateWebhookDto, |
|||
) { |
|||
const data = await this.webhookService.create({ |
|||
user: req.user, |
|||
createWebhookDto, |
|||
}) |
|||
return { data } |
|||
} |
|||
|
|||
@Patch(':webhookId') |
|||
@UseGuards(AuthGuard) |
|||
async updateWebhook( |
|||
@Request() req, |
|||
@Param('webhookId') webhookId: string, |
|||
@Body() updateWebhookDto: UpdateWebhookDto, |
|||
) { |
|||
const data = await this.webhookService.update({ |
|||
user: req.user, |
|||
webhookId, |
|||
updateWebhookDto, |
|||
}) |
|||
return { data } |
|||
} |
|||
} |
|||
@ -0,0 +1,14 @@ |
|||
import { WebhookEvent } from './webhook-event.enum' |
|||
|
|||
export class CreateWebhookDto { |
|||
deliveryUrl: string |
|||
signingSecret?: string |
|||
events: WebhookEvent[] |
|||
} |
|||
|
|||
export class UpdateWebhookDto { |
|||
isActive: boolean |
|||
deliveryUrl: string |
|||
signingSecret: string |
|||
events: WebhookEvent[] |
|||
} |
|||
@ -0,0 +1,35 @@ |
|||
import { Module } from '@nestjs/common' |
|||
import { MongooseModule } from '@nestjs/mongoose' |
|||
import { WebhookController } from './webhook.controller' |
|||
import { WebhookService } from './webhook.service' |
|||
import { |
|||
WebhookSubscription, |
|||
WebhookSubscriptionSchema, |
|||
} from './schemas/webhook-subscription.schema' |
|||
import { |
|||
WebhookNotification, |
|||
WebhookNotificationSchema, |
|||
} from './schemas/webhook-notification.schema' |
|||
import { AuthModule } from 'src/auth/auth.module' |
|||
import { UsersModule } from 'src/users/users.module' |
|||
|
|||
@Module({ |
|||
imports: [ |
|||
MongooseModule.forFeature([ |
|||
{ |
|||
name: WebhookSubscription.name, |
|||
schema: WebhookSubscriptionSchema, |
|||
}, |
|||
{ |
|||
name: WebhookNotification.name, |
|||
schema: WebhookNotificationSchema, |
|||
}, |
|||
]), |
|||
AuthModule, |
|||
UsersModule, |
|||
], |
|||
controllers: [WebhookController], |
|||
providers: [WebhookService], |
|||
exports: [MongooseModule, WebhookService], |
|||
}) |
|||
export class WebhookModule {} |
|||
@ -0,0 +1,251 @@ |
|||
import { HttpException, HttpStatus, Injectable } from '@nestjs/common' |
|||
import { Model } from 'mongoose' |
|||
import { |
|||
WebhookSubscription, |
|||
WebhookSubscriptionDocument, |
|||
} from './schemas/webhook-subscription.schema' |
|||
import { InjectModel } from '@nestjs/mongoose' |
|||
import { WebhookEvent } from './webhook-event.enum' |
|||
import { |
|||
WebhookNotification, |
|||
WebhookNotificationDocument, |
|||
} from './schemas/webhook-notification.schema' |
|||
import axios from 'axios' |
|||
import { v4 as uuidv4 } from 'uuid' |
|||
import { Cron } from '@nestjs/schedule' |
|||
import { CronExpression } from '@nestjs/schedule' |
|||
import * as crypto from 'crypto' |
|||
|
|||
@Injectable() |
|||
export class WebhookService { |
|||
constructor( |
|||
@InjectModel(WebhookSubscription.name) |
|||
private webhookSubscriptionModel: Model<WebhookSubscriptionDocument>, |
|||
@InjectModel(WebhookNotification.name) |
|||
private webhookNotificationModel: Model<WebhookNotificationDocument>, |
|||
) {} |
|||
|
|||
async findOne({ user, webhookId }) { |
|||
const webhook = await this.webhookSubscriptionModel.findOne({ |
|||
_id: webhookId, |
|||
user: user._id, |
|||
}) |
|||
|
|||
if (!webhook) { |
|||
throw new HttpException('Subscription not found', HttpStatus.NOT_FOUND) |
|||
} |
|||
return webhook |
|||
} |
|||
|
|||
async findWebhooksForUser({ user }) { |
|||
return await this.webhookSubscriptionModel.find({ user: user._id }) |
|||
} |
|||
|
|||
async create({ user, createWebhookDto }) { |
|||
const { events, deliveryUrl } = createWebhookDto |
|||
|
|||
// Add URL validation
|
|||
try { |
|||
new URL(deliveryUrl) |
|||
} catch (e) { |
|||
throw new HttpException('Invalid delivery URL', HttpStatus.BAD_REQUEST) |
|||
} |
|||
|
|||
const existingSubscription = await this.webhookSubscriptionModel.findOne({ |
|||
user: user._id, |
|||
events, |
|||
}) |
|||
|
|||
if (existingSubscription) { |
|||
throw new HttpException( |
|||
'You have already subscribed to this event', |
|||
HttpStatus.BAD_REQUEST, |
|||
) |
|||
} |
|||
|
|||
if (!events.every((event) => Object.values(WebhookEvent).includes(event))) { |
|||
throw new HttpException('Invalid event type', HttpStatus.BAD_REQUEST) |
|||
} |
|||
|
|||
const signingSecret = uuidv4() |
|||
|
|||
// TODO: Encrypt signing secret
|
|||
// const webhookSignatureKey = process.env.WEBHOOK_SIGNATURE_KEY
|
|||
// const encryptedSigningSecret = encrypt(signingSecret, webhookSignatureKey)
|
|||
|
|||
const webhookSubscription = await this.webhookSubscriptionModel.create({ |
|||
user: user._id, |
|||
events, |
|||
deliveryUrl, |
|||
signingSecret, |
|||
}) |
|||
|
|||
return webhookSubscription |
|||
} |
|||
|
|||
async update({ user, webhookId, updateWebhookDto }) { |
|||
const webhookSubscription = await this.webhookSubscriptionModel.findOne({ |
|||
_id: webhookId, |
|||
user: user._id, |
|||
}) |
|||
|
|||
if (!webhookSubscription) { |
|||
throw new HttpException('Subscription not found', HttpStatus.NOT_FOUND) |
|||
} |
|||
|
|||
if (updateWebhookDto.hasOwnProperty('isActive')) { |
|||
webhookSubscription.isActive = updateWebhookDto.isActive |
|||
} |
|||
|
|||
if (updateWebhookDto.hasOwnProperty('deliveryUrl')) { |
|||
webhookSubscription.deliveryUrl = updateWebhookDto.deliveryUrl |
|||
} |
|||
|
|||
// if there is a valid uuid signing secret, update it
|
|||
if ( |
|||
updateWebhookDto.hasOwnProperty('signingSecret') && |
|||
updateWebhookDto.signingSecret.length < 20 |
|||
) { |
|||
throw new HttpException('Invalid signing secret', HttpStatus.BAD_REQUEST) |
|||
} else if (updateWebhookDto.hasOwnProperty('signingSecret')) { |
|||
webhookSubscription.signingSecret = updateWebhookDto.signingSecret |
|||
} |
|||
|
|||
await webhookSubscription.save() |
|||
|
|||
return webhookSubscription |
|||
} |
|||
|
|||
async deliverNotification({ sms, user, event }) { |
|||
console.log('deliverNotification') |
|||
console.log(sms) |
|||
console.log(user) |
|||
console.log(event) |
|||
const webhookSubscription = await this.webhookSubscriptionModel.findOne({ |
|||
user: user._id, |
|||
events: { $in: [event] }, |
|||
}) |
|||
|
|||
if (!webhookSubscription || !webhookSubscription.isActive) { |
|||
return |
|||
} |
|||
|
|||
if (event === WebhookEvent.MESSAGE_RECEIVED) { |
|||
const payload = { |
|||
smsId: sms._id, |
|||
sender: sms.sender, |
|||
message: sms.message, |
|||
receivedAt: sms.receivedAt, |
|||
deviceId: sms.device, |
|||
webhookSubscriptionId: webhookSubscription._id, |
|||
webhookEvent: event, |
|||
} |
|||
const webhookNotification = await this.webhookNotificationModel.create({ |
|||
webhookSubscription: webhookSubscription._id, |
|||
event, |
|||
payload, |
|||
sms, |
|||
}) |
|||
|
|||
await this.attemptWebhookDelivery(webhookNotification) |
|||
} else { |
|||
throw new HttpException('Invalid event type', HttpStatus.BAD_REQUEST) |
|||
} |
|||
} |
|||
|
|||
private async attemptWebhookDelivery( |
|||
webhookNotification: WebhookNotificationDocument, |
|||
) { |
|||
const now = new Date() |
|||
|
|||
const webhookSubscription = await this.webhookSubscriptionModel.findById( |
|||
webhookNotification.webhookSubscription, |
|||
) |
|||
const deliveryUrl = webhookSubscription?.deliveryUrl |
|||
const signingSecret = webhookSubscription?.signingSecret |
|||
|
|||
const signature = crypto |
|||
.createHmac('sha256', signingSecret) |
|||
.update(JSON.stringify(webhookNotification.payload)) |
|||
.digest('hex') |
|||
|
|||
try { |
|||
await axios.post(deliveryUrl, webhookNotification.payload, { |
|||
headers: { |
|||
'X-Signature': signature, |
|||
}, |
|||
timeout: 10000, |
|||
}) |
|||
webhookNotification.deliveryAttemptCount += 1 |
|||
webhookNotification.lastDeliveryAttemptAt = now |
|||
webhookNotification.nextDeliveryAttemptAt = this.getNextDeliveryAttemptAt( |
|||
webhookNotification.deliveryAttemptCount, |
|||
) |
|||
webhookNotification.deliveredAt = now |
|||
await webhookNotification.save() |
|||
|
|||
|
|||
webhookSubscription.successfulDeliveryCount += 1 |
|||
webhookSubscription.lastDeliverySuccessAt = now |
|||
} catch (e) { |
|||
console.error( |
|||
`Failed to deliver webhook notification ${webhookNotification._id}: received response status code ${e.response.status}`, |
|||
) |
|||
webhookNotification.deliveryAttemptCount += 1 |
|||
webhookNotification.lastDeliveryAttemptAt = now |
|||
webhookNotification.nextDeliveryAttemptAt = this.getNextDeliveryAttemptAt( |
|||
webhookNotification.deliveryAttemptCount, |
|||
) |
|||
await webhookNotification.save() |
|||
|
|||
} finally { |
|||
webhookSubscription.deliveryAttemptCount += 1 |
|||
await webhookSubscription.save() |
|||
} |
|||
} |
|||
|
|||
private getNextDeliveryAttemptAt(deliveryAttemptCount: number): Date { |
|||
// Delays in minutes
|
|||
const delaySequence = [ |
|||
1, // 1 minute
|
|||
5, // 5 minutes
|
|||
30, // 30 minutes
|
|||
60, // 1 hour
|
|||
360, // 6 hours
|
|||
1440, // 1 day
|
|||
4320, // 3 days
|
|||
10080, // 7 days
|
|||
43200, // 30 days
|
|||
] |
|||
|
|||
// Get the delay in minutes (use last value if attempt count exceeds sequence length)
|
|||
const delayInMinutes = |
|||
delaySequence[ |
|||
Math.min(deliveryAttemptCount - 1, delaySequence.length - 1) |
|||
] || delaySequence[delaySequence.length - 1] |
|||
|
|||
// Convert minutes to milliseconds and add to current time
|
|||
return new Date(Date.now() + delayInMinutes * 60 * 1000) |
|||
} |
|||
|
|||
// Check for notifications that need to be delivered every minute
|
|||
@Cron(CronExpression.EVERY_MINUTE) |
|||
async checkForNotificationsToDeliver() { |
|||
const now = new Date() |
|||
const notifications = await this.webhookNotificationModel |
|||
.find({ |
|||
nextDeliveryAttemptAt: { $lte: now }, |
|||
deliveredAt: null, |
|||
deliveryAttemptCount: { $lt: 10 }, |
|||
deliveryAttemptAbortedAt: null, |
|||
}) |
|||
.sort({ nextDeliveryAttemptAt: 1 }) |
|||
.limit(50) |
|||
|
|||
console.log(`delivering ${notifications.length} webhook notifications`) |
|||
|
|||
for (const notification of notifications) { |
|||
await this.attemptWebhookDelivery(notification) |
|||
} |
|||
} |
|||
} |
|||
Write
Preview
Loading…
Cancel
Save
Reference in new issue