Skip to content

Commit

Permalink
issue 67, subscription improvements, unit tests (#512)
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf authored Nov 13, 2021
1 parent eb74989 commit d112848
Show file tree
Hide file tree
Showing 21 changed files with 462 additions and 254 deletions.
23 changes: 23 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -934,6 +934,29 @@ messages, one for each message the previous batch was short. You can just ignore
See `JetStreamPullSubExpire` (TODO) and `JetStreamPullSubExpireUseCases` (TODO)
in the JetStream samples for detailed and runnable samples.

### Subscription Creation

Subscription creation has many checks to make sure that a valid, operable subscription can be made.

| Name | Group | Code | Description |
| --- | --- | --- | --- |
| JsSubPullCantHaveDeliverGroup | SUB | 90001 | Pull subscriptions can't have a deliver group. |
| JsSubPullCantHaveDeliverSubject | SUB | 90002 | Pull subscriptions can't have a deliver subject. |
| JsSubPushCantHaveMaxPullWaiting | SUB | 90003 | Push subscriptions cannot supply max pull waiting. |
| JsSubQueueDeliverGroupMismatch | SUB | 90004 | Queue / deliver group mismatch. |
| JsSubFcHbNotValidPull | SUB | 90005 | Flow Control and/or heartbeat is not valid with a pull subscription. |
| JsSubFcHbHbNotValidQueue | SUB | 90006 | Flow Control and/or heartbeat is not valid in queue mode. |
| JsSubNoMatchingStreamForSubject | SUB | 90007 | No matching streams for subject. |
| JsSubConsumerAlreadyConfiguredAsPush | SUB | 90008 | Consumer is already configured as a push consumer. |
| JsSubConsumerAlreadyConfiguredAsPull | SUB | 90009 | Consumer is already configured as a pull consumer. |
| JsSubSubjectDoesNotMatchFilter | SUB | 90011 | Subject does not match consumer configuration filter. |
| JsSubConsumerAlreadyBound | SUB | 90012 | Consumer is already bound to a subscription. |
| JsSubExistingConsumerNotQueue | SUB | 90013 | Existing consumer is not configured as a queue / deliver group. |
| JsSubExistingConsumerIsQueue | SUB | 90014 | Existing consumer is configured as a queue / deliver group. |
| JsSubExistingQueueDoesNotMatchRequestedQueue | SUB | 90015 | Existing consumer deliver group does not match requested queue / deliver group. |
| JsSubExistingConsumerCannotBeModified | SUB | 90016 | Existing consumer cannot be modified. |
| JsSubConsumerNotFoundRequiredInBind | SUB | 90017 | Consumer not found, required in bind mode. |

### Message Acknowledgements

There are multiple types of acknowledgements in JetStream:
Expand Down
14 changes: 1 addition & 13 deletions src/NATS.Client/Conn.cs
Original file line number Diff line number Diff line change
Expand Up @@ -742,11 +742,7 @@ internal Connection(Options options)
callbackScheduler.Start();

globalRequestInbox = NewInbox();

BeforeQueueProcessor = msg => msg;
}

internal Func<Msg, Msg> BeforeQueueProcessor;

private void buildPublishProtocolBuffers(int size)
{
Expand Down Expand Up @@ -2195,15 +2191,7 @@ internal void processMsg(byte[] msgBytes, long length)
? new JetStreamMsg(this, msgArgs, s, msgBytes, length)
: new Msg(msgArgs, s, msgBytes, length);

// BeforeQueueProcessor returns null if the message
// does not need to be queued, for instance heartbeats
// that are not flow control and are already seen by the
// auto status manager
msg = BeforeQueueProcessor.Invoke(msg);
if (msg != null)
{
s.addMessage(msg, opts.subChanLen);
}
s.addMessage(msg, opts.subChanLen);
} // maxreached == false

} // lock s.mu
Expand Down
1 change: 0 additions & 1 deletion src/NATS.Client/Exceptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,6 @@ public sealed class ClientExDetail
public static readonly ClientExDetail JsSubNoMatchingStreamForSubject = new ClientExDetail(Sub, 90007, "No matching streams for subject.");
public static readonly ClientExDetail JsSubConsumerAlreadyConfiguredAsPush = new ClientExDetail(Sub, 90008, "Consumer is already configured as a push consumer.");
public static readonly ClientExDetail JsSubConsumerAlreadyConfiguredAsPull = new ClientExDetail(Sub, 90009, "Consumer is already configured as a pull consumer.");
public static readonly ClientExDetail JsSubExistingDeliverSubjectMismatch = new ClientExDetail(Sub, 90010, "Existing consumer deliver subject does not match requested deliver subject.");
public static readonly ClientExDetail JsSubSubjectDoesNotMatchFilter = new ClientExDetail(Sub, 90011, "Subject does not match consumer configuration filter.");
public static readonly ClientExDetail JsSubConsumerAlreadyBound = new ClientExDetail(Sub, 90012, "Consumer is already bound to a subscription.");
public static readonly ClientExDetail JsSubExistingConsumerNotQueue = new ClientExDetail(Sub, 90013, "Existing consumer is not configured as a queue / deliver group.");
Expand Down
4 changes: 2 additions & 2 deletions src/NATS.Client/JetStream/AutoStatusManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public void SetSub(Subscription sub)
{
_sub = sub;
if (Hb) {
conn.BeforeQueueProcessor = BeforeQueueProcessor;
_sub.BeforeChannelAddCheck = BeforeChannelAddCheck;
asmTimer = new AsmTimer(this);
}
}
Expand Down Expand Up @@ -192,7 +192,7 @@ public bool Manage(Msg msg) {
return false;
}

private Msg BeforeQueueProcessor(Msg msg)
private Msg BeforeChannelAddCheck(Msg msg)
{
LastMsgReceived = DateTimeOffset.Now.ToUnixTimeMilliseconds();
if (msg.HasStatus
Expand Down
3 changes: 1 addition & 2 deletions src/NATS.Client/JetStream/ConsumerConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -153,17 +153,16 @@ internal bool WouldBeChangeTo(ConsumerConfiguration original)

|| WouldBeChange(AckWait, original.AckWait)
|| WouldBeChange(IdleHeartbeat, original.IdleHeartbeat)

|| WouldBeChange(StartTime, original.StartTime)

|| WouldBeChange(FilterSubject, original.FilterSubject)
|| WouldBeChange(Description, original.Description)
|| WouldBeChange(SampleFrequency, original.SampleFrequency)
|| WouldBeChange(DeliverSubject, original.DeliverSubject)
|| WouldBeChange(DeliverGroup, original.DeliverGroup)
;

// do not need to check Durable because the original is retrieved by the durable name
// do not need to check FilterSubject because it's already validated against the original
}

private static bool WouldBeChange(string request, string original)
Expand Down
10 changes: 5 additions & 5 deletions src/NATS.Client/JetStream/IJetStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ public interface IJetStream
/// <param name="subject">The subject on which to listen for messages.
/// The subject can have wildcards (partial: <c>*</c>, full: <c>&gt;</c>).</param>
/// <param name="options">Pull Subscribe options for this subscription.</param>
/// <returns>a JetStreamPullSubscription</returns>
/// <returns>An IJetStreamPullSubscription</returns>
IJetStreamPullSubscription PullSubscribe(string subject, PullSubscribeOptions options);

/// <summary>
Expand All @@ -168,7 +168,7 @@ public interface IJetStream
/// <param name="handler">The <see cref="EventHandler{MsgHandlerEventArgs}"/> invoked when messages are received
/// on the returned <see cref="IAsyncSubscription"/>.</param>
/// <param name="autoAck">Whether or not to auto ack the message</param>
/// <returns>An <see cref="IAsyncSubscription"/> to use to read any messages received
/// <returns>An <see cref="IJetStreamPushAsyncSubscription"/> to use to read any messages received
/// from the NATS Server on the given <paramref name="subject"/>.</returns>
/// <seealso cref="ISubscription.Subject"/>
/// <returns>A JetStream push subscription</returns>
Expand Down Expand Up @@ -215,7 +215,7 @@ public interface IJetStream
/// from the NATS Server on the given <paramref name="subject"/>.</returns>
/// <seealso cref="ISubscription.Subject"/>
/// <seealso cref="ISubscription.Queue"/>
/// <returns>A JetStream push subscription</returns>
/// <returns>An IJetStreamPushAsyncSubscription</returns>
IJetStreamPushAsyncSubscription PushSubscribeAsync(string subject, string queue, EventHandler<MsgHandlerEventArgs> handler, bool autoAck);

/// <summary>
Expand All @@ -238,8 +238,8 @@ public interface IJetStream
/// <seealso cref="ISubscription.Subject"/>
/// <seealso cref="ISubscription.Queue"/>
/// <param name="autoAck">Whether or not to auto ack the message</param>
/// <param name="options">JetStream pull subscription options.</param>
/// <returns>A JetStream push subscription</returns>
/// <param name="options">JetStream push subscription options.</param>
/// <returns>An IJetStreamPushAsyncSubscription</returns>
IJetStreamPushAsyncSubscription PushSubscribeAsync(string subject, string queue, EventHandler<MsgHandlerEventArgs> handler, bool autoAck, PushSubscribeOptions options);

/// <summary>
Expand Down
Loading

0 comments on commit d112848

Please sign in to comment.