Skip to content

Commit

Permalink
Simplification Review, Tuning, Examples, Testing (#790)
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf authored Jun 20, 2023
1 parent d76ad19 commit 0fd815a
Show file tree
Hide file tree
Showing 24 changed files with 300 additions and 244 deletions.
56 changes: 47 additions & 9 deletions src/NATS.Client/JetStream/ConsumerContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@
// limitations under the License.

using System;
using System.Threading.Tasks;
using NATS.Client.Internals;
using static NATS.Client.JetStream.BaseConsumeOptions;
using static NATS.Client.JetStream.ConsumeOptions;
using static NATS.Client.JetStream.JetStreamPullSubscription;

namespace NATS.Client.JetStream
{
Expand Down Expand Up @@ -50,19 +52,16 @@ public ConsumerInfo GetCachedConsumerInfo()
}

public Msg Next() {
return Next(DefaultExpiresInMillis);
return new NextSub(js, bindPso, DefaultExpiresInMillis).Next();
}

public Msg Next(int maxWaitMillis) {
if (maxWaitMillis < MinExpiresMills) {
public Msg Next(int maxWaitMillis)
{
if (maxWaitMillis < MinExpiresMills)
{
throw new ArgumentException($"Max wait must be at least {MinExpiresMills} milliseconds.");
}

long expires = maxWaitMillis - JetStreamPullSubscription.ExpireAdjustment;
JetStreamPullSubscription sub
= (JetStreamPullSubscription)new SubscriptionMaker(js, bindPso).MakeSubscription();
sub.pullImpl.Pull(false, null, PullRequestOptions.Builder(1).WithExpiresIn(expires).Build());
return sub.NextMessage(maxWaitMillis);
return new NextSub(js, bindPso, maxWaitMillis).Next();
}

public IFetchConsumer FetchMessages(int maxMessages) {
Expand Down Expand Up @@ -99,6 +98,45 @@ public IMessageConsumer consume(EventHandler<MsgHandlerEventArgs> handler, Consu
}
}

internal class NextSub
{
private int maxWaitMillis;
private JetStreamPullSubscription sub;

public NextSub(IJetStream js, PullSubscribeOptions pso, int maxWaitMillis)
{
sub = (JetStreamPullSubscription)new SubscriptionMaker(js, pso).MakeSubscription();
this.maxWaitMillis = maxWaitMillis;
sub.pullImpl.Pull(PullRequestOptions.Builder(1).WithExpiresIn(maxWaitMillis - ExpireAdjustment).Build(), false, null);
}

internal Msg Next()
{
try
{
return sub.NextMessage(maxWaitMillis);
}
catch (NATSTimeoutException)
{
return null;
}
finally
{
Task.Run(() =>
{
try
{
sub.Unsubscribe();
}
catch (Exception)
{
// intentionally ignored, nothing we can do anyway
}
});
}
}
}

internal class SubscriptionMaker
{
private readonly IJetStream js;
Expand Down
40 changes: 24 additions & 16 deletions src/NATS.Client/JetStream/FetchConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,30 +32,38 @@ internal FetchConsumer(SubscriptionMaker subscriptionMaker, FetchConsumeOptions
.WithExpiresIn(opts.ExpiresIn)
.WithIdleHeartbeat(opts.IdleHeartbeat)
.Build();
((JetStreamPullSubscription)sub).pullImpl.Pull(false, null, pro);
((JetStreamPullSubscription)sub).pullImpl.Pull(pro, false, null);
}

public Msg NextMessage()
{
int timeLeftMillis;
if (sw == null) {
sw = Stopwatch.StartNew();
timeLeftMillis = maxWaitMillis;
}
else
try
{
timeLeftMillis = maxWaitMillis - (int)sw.ElapsedMilliseconds;
}
int timeLeftMillis;
if (sw == null)
{
sw = Stopwatch.StartNew();
timeLeftMillis = maxWaitMillis;
}
else
{
timeLeftMillis = maxWaitMillis - (int)sw.ElapsedMilliseconds;
}

// if the manager thinks it has received everything in the pull, it means
// that all the messages are already in the internal queue and there is
// no waiting necessary
if (timeLeftMillis < 1 | pmm.pendingMessages < 1 || (pmm.trackingBytes && pmm.pendingBytes < 1))
{
return ((JetStreamPullSubscription)sub).NextMessage(1); // 1 is the shortest time I can give
}

// if the manager thinks it has received everything in the pull, it means
// that all the messages are already in the internal queue and there is
// no waiting necessary
if (timeLeftMillis < 1 | pmm.pendingMessages < 1 || (pmm.trackingBytes && pmm.pendingBytes < 1))
return ((JetStreamPullSubscription)sub).NextMessage(timeLeftMillis);
}
catch (NATSTimeoutException)
{
return ((JetStreamPullSubscription)sub).NextMessage(1); // 1 is the shortest time I can give
return null;
}

return ((JetStreamPullSubscription)sub).NextMessage(timeLeftMillis);
}
}
}
7 changes: 7 additions & 0 deletions src/NATS.Client/JetStream/IFetchConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@ namespace NATS.Client.JetStream
/// </summary>
public interface IFetchConsumer : IMessageConsumer
{
/// <summary>
/// Read the next message. Return null if the fetch has been fulfilled either
/// because max messages or bytes max bytes have been reached,
/// or because the fetch was not fulfilled in the timeout set byt the fetch options.
/// @return the next message for this subscriber or null if there is a timeout
/// </summary>
/// <returns>the next message or null if there is a timeout</returns>
Msg NextMessage();
}
}
3 changes: 1 addition & 2 deletions src/NATS.Client/JetStream/IMessageConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// limitations under the License.

using System;
using System.Threading.Tasks;

namespace NATS.Client.JetStream
{
Expand All @@ -34,6 +33,6 @@ public interface IMessageConsumer : IDisposable
/// <param name="timeout">The time to wait for the stop to succeed, pass 0 to wait forever.
/// Stop involves moving messages to and from the server so a very short timeout is not recommended.</param>
/// <returns>A task so you could wait for the stop to know when there are no more messages.</returns>
Task Stop(int timeout);
void Stop(int timeout);
}
}
2 changes: 1 addition & 1 deletion src/NATS.Client/JetStream/IStreamContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public interface IStreamContext
/// </summary>
/// <param name="config">the consumer configuration to use.</param>
/// <returns>consumer information.</returns>
IConsumerContext AddConsumer(ConsumerConfiguration config);
IConsumerContext CreateOrUpdateConsumer(ConsumerConfiguration config);

/// <summary>
/// Management function to deletes a consumer.
Expand Down
12 changes: 6 additions & 6 deletions src/NATS.Client/JetStream/IterableConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@ public Msg NextMessage(int timeoutMillis)
{
return ((JetStreamPullSubscription)sub).NextMessage(timeoutMillis);
}
catch (NATSBadSubscriptionException e)
catch (NATSTimeoutException)
{
return null;
}
catch (NATSBadSubscriptionException)
{
// this happens if the consumer is stopped, since it is
// drained/unsubscribed, so don't pass it on if it's expected
if (stopped)
{
throw new NATSTimeoutException();
}
throw e;
return null;
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion src/NATS.Client/JetStream/JetStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,7 @@ Subscription CreateSubscription(string subject, string queueName,
}
}

Conn.FlushBuffer();
return sub;
}

Expand Down Expand Up @@ -494,8 +495,9 @@ public IJetStreamPullSubscription PullSubscribe(string subject, PullSubscribeOpt

public IJetStreamPullAsyncSubscription PullSubscribeAsync(string subject, EventHandler<MsgHandlerEventArgs> handler, PullSubscribeOptions options)
{
ValidateNotNull(options, "Pull Subscribe Options");
ValidateSubject(subject, IsSubjectRequired(options));
ValidateNotNull(handler, "Handler");
ValidateNotNull(options, "Pull Subscribe Options");
return (IJetStreamPullAsyncSubscription) CreateSubscription(subject, null, handler, false, null, options);
}

Expand Down
29 changes: 24 additions & 5 deletions src/NATS.Client/JetStream/JetStreamAbstractSyncSubscription.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,24 @@ public override void Unsubscribe()
base.Unsubscribe();
}

public override void AutoUnsubscribe(int max)
{
MessageManager.Shutdown();
base.AutoUnsubscribe(max);
}

internal override void close()
{
MessageManager.Shutdown();
base.close();
}

protected override void Dispose(bool disposing)
{
MessageManager.Shutdown();
base.Dispose(disposing);
}

public override Msg NextMessage()
{
return _nextUnmanagedWaitForever(null);
Expand Down Expand Up @@ -93,9 +105,12 @@ protected Msg _nextUnmanagedWaitForever(String expectedPullSubject)
{
throw new NATSJetStreamStatusException(msg.Status, this);
}

break;
}
// StatusHandled, StatusTerminus and StatusError that isn't for expected pullSubject: check again since waiting forever
// Check again since waiting forever when:
// 1. Any StatusHandled or StatusTerminus
// 2. StatusError that aren't for expected pullSubject
}
}

Expand All @@ -108,18 +123,22 @@ protected Msg _nextUnmanagedNoWait(string expectedPullSubject)
return msg;
case ManageResult.StatusTerminus:
// if the status applies return null, otherwise it's ignored, fall through
if (expectedPullSubject == null || expectedPullSubject.Equals(msg.Subject)) {
if (expectedPullSubject == null || expectedPullSubject.Equals(msg.Subject))
{
throw new NATSTimeoutException();
}
break;
case ManageResult.StatusError:
// if the status applies throw exception, otherwise it's ignored, fall through
if (expectedPullSubject == null || expectedPullSubject.Equals(msg.Subject)) {
if (expectedPullSubject == null || expectedPullSubject.Equals(msg.Subject))
{
throw new NATSJetStreamStatusException(msg.Status, this);
}
break;
}
// StatusHandled and StatusTerminus / StatusError that aren't for expected pullSubject: check again
// Check again when, regular messages might have arrived
// 1. Any StatusHandled
// 2. StatusTerminus or StatusError that aren't for expected pullSubject
}
}

Expand Down Expand Up @@ -149,7 +168,7 @@ protected Msg _nextUnmanaged(int timeout, string expectedPullSubject)
}
break;
}
// StatusHandled and StatusTerminus / StatusError that aren't for expected pullSubject: check again while have time
// anything else, try again while we have time
timeLeft = timeout - (int)sw.ElapsedMilliseconds;
}
throw new NATSTimeoutException();
Expand Down
1 change: 1 addition & 0 deletions src/NATS.Client/JetStream/JetStreamManagement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public PurgeResponse PurgeStream(string streamName)
public PurgeResponse PurgeStream(string streamName, PurgeOptions options)
{
Validator.ValidateStreamName(streamName, true);
Validator.ValidateNotNull(options, nameof(options));
string subj = string.Format(JetStreamConstants.JsapiStreamPurge, streamName);
Msg m = RequestResponseRequired(subj, options.Serialize(), Timeout);
return new PurgeResponse(m, true);
Expand Down
3 changes: 1 addition & 2 deletions src/NATS.Client/JetStream/JetStreamPullApiImpl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ internal void UpdateConsumer(string consumer)
_consumer = consumer;
}

internal string Pull(bool raiseStatusWarnings, ITrackPendingListener trackPendingListener,
PullRequestOptions pullRequestOptions) {
internal string Pull(PullRequestOptions pullRequestOptions, bool raiseStatusWarnings, ITrackPendingListener trackPendingListener) {
string publishSubject = _js.PrependPrefix(string.Format(JetStreamConstants.JsapiConsumerMsgNext, _stream, _consumer));
string pullSubject = _subject.Replace("*", _pullSubjectIdHolder.Increment().ToString());
_mm.StartPullRequest(pullSubject, pullRequestOptions, raiseStatusWarnings, trackPendingListener);
Expand Down
10 changes: 5 additions & 5 deletions src/NATS.Client/JetStream/JetStreamPullAsyncSubscription.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,28 +36,28 @@ internal override void UpdateConsumer(string consumer)

public void Pull(int batchSize)
{
pullImpl.Pull(true, null, PullRequestOptions.Builder(batchSize).Build());
pullImpl.Pull(PullRequestOptions.Builder(batchSize).Build(), true, null);
}

public void Pull(PullRequestOptions pullRequestOptions) {
pullImpl.Pull(true, null, pullRequestOptions);
pullImpl.Pull(pullRequestOptions, true, null);
}

public void PullExpiresIn(int batchSize, int expiresInMillis)
{
Validator.ValidateDurationGtZeroRequired(expiresInMillis, "Expires In");
pullImpl.Pull(true, null, PullRequestOptions.Builder(batchSize).WithExpiresIn(expiresInMillis).Build());
pullImpl.Pull(PullRequestOptions.Builder(batchSize).WithExpiresIn(expiresInMillis).Build(), true, null);
}

public void PullNoWait(int batchSize)
{
pullImpl.Pull(true, null, PullRequestOptions.Builder(batchSize).WithNoWait().Build());
pullImpl.Pull(PullRequestOptions.Builder(batchSize).WithNoWait().Build(), true, null);
}

public void PullNoWait(int batchSize, int expiresInMillis)
{
Validator.ValidateDurationGtZeroRequired(expiresInMillis, "NoWait Expires In");
pullImpl.Pull(true, null, PullRequestOptions.Builder(batchSize).WithNoWait().WithExpiresIn(expiresInMillis).Build());
pullImpl.Pull(PullRequestOptions.Builder(batchSize).WithNoWait().WithExpiresIn(expiresInMillis).Build(), true, null);
}
}
}
12 changes: 6 additions & 6 deletions src/NATS.Client/JetStream/JetStreamPullSubscription.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,28 +39,28 @@ internal override void UpdateConsumer(string consumer)

public void Pull(int batchSize)
{
pullImpl.Pull(true, null, PullRequestOptions.Builder(batchSize).Build());
pullImpl.Pull(PullRequestOptions.Builder(batchSize).Build(), true, null);
}

public void Pull(PullRequestOptions pullRequestOptions) {
pullImpl.Pull(true, null, pullRequestOptions);
pullImpl.Pull(pullRequestOptions, true, null);
}

public void PullExpiresIn(int batchSize, int expiresInMillis)
{
Validator.ValidateDurationGtZeroRequired(expiresInMillis, "Expires In");
pullImpl.Pull(true, null, PullRequestOptions.Builder(batchSize).WithExpiresIn(expiresInMillis).Build());
pullImpl.Pull(PullRequestOptions.Builder(batchSize).WithExpiresIn(expiresInMillis).Build(), true, null);
}

public void PullNoWait(int batchSize)
{
pullImpl.Pull(true, null, PullRequestOptions.Builder(batchSize).WithNoWait().Build());
pullImpl.Pull(PullRequestOptions.Builder(batchSize).WithNoWait().Build(), true, null);
}

public void PullNoWait(int batchSize, int expiresInMillis)
{
Validator.ValidateDurationGtZeroRequired(expiresInMillis, "NoWait Expires In");
pullImpl.Pull(true, null, PullRequestOptions.Builder(batchSize).WithNoWait().WithExpiresIn(expiresInMillis).Build());
pullImpl.Pull(PullRequestOptions.Builder(batchSize).WithNoWait().WithExpiresIn(expiresInMillis).Build(), true, null);
}

internal const int ExpireAdjustment = 10;
Expand All @@ -77,7 +77,7 @@ public IList<Msg> Fetch(int batchSize, int maxWaitMillis)

Duration expires = Duration.OfMillis(
maxWaitMillis > ExpireAdjustment ? maxWaitMillis - ExpireAdjustment : maxWaitMillis);
string pullSubject = pullImpl.Pull(false, null, PullRequestOptions.Builder(batchLeft).WithExpiresIn(expires).Build());
string pullSubject = pullImpl.Pull(PullRequestOptions.Builder(batchLeft).WithExpiresIn(expires).Build(), false, null);

try
{
Expand Down
Loading

0 comments on commit 0fd815a

Please sign in to comment.