From d112848d42c687110825d00cb09fae20484f7293 Mon Sep 17 00:00:00 2001 From: Scott Fauerbach Date: Sat, 13 Nov 2021 07:44:33 -0500 Subject: [PATCH] issue 67, subscription improvements, unit tests (#512) --- README.md | 23 +++ src/NATS.Client/Conn.cs | 14 +- src/NATS.Client/Exceptions.cs | 1 - .../JetStream/AutoStatusManager.cs | 4 +- .../JetStream/ConsumerConfiguration.cs | 3 +- src/NATS.Client/JetStream/IJetStream.cs | 10 +- src/NATS.Client/JetStream/JetStream.cs | 107 +++++++---- src/NATS.Client/JetStream/JetStreamBase.cs | 7 + .../JetStream/JetStreamManagement.cs | 4 +- .../JetStream/PullSubscribeOptions.cs | 5 - .../JetStream/PushSubscribeOptions.cs | 15 -- src/NATS.Client/JetStream/SubscribeOptions.cs | 25 ++- src/NATS.Client/Subscription.cs | 42 +++-- src/Tests/IntegrationTests/TestJetStream.cs | 166 +++++++++++------- .../TestJetStreamManagement.cs | 95 +++++----- .../IntegrationTests/TestJetStreamPublish.cs | 3 +- .../IntegrationTests/TestJetStreamPull.cs | 46 +++++ .../TestJetStreamPushAsync.cs | 7 +- .../IntegrationTests/TestJetStreamPushSync.cs | 105 ++++++++--- .../UnitTests/Internals/TestValidator.cs | 32 ++++ .../JetStream/TestPushPullSubscribeOptions.cs | 2 + 21 files changed, 462 insertions(+), 254 deletions(-) diff --git a/README.md b/README.md index 4009a1f04..d9963152b 100755 --- a/README.md +++ b/README.md @@ -934,6 +934,29 @@ messages, one for each message the previous batch was short. You can just ignore See `JetStreamPullSubExpire` (TODO) and `JetStreamPullSubExpireUseCases` (TODO) in the JetStream samples for detailed and runnable samples. +### Subscription Creation + +Subscription creation has many checks to make sure that a valid, operable subscription can be made. + +| Name | Group | Code | Description | +| --- | --- | --- | --- | +| JsSubPullCantHaveDeliverGroup | SUB | 90001 | Pull subscriptions can't have a deliver group. | +| JsSubPullCantHaveDeliverSubject | SUB | 90002 | Pull subscriptions can't have a deliver subject. | +| JsSubPushCantHaveMaxPullWaiting | SUB | 90003 | Push subscriptions cannot supply max pull waiting. | +| JsSubQueueDeliverGroupMismatch | SUB | 90004 | Queue / deliver group mismatch. | +| JsSubFcHbNotValidPull | SUB | 90005 | Flow Control and/or heartbeat is not valid with a pull subscription. | +| JsSubFcHbHbNotValidQueue | SUB | 90006 | Flow Control and/or heartbeat is not valid in queue mode. | +| JsSubNoMatchingStreamForSubject | SUB | 90007 | No matching streams for subject. | +| JsSubConsumerAlreadyConfiguredAsPush | SUB | 90008 | Consumer is already configured as a push consumer. | +| JsSubConsumerAlreadyConfiguredAsPull | SUB | 90009 | Consumer is already configured as a pull consumer. | +| JsSubSubjectDoesNotMatchFilter | SUB | 90011 | Subject does not match consumer configuration filter. | +| JsSubConsumerAlreadyBound | SUB | 90012 | Consumer is already bound to a subscription. | +| JsSubExistingConsumerNotQueue | SUB | 90013 | Existing consumer is not configured as a queue / deliver group. | +| JsSubExistingConsumerIsQueue | SUB | 90014 | Existing consumer is configured as a queue / deliver group. | +| JsSubExistingQueueDoesNotMatchRequestedQueue | SUB | 90015 | Existing consumer deliver group does not match requested queue / deliver group. | +| JsSubExistingConsumerCannotBeModified | SUB | 90016 | Existing consumer cannot be modified. | +| JsSubConsumerNotFoundRequiredInBind | SUB | 90017 | Consumer not found, required in bind mode. | + ### Message Acknowledgements There are multiple types of acknowledgements in JetStream: diff --git a/src/NATS.Client/Conn.cs b/src/NATS.Client/Conn.cs index d460e6652..0826de161 100644 --- a/src/NATS.Client/Conn.cs +++ b/src/NATS.Client/Conn.cs @@ -742,11 +742,7 @@ internal Connection(Options options) callbackScheduler.Start(); globalRequestInbox = NewInbox(); - - BeforeQueueProcessor = msg => msg; } - - internal Func BeforeQueueProcessor; private void buildPublishProtocolBuffers(int size) { @@ -2195,15 +2191,7 @@ internal void processMsg(byte[] msgBytes, long length) ? new JetStreamMsg(this, msgArgs, s, msgBytes, length) : new Msg(msgArgs, s, msgBytes, length); - // BeforeQueueProcessor returns null if the message - // does not need to be queued, for instance heartbeats - // that are not flow control and are already seen by the - // auto status manager - msg = BeforeQueueProcessor.Invoke(msg); - if (msg != null) - { - s.addMessage(msg, opts.subChanLen); - } + s.addMessage(msg, opts.subChanLen); } // maxreached == false } // lock s.mu diff --git a/src/NATS.Client/Exceptions.cs b/src/NATS.Client/Exceptions.cs index e32c150c5..9deabe57b 100644 --- a/src/NATS.Client/Exceptions.cs +++ b/src/NATS.Client/Exceptions.cs @@ -235,7 +235,6 @@ public sealed class ClientExDetail public static readonly ClientExDetail JsSubNoMatchingStreamForSubject = new ClientExDetail(Sub, 90007, "No matching streams for subject."); public static readonly ClientExDetail JsSubConsumerAlreadyConfiguredAsPush = new ClientExDetail(Sub, 90008, "Consumer is already configured as a push consumer."); public static readonly ClientExDetail JsSubConsumerAlreadyConfiguredAsPull = new ClientExDetail(Sub, 90009, "Consumer is already configured as a pull consumer."); - public static readonly ClientExDetail JsSubExistingDeliverSubjectMismatch = new ClientExDetail(Sub, 90010, "Existing consumer deliver subject does not match requested deliver subject."); public static readonly ClientExDetail JsSubSubjectDoesNotMatchFilter = new ClientExDetail(Sub, 90011, "Subject does not match consumer configuration filter."); public static readonly ClientExDetail JsSubConsumerAlreadyBound = new ClientExDetail(Sub, 90012, "Consumer is already bound to a subscription."); public static readonly ClientExDetail JsSubExistingConsumerNotQueue = new ClientExDetail(Sub, 90013, "Existing consumer is not configured as a queue / deliver group."); diff --git a/src/NATS.Client/JetStream/AutoStatusManager.cs b/src/NATS.Client/JetStream/AutoStatusManager.cs index 1be4066b1..bb5a623f7 100644 --- a/src/NATS.Client/JetStream/AutoStatusManager.cs +++ b/src/NATS.Client/JetStream/AutoStatusManager.cs @@ -114,7 +114,7 @@ public void SetSub(Subscription sub) { _sub = sub; if (Hb) { - conn.BeforeQueueProcessor = BeforeQueueProcessor; + _sub.BeforeChannelAddCheck = BeforeChannelAddCheck; asmTimer = new AsmTimer(this); } } @@ -192,7 +192,7 @@ public bool Manage(Msg msg) { return false; } - private Msg BeforeQueueProcessor(Msg msg) + private Msg BeforeChannelAddCheck(Msg msg) { LastMsgReceived = DateTimeOffset.Now.ToUnixTimeMilliseconds(); if (msg.HasStatus diff --git a/src/NATS.Client/JetStream/ConsumerConfiguration.cs b/src/NATS.Client/JetStream/ConsumerConfiguration.cs index c182d3fb2..8ac9d1ceb 100644 --- a/src/NATS.Client/JetStream/ConsumerConfiguration.cs +++ b/src/NATS.Client/JetStream/ConsumerConfiguration.cs @@ -153,9 +153,9 @@ internal bool WouldBeChangeTo(ConsumerConfiguration original) || WouldBeChange(AckWait, original.AckWait) || WouldBeChange(IdleHeartbeat, original.IdleHeartbeat) - || WouldBeChange(StartTime, original.StartTime) + || WouldBeChange(FilterSubject, original.FilterSubject) || WouldBeChange(Description, original.Description) || WouldBeChange(SampleFrequency, original.SampleFrequency) || WouldBeChange(DeliverSubject, original.DeliverSubject) @@ -163,7 +163,6 @@ internal bool WouldBeChangeTo(ConsumerConfiguration original) ; // do not need to check Durable because the original is retrieved by the durable name - // do not need to check FilterSubject because it's already validated against the original } private static bool WouldBeChange(string request, string original) diff --git a/src/NATS.Client/JetStream/IJetStream.cs b/src/NATS.Client/JetStream/IJetStream.cs index fbe888d16..ab4956755 100644 --- a/src/NATS.Client/JetStream/IJetStream.cs +++ b/src/NATS.Client/JetStream/IJetStream.cs @@ -150,7 +150,7 @@ public interface IJetStream /// The subject on which to listen for messages. /// The subject can have wildcards (partial: *, full: >). /// Pull Subscribe options for this subscription. - /// a JetStreamPullSubscription + /// An IJetStreamPullSubscription IJetStreamPullSubscription PullSubscribe(string subject, PullSubscribeOptions options); /// @@ -168,7 +168,7 @@ public interface IJetStream /// The invoked when messages are received /// on the returned . /// Whether or not to auto ack the message - /// An to use to read any messages received + /// An to use to read any messages received /// from the NATS Server on the given . /// /// A JetStream push subscription @@ -215,7 +215,7 @@ public interface IJetStream /// from the NATS Server on the given . /// /// - /// A JetStream push subscription + /// An IJetStreamPushAsyncSubscription IJetStreamPushAsyncSubscription PushSubscribeAsync(string subject, string queue, EventHandler handler, bool autoAck); /// @@ -238,8 +238,8 @@ public interface IJetStream /// /// /// Whether or not to auto ack the message - /// JetStream pull subscription options. - /// A JetStream push subscription + /// JetStream push subscription options. + /// An IJetStreamPushAsyncSubscription IJetStreamPushAsyncSubscription PushSubscribeAsync(string subject, string queue, EventHandler handler, bool autoAck, PushSubscribeOptions options); /// diff --git a/src/NATS.Client/JetStream/JetStream.cs b/src/NATS.Client/JetStream/JetStream.cs index 8ac2b6fb1..e61505e81 100644 --- a/src/NATS.Client/JetStream/JetStream.cs +++ b/src/NATS.Client/JetStream/JetStream.cs @@ -132,6 +132,8 @@ public Task PublishAsync(Msg msg, PublishOptions publishOptions) // ---------------------------------------------------------------------------------------------------- // Subscribe // ---------------------------------------------------------------------------------------------------- + private static readonly PushSubscribeOptions DefaultPushOpts = PushSubscribeOptions.Builder().Build(); + Subscription CreateSubscription(string subject, string queueName, EventHandler userHandler, bool autoAck, PushSubscribeOptions pushSubscribeOptions, @@ -156,7 +158,7 @@ Subscription CreateSubscription(string subject, string queueName, ValidateNotSupplied(userCC.DeliverSubject, JsSubPullCantHaveDeliverSubject); } else { - so = pushSubscribeOptions ?? PushSubscribeOptions.Builder().Build(); + so = pushSubscribeOptions ?? DefaultPushOpts; stream = so.Stream; // might be null, that's ok (see directBind) userCC = so.ConsumerConfiguration; @@ -165,11 +167,6 @@ Subscription CreateSubscription(string subject, string queueName, // figure out the queue name qgroup = ValidateMustMatchIfBothSupplied(userCC.DeliverGroup, queueName, JsSubQueueDeliverGroupMismatch); - - if (qgroup != null && string.IsNullOrWhiteSpace(userCC.DeliverGroup)) { - // the queueName was provided versus the config deliver group, so the user config must be set - userCC = ConsumerConfiguration.Builder(userCC).WithDeliverGroup(qgroup).Build(); - } } // 2A. Flow Control / heartbeat not always valid @@ -190,7 +187,7 @@ Subscription CreateSubscription(string subject, string queueName, } } - ConsumerConfiguration serverCc = null; + ConsumerConfiguration serverCC = null; String consumerName = userCC.Durable; String inboxDeliver = userCC.DeliverSubject; @@ -199,27 +196,24 @@ Subscription CreateSubscription(string subject, string queueName, ConsumerInfo serverInfo = LookupConsumerInfo(stream, consumerName); if (serverInfo != null) { // the consumer for that durable already exists - serverCc = serverInfo.ConsumerConfiguration; + serverCC = serverInfo.ConsumerConfiguration; + + // check to see if the user sent a different version than the server has + // modifications are not allowed + if (userCC.WouldBeChangeTo(serverCC)) { + throw JsSubExistingConsumerCannotBeModified.Instance(); + } if (isPullMode) { - if (!string.IsNullOrWhiteSpace(serverCc.DeliverSubject)) { + if (!string.IsNullOrWhiteSpace(serverCC.DeliverSubject)) { throw JsSubConsumerAlreadyConfiguredAsPush.Instance(); } } - else if (string.IsNullOrWhiteSpace(serverCc.DeliverSubject)) { + else if (string.IsNullOrWhiteSpace(serverCC.DeliverSubject)) { throw JsSubConsumerAlreadyConfiguredAsPull.Instance(); } - else if (inboxDeliver != null && !inboxDeliver.Equals(serverCc.DeliverSubject)) { - throw JsSubExistingDeliverSubjectMismatch.Instance(); - } - // durable already exists, make sure the filter subject matches - String userFilterSubject = userCC.FilterSubject; - if (!string.IsNullOrWhiteSpace(userFilterSubject) && !userFilterSubject.Equals(serverCc.FilterSubject)) { - throw JsSubSubjectDoesNotMatchFilter.Instance(); - } - - if (string.IsNullOrWhiteSpace(serverCc.DeliverGroup)) { + if (string.IsNullOrWhiteSpace(serverCC.DeliverGroup)) { // lookedUp was null/empty, means existing consumer is not a queue consumer if (qgroup == null) { // ok fine, no queue requested and the existing consumer is also not a queue consumer @@ -235,19 +229,21 @@ Subscription CreateSubscription(string subject, string queueName, else if (qgroup == null) { throw JsSubExistingConsumerIsQueue.Instance(); } - else if (!serverCc.DeliverGroup.Equals(qgroup)) { + else if (!serverCC.DeliverGroup.Equals(qgroup)) { throw JsSubExistingQueueDoesNotMatchRequestedQueue.Instance(); } - // check to see if the user sent a different version than the server has - // modifications are not allowed - // previous checks for deliver subject and filter subject matching are now - // in the changes function - if (userCC.WouldBeChangeTo(serverCc)) { - throw JsSubExistingConsumerCannotBeModified.Instance(); + // durable already exists, make sure the filter subject matches + if (string.IsNullOrWhiteSpace(subject)) + { + subject = userCC.FilterSubject; + } + else if (!IsFilterMatch(subject, serverCC.FilterSubject, stream)) + { + throw JsSubSubjectDoesNotMatchFilter.Instance(); } - inboxDeliver = serverCc.DeliverSubject; // use the deliver subject as the inbox. It may be null, that's ok, we'll fix that later + inboxDeliver = serverCC.DeliverSubject; // use the deliver subject as the inbox. It may be null, that's ok, we'll fix that later } else if (so.Bind) { throw JsSubConsumerNotFoundRequiredInBind.Instance(); @@ -260,7 +256,7 @@ Subscription CreateSubscription(string subject, string queueName, } // 5. If consumer does not exist, create - if (serverCc == null) { + if (serverCC == null) { ConsumerConfiguration.ConsumerConfigurationBuilder ccBuilder = ConsumerConfiguration.Builder(userCC); // Pull mode doesn't maintain a deliver subject. It's actually an error if we send it. @@ -268,19 +264,26 @@ Subscription CreateSubscription(string subject, string queueName, ccBuilder.WithDeliverSubject(inboxDeliver); } - string userFilterSubject = userCC.FilterSubject; - ccBuilder.WithFilterSubject(string.IsNullOrWhiteSpace(userFilterSubject) ? subject : userFilterSubject); + if (string.IsNullOrWhiteSpace(userCC.FilterSubject)) + { + ccBuilder.WithFilterSubject(subject); + } + + if (string.IsNullOrWhiteSpace(userCC.DeliverGroup) && !string.IsNullOrWhiteSpace(qgroup)) + { + ccBuilder.WithDeliverGroup(qgroup); + } // createOrUpdateConsumer can fail for security reasons, maybe other reasons? ConsumerInfo ci = AddOrUpdateConsumerInternal(stream, ccBuilder.Build()); consumerName = ci.Name; - serverCc = ci.ConsumerConfiguration; + serverCC = ci.ConsumerConfiguration; } // 6. create the subscription IAutoStatusManager asm = isPullMode ? (IAutoStatusManager)new PullAutoStatusManager() - : new PushAutoStatusManager((Connection) Conn, so, serverCc, qgroup != null, userHandler == null); + : new PushAutoStatusManager((Connection) Conn, so, serverCC, qgroup != null, userHandler == null); Subscription sub; if (isPullMode) @@ -303,7 +306,7 @@ SyncSubscription CreateSubDelegate(Connection lConn, string lSubject, string lQu else { EventHandler handler; - if (autoAck && serverCc.AckPolicy != AckPolicy.None) + if (autoAck && serverCC.AckPolicy != AckPolicy.None) { handler = (sender, args) => { @@ -357,11 +360,35 @@ private string LookupStreamBySubject(string subject) snr.Process(resp); return snr.Strings.Count == 1 ? snr.Strings[0] : null; } + + private string LookupStreamSubject(string stream) + { + StreamInfo si = GetStreamInfoInternal(stream); + return si.Config.Subjects.Count == 1 ? si.Config.Subjects[0] : null; + } + + private Boolean IsFilterMatch(String subscribeSubject, String filterSubject, String stream) { + + // subscribeSubject guaranteed to not be empty or null + // filterSubject may be null or empty or have value + + if (subscribeSubject.Equals(filterSubject)) { + return true; + } + + if (string.IsNullOrWhiteSpace(filterSubject) || filterSubject.Equals(">")) { + // lookup stream subject returns null if there is not exactly one subject + String streamSubject = LookupStreamSubject(stream); + return subscribeSubject.Equals(streamSubject); + } + + return false; + } public IJetStreamPullSubscription PullSubscribe(string subject, PullSubscribeOptions options) { - ValidateSubject(subject, true); - ValidateNotNull(options, "PullSubscribeOptions"); + ValidateNotNull(options, "Pull Subscribe Options"); + ValidateSubject(subject, IsSubjectRequired(options)); return (IJetStreamPullSubscription) CreateSubscription(subject, null, null, false, null, options); } @@ -382,7 +409,7 @@ public IJetStreamPushAsyncSubscription PushSubscribeAsync(string subject, string public IJetStreamPushAsyncSubscription PushSubscribeAsync(string subject, EventHandler handler, bool autoAck, PushSubscribeOptions options) { - ValidateSubject(subject, true); + ValidateSubject(subject, IsSubjectRequired(options)); ValidateNotNull(handler, "Handler"); return (IJetStreamPushAsyncSubscription) CreateSubscription(subject, null, handler, autoAck, options, null); } @@ -403,7 +430,7 @@ public IJetStreamPushSyncSubscription PushSubscribeSync(string subject) public IJetStreamPushSyncSubscription PushSubscribeSync(string subject, PushSubscribeOptions options) { - ValidateSubject(subject, true); + ValidateSubject(subject, IsSubjectRequired(options)); return (IJetStreamPushSyncSubscription) CreateSubscription(subject, null, null, false, options, null); } @@ -416,9 +443,11 @@ public IJetStreamPushSyncSubscription PushSubscribeSync(string subject, string q public IJetStreamPushSyncSubscription PushSubscribeSync(string subject, string queue, PushSubscribeOptions options) { - ValidateSubject(subject, true); + ValidateSubject(subject, IsSubjectRequired(options)); queue = EmptyAsNull(ValidateQueueName(queue, false)); return (IJetStreamPushSyncSubscription) CreateSubscription(subject, queue, null, false, options, null); } + + private bool IsSubjectRequired(SubscribeOptions options) => options == null || !options.Bind; } } diff --git a/src/NATS.Client/JetStream/JetStreamBase.cs b/src/NATS.Client/JetStream/JetStreamBase.cs index e40e85bf6..689d0c03a 100644 --- a/src/NATS.Client/JetStream/JetStreamBase.cs +++ b/src/NATS.Client/JetStream/JetStreamBase.cs @@ -50,6 +50,13 @@ internal ConsumerInfo AddOrUpdateConsumerInternal(string streamName, ConsumerCon return new ConsumerInfo(m, true); } + public StreamInfo GetStreamInfoInternal(string streamName) + { + string subj = string.Format(JetStreamConstants.JsapiStreamInfo, streamName); + Msg m = RequestResponseRequired(subj, null, Timeout); + return new StreamInfo(m, true); + } + // ---------------------------------------------------------------------------------------------------- // Request Utils // ---------------------------------------------------------------------------------------------------- diff --git a/src/NATS.Client/JetStream/JetStreamManagement.cs b/src/NATS.Client/JetStream/JetStreamManagement.cs index 50767f74f..40fa33671 100644 --- a/src/NATS.Client/JetStream/JetStreamManagement.cs +++ b/src/NATS.Client/JetStream/JetStreamManagement.cs @@ -57,9 +57,7 @@ public bool DeleteStream(string streamName) public StreamInfo GetStreamInfo(string streamName) { Validator.ValidateStreamName(streamName, true); - string subj = string.Format(JetStreamConstants.JsapiStreamInfo, streamName); - Msg m = RequestResponseRequired(subj, null, Timeout); - return new StreamInfo(m, true); + return GetStreamInfoInternal(streamName); } public PurgeResponse PurgeStream(string streamName) diff --git a/src/NATS.Client/JetStream/PullSubscribeOptions.cs b/src/NATS.Client/JetStream/PullSubscribeOptions.cs index d7932ed10..8c342e1d4 100644 --- a/src/NATS.Client/JetStream/PullSubscribeOptions.cs +++ b/src/NATS.Client/JetStream/PullSubscribeOptions.cs @@ -15,11 +15,6 @@ namespace NATS.Client.JetStream { public sealed class PullSubscribeOptions : SubscribeOptions { - /// - /// Gets the Durable name - /// - public string Durable => ConsumerConfiguration.Durable; - // Validation is done by base class private PullSubscribeOptions(ISubscribeOptionsBuilder builder) : base(builder, true, null, null) {} diff --git a/src/NATS.Client/JetStream/PushSubscribeOptions.cs b/src/NATS.Client/JetStream/PushSubscribeOptions.cs index 2fc914540..6f9e0f4a6 100644 --- a/src/NATS.Client/JetStream/PushSubscribeOptions.cs +++ b/src/NATS.Client/JetStream/PushSubscribeOptions.cs @@ -15,21 +15,6 @@ namespace NATS.Client.JetStream { public sealed class PushSubscribeOptions : SubscribeOptions { - /// - /// Gets the durable name - /// - public string Durable => ConsumerConfiguration.Durable; - - /// - /// Gets the deliver subject - /// - public string DeliverSubject => ConsumerConfiguration.DeliverSubject; - - /// - /// Gets the deliver group - /// - public string DeliverGroup => ConsumerConfiguration.DeliverGroup; - // Validation is done by base class private PushSubscribeOptions(ISubscribeOptionsBuilder builder, string deliverSubject, string deliverGroup) : base(builder, false, deliverSubject, deliverGroup) {} diff --git a/src/NATS.Client/JetStream/SubscribeOptions.cs b/src/NATS.Client/JetStream/SubscribeOptions.cs index fd0436a75..57251dd1b 100644 --- a/src/NATS.Client/JetStream/SubscribeOptions.cs +++ b/src/NATS.Client/JetStream/SubscribeOptions.cs @@ -22,11 +22,26 @@ namespace NATS.Client.JetStream /// public abstract class SubscribeOptions { - internal string Stream { get; } - internal bool Pull { get; } - internal bool Bind { get; } - internal ConsumerConfiguration ConsumerConfiguration { get;} - internal int MessageAlarmTime; + public string Stream { get; } + public bool Pull { get; } + public bool Bind { get; } + public ConsumerConfiguration ConsumerConfiguration { get; } + internal int MessageAlarmTime { get; } + + /// + /// Gets the durable name + /// + public string Durable => ConsumerConfiguration.Durable; + + /// + /// Gets the deliver subject + /// + public string DeliverSubject => ConsumerConfiguration.DeliverSubject; + + /// + /// Gets the deliver group + /// + public string DeliverGroup => ConsumerConfiguration.DeliverGroup; protected SubscribeOptions(ISubscribeOptionsBuilder builder, bool pull, string deliverSubject, string deliverGroup) { diff --git a/src/NATS.Client/Subscription.cs b/src/NATS.Client/Subscription.cs index 5a222f0ad..157a851fa 100644 --- a/src/NATS.Client/Subscription.cs +++ b/src/NATS.Client/Subscription.cs @@ -57,11 +57,15 @@ public class Subscription : ISubscription, IDisposable // than the received subject inside a Msg if this is a wildcard. private string subject = null; + internal Func BeforeChannelAddCheck; + internal Subscription(Connection conn, string subject, string queue) { this.conn = conn; this.subject = subject; this.queue = queue; + + BeforeChannelAddCheck = msg => msg; } internal virtual void close() @@ -181,28 +185,38 @@ internal bool addMessage(Msg msg, int maxCount) pBytesMax = pBytes; } - // Check for a Slow Consumer - if ((pMsgsLimit > 0 && pMsgs > pMsgsLimit) - || (pBytesLimit > 0 && pBytes > pBytesLimit)) - { - // slow consumer - handleSlowConsumer(msg); - return false; - } - - if (mch != null) + // BeforeChannelAddCheck returns null if the message + // does not need to be queued, for instance heartbeats + // that are not flow control and are already seen by the + // auto status manager + msg = BeforeChannelAddCheck.Invoke(msg); + + if (msg != null) { - if (mch.Count >= maxCount) + // Check for a Slow Consumer + if ((pMsgsLimit > 0 && pMsgs > pMsgsLimit) + || (pBytesLimit > 0 && pBytes > pBytesLimit)) { + // slow consumer handleSlowConsumer(msg); return false; } - else + + if (mch != null) { - sc = false; - mch.add(msg); + if (mch.Count >= maxCount) + { + handleSlowConsumer(msg); + return false; + } + else + { + sc = false; + mch.add(msg); + } } } + return true; } diff --git a/src/Tests/IntegrationTests/TestJetStream.cs b/src/Tests/IntegrationTests/TestJetStream.cs index c4a55af66..ffcd6588e 100644 --- a/src/Tests/IntegrationTests/TestJetStream.cs +++ b/src/Tests/IntegrationTests/TestJetStream.cs @@ -18,6 +18,7 @@ using NATS.Client.Internals; using NATS.Client.JetStream; using Xunit; +using Xunit.Abstractions; using static UnitTests.TestBase; using static IntegrationTests.JetStreamTestBase; using static NATS.Client.ClientExDetail; @@ -26,7 +27,12 @@ namespace IntegrationTests { public class TestJetStream : TestSuite { - public TestJetStream(JetStreamSuiteContext context) : base(context) {} + private readonly ITestOutputHelper output; + + public TestJetStream(ITestOutputHelper output, JetStreamSuiteContext context) : base(context) + { + this.output = output; + } [Fact] public void TestJetStreamContextCreate() @@ -251,64 +257,6 @@ public void TestFilterSubjectEphemeral() { }); } - [Fact] - public void TestFilterSubjectDurable() { - Context.RunInJsServer(c => - { - // Create our JetStream context. - IJetStream js = c.CreateJetStreamContext(); - - string subjectWild = SUBJECT + ".*"; - string subjectA = SUBJECT + ".A"; - string subjectB = SUBJECT + ".B"; - - // create the stream. - CreateMemoryStream(c, STREAM, subjectWild); - - JsPublish(js, subjectA, 1); - JsPublish(js, subjectB, 1); - JsPublish(js, subjectA, 1); - JsPublish(js, subjectB, 1); - - ConsumerConfiguration cc = ConsumerConfiguration.Builder().WithFilterSubject(subjectA).WithAckPolicy(AckPolicy.None).Build(); - PushSubscribeOptions pso = PushSubscribeOptions.Builder().WithDurable(DURABLE).WithConfiguration(cc).Build(); - IJetStreamPushSyncSubscription sub = js.PushSubscribeSync(subjectWild, pso); - c.Flush(1000); - - Msg m = sub.NextMessage(1000); - Assert.Equal(subjectA, m.Subject); - Assert.Equal(1U, m.MetaData.StreamSequence); - m = sub.NextMessage(1000); - Assert.Equal(subjectA, m.Subject); - Assert.Equal(3U, m.MetaData.StreamSequence); - Assert.Throws(() => sub.NextMessage(1000)); - sub.Unsubscribe(); - - JsPublish(js, subjectA, 1); - JsPublish(js, subjectB, 1); - JsPublish(js, subjectA, 1); - JsPublish(js, subjectB, 1); - - sub = js.PushSubscribeSync(subjectWild, pso); - c.Flush(1000); - - m = sub.NextMessage(1000); - Assert.Equal(subjectA, m.Subject); - Assert.Equal(5U, m.MetaData.StreamSequence); - m = sub.NextMessage(1000); - Assert.Equal(7U, m.MetaData.StreamSequence); - sub.Unsubscribe(); - - ConsumerConfiguration cc1 = ConsumerConfiguration.Builder().WithFilterSubject(subjectWild).Build(); - PushSubscribeOptions pso1 = PushSubscribeOptions.Builder().WithDurable(DURABLE).WithConfiguration(cc1).Build(); - Assert.Throws(() => js.PushSubscribeSync(subjectWild, pso1)); - - ConsumerConfiguration cc2 = ConsumerConfiguration.Builder().WithFilterSubject(subjectB).Build(); - PushSubscribeOptions pso2 = PushSubscribeOptions.Builder().WithDurable(DURABLE).WithConfiguration(cc2).Build(); - Assert.Throws(() => js.PushSubscribeSync(subjectWild, pso2)); - }); - } - class JetStreamTestImpl : JetStream { public JetStreamTestImpl(IConnection connection) : base(connection, null) {} @@ -454,6 +402,99 @@ public void TestBindErrors() }); } + private static readonly Random Rndm = new Random(); + + [Fact] + public void TestFilterMismatchErrors() + { + Context.RunInJsServer(c => + { + IJetStreamManagement jsm = c.CreateJetStreamManagementContext(); + IJetStream js = c.CreateJetStreamContext(); + + // single subject + CreateMemoryStream(jsm, STREAM, SUBJECT); + + // will work as SubscribeSubject equals Filter Subject + SubscribeOk(js, jsm, SUBJECT, SUBJECT); + SubscribeOk(js, jsm, ">", ">"); + SubscribeOk(js, jsm, "*", "*"); + + // will work as SubscribeSubject != empty Filter Subject, + // b/c Stream has exactly 1 subject and is a match. + SubscribeOk(js, jsm, "", SUBJECT); + + // will work as SubscribeSubject != Filter Subject of '>' + // b/c Stream has exactly 1 subject and is a match. + SubscribeOk(js, jsm, ">", SUBJECT); + + // will not work + SubscribeEx(js, jsm, "*", SUBJECT); + + // multiple subjects no wildcards + jsm.DeleteStream(STREAM); + CreateMemoryStream(jsm, STREAM, SUBJECT, Subject(2)); + + // will work as SubscribeSubject equals Filter Subject + SubscribeOk(js, jsm, SUBJECT, SUBJECT); + SubscribeOk(js, jsm, ">", ">"); + SubscribeOk(js, jsm, "*", "*"); + + // will not work because stream has more than 1 subject + SubscribeEx(js, jsm, "", SUBJECT); + SubscribeEx(js, jsm, ">", SUBJECT); + SubscribeEx(js, jsm, "*", SUBJECT); + + // multiple subjects via '>' + jsm.DeleteStream(STREAM); + CreateMemoryStream(jsm, STREAM, SUBJECT_GT); + + // will work, exact matches + SubscribeOk(js, jsm, SubjectDot("1"), SubjectDot("1")); + SubscribeOk(js, jsm, ">", ">"); + + // will not work because mismatch / stream has more than 1 subject + SubscribeEx(js, jsm, "", SubjectDot("1")); + SubscribeEx(js, jsm, ">", SubjectDot("1")); + SubscribeEx(js, jsm, SUBJECT_GT, SubjectDot("1")); + + // multiple subjects via '*' + jsm.DeleteStream(STREAM); + CreateMemoryStream(jsm, STREAM, SUBJECT_STAR); + + // will work, exact matches + SubscribeOk(js, jsm, SubjectDot("1"), SubjectDot("1")); + SubscribeOk(js, jsm, ">", ">"); + + // will not work because mismatch / stream has more than 1 subject + SubscribeEx(js, jsm, "", SubjectDot("1")); + SubscribeEx(js, jsm, ">", SubjectDot("1")); + SubscribeEx(js, jsm, SUBJECT_STAR, SubjectDot("1")); + }); + } + + private void SubscribeOk(IJetStream js, IJetStreamManagement jsm, string fs, string ss) + { + int i = Rndm.Next(); // just want a unique number + SetupConsumer(jsm, i, fs); + js.PushSubscribeSync(ss, ConsumerConfiguration.Builder().WithDurable(Durable(i)).BuildPushSubscribeOptions()).Unsubscribe(); + } + + private void SubscribeEx(IJetStream js, IJetStreamManagement jsm, string fs, string ss) + { + int i = Rndm.Next(); // just want a unique number + SetupConsumer(jsm, i, fs); + NATSJetStreamClientException e = Assert.Throws( + () => js.PushSubscribeSync(ss, ConsumerConfiguration.Builder().WithDurable(Durable(i)).BuildPushSubscribeOptions())); + Assert.Contains(JsSubSubjectDoesNotMatchFilter.Id, e.Message); + } + + private void SetupConsumer(IJetStreamManagement jsm, int i, String fs) + { + jsm.AddOrUpdateConsumer(STREAM, + ConsumerConfiguration.Builder().WithDeliverSubject(Deliver(i)).WithDurable(Durable(i)).WithFilterSubject(fs).Build()); + } + [Fact] public void TestBindDurableDeliverSubject() { @@ -501,13 +542,6 @@ public void TestBindDurableDeliverSubject() () => js.PushSubscribeSync(SUBJECT, PushSubscribeOptions.BindTo(STREAM, Durable(2)))); Assert.Contains(JsSubConsumerAlreadyConfiguredAsPull.Id, e.Message); - // try to push subscribe but mismatch the deliver subject - ConsumerConfiguration ccMis = ConsumerConfiguration.Builder().WithDeliverSubject("not-match").Build(); - PushSubscribeOptions psoMis = PushSubscribeOptions.Builder().WithDurable(Durable(1)) - .WithConfiguration(ccMis).Build(); - e = Assert.Throws(() => js.PushSubscribeSync(SUBJECT, psoMis)); - Assert.Contains(JsSubExistingDeliverSubjectMismatch.Id, e.Message); - // this one is okay js.PushSubscribeSync(SUBJECT, PushSubscribeOptions.Builder().WithDurable(Durable(1)).Build()); }); diff --git a/src/Tests/IntegrationTests/TestJetStreamManagement.cs b/src/Tests/IntegrationTests/TestJetStreamManagement.cs index b27d5a657..409c397bc 100644 --- a/src/Tests/IntegrationTests/TestJetStreamManagement.cs +++ b/src/Tests/IntegrationTests/TestJetStreamManagement.cs @@ -18,7 +18,6 @@ using NATS.Client.JetStream; using Xunit; using static IntegrationTests.JetStreamTestBase; -using static NATS.Client.ClientExDetail; using static UnitTests.TestBase; namespace IntegrationTests @@ -373,54 +372,79 @@ public void TestAddDeleteConsumer() } [Fact] - public void TestInvalidConsumerUpdates() + public void TestValidConsumerUpdates() { Context.RunInJsServer(c => { IJetStreamManagement jsm = c.CreateJetStreamManagementContext(); CreateMemoryStream(jsm, STREAM, SUBJECT_GT); - ConsumerConfiguration cc = ConsumerConfiguration.Builder() - .WithDurable(Durable(1)) - .WithAckPolicy(AckPolicy.Explicit) - .WithDeliverSubject(Deliver(1)) - .WithMaxDeliver(3) - .WithFilterSubject(SUBJECT_GT) - .Build(); + ConsumerConfiguration cc = PrepForUpdateTest(jsm); + cc = ConsumerConfiguration.Builder(cc).WithDeliverSubject(Deliver(2)).Build(); + AssertValidAddOrUpdate(jsm, cc); + cc = PrepForUpdateTest(jsm); + cc = ConsumerConfiguration.Builder(cc).WithAckWait(Duration.OfSeconds(5)).Build(); AssertValidAddOrUpdate(jsm, cc); - cc = ConsumerConfiguration.Builder(cc).WithDeliverSubject(Deliver(2)).Build(); + cc = PrepForUpdateTest(jsm); + cc = ConsumerConfiguration.Builder(cc).WithRateLimit(100).Build(); AssertValidAddOrUpdate(jsm, cc); - cc = ConsumerConfiguration.Builder(cc).WithDeliverPolicy(DeliverPolicy.New).Build(); - AssertInvalidConsumerUpdate(jsm, cc); + cc = PrepForUpdateTest(jsm); + cc = ConsumerConfiguration.Builder(cc).WithMaxAckPending(100).Build(); + AssertValidAddOrUpdate(jsm, cc); - cc = ConsumerConfiguration.Builder(cc).WithAckWait(Duration.OfSeconds(5)).Build(); - AssertInvalidConsumerUpdate(jsm, cc); + cc = PrepForUpdateTest(jsm); + cc = ConsumerConfiguration.Builder(cc).WithMaxDeliver(4).Build(); + AssertValidAddOrUpdate(jsm, cc); + }); + } - cc = ConsumerConfiguration.Builder(cc).WithFilterSubject(SUBJECT_STAR).Build(); - AssertInvalidConsumerUpdate(jsm, cc); + [Fact] + public void TestInvalidConsumerUpdates() + { + Context.RunInJsServer(c => + { + IJetStreamManagement jsm = c.CreateJetStreamManagementContext(); + CreateMemoryStream(jsm, STREAM, SUBJECT_GT); - cc = ConsumerConfiguration.Builder(cc).WithRateLimit(100).Build(); + ConsumerConfiguration cc = PrepForUpdateTest(jsm); + cc = ConsumerConfiguration.Builder(cc).WithDeliverPolicy(DeliverPolicy.New).Build(); AssertInvalidConsumerUpdate(jsm, cc); - cc = ConsumerConfiguration.Builder(cc).WithMaxAckPending(100).Build(); + cc = PrepForUpdateTest(jsm); + cc = ConsumerConfiguration.Builder(cc).WithFilterSubject(SUBJECT_STAR).Build(); AssertInvalidConsumerUpdate(jsm, cc); + cc = PrepForUpdateTest(jsm); cc = ConsumerConfiguration.Builder(cc).WithIdleHeartbeat(Duration.OfMillis(111)).Build(); AssertInvalidConsumerUpdate(jsm, cc); - - cc = ConsumerConfiguration.Builder(cc).WithMaxDeliver(4).Build(); - AssertInvalidConsumerUpdate(jsm, cc); }); } + + private ConsumerConfiguration PrepForUpdateTest(IJetStreamManagement jsm) + { + try { + jsm.DeleteConsumer(STREAM, Durable(1)); + } + catch (Exception) { /* ignore */ } + + ConsumerConfiguration cc = ConsumerConfiguration.Builder() + .WithDurable(Durable(1)) + .WithAckPolicy(AckPolicy.Explicit) + .WithDeliverSubject(Deliver(1)) + .WithMaxDeliver(3) + .WithFilterSubject(SUBJECT_GT) + .Build(); + AssertValidAddOrUpdate(jsm, cc); + return cc; + } private void AssertInvalidConsumerUpdate(IJetStreamManagement jsm, ConsumerConfiguration cc) { NATSJetStreamException e = Assert.Throws(() => jsm.AddOrUpdateConsumer(STREAM, cc)); - // 10013 consumer name already in use - // 10105 consumer already exists and is still active - Assert.True(e.ApiErrorCode == 10013 || e.ApiErrorCode == 10105); + Assert.Equal(10012, e.ApiErrorCode); + Assert.Equal(500, e.ErrorCode); } private void AssertValidAddOrUpdate(IJetStreamManagement jsm, ConsumerConfiguration cc) { @@ -478,29 +502,6 @@ public void TestCreateConsumersWithFilters() .WithFilterSubject(SubjectDot("F")) .Build() ); - - ConsumerConfiguration ccBadFilter = ConsumerConfiguration.Builder() - .WithDurable(Durable(42)).WithFilterSubject("x").Build(); - - PullSubscribeOptions pullOptsBadFilter = PullSubscribeOptions.Builder() - .WithConfiguration(ccBadFilter).Build(); - - NATSJetStreamClientException e = Assert.Throws(() => js.PullSubscribe(SubjectDot("F"), pullOptsBadFilter)); - Assert.Contains(JsSubSubjectDoesNotMatchFilter.Id, e.Message); - - // try to filter against durable with mismatch, push - jsm.AddOrUpdateConsumer(STREAM, ConsumerConfiguration.Builder() - .WithDurable(Durable(43)) - .WithDeliverSubject(Deliver(43)) - .WithFilterSubject(SubjectDot("F")) - .Build() - ); - - ccBadFilter = ConsumerConfiguration.Builder().WithDurable(Durable(43)).WithFilterSubject("x").Build(); - - PushSubscribeOptions pushOptsBadFilter = PushSubscribeOptions.Builder().WithConfiguration(ccBadFilter).Build(); - e = Assert.Throws(() => js.PushSubscribeSync(SubjectDot("F"), pushOptsBadFilter)); - Assert.Contains(JsSubSubjectDoesNotMatchFilter.Id, e.Message); }); } diff --git a/src/Tests/IntegrationTests/TestJetStreamPublish.cs b/src/Tests/IntegrationTests/TestJetStreamPublish.cs index 010b12975..dc9dbdb3a 100644 --- a/src/Tests/IntegrationTests/TestJetStreamPublish.cs +++ b/src/Tests/IntegrationTests/TestJetStreamPublish.cs @@ -283,8 +283,7 @@ public void TestPublishAckJson() [Fact] public void TestPublishNoAck() { - TestServerInfo testServerInfo = new TestServerInfo(4222); - Context.RunInJsServer(testServerInfo, c => + Context.RunInJsServer(c => { CreateDefaultTestStream(c); diff --git a/src/Tests/IntegrationTests/TestJetStreamPull.cs b/src/Tests/IntegrationTests/TestJetStreamPull.cs index b745f9b41..84d698cc7 100644 --- a/src/Tests/IntegrationTests/TestJetStreamPull.cs +++ b/src/Tests/IntegrationTests/TestJetStreamPull.cs @@ -452,5 +452,51 @@ public void TestAckWaitTimeout() AssertNoMoreMessages(sub); }); } + + [Fact] + public void TestDurable() + { + Context.RunInJsServer(c => + { + // create the stream. + CreateDefaultTestStream(c); + + // Create our JetStream context. + IJetStream js = c.CreateJetStreamContext(); + + // Build our subscription options normally + PullSubscribeOptions options1 = PullSubscribeOptions.Builder().WithDurable(DURABLE).Build(); + _testDurable(js, () => js.PullSubscribe(SUBJECT, options1)); + + // bind long form + PullSubscribeOptions options2 = PullSubscribeOptions.Builder() + .WithStream(STREAM) + .WithDurable(DURABLE) + .WithBind(true) + .Build(); + _testDurable(js, () => js.PullSubscribe(null, options2)); + + // bind short form + PullSubscribeOptions options3 = PullSubscribeOptions.BindTo(STREAM, DURABLE); + _testDurable(js, () => js.PullSubscribe(null, options3)); + }); + } + + private void _testDurable(IJetStream js, PullSubSupplier supplier) + { + JsPublish(js, SUBJECT, 2); + + IJetStreamPullSubscription sub = supplier.Invoke(); + + // start the pull + sub.PullNoWait(4); + + IList messages = ReadMessagesAck(sub); + ValidateRedAndTotal(2, messages.Count, 2, 2); + + sub.Unsubscribe(); + } + + delegate IJetStreamPullSubscription PullSubSupplier(); } } diff --git a/src/Tests/IntegrationTests/TestJetStreamPushAsync.cs b/src/Tests/IntegrationTests/TestJetStreamPushAsync.cs index 137c4d451..36c30d62d 100644 --- a/src/Tests/IntegrationTests/TestJetStreamPushAsync.cs +++ b/src/Tests/IntegrationTests/TestJetStreamPushAsync.cs @@ -54,12 +54,7 @@ public void TestHandlerSub() void TestHandler(object sender, MsgHandlerEventArgs args) { received++; - - if (args.Message.IsJetStream) - { - args.Message.Ack(); - } - + args.Message.Ack(); latch.Signal(); } diff --git a/src/Tests/IntegrationTests/TestJetStreamPushSync.cs b/src/Tests/IntegrationTests/TestJetStreamPushSync.cs index 23a88e27d..d69da4165 100644 --- a/src/Tests/IntegrationTests/TestJetStreamPushSync.cs +++ b/src/Tests/IntegrationTests/TestJetStreamPushSync.cs @@ -103,46 +103,93 @@ public void TestJetStreamPushDurable(string deliverSubject) // Create our JetStream context. IJetStream js = c.CreateJetStreamContext(); - // publish some messages - JsPublish(js, SUBJECT, 1, 5); + // Build our subscription options normally + PushSubscribeOptions options1 = PushSubscribeOptions.Builder() + .WithDurable(DURABLE) + .WithDeliverSubject(deliverSubject) + .Build(); - // use ackWait so I don't have to wait forever before re-subscribing - ConsumerConfiguration cc = ConsumerConfiguration.Builder().WithAckWait(3000).Build(); + _testPushDurableSubSync(deliverSubject, c, js, () => js.PushSubscribeSync(SUBJECT, options1)); + _testPushDurableSubAsync(js, h => js.PushSubscribeAsync(SUBJECT, h, false, options1)); - // Build our subscription options. - PushSubscribeOptions options = PushSubscribeOptions.Builder() + // bind long form + PushSubscribeOptions options2 = PushSubscribeOptions.Builder() + .WithStream(STREAM) .WithDurable(DURABLE) - .WithConfiguration(cc) + .WithBind(true) .WithDeliverSubject(deliverSubject) .Build(); + _testPushDurableSubSync(deliverSubject, c, js, () => js.PushSubscribeSync(null, options2)); + _testPushDurableSubAsync(js, h => js.PushSubscribeAsync(null, h, false, options2)); - // Subscribe. - IJetStreamPushSyncSubscription sub = js.PushSubscribeSync(SUBJECT, options); - AssertSubscription(sub, STREAM, DURABLE, deliverSubject, false); - c.Flush(DefaultTimeout); // flush outgoing communication with/to the server + // bind short form + PushSubscribeOptions options3 = PushSubscribeOptions.BindTo(STREAM, DURABLE); + _testPushDurableSubSync(deliverSubject, c, js, () => js.PushSubscribeSync(null, options3)); + _testPushDurableSubAsync(js, h => js.PushSubscribeAsync(null, h, false, options3)); + }); + } + + delegate IJetStreamPushSyncSubscription PushSyncSubSupplier(); + delegate IJetStreamPushAsyncSubscription PushAsyncSubSupplier(EventHandler handler); - // read what is available - IList messages = ReadMessagesAck(sub); - int total = messages.Count; - ValidateRedAndTotal(5, messages.Count, 5, total); + private void _testPushDurableSubSync(string deliverSubject, IConnection nc, IJetStream js, PushSyncSubSupplier supplier) + { + // publish some messages + JsPublish(js, SUBJECT, 1, 5); - // read again, nothing should be there - messages = ReadMessagesAck(sub); - total += messages.Count; - ValidateRedAndTotal(0, messages.Count, 5, total); + IJetStreamPushSyncSubscription sub = supplier.Invoke(); + AssertSubscription(sub, STREAM, DURABLE, deliverSubject, false); - sub.Unsubscribe(); - c.Flush(DefaultTimeout); // flush outgoing communication with/to the server + // read what is available + IList messages = ReadMessagesAck(sub); + int total = messages.Count; + ValidateRedAndTotal(5, messages.Count, 5, total); - // re-subscribe - sub = js.PushSubscribeSync(SUBJECT, options); - c.Flush(DefaultTimeout); // flush outgoing communication with/to the server + // read again, nothing should be there + messages = ReadMessagesAck(sub); + total += messages.Count; + ValidateRedAndTotal(0, messages.Count, 5, total); - // read again, nothing should be there - messages = ReadMessagesAck(sub); - total += messages.Count; - ValidateRedAndTotal(0, messages.Count, 5, total); - }); + sub.Unsubscribe(); + nc.Flush(1000); // flush outgoing communication with/to the server + + // re-subscribe + sub = supplier.Invoke(); + nc.Flush(1000); // flush outgoing communication with/to the server + + // read again, nothing should be there + messages = ReadMessagesAck(sub); + total += messages.Count; + ValidateRedAndTotal(0, messages.Count, 5, total); + + sub.Unsubscribe(); + nc.Flush(1000); // flush outgoing communication with/to the server + } + + private void _testPushDurableSubAsync(IJetStream js, PushAsyncSubSupplier supplier) + { + // publish some messages + JsPublish(js, SUBJECT, 5); + + CountdownEvent latch = new CountdownEvent(5); + int received = 0; + + void TestHandler(object sender, MsgHandlerEventArgs args) + { + received++; + args.Message.Ack(); + latch.Signal(); + } + + // Subscribe using the handler + IJetStreamPushAsyncSubscription sub = supplier.Invoke(TestHandler); + + // Wait for messages to arrive using the countdown latch. + latch.Wait(10000); + + sub.Unsubscribe(); + + Assert.Equal(5, received); } [Fact] diff --git a/src/Tests/UnitTests/Internals/TestValidator.cs b/src/Tests/UnitTests/Internals/TestValidator.cs index a2c46796b..83bd9120c 100644 --- a/src/Tests/UnitTests/Internals/TestValidator.cs +++ b/src/Tests/UnitTests/Internals/TestValidator.cs @@ -231,6 +231,18 @@ public void TestValidateDurationNotRequiredGtOrEqZero() Duration.OfSeconds(-1))); } + [Fact] + public void TestValidateDurationNotRequiredNotLessThanMin() { + Duration min = Duration.OfMillis(99); + Duration less = Duration.OfMillis(9); + Duration more = Duration.OfMillis(9999); + + Assert.Null(Validator.ValidateDurationNotRequiredNotLessThanMin(null, min)); + Assert.Equal(more, Validator.ValidateDurationNotRequiredNotLessThanMin(more, min)); + + Assert.Throws(() => Validator.ValidateDurationNotRequiredNotLessThanMin(less, min)); + } + [Fact] public void TestValidateJetStreamPrefix() { @@ -240,6 +252,19 @@ public void TestValidateJetStreamPrefix() Assert.Throws(() => Validator.ValidateJetStreamPrefix(HasSpace)); Assert.Throws(() => Validator.ValidateJetStreamPrefix(HasLow)); } + + [Fact] + public void TestValidateNotSupplied() { + ClientExDetail err = new ClientExDetail("TEST", 999999, "desc"); + + // string version + Validator.ValidateNotSupplied((string)null, err); + Validator.ValidateNotSupplied("", err); + Assert.Throws(() => Validator.ValidateNotSupplied("notempty", err)); + + Validator.ValidateNotSupplied(0, 0, err); + Assert.Throws(() => Validator.ValidateNotSupplied(1, 0, err)); + } [Fact] public void TestValidateMustMatchIfBothSupplied() @@ -297,6 +322,13 @@ private void NotAllowedRequired(Func test, params String[] Assert.Throws(() => test.Invoke(s, true)); } } + + [Fact] + public void TestNatsJetStreamClientError() + { + ClientExDetail err = new ClientExDetail("TEST", 999999, "desc"); + Assert.Equal("[TEST-999999] desc", err.Message); + } } } diff --git a/src/Tests/UnitTests/JetStream/TestPushPullSubscribeOptions.cs b/src/Tests/UnitTests/JetStream/TestPushPullSubscribeOptions.cs index 810ca82e9..20ba1ac68 100644 --- a/src/Tests/UnitTests/JetStream/TestPushPullSubscribeOptions.cs +++ b/src/Tests/UnitTests/JetStream/TestPushPullSubscribeOptions.cs @@ -43,6 +43,7 @@ public void TestPushAffirmative() Assert.Null(so.Stream); Assert.Null(so.Durable); Assert.Null(so.DeliverSubject); + Assert.False(so.Pull); } [Fact] @@ -54,6 +55,7 @@ public void TestPullAffirmative() .Build(); Assert.Equal(STREAM, so.Stream); Assert.Equal(DURABLE, so.Durable); + Assert.True(so.Pull); } [Fact]