From 0e849b93e60c0edbb0e737040799b993e77e4272 Mon Sep 17 00:00:00 2001 From: Yuwei Ba Date: Fri, 8 Mar 2019 19:57:59 +1100 Subject: [PATCH 1/2] support binary data --- src/Websocket.Client/IWebsocketClient.cs | 5 +- src/Websocket.Client/MessageType.cs | 12 +++++ src/Websocket.Client/WebsocketClient.cs | 60 +++++++++++++++++------- 3 files changed, 56 insertions(+), 21 deletions(-) create mode 100644 src/Websocket.Client/MessageType.cs diff --git a/src/Websocket.Client/IWebsocketClient.cs b/src/Websocket.Client/IWebsocketClient.cs index 540579b..df32f6d 100644 --- a/src/Websocket.Client/IWebsocketClient.cs +++ b/src/Websocket.Client/IWebsocketClient.cs @@ -11,9 +11,8 @@ public interface IWebsocketClient : IDisposable /// /// Stream with received message (raw format) /// - IObservable MessageReceived { get; } - - /// + IObservable MessageReceived { get; } + /// /// Stream for reconnection event (trigerred after the new connection) /// IObservable ReconnectionHappened { get; } diff --git a/src/Websocket.Client/MessageType.cs b/src/Websocket.Client/MessageType.cs new file mode 100644 index 0000000..78951f6 --- /dev/null +++ b/src/Websocket.Client/MessageType.cs @@ -0,0 +1,12 @@ +using System.Net.WebSockets; + +namespace Websocket.Client +{ + public class MessageType + { + public byte[] RawData; + public string Data; + + public WebSocketMessageType WebSocketMessageType { get; internal set; } + } +} \ No newline at end of file diff --git a/src/Websocket.Client/WebsocketClient.cs b/src/Websocket.Client/WebsocketClient.cs index 3854e2a..8012aee 100644 --- a/src/Websocket.Client/WebsocketClient.cs +++ b/src/Websocket.Client/WebsocketClient.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Concurrent; +using System.IO; using System.Net.WebSockets; using System.Reactive.Linq; using System.Reactive.Subjects; @@ -28,7 +29,7 @@ public class WebsocketClient : IWebsocketClient private CancellationTokenSource _cancellation; private CancellationTokenSource _cancellationTotal; - private readonly Subject _messageReceivedSubject = new Subject(); + private readonly Subject _messageReceivedSubject = new Subject(); private readonly Subject _reconnectionSubject = new Subject(); private readonly Subject _disconnectedSubject = new Subject(); @@ -49,7 +50,7 @@ public WebsocketClient(Uri url, Func clientFactory = null) /// /// Stream with received message (raw format) /// - public IObservable MessageReceived => _messageReceivedSubject.AsObservable(); + public IObservable MessageReceived => _messageReceivedSubject.AsObservable(); /// /// Stream for reconnection event (triggered after the new connection) @@ -164,6 +165,11 @@ public Task SendInstant(string message) return SendInternal(message); } + public Task SendInstant(byte[] message) + { + return SendInternal(message); + } + /// /// Force reconnection. /// Closes current websocket stream and perform a new connection to the server. @@ -227,6 +233,14 @@ private async Task SendInternal(string message) await client.SendAsync(messageSegment, WebSocketMessageType.Text, true, _cancellation.Token).ConfigureAwait(false); } + private async Task SendInternal(byte[] message) + { + var client = await GetClient().ConfigureAwait(false); + await client + .SendAsync(new ArraySegment(message), WebSocketMessageType.Binary, true, _cancellation.Token) + .ConfigureAwait(false); + } + private async Task StartClient(Uri uri, CancellationToken token, ReconnectionType type) { DeactivateLastChance(); @@ -283,24 +297,34 @@ private async Task Listen(ClientWebSocket client, CancellationToken token) { do { + ArraySegment buffer = new ArraySegment(new byte[8192]); + WebSocketReceiveResult result = null; - var buffer = new byte[1000]; - var message = new ArraySegment(buffer); - var resultMessage = new StringBuilder(); - do + + using (var ms = new MemoryStream()) { - result = await client.ReceiveAsync(message, token).ConfigureAwait(false); - var receivedMessage = Encoding.UTF8.GetString(buffer, 0, result.Count); - resultMessage.Append(receivedMessage); - if (result.MessageType != WebSocketMessageType.Text) - break; - - } while (!result.EndOfMessage); - - var received = resultMessage.ToString(); - Logger.Trace(L($"Received: {received}")); - _lastReceivedMsg = DateTime.UtcNow; - _messageReceivedSubject.OnNext(received); + do + { + result = await _client.ReceiveAsync(buffer, CancellationToken.None); + ms.Write(buffer.Array, buffer.Offset, result.Count); + } while (!result.EndOfMessage); + + ms.Seek(0, SeekOrigin.Begin); + + var message = new MessageType(); + message.WebSocketMessageType = result.MessageType; + if (result.MessageType == WebSocketMessageType.Text) + { + message.Data = Encoding.UTF8.GetString(ms.ToArray()); + } + else + { + message.RawData = ms.ToArray(); + } + Logger.Trace(L($"Received: {message}")); + _lastReceivedMsg = DateTime.UtcNow; + _messageReceivedSubject.OnNext(message); + } } while (client.State == WebSocketState.Open && !token.IsCancellationRequested); } From 4eb3a28208e78094f35f7aabc615c842bf080248 Mon Sep 17 00:00:00 2001 From: Yuwei Ba Date: Sat, 9 Mar 2019 00:33:04 +1100 Subject: [PATCH 2/2] fix unit test --- .../WebsocketClientTests.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test_integration/Websocket.Client.Tests.Integration/WebsocketClientTests.cs b/test_integration/Websocket.Client.Tests.Integration/WebsocketClientTests.cs index 036e56f..1716215 100644 --- a/test_integration/Websocket.Client.Tests.Integration/WebsocketClientTests.cs +++ b/test_integration/Websocket.Client.Tests.Integration/WebsocketClientTests.cs @@ -19,7 +19,7 @@ public async Task OnStarting_ShouldGetInfoResponse() client.MessageReceived.Subscribe(msg => { - received = msg; + received = msg.Data; receivedEvent.Set(); }); @@ -48,11 +48,11 @@ public async Task SendMessageBeforeStart_ShouldWorkAfterStart() client .MessageReceived - .Where(x => x.ToLower().Contains("pong")) + .Where(x => x.Data.ToLower().Contains("pong")) .Subscribe(msg => { receivedCount++; - received = msg; + received = msg.Data; if(receivedCount >= 7) receivedEvent.Set();