Skip to content

Commit

Permalink
Merge pull request #2 from Watfaq/master
Browse files Browse the repository at this point in the history
Support binary data
  • Loading branch information
Marfusios authored Mar 12, 2019
2 parents 8fcb270 + 4eb3a28 commit 7a5b01e
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 24 deletions.
5 changes: 2 additions & 3 deletions src/Websocket.Client/IWebsocketClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@ public interface IWebsocketClient : IDisposable
/// <summary>
/// Stream with received message (raw format)
/// </summary>
IObservable<string> MessageReceived { get; }

/// <summary>
IObservable<MessageType> MessageReceived { get; }
/// <summary>
/// Stream for reconnection event (trigerred after the new connection)
/// </summary>
IObservable<ReconnectionType> ReconnectionHappened { get; }
Expand Down
12 changes: 12 additions & 0 deletions src/Websocket.Client/MessageType.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
using System.Net.WebSockets;

namespace Websocket.Client
{
public class MessageType
{
public byte[] RawData;
public string Data;

public WebSocketMessageType WebSocketMessageType { get; internal set; }
}
}
60 changes: 42 additions & 18 deletions src/Websocket.Client/WebsocketClient.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Concurrent;
using System.IO;
using System.Net.WebSockets;
using System.Reactive.Linq;
using System.Reactive.Subjects;
Expand Down Expand Up @@ -28,7 +29,7 @@ public class WebsocketClient : IWebsocketClient
private CancellationTokenSource _cancellation;
private CancellationTokenSource _cancellationTotal;

private readonly Subject<string> _messageReceivedSubject = new Subject<string>();
private readonly Subject<MessageType> _messageReceivedSubject = new Subject<MessageType>();
private readonly Subject<ReconnectionType> _reconnectionSubject = new Subject<ReconnectionType>();
private readonly Subject<DisconnectionType> _disconnectedSubject = new Subject<DisconnectionType>();

Expand All @@ -49,7 +50,7 @@ public WebsocketClient(Uri url, Func<ClientWebSocket> clientFactory = null)
/// <summary>
/// Stream with received message (raw format)
/// </summary>
public IObservable<string> MessageReceived => _messageReceivedSubject.AsObservable();
public IObservable<MessageType> MessageReceived => _messageReceivedSubject.AsObservable();

/// <summary>
/// Stream for reconnection event (triggered after the new connection)
Expand Down Expand Up @@ -164,6 +165,11 @@ public Task SendInstant(string message)
return SendInternal(message);
}

public Task SendInstant(byte[] message)
{
return SendInternal(message);
}

/// <summary>
/// Force reconnection.
/// Closes current websocket stream and perform a new connection to the server.
Expand Down Expand Up @@ -227,6 +233,14 @@ private async Task SendInternal(string message)
await client.SendAsync(messageSegment, WebSocketMessageType.Text, true, _cancellation.Token).ConfigureAwait(false);
}

private async Task SendInternal(byte[] message)
{
var client = await GetClient().ConfigureAwait(false);
await client
.SendAsync(new ArraySegment<byte>(message), WebSocketMessageType.Binary, true, _cancellation.Token)
.ConfigureAwait(false);
}

private async Task StartClient(Uri uri, CancellationToken token, ReconnectionType type)
{
DeactivateLastChance();
Expand Down Expand Up @@ -283,24 +297,34 @@ private async Task Listen(ClientWebSocket client, CancellationToken token)
{
do
{
ArraySegment<byte> buffer = new ArraySegment<byte>(new byte[8192]);

WebSocketReceiveResult result = null;
var buffer = new byte[1000];
var message = new ArraySegment<byte>(buffer);
var resultMessage = new StringBuilder();
do

using (var ms = new MemoryStream())
{
result = await client.ReceiveAsync(message, token).ConfigureAwait(false);
var receivedMessage = Encoding.UTF8.GetString(buffer, 0, result.Count);
resultMessage.Append(receivedMessage);
if (result.MessageType != WebSocketMessageType.Text)
break;

} while (!result.EndOfMessage);

var received = resultMessage.ToString();
Logger.Trace(L($"Received: {received}"));
_lastReceivedMsg = DateTime.UtcNow;
_messageReceivedSubject.OnNext(received);
do
{
result = await _client.ReceiveAsync(buffer, CancellationToken.None);
ms.Write(buffer.Array, buffer.Offset, result.Count);
} while (!result.EndOfMessage);

ms.Seek(0, SeekOrigin.Begin);

var message = new MessageType();
message.WebSocketMessageType = result.MessageType;
if (result.MessageType == WebSocketMessageType.Text)
{
message.Data = Encoding.UTF8.GetString(ms.ToArray());
}
else
{
message.RawData = ms.ToArray();
}
Logger.Trace(L($"Received: {message}"));
_lastReceivedMsg = DateTime.UtcNow;
_messageReceivedSubject.OnNext(message);
}

} while (client.State == WebSocketState.Open && !token.IsCancellationRequested);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public async Task OnStarting_ShouldGetInfoResponse()

client.MessageReceived.Subscribe(msg =>
{
received = msg;
received = msg.Data;
receivedEvent.Set();
});

Expand Down Expand Up @@ -48,11 +48,11 @@ public async Task SendMessageBeforeStart_ShouldWorkAfterStart()

client
.MessageReceived
.Where(x => x.ToLower().Contains("pong"))
.Where(x => x.Data.ToLower().Contains("pong"))
.Subscribe(msg =>
{
receivedCount++;
received = msg;
received = msg.Data;

if(receivedCount >= 7)
receivedEvent.Set();
Expand Down

0 comments on commit 7a5b01e

Please sign in to comment.