Skip to content

Commit

Permalink
Parity changes (#498)
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf authored Oct 26, 2021
1 parent 26ce1eb commit 8622859
Show file tree
Hide file tree
Showing 111 changed files with 6,041 additions and 1,884 deletions.
2 changes: 1 addition & 1 deletion documentation/DoxyFile.NATS.Client
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ PROJECT_NAME = "NATS .NET Client"
# could be handy for archiving the generated documentation or if some version
# control system is used.

PROJECT_NUMBER = 0.12.0
PROJECT_NUMBER = 0.14.0

# Using the PROJECT_BRIEF tag one can provide an optional one line description
# for a project that appears at the top of each page and should give viewer a
Expand Down
Binary file added documentation/favicon.ico
Binary file not shown.
Binary file added documentation/large-logo.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
34 changes: 34 additions & 0 deletions src/NATS.Client/AckType.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright 2021 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using System.Text;

namespace NATS.Client
{
public sealed class AckType
{
public static AckType AckAck = new AckType("+ACK");
public static AckType AckNak = new AckType("-NAK");
public static AckType AckProgress = new AckType("+WPI");
public static AckType AckTerm = new AckType("+TERM");

public string Text { get; }
public byte[] Bytes { get; }

public AckType(string text)
{
Text = text;
Bytes = Encoding.ASCII.GetBytes(text);
}
}
}
111 changes: 63 additions & 48 deletions src/NATS.Client/Conn.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
using System.Threading.Tasks;
using NATS.Client.Internals;
using NATS.Client.JetStream;
using static NATS.Client.Defaults;
using Timeout = System.Threading.Timeout;

namespace NATS.Client
{
Expand Down Expand Up @@ -105,15 +107,12 @@ public class Connection : IConnection, IDisposable

private readonly Nuid _nuid = new Nuid();

Options opts = new Options();
private readonly Options opts; // assigned in constructor

/// <summary>
/// Gets the configuration options for this instance.
/// </summary>
public Options Opts
{
get { return opts; }
}
public Options Opts => opts;

private readonly List<Thread> wg = new List<Thread>(2);

Expand Down Expand Up @@ -394,8 +393,8 @@ internal void open(Srv s, int timeoutMillis)

client.NoDelay = false;

client.ReceiveBufferSize = Defaults.defaultBufSize*2;
client.SendBufferSize = Defaults.defaultBufSize;
client.ReceiveBufferSize = defaultBufSize*2;
client.SendBufferSize = defaultBufSize;

stream = client.GetStream();

Expand Down Expand Up @@ -721,10 +720,6 @@ internal Channel<Msg> getMessageChannel()
internal Connection(Options options)
{
opts = new Options(options);
if (opts.ReconnectDelayHandler == null)
{
opts.ReconnectDelayHandler = DefaultReconnectDelayHandler;
}

PING_P_BYTES = Encoding.UTF8.GetBytes(IC.pingProto);
PING_P_BYTES_LEN = PING_P_BYTES.Length;
Expand All @@ -747,8 +742,12 @@ internal Connection(Options options)
callbackScheduler.Start();

globalRequestInbox = NewInbox();

BeforeQueueProcessor = msg => msg;
}

internal Func<Msg, Msg> BeforeQueueProcessor;

private void buildPublishProtocolBuffers(int size)
{
pubProtoBuf = new byte[size];
Expand Down Expand Up @@ -833,7 +832,7 @@ private bool createConn(Srv s, out Exception ex)
catch (Exception) { }
}

bw = conn.getWriteBufferedStream(Defaults.defaultBufSize);
bw = conn.getWriteBufferedStream(defaultBufSize);
br = conn.getReadBufferedStream();
}
catch (Exception e)
Expand All @@ -850,7 +849,7 @@ private void makeTLSConn()
{
conn.makeTLS(this.opts);

bw = conn.getWriteBufferedStream(Defaults.defaultBufSize);
bw = conn.getWriteBufferedStream(defaultBufSize);
br = conn.getReadBufferedStream();
}

Expand Down Expand Up @@ -1016,6 +1015,24 @@ public IPAddress ClientIP
}
}

/// <summary>
/// Gets the ID of client as known by the NATS server, otherwise <c>null</c>.
/// </summary>
/// <remarks>
/// May not be supported in all versions of the server. If the client is connected to
/// an older server or is in the process of connecting, 0 will be returned.
/// </remarks>
public int ClientID
{
get
{
lock (mu)
{
return info.ClientId;
}
}
}

/// <summary>
/// Gets the server ID of the NATS server to which this instance
/// is connected, otherwise <c>null</c>.
Expand Down Expand Up @@ -1139,9 +1156,7 @@ internal bool connect(Srv s, out Exception exToThrow)
if (!ex.IsAuthorizationViolationError() && !ex.IsAuthenticationExpiredError())
throw;

var aseh = opts.AsyncErrorEventHandler;
if (aseh != null)
callbackScheduler.Add(() => aseh(s, new ErrEventArgs(this, null, ex.Message)));
callbackScheduler.Add(() => opts.AsyncErrorEventHandlerOrDefault(s, new ErrEventArgs(this, null, ex.Message)));

if (natsAuthEx == null || !natsAuthEx.Message.Equals(ex.Message, StringComparison.OrdinalIgnoreCase))
{
Expand Down Expand Up @@ -1391,11 +1406,11 @@ private void sendConnect()
{
// TODO: Make this reader (or future equivalent) unbounded.
// we need the underlying stream, so leave it open.
sr = new StreamReader(br, Encoding.UTF8, false, Defaults.MaxControlLineSize, true);
sr = new StreamReader(br, Encoding.UTF8, false, MaxControlLineSize, true);
result = sr.ReadLine();

// If opts.verbose is set, handle +OK.
if (opts.Verbose == true && IC.okProtoNoCRLF.Equals(result))
if (opts.Verbose && IC.okProtoNoCRLF.Equals(result))
{
result = sr.ReadLine();
}
Expand Down Expand Up @@ -1441,7 +1456,7 @@ private Control readOp()
// the string directly using the buffered reader.
//
// Keep the underlying stream open.
using (StreamReader sr = new StreamReader(br, Encoding.ASCII, false, Defaults.MaxControlLineSize, true))
using (StreamReader sr = new StreamReader(br, Encoding.ASCII, false, MaxControlLineSize, true))
{
return new Control(sr.ReadLine());
}
Expand Down Expand Up @@ -1574,7 +1589,7 @@ private void doReconnect()
var errorForHandler = lastEx;
lastEx = null;

scheduleConnEvent(Opts.DisconnectedEventHandler, errorForHandler);
scheduleConnEvent(Opts.DisconnectedEventHandlerOrDefault, errorForHandler);

Srv cur;
int wlf = 0;
Expand Down Expand Up @@ -1667,7 +1682,7 @@ private void doReconnect()
srvPool.CurrentServer = cur;
status = ConnState.CONNECTED;

scheduleConnEvent(Opts.ReconnectedEventHandler);
scheduleConnEvent(Opts.ReconnectedEventHandlerOrDefault);

// Release lock here, we will return below
if (lockWasTaken)
Expand Down Expand Up @@ -1743,15 +1758,15 @@ private void processOpError(Exception e)
private void readLoop()
{
// Stack based buffer.
byte[] buffer = new byte[Defaults.defaultReadLength];
byte[] buffer = new byte[defaultReadLength];
var parser = new Parser(this);
int len;

while (true)
{
try
{
len = br.Read(buffer, 0, Defaults.defaultReadLength);
len = br.Read(buffer, 0, defaultReadLength);

// A length of zero can mean that the socket was closed
// locally by the application (Close) or the server
Expand Down Expand Up @@ -1827,7 +1842,7 @@ internal void deliverMsgs(Channel<Msg> ch)

// Roll our own fast conversion - we know it's the right
// encoding.
char[] convertToStrBuf = new char[Defaults.MaxControlLineSize];
char[] convertToStrBuf = new char[MaxControlLineSize];

// Since we know we don't need to decode the protocol string,
// just copy the chars into bytes. This increased
Expand Down Expand Up @@ -2179,8 +2194,16 @@ internal void processMsg(byte[] msgBytes, long length)
Msg msg = msgArgs.reply != null && msgArgs.reply.StartsWith(JetStreamConstants.JsAckSubjectPrefix)
? new JetStreamMsg(this, msgArgs, s, msgBytes, length)
: new Msg(msgArgs, s, msgBytes, length);

s.addMessage(msg, opts.subChanLen);

// BeforeQueueProcessor returns null if the message
// does not need to be queued, for instance heartbeats
// that are not flow control and are already seen by the
// auto status manager
msg = BeforeQueueProcessor.Invoke(msg);
if (msg != null)
{
s.addMessage(msg, opts.subChanLen);
}
} // maxreached == false

} // lock s.mu
Expand All @@ -2198,13 +2221,9 @@ internal void processSlowConsumer(Subscription s)
lastEx = new NATSSlowConsumerException();
if (!s.sc)
{
EventHandler<ErrEventArgs> aseh = opts.AsyncErrorEventHandler;
if (aseh != null)
{
callbackScheduler.Add(
() => { aseh(this, new ErrEventArgs(this, s, "Slow Consumer")); }
);
}
callbackScheduler.Add(
() => { opts.AsyncErrorEventHandlerOrDefault(this, new ErrEventArgs(this, s, "Slow Consumer")); }
);
}
s.sc = true;
}
Expand Down Expand Up @@ -2354,13 +2373,13 @@ internal void processInfo(string json, bool notify)
var serverAdded = srvPool.Add(discoveredUrls, true);
if (notify && serverAdded)
{
scheduleConnEvent(opts.ServerDiscoveredEventHandler);
scheduleConnEvent(opts.ServerDiscoveredEventHandlerOrDefault);
}
}

if (notify && info.LameDuckMode && opts.LameDuckModeEventHandler != null)
if (notify && info.LameDuckMode)
{
scheduleConnEvent(opts.LameDuckModeEventHandler);
scheduleConnEvent(opts.LameDuckModeEventHandlerOrDefault);
}
}

Expand Down Expand Up @@ -4159,7 +4178,7 @@ private void close(ConnState closeState, bool invokeDelegates, Exception error =
// disconnect;
if (invokeDelegates && conn.isSetup())
{
scheduleConnEvent(Opts.DisconnectedEventHandler, error);
scheduleConnEvent(Opts.DisconnectedEventHandlerOrDefault, error);
}

// Go ahead and make sure we have flushed the outbound buffer.
Expand Down Expand Up @@ -4209,7 +4228,7 @@ private void close(ConnState closeState, bool invokeDelegates, Exception error =

if (invokeDelegates)
{
scheduleConnEvent(opts.ClosedEventHandler, error);
scheduleConnEvent(opts.ClosedEventHandlerOrDefault, error);
}

status = closeState;
Expand Down Expand Up @@ -4306,13 +4325,9 @@ private void checkDrained(Subscription s, int timeout)
// async error handler if registered.
internal void pushDrainException(Subscription s, Exception ex)
{
EventHandler<ErrEventArgs> aseh = opts.AsyncErrorEventHandler;
if (aseh != null)
{
callbackScheduler.Add(
() => { aseh(s, new ErrEventArgs(this, s, ex.Message)); }
);
}
callbackScheduler.Add(
() => { opts.AsyncErrorEventHandlerOrDefault(s, new ErrEventArgs(this, s, ex.Message)); }
);
}

private void drain(int timeout)
Expand Down Expand Up @@ -4392,7 +4407,7 @@ private void drain(int timeout)
/// <seealso cref="Close()"/>
public void Drain()
{
Drain(Defaults.DefaultDrainTimeout);
Drain(DefaultDrainTimeout);
}

/// <summary>
Expand Down Expand Up @@ -4431,7 +4446,7 @@ public void Drain(int timeout)
/// <returns>A task that represents the asynchronous drain operation.</returns>
public Task DrainAsync()
{
return DrainAsync(Defaults.DefaultDrainTimeout);
return DrainAsync(DefaultDrainTimeout);
}

/// <summary>
Expand Down Expand Up @@ -4679,7 +4694,7 @@ public IJetStream CreateJetStreamContext(JetStreamOptions options = null)

public IJetStreamManagement CreateJetStreamManagementContext(JetStreamOptions options = null)
{
return new JetStream.JetStreamManagement(this, options);
return new JetStreamManagement(this, options);
}

#endregion
Expand Down
Loading

0 comments on commit 8622859

Please sign in to comment.