This repository is a fork of p-fedyukovich/nestjs-google-pubsub-microservice
Pub/Sub is an asynchronous messaging service that decouples services that produce events from services that process events.
You can use Pub/Sub as messaging-oriented middleware or event ingestion and delivery for streaming analytics pipelines.
Pub/Sub offers durable message storage and real-time message delivery with high availability and consistent performance at scale
To start building Pub/Sub-based microservices, first install the required packages:
$ npm i --save @google-cloud/pubsub nestjs-google-pubsub-microservice
To use the Pub/Sub transporter, pass the following options object to the createMicroservice()
method:
const app = await NestFactory.createMicroservice<MicroserviceOptions>(
ApplicationModule,
{
strategy: new GCPubSubServer({
topic: 'cats_topic',
subscription: 'cats_subscription',
client: {
projectId: 'microservice',
},
}),
},
);
The options
property is specific to the chosen transporter. The GCloud Pub/Sub transporter exposes the properties described below.
topic |
Topic name which your server subscription will belong to |
subscription |
Subscription name which your server will listen to |
replyTopic |
Topic name which your client subscription will belong to |
replySubscription |
Subscription name which your client will listen to |
noAck |
If false , manual acknowledgment mode enabled |
init |
If false , topics and subscriptions will not be created, only validated |
checkExistence |
If false , topics and subscriptions will not be checked, only used. This only applies when init is false |
client |
Additional client options (read more here) |
publisher |
Additional topic publisher options (read more here) |
subscriber |
Additional subscriber options (read more here) |
createSubscriptionOptions |
Options to create subscription if init is set to true and a subscription is needed to be created (read more here) |
autoResume |
Automatically resume publishing a message with ordering key if it fails (read more here) |
autoDeleteSubscriptionOnShutdown |
Automatically delete the subscription that is connected by the client on shutdown. If the deletion fails, it will close the subscription |
clientIdFilter |
Allows a client to only receive the response from its own request |
appendClientIdtoSubscription |
Append client id to the name of the subscription on init |
Client can be instantiated by importing GCPubSubClientModule
to the root module. The clients can be registered with both the register
method or the registerAsync
method via useFactory
.
@Module({
imports: [
GCPubSubClientModule.register([{
name: 'client-1'.
config: options
}]),
GCPubSubClientModule.registerAsync([{
name: 'client-2',
imports: [ConfigModule],
inject: [ConfigService],
useFactory: (configService: ConfigService) => {
return this.configService.get('GCPubSubClientOptions')
}
}])
]
})
export class AppModule {}
The client can then be injected with @InjectGCPubSubClient
decorator
@Injectable()
export class AppService {
constructor(
@InjectGCPubSubClient('client-1') private readonly client: GCPubSUbCLient,
) {}
}
If the token for the client is needed for tests, the package provides a utility function getGCPubSubClientToken
to retrive the provider token of the client.
const token = getGCPubSubClientToken('client-1');
To fully utilize the features provided by Google PubSub, the message needs to be serialized and deserialized in a certain way to ensure the integrity of the data. Therefore, a helper class, GCPubSubMessageBuilder
is available to build messages with features such as attributes, ordering key and timeouts.
Usage
this.client.send(
'pattern',
new GCPubSubMessageBuilder(data)
.setAttributes(attrs)
.setOrderingKey('orderingKey')
.setTimeout(12000)
.build(),
);
data? | TData | Data of the message payload |
attributes? | TAttrs | Attributes of the payload |
orderingKey? | string | Ordering key of the message |
timeout? | number | Timeout of the message, the request will not be processed if the request exceeds the timeout when it reaches the server |
setData | (data: TData) => this | Setting the data of the message |
setAttributes | (attributes: TAttrs) => this | Setting the attributes of the payload |
setOrderingKey | (orderingKey: string) => this | Setting the ordering key of the message |
setTimeout | (timeout: number) => this | Setting the timeout value of the payload |
build | () => GCPubSubMessage | Build the message, throws error if data is empty |
In more sophisticated scenarios, you may want to access more information about the incoming request. When using the Pub/Sub transporter, you can access the GCPubSubContext
object.
@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: GCPubSubContext) {
console.log(`Pattern: ${context.getPattern()}`);
}
To access the original Pub/Sub message (with the attributes
, data
, ack
and nack
), use the getMessage()
method of the GCPubSubContext
object, as follows:
@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: GCPubSubContext) {
console.log(context.getMessage());
}
To make sure a message is never lost, Pub/Sub supports message acknowledgements. An acknowledgement is sent back by the consumer to tell Pub/Sub that a particular message has been received, processed and that Pub/Sub is free to delete it. If a consumer dies (its subscription is closed, connection is closed, or TCP connection is lost) without sending an ack, Pub/Sub will understand that a message wasn't processed fully and will re-deliver it.
To enable manual acknowledgment mode, set the noAck
property to false
:
{
replyTopic: 'cats_topic_reply',
replySubscription: 'cats_subscription_reply',
noAck: false,
client: {
projectId: 'microservice',
},
},
When manual consumer acknowledgements are turned on, we must send a proper acknowledgement from the worker to signal that we are done with a task.
@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: GCPubSubContext) {
const originalMsg = context.getMessage();
originalMsg.ack();
}