Skip to content

Commit

Permalink
deliver subject check, pull sub fix, use IsNullOrWhiteSpace, more tes…
Browse files Browse the repository at this point in the history
…ting (#491)
  • Loading branch information
scottf authored Sep 1, 2021
1 parent c9ca356 commit 70acbf5
Show file tree
Hide file tree
Showing 11 changed files with 513 additions and 232 deletions.
9 changes: 0 additions & 9 deletions src/NATS.Client/JetStream/IJetStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -143,15 +143,6 @@ public interface IJetStream
/// <returns>A publish acknowedgement.</returns>
Task<PublishAck> PublishAsync(Msg message, PublishOptions publishOptions);

/// <summary>
/// Creates a JetStream pull subscription. Pull subscriptions fetch messages
/// from the server in batches.
/// </summary>
/// <param name="subject">The subject on which to listen for messages.
/// The subject can have wildcards (partial: <c>*</c>, full: <c>&gt;</c>).</param>
/// <returns>a JetStreamPullSubscription</returns>
IJetStreamPullSubscription PullSubscribe(string subject);

/// <summary>
/// Creates a JetStream pull subscription. Pull subscriptions fetch messages
/// from the server in batches.
Expand Down
81 changes: 44 additions & 37 deletions src/NATS.Client/JetStream/JetStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
using System.Threading.Tasks;
using NATS.Client.Internals;
using static NATS.Client.Connection;
using static NATS.Client.JetStream.ConsumerConfiguration;

namespace NATS.Client.JetStream
{
Expand Down Expand Up @@ -68,7 +67,7 @@ private PublishAck ProcessPublishResponse(Msg resp, PublishOptions options) {
String ackStream = ack.Stream;
String pubStream = options?.Stream;
// stream specified in options but different than ack should not happen but...
if (pubStream != null && !pubStream.Equals(ackStream)) {
if (!string.IsNullOrWhiteSpace(pubStream) && pubStream != ackStream) {
throw new NATSJetStreamException("Expected ack from stream " + pubStream + ", received from: " + ackStream);
}
return ack;
Expand Down Expand Up @@ -142,103 +141,116 @@ Subscription CreateSubscription(string subject, string queueName,

// setup the configuration, use a default.
string stream;
ConsumerConfigurationBuilder ccBuilder;
ConsumerConfiguration.ConsumerConfigurationBuilder ccBuilder;
SubscribeOptions so;

if (isPullMode) {
so = pullOpts; // options must have already been checked to be non null
stream = pullOpts.Stream;
ccBuilder = Builder(pullOpts.ConsumerConfiguration);
ccBuilder = ConsumerConfiguration.Builder(pullOpts.ConsumerConfiguration);
ccBuilder.WithDeliverSubject(null); // pull mode can't have a deliver subject
// queueName is already null
ccBuilder.WithDeliverGroup(null); // pull mode can't have a deliver group
}
else {
so = pushOpts ?? PushSubscribeOptions.Builder().Build();
stream = so.Stream; // might be null, that's ok (see direct)
ccBuilder = Builder(so.ConsumerConfiguration);
ccBuilder = ConsumerConfiguration.Builder(so.ConsumerConfiguration);
ccBuilder.WithMaxPullWaiting(0); // this does not apply to push, in fact will error b/c deliver subject will be set
// deliver subject does not have to be cleared
// figure out the queue name
queueName = Validator.ValidateMustMatchIfBothSupplied(ccBuilder.DeliverGroup, queueName,
"Consumer Configuration DeliverGroup", "Queue Name");
"[SUB-Q01] Consumer Configuration DeliverGroup", "Queue Name");
ccBuilder.WithDeliverGroup(queueName); // and set it in case the deliver group was null
}

//
bool bindMode = so.Bind;

string durable = ccBuilder.Durable;
string inbox = ccBuilder.DeliverSubject;
string inboxDeliver = ccBuilder.DeliverSubject;
string filterSubject = ccBuilder.FilterSubject;

bool createConsumer = true;

// 1. Did they tell me what stream? No? look it up.
// subscribe options will have already validated that stream is present for direct mode
if (stream == null) {
if (string.IsNullOrWhiteSpace(stream)) {
stream = LookupStreamBySubject(subject);
}

// 2. Is this a durable or ephemeral
if (durable != null) {
if (!string.IsNullOrWhiteSpace(durable)) {
ConsumerInfo lookedUpInfo =
LookupConsumerInfo(stream, durable);

if (lookedUpInfo != null) { // the consumer for that durable already exists
createConsumer = false;
ConsumerConfiguration lookedUpConfig = lookedUpInfo.Configuration;

string lookedUp = lookedUpConfig.DeliverSubject;
if (isPullMode) {
if (!string.IsNullOrWhiteSpace(lookedUp)) {
throw new ArgumentException($"[SUB-DS01] Consumer is already configured as a push consumer with deliver subject '{lookedUp}'.");
}
}
else if (string.IsNullOrWhiteSpace(lookedUp)) {
throw new ArgumentException("[SUB-DS02] Consumer is already configured as a pull consumer with no deliver subject.");
}
else if (!string.IsNullOrWhiteSpace(inboxDeliver) && inboxDeliver != lookedUp) {
throw new ArgumentException($"[SUB-DS03] Existing consumer deliver subject '{lookedUp}' does not match requested deliver subject '{inboxDeliver}'.");
}

// durable already exists, make sure the filter subject matches
string lookedUp = Validator.EmptyAsNull(lookedUpConfig.FilterSubject);
if (filterSubject != null && !filterSubject.Equals(lookedUp)) {
lookedUp = Validator.EmptyAsNull(lookedUpConfig.FilterSubject);
if (!string.IsNullOrWhiteSpace(filterSubject) && !filterSubject.Equals(lookedUp)) {
throw new ArgumentException(
$"Subject {subject} mismatches consumer configuration {filterSubject}.");
$"[SUB-FS01] Subject {subject} mismatches consumer configuration {filterSubject}.");
}
filterSubject = lookedUp;

lookedUp = Validator.EmptyAsNull(lookedUpConfig.DeliverGroup);
if (lookedUp == null) {
if (string.IsNullOrWhiteSpace(lookedUp)) {
// lookedUp was null, means existing consumer is not a queue consumer
if (queueName == null) {
if (string.IsNullOrWhiteSpace(queueName)) {
// ok fine, no queue requested and the existing consumer is also not a queue consumer
// we must check if the consumer is in use though
if (lookedUpInfo.PushBound) {
throw new ArgumentException($"Consumer [{durable}] is already bound to a subscription.");
throw new ArgumentException($"[SUB-Q02] Consumer [{durable}] is already bound to a subscription.");
}
}
else { // else they requested a queue but this durable was not configured as queue
throw new ArgumentException($"Existing consumer [{durable}] is not configured as a queue / deliver group.");
throw new ArgumentException($"[SUB-Q03] Existing consumer [{durable}] is not configured as a queue / deliver group.");
}
}
else if (queueName == null) {
throw new ArgumentException($"Existing consumer [{durable}] is configured as a queue / deliver group.");
else if (string.IsNullOrWhiteSpace(queueName)) {
throw new ArgumentException($"[SUB-Q04] Existing consumer [{durable}] is configured as a queue / deliver group.");
}
else if (lookedUp != queueName) {
throw new ArgumentException(
$"Existing consumer deliver group {lookedUp} does not match requested queue / deliver group {queueName}.");
$"[SUB-Q05] Existing consumer deliver group {lookedUp} does not match requested queue / deliver group {queueName}.");
}

inbox = lookedUpConfig.DeliverSubject; // use the deliver subject as the inbox. It may be null, that's ok
inboxDeliver = lookedUpConfig.DeliverSubject; // use the deliver subject as the inbox. It may be null, that's ok
}
else if (bindMode) {
throw new ArgumentException("Consumer not found for durable. Required in bind mode.");
throw new ArgumentException("[SUB-B01] Consumer not found for durable. Required in bind mode.");
}
}

// 3. If no deliver subject (inbox) provided or found, make an inbox.
if (inbox == null) {
inbox = Conn.NewInbox();
if (string.IsNullOrWhiteSpace(inboxDeliver)) {
inboxDeliver = Conn.NewInbox();
}

// 4. create the subscription
Subscription sub;
if (isPullMode)
{
sub = ((Connection) Conn).subscribeSync(inbox, queueName, PullSubDelegate);
sub = ((Connection) Conn).subscribeSync(inboxDeliver, queueName, PullSubDelegate);
}
else if (handler == null) {
sub = ((Connection) Conn).subscribeSync(inbox, queueName, PushSyncSubDelegate);
sub = ((Connection) Conn).subscribeSync(inboxDeliver, queueName, PushSyncSubDelegate);
}
else if (autoAck)
{
Expand All @@ -258,21 +270,21 @@ void AutoAckHandler(object sender, MsgHandlerEventArgs args)
}
}

sub = ((Connection) Conn).subscribeAsync(inbox, queueName, AutoAckHandler, PushAsyncSubDelegate);
sub = ((Connection) Conn).subscribeAsync(inboxDeliver, queueName, AutoAckHandler, PushAsyncSubDelegate);
}
else {
sub = ((Connection) Conn).subscribeAsync(inbox, queueName, handler, PushAsyncSubDelegate);
sub = ((Connection) Conn).subscribeAsync(inboxDeliver, queueName, handler, PushAsyncSubDelegate);
}

// 5-Consumer didn't exist. It's either ephemeral or a durable that didn't already exist.
if (createConsumer) {
// Pull mode doesn't maintain a deliver subject. It's actually an error if we send it.
if (!isPullMode) {
ccBuilder.WithDeliverSubject(inbox);
ccBuilder.WithDeliverSubject(inboxDeliver);
}

// being discussed if this is correct, but leave it for now.
ccBuilder.WithFilterSubject(filterSubject == null ? subject : filterSubject);
ccBuilder.WithFilterSubject(string.IsNullOrWhiteSpace(filterSubject) ? subject : filterSubject);

// createOrUpdateConsumer can fail for security reasons, maybe other reasons?
ConsumerInfo ci;
Expand All @@ -284,11 +296,11 @@ void AutoAckHandler(object sender, MsgHandlerEventArgs args)
sub.Unsubscribe();
throw;
}
((IJetStreamSubscriptionInternal)sub).SetupJetStream(this, ci.Name, ci.Stream, inbox);
((IJetStreamSubscriptionInternal)sub).SetupJetStream(this, ci.Name, ci.Stream, inboxDeliver);
}
// 5-Consumer did exist.
else {
((IJetStreamSubscriptionInternal)sub).SetupJetStream(this, durable, stream, inbox);
((IJetStreamSubscriptionInternal)sub).SetupJetStream(this, durable, stream, inboxDeliver);
}

return sub;
Expand Down Expand Up @@ -328,15 +340,10 @@ internal string LookupStreamBySubject(string subject)
return snr.Strings[0];
}

public IJetStreamPullSubscription PullSubscribe(string subject)
{
Validator.ValidateSubject(subject, true);
return (IJetStreamPullSubscription) CreateSubscription(subject, null, null, false, null, null);
}

public IJetStreamPullSubscription PullSubscribe(string subject, PullSubscribeOptions options)
{
Validator.ValidateSubject(subject, true);
Validator.ValidateNotNull(options, "PullSubscribeOptions");
return (IJetStreamPullSubscription) CreateSubscription(subject, null, null, false, null, options);
}

Expand Down
2 changes: 1 addition & 1 deletion src/NATS.Client/JetStream/JetStreamBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ internal ConsumerInfo GetConsumerInfoInternal(string streamName, string consumer

internal ConsumerInfo AddOrUpdateConsumerInternal(string streamName, ConsumerConfiguration config)
{
string subj = config.Durable == null
string subj = string.IsNullOrWhiteSpace(config.Durable)
? string.Format(JetStreamConstants.JsapiConsumerCreate, streamName)
: string.Format(JetStreamConstants.JsapiDurableCreate, streamName, config.Durable);

Expand Down
8 changes: 4 additions & 4 deletions src/NATS.Client/JetStream/JsPrefixManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@

namespace NATS.Client.JetStream
{
internal static class JsPrefixManager
public static class JsPrefixManager
{
private static ConcurrentDictionary<string, string> JsPrefixes = new ConcurrentDictionary<string, string>();
public static ConcurrentDictionary<string, string> JsPrefixes = new ConcurrentDictionary<string, string>();

internal static string AddPrefix(string prefix) {
public static string AddPrefix(string prefix) {
if (string.IsNullOrWhiteSpace(prefix) || prefix.Equals(JetStreamConstants.JsapiPrefix)) {
return JetStreamConstants.JsapiPrefix;
}
Expand All @@ -34,7 +34,7 @@ internal static string AddPrefix(string prefix) {
return JsPrefixes.GetOrAdd(prefix, prefix);
}

internal static bool HasPrefix(string replyTo)
public static bool HasPrefix(string replyTo)
{
if (replyTo == null) return false;

Expand Down
6 changes: 3 additions & 3 deletions src/NATS.Client/JetStream/PushSubscribeOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public static PushSubscribeOptions ForStream(string stream) {
/// <param name="durable">the durable name</param>
/// <returns>the PushSubscribeOptions</returns>
public static PushSubscribeOptions BindTo(string stream, string durable) {
return new PushSubscribeOptionsBuilder().WithStream(stream).WithDurable(durable).Bind().Build();
return new PushSubscribeOptionsBuilder().WithStream(stream).WithDurable(durable).Bind(true).Build();
}

/// <summary>
Expand Down Expand Up @@ -90,9 +90,9 @@ public PushSubscribeOptionsBuilder WithStream(string stream)
/// Set as a direct subscribe
/// </summary>
/// <returns>The builder</returns>
public PushSubscribeOptionsBuilder Bind()
public PushSubscribeOptionsBuilder Bind(bool b)
{
_bind = true;
_bind = b;
return this;
}

Expand Down
2 changes: 2 additions & 0 deletions src/NATS.Client/JetStream/SubscribeOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ internal SubscribeOptions(string stream, string durable, bool pull, bool bind,

deliverGroup = Validator.ValidateMustMatchIfBothSupplied(deliverGroup, cc?.DeliverGroup, "Builder Deliver Group", "Consumer Configuration Deliver Group");

deliverSubject = Validator.ValidateMustMatchIfBothSupplied(deliverSubject, cc?.DeliverSubject, "Builder Deliver Subject", "Consumer Configuration Deliver Subject");

ConsumerConfiguration = ConsumerConfiguration.Builder(cc)
.WithDurable(durable)
.WithDeliverSubject(deliverSubject)
Expand Down
Loading

0 comments on commit 70acbf5

Please sign in to comment.