Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/ratelimit refactor #197

Merged
merged 18 commits into from
Apr 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
216 changes: 132 additions & 84 deletions CryptoExchange.Net.UnitTests/RestClientTests.cs

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -139,12 +139,12 @@ public TestRestApi1Client(TestClientOptions options) : base(new TraceLogger(), n

public async Task<CallResult<T>> Request<T>(CancellationToken ct = default) where T : class
{
return await SendRequestAsync<T>(new Uri("http://www.test.com"), HttpMethod.Get, ct);
return await SendRequestAsync<T>(new Uri("http://www.test.com"), HttpMethod.Get, ct, requestWeight: 0);
}

public async Task<CallResult<T>> RequestWithParams<T>(HttpMethod method, Dictionary<string, object> parameters, Dictionary<string, string> headers) where T : class
{
return await SendRequestAsync<T>(new Uri("http://www.test.com"), method, default, parameters, additionalHeaders: headers);
return await SendRequestAsync<T>(new Uri("http://www.test.com"), method, default, parameters, requestWeight: 0, additionalHeaders: headers);
}

public void SetParameterPosition(HttpMethod method, HttpMethodParameterPosition position)
Expand Down Expand Up @@ -180,7 +180,7 @@ public TestRestApi2Client(TestClientOptions options) : base(new TraceLogger(), n

public async Task<CallResult<T>> Request<T>(CancellationToken ct = default) where T : class
{
return await SendRequestAsync<T>(new Uri("http://www.test.com"), HttpMethod.Get, ct);
return await SendRequestAsync<T>(new Uri("http://www.test.com"), HttpMethod.Get, ct, requestWeight: 0);
}

protected override Error ParseErrorResponse(int httpStatusCode, IEnumerable<KeyValuePair<string, IEnumerable<string>>> responseHeaders, IMessageAccessor accessor)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public class TestSocket: IWebsocket
#pragma warning disable 0067
public event Func<Task> OnReconnected;
public event Func<Task> OnReconnecting;
public event Func<int, Task> OnRequestRateLimited;
#pragma warning restore 0067
public event Func<int, Task> OnRequestSent;
public event Action<WebSocketMessageType, ReadOnlyMemory<byte>> OnStreamMessage;
Expand Down Expand Up @@ -62,13 +63,13 @@ public TestSocket()
}
}

public Task<bool> ConnectAsync()
public Task<CallResult> ConnectAsync()
{
Connected = CanConnect;
ConnectCalls++;
if (CanConnect)
InvokeOpen();
return Task.FromResult(CanConnect);
return Task.FromResult(CanConnect ? new CallResult(null) : new CallResult(new CantConnectError()));
}

public void Send(int requestId, string data, int weight)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ internal IWebsocket CreateSocketInternal(string address)
protected override AuthenticationProvider CreateAuthenticationProvider(ApiCredentials credentials)
=> new TestAuthProvider(credentials);

public CallResult<bool> ConnectSocketSub(SocketConnection sub)
public CallResult ConnectSocketSub(SocketConnection sub)
{
return ConnectSocketAsync(sub).Result;
}
Expand Down
7 changes: 0 additions & 7 deletions CryptoExchange.Net/Clients/BaseApiClient.cs
Original file line number Diff line number Diff line change
@@ -1,13 +1,6 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Net.Http;
using System.Text;
using System.Threading.Tasks;
using CryptoExchange.Net.Authentication;
using CryptoExchange.Net.Converters;
using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Objects.Options;
using Microsoft.Extensions.Logging;

Expand Down
351 changes: 315 additions & 36 deletions CryptoExchange.Net/Clients/RestApiClient.cs

Large diffs are not rendered by default.

37 changes: 18 additions & 19 deletions CryptoExchange.Net/Clients/SocketApiClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Objects.Options;
using CryptoExchange.Net.Objects.Sockets;
using CryptoExchange.Net.RateLimiting.Interfaces;
using CryptoExchange.Net.Sockets;
using Microsoft.Extensions.Logging;
using System;
Expand Down Expand Up @@ -59,15 +60,15 @@ public abstract class SocketApiClient : BaseApiClient, ISocketApiClient
/// <summary>
/// The rate limiters
/// </summary>
protected internal IEnumerable<IRateLimiter>? RateLimiters { get; set; }
protected internal IRateLimitGate? RateLimiter { get; set; }

/// <summary>
/// The max size a websocket message size can be
/// </summary>
protected internal int? MessageSendSizeLimit { get; set; }

/// <summary>
/// Periodic task regisrations
/// Periodic task registrations
/// </summary>
protected List<PeriodicTaskRegistration> PeriodicTaskRegistrations { get; set; } = new List<PeriodicTaskRegistration>();

Expand Down Expand Up @@ -121,10 +122,6 @@ public SocketApiClient(ILogger logger, string baseAddress, SocketExchangeOptions
options,
apiOptions)
{
var rateLimiters = new List<IRateLimiter>();
foreach (var rateLimiter in apiOptions.RateLimiters)
rateLimiters.Add(rateLimiter);
RateLimiters = rateLimiters;
}

/// <summary>
Expand Down Expand Up @@ -344,20 +341,20 @@ protected virtual async Task<CallResult<T>> QueryAsync<T>(string url, Query<T> q
/// <param name="socket">The connection to check</param>
/// <param name="authenticated">Whether the socket should authenticated</param>
/// <returns></returns>
protected virtual async Task<CallResult<bool>> ConnectIfNeededAsync(SocketConnection socket, bool authenticated)
protected virtual async Task<CallResult> ConnectIfNeededAsync(SocketConnection socket, bool authenticated)
{
if (socket.Connected)
return new CallResult<bool>(true);
return new CallResult(null);

var connectResult = await ConnectSocketAsync(socket).ConfigureAwait(false);
if (!connectResult)
return new CallResult<bool>(connectResult.Error!);
return connectResult;

if (ClientOptions.DelayAfterConnect != TimeSpan.Zero)
await Task.Delay(ClientOptions.DelayAfterConnect).ConfigureAwait(false);

if (!authenticated || socket.Authenticated)
return new CallResult<bool>(true);
return new CallResult(null);

return await AuthenticateSocketAsync(socket).ConfigureAwait(false);
}
Expand All @@ -367,10 +364,10 @@ protected virtual async Task<CallResult<bool>> ConnectIfNeededAsync(SocketConnec
/// </summary>
/// <param name="socket">Socket to authenticate</param>
/// <returns></returns>
public virtual async Task<CallResult<bool>> AuthenticateSocketAsync(SocketConnection socket)
public virtual async Task<CallResult> AuthenticateSocketAsync(SocketConnection socket)
{
if (AuthenticationProvider == null)
return new CallResult<bool>(new NoApiCredentialsError());
return new CallResult(new NoApiCredentialsError());

_logger.AttemptingToAuthenticate(socket.SocketId);
var authRequest = GetAuthenticationRequest();
Expand All @@ -385,13 +382,13 @@ public virtual async Task<CallResult<bool>> AuthenticateSocketAsync(SocketConnec
await socket.CloseAsync().ConfigureAwait(false);

result.Error!.Message = "Authentication failed: " + result.Error.Message;
return new CallResult<bool>(result.Error)!;
return new CallResult(result.Error)!;
}
}

_logger.Authenticated(socket.SocketId);
socket.Authenticated = true;
return new CallResult<bool>(true);
return new CallResult(null);
}

/// <summary>
Expand Down Expand Up @@ -499,16 +496,17 @@ protected virtual void HandleUnhandledMessage(IMessageAccessor message)
/// </summary>
/// <param name="socketConnection">The socket to connect</param>
/// <returns></returns>
protected virtual async Task<CallResult<bool>> ConnectSocketAsync(SocketConnection socketConnection)
protected virtual async Task<CallResult> ConnectSocketAsync(SocketConnection socketConnection)
{
if (await socketConnection.ConnectAsync().ConfigureAwait(false))
var connectResult = await socketConnection.ConnectAsync().ConfigureAwait(false);
if (connectResult)
{
socketConnections.TryAdd(socketConnection.SocketId, socketConnection);
return new CallResult<bool>(true);
return connectResult;
}

socketConnection.Dispose();
return new CallResult<bool>(new CantConnectError());
return connectResult;
}

/// <summary>
Expand All @@ -521,7 +519,8 @@ protected virtual WebSocketParameters GetWebSocketParameters(string address)
{
KeepAliveInterval = KeepAliveInterval,
ReconnectInterval = ClientOptions.ReconnectInterval,
RateLimiters = RateLimiters,
RateLimiter = ClientOptions.RatelimiterEnabled ? RateLimiter : null,
RateLimitingBehaviour = ClientOptions.RateLimitingBehaviour,
Proxy = ClientOptions.Proxy,
Timeout = ApiOptions.SocketNoDataTimeout ?? ClientOptions.SocketNoDataTimeout
};
Expand Down
2 changes: 0 additions & 2 deletions CryptoExchange.Net/Converters/ArrayPropertyAttribute.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace CryptoExchange.Net.Converters
{
Expand Down
6 changes: 0 additions & 6 deletions CryptoExchange.Net/ExtensionMethods.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO.Compression;
using System.IO;
using System.Linq;
Expand All @@ -9,12 +8,7 @@
using System.Text;
using System.Web;
using CryptoExchange.Net.Objects;
using Microsoft.Extensions.Logging;
using System.Globalization;
using System.Collections;
using System.Net.Http;
using System.Data.Common;
using Newtonsoft.Json.Linq;

namespace CryptoExchange.Net
{
Expand Down
2 changes: 0 additions & 2 deletions CryptoExchange.Net/Interfaces/ICryptoRestClient.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
using CryptoExchange.Net.Interfaces.CommonClients;
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Collections.Generic;
using System.Text;

namespace CryptoExchange.Net.Interfaces
{
Expand Down
6 changes: 1 addition & 5 deletions CryptoExchange.Net/Interfaces/ICryptoSocketClient.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
using CryptoExchange.Net.Interfaces.CommonClients;
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Collections.Generic;
using System.Text;
using System;

namespace CryptoExchange.Net.Interfaces
{
Expand Down
1 change: 0 additions & 1 deletion CryptoExchange.Net/Interfaces/IMessageProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
using CryptoExchange.Net.Sockets;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace CryptoExchange.Net.Interfaces
{
Expand Down
10 changes: 7 additions & 3 deletions CryptoExchange.Net/Interfaces/IWebsocket.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
using System;
using System.IO;
using CryptoExchange.Net.Objects;
using System;
using System.Net.WebSockets;
using System.Threading.Tasks;

Expand All @@ -23,6 +23,10 @@ public interface IWebsocket: IDisposable
/// </summary>
event Func<int, Task> OnRequestSent;
/// <summary>
/// Websocket query was ratelimited and couldn't be send
/// </summary>
event Func<int, Task>? OnRequestRateLimited;
/// <summary>
/// Websocket error event
/// </summary>
event Func<Exception, Task> OnError;
Expand Down Expand Up @@ -67,7 +71,7 @@ public interface IWebsocket: IDisposable
/// Connect the socket
/// </summary>
/// <returns></returns>
Task<bool> ConnectAsync();
Task<CallResult> ConnectAsync();
/// <summary>
/// Send data
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ internal static class CryptoExchangeWebSocketClientLoggingExtension
private static readonly Action<ILogger, int, Exception?> _closed;
private static readonly Action<ILogger, int, Exception?> _disposing;
private static readonly Action<ILogger, int, Exception?> _disposed;
private static readonly Action<ILogger, int, int, int, Exception?> _sendDelayedBecauseOfRateLimit;
private static readonly Action<ILogger, int, int, int, Exception?> _sentBytes;
private static readonly Action<ILogger, int, string, Exception?> _sendLoopStoppedWithException;
private static readonly Action<ILogger, int, Exception?> _sendLoopFinished;
Expand Down Expand Up @@ -74,7 +73,7 @@ static CryptoExchangeWebSocketClientLoggingExtension()
_addingBytesToSendBuffer = LoggerMessage.Define<int, int, int>(
LogLevel.Trace,
new EventId(1007, "AddingBytesToSendBuffer"),
"[Sckt {SocketId}] msg {RequestId} - Adding {NumBytes} bytes to send buffer");
"[Sckt {SocketId}] [Req {RequestId}] adding {NumBytes} bytes to send buffer");

_reconnectRequested = LoggerMessage.Define<int>(
LogLevel.Debug,
Expand Down Expand Up @@ -111,15 +110,10 @@ static CryptoExchangeWebSocketClientLoggingExtension()
new EventId(1014, "Disposed"),
"[Sckt {SocketId}] disposed");

_sendDelayedBecauseOfRateLimit = LoggerMessage.Define<int, int, int>(
LogLevel.Debug,
new EventId(1015, "SendDelayedBecauseOfRateLimit"),
"[Sckt {SocketId}] msg {RequestId} - send delayed {DelayMS}ms because of rate limit");

_sentBytes = LoggerMessage.Define<int, int, int>(
LogLevel.Trace,
new EventId(1016, "SentBytes"),
"[Sckt {SocketId}] msg {RequestId} - sent {NumBytes} bytes");
"[Sckt {SocketId}] [Req {RequestId}] sent {NumBytes} bytes");

_sendLoopStoppedWithException = LoggerMessage.Define<int, string>(
LogLevel.Warning,
Expand Down Expand Up @@ -267,12 +261,6 @@ public static void SocketDisposed(
_disposed(logger, socketId, null);
}

public static void SocketSendDelayedBecauseOfRateLimit(
this ILogger logger, int socketId, int requestId, int delay)
{
_sendDelayedBecauseOfRateLimit(logger, socketId, requestId, delay, null);
}

public static void SocketSentBytes(
this ILogger logger, int socketId, int requestId, int numBytes)
{
Expand Down
Loading
Loading