Skip to content

Commit

Permalink
Merge branch 'master' into 2095-error-while-authenticating-extended-a…
Browse files Browse the repository at this point in the history
…uthentication-handler-is-not-yet-supported-with-azure-event-grid-mqtt-broker
  • Loading branch information
chkr1011 authored Dec 25, 2024
2 parents 41fac37 + f13b3eb commit c96f2dd
Show file tree
Hide file tree
Showing 30 changed files with 198 additions and 58 deletions.
2 changes: 1 addition & 1 deletion Source/MQTTnet.Benchmarks/MQTTnet.Benchmarks.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="BenchmarkDotNet" Version="0.13.12"/>
<PackageReference Include="BenchmarkDotNet" Version="0.14.0" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ namespace MQTTnet.Server
{
public sealed class ClientAcknowledgedPublishPacketEventArgs : EventArgs
{
public ClientAcknowledgedPublishPacketEventArgs(string clientId, IDictionary sessionItems, MqttPublishPacket publishPacket, MqttPacketWithIdentifier acknowledgePacket)
public ClientAcknowledgedPublishPacketEventArgs(string clientId, string userName, IDictionary sessionItems, MqttPublishPacket publishPacket, MqttPacketWithIdentifier acknowledgePacket)
{
ClientId = clientId ?? throw new ArgumentNullException(nameof(clientId));
UserName = userName;
SessionItems = sessionItems ?? throw new ArgumentNullException(nameof(sessionItems));
PublishPacket = publishPacket ?? throw new ArgumentNullException(nameof(publishPacket));
AcknowledgePacket = acknowledgePacket ?? throw new ArgumentNullException(nameof(acknowledgePacket));
Expand All @@ -28,6 +29,11 @@ public ClientAcknowledgedPublishPacketEventArgs(string clientId, IDictionary ses
/// </summary>
public string ClientId { get; }

/// <summary>
/// Gets the user name of the client.
/// </summary>
public string UserName { get; }

/// <summary>
/// Gets whether the PUBLISH packet is fully acknowledged. This is the case for PUBACK (QoS 1) and PUBCOMP (QoS 2.
/// </summary>
Expand Down
6 changes: 6 additions & 0 deletions Source/MQTTnet.Server/Events/ClientConnectedEventArgs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Collections;
using System.Collections.Generic;
using System.Net;
using System.Text;
using MQTTnet.Formatter;
using MQTTnet.Packets;

Expand Down Expand Up @@ -56,6 +57,11 @@ public ClientConnectedEventArgs(MqttConnectPacket connectPacket, MqttProtocolVer
/// </summary>
public string UserName => _connectPacket.Username;

/// <summary>
/// Gets the password of the connected client.
/// </summary>
public string Password => Encoding.UTF8.GetString(_connectPacket.Password.AsSpan());

/// <summary>
/// Gets the user properties sent by the client.
/// <remarks>MQTT 5.0.0+ feature.</remarks>
Expand Down
18 changes: 15 additions & 3 deletions Source/MQTTnet.Server/Events/ClientDisconnectedEventArgs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,29 @@
using System.Collections;
using System.Collections.Generic;
using System.Net;
using System.Text;
using MQTTnet.Packets;
using MQTTnet.Protocol;

namespace MQTTnet.Server
{
public sealed class ClientDisconnectedEventArgs : EventArgs
{
readonly MqttConnectPacket _connectPacket;
readonly MqttDisconnectPacket _disconnectPacket;

public ClientDisconnectedEventArgs(
string clientId,
MqttConnectPacket connectPacket,
MqttDisconnectPacket disconnectPacket,
MqttClientDisconnectType disconnectType,
EndPoint remoteEndPoint,
IDictionary sessionItems)
{
ClientId = clientId ?? throw new ArgumentNullException(nameof(clientId));
DisconnectType = disconnectType;
RemoteEndPoint = remoteEndPoint;
SessionItems = sessionItems ?? throw new ArgumentNullException(nameof(sessionItems));

_connectPacket = connectPacket;
// The DISCONNECT packet can be null in case of a non clean disconnect or session takeover.
_disconnectPacket = disconnectPacket;
}
Expand All @@ -35,7 +37,17 @@ public ClientDisconnectedEventArgs(
/// Gets the client identifier.
/// Hint: This identifier needs to be unique over all used clients / devices on the broker to avoid connection issues.
/// </summary>
public string ClientId { get; }
public string ClientId => _connectPacket.ClientId;

/// <summary>
/// Gets the user name of the client.
/// </summary>
public string UserName => _connectPacket.Username;

/// <summary>
/// Gets the password of the client.
/// </summary>
public string Password => Encoding.UTF8.GetString(_connectPacket.Password.AsSpan());

public MqttClientDisconnectType DisconnectType { get; }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ namespace MQTTnet.Server
{
public sealed class ClientSubscribedTopicEventArgs : EventArgs
{
public ClientSubscribedTopicEventArgs(string clientId, MqttTopicFilter topicFilter, IDictionary sessionItems)
public ClientSubscribedTopicEventArgs(string clientId, string userName, MqttTopicFilter topicFilter, IDictionary sessionItems)
{
ClientId = clientId ?? throw new ArgumentNullException(nameof(clientId));
UserName = userName;
TopicFilter = topicFilter ?? throw new ArgumentNullException(nameof(topicFilter));
SessionItems = sessionItems ?? throw new ArgumentNullException(nameof(sessionItems));
}
Expand All @@ -23,6 +24,11 @@ public ClientSubscribedTopicEventArgs(string clientId, MqttTopicFilter topicFilt
/// </summary>
public string ClientId { get; }

/// <summary>
/// Gets the user name of the client.
/// </summary>
public string UserName { get; }

/// <summary>
/// Gets or sets a key/value collection that can be used to share data within the scope of this session.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ namespace MQTTnet.Server
{
public sealed class ClientUnsubscribedTopicEventArgs : EventArgs
{
public ClientUnsubscribedTopicEventArgs(string clientId, string topicFilter, IDictionary sessionItems)
public ClientUnsubscribedTopicEventArgs(string clientId, string userName, string topicFilter, IDictionary sessionItems)
{
ClientId = clientId ?? throw new ArgumentNullException(nameof(clientId));
UserName = userName;
TopicFilter = topicFilter ?? throw new ArgumentNullException(nameof(topicFilter));
SessionItems = sessionItems ?? throw new ArgumentNullException(nameof(sessionItems));
}
Expand All @@ -22,6 +23,11 @@ public ClientUnsubscribedTopicEventArgs(string clientId, string topicFilter, IDi
/// </summary>
public string ClientId { get; }

/// <summary>
/// Gets the user name of the client.
/// </summary>
public string UserName { get; }

/// <summary>
/// Gets or sets a key/value collection that can be used to share data within the scope of this session.
/// </summary>
Expand Down
8 changes: 7 additions & 1 deletion Source/MQTTnet.Server/Events/InterceptingPacketEventArgs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ namespace MQTTnet.Server
{
public sealed class InterceptingPacketEventArgs : EventArgs
{
public InterceptingPacketEventArgs(CancellationToken cancellationToken, string clientId, EndPoint remoteEndPoint, MqttPacket packet, IDictionary sessionItems)
public InterceptingPacketEventArgs(CancellationToken cancellationToken, string clientId, string userName, EndPoint remoteEndPoint, MqttPacket packet, IDictionary sessionItems)
{
CancellationToken = cancellationToken;
ClientId = clientId ?? throw new ArgumentNullException(nameof(clientId));
UserName = userName;
RemoteEndPoint = remoteEndPoint;
Packet = packet ?? throw new ArgumentNullException(nameof(packet));
SessionItems = sessionItems;
Expand All @@ -32,6 +33,11 @@ public InterceptingPacketEventArgs(CancellationToken cancellationToken, string c
/// </summary>
public string ClientId { get; }

/// <summary>
/// Gets the user name of the client.
/// </summary>
public string UserName { get; }

/// <summary>
/// Gets the endpoint of the sending or receiving client.
/// </summary>
Expand Down
8 changes: 7 additions & 1 deletion Source/MQTTnet.Server/Events/InterceptingPublishEventArgs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@ namespace MQTTnet.Server
{
public sealed class InterceptingPublishEventArgs : EventArgs
{
public InterceptingPublishEventArgs(MqttApplicationMessage applicationMessage, CancellationToken cancellationToken, string clientId, IDictionary sessionItems)
public InterceptingPublishEventArgs(MqttApplicationMessage applicationMessage, CancellationToken cancellationToken, string clientId, string userName, IDictionary sessionItems)
{
ApplicationMessage = applicationMessage ?? throw new ArgumentNullException(nameof(applicationMessage));
CancellationToken = cancellationToken;
ClientId = clientId ?? throw new ArgumentNullException(nameof(clientId));
UserName = userName;
SessionItems = sessionItems ?? throw new ArgumentNullException(nameof(sessionItems));
}

Expand All @@ -31,6 +32,11 @@ public InterceptingPublishEventArgs(MqttApplicationMessage applicationMessage, C
/// </summary>
public string ClientId { get; }

/// <summary>
/// Gets the user name of the client.
/// </summary>
public string UserName { get; }

public bool CloseConnection { get; set; }

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@ public sealed class InterceptingSubscriptionEventArgs : EventArgs
public InterceptingSubscriptionEventArgs(
CancellationToken cancellationToken,
string clientId,
string userName,
MqttSessionStatus session,
MqttTopicFilter topicFilter,
List<MqttUserProperty> userProperties)
{
CancellationToken = cancellationToken;
ClientId = clientId;
UserName = userName;
Session = session;
TopicFilter = topicFilter;
UserProperties = userProperties;
Expand All @@ -37,6 +39,11 @@ public InterceptingSubscriptionEventArgs(
/// </summary>
public string ClientId { get; }

/// <summary>
/// Gets the user name of the client.
/// </summary>
public string UserName { get; }

/// <summary>
/// Gets or sets whether the broker should close the client connection.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ namespace MQTTnet.Server
{
public sealed class InterceptingUnsubscriptionEventArgs : EventArgs
{
public InterceptingUnsubscriptionEventArgs(CancellationToken cancellationToken, string clientId, IDictionary sessionItems, string topic, List<MqttUserProperty> userProperties)
public InterceptingUnsubscriptionEventArgs(CancellationToken cancellationToken, string clientId, string userName, IDictionary sessionItems, string topic, List<MqttUserProperty> userProperties)
{
CancellationToken = cancellationToken;
ClientId = clientId;
UserName = userName;
SessionItems = sessionItems;
Topic = topic;
UserProperties = userProperties;
Expand All @@ -32,6 +33,11 @@ public InterceptingUnsubscriptionEventArgs(CancellationToken cancellationToken,
/// </summary>
public string ClientId { get; }

/// <summary>
/// Gets the user name of the client.
/// </summary>
public string UserName { get; }

/// <summary>
/// Gets or sets whether the broker should close the client connection.
/// </summary>
Expand Down
8 changes: 7 additions & 1 deletion Source/MQTTnet.Server/Events/SessionDeletedEventArgs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ namespace MQTTnet.Server
{
public sealed class SessionDeletedEventArgs : EventArgs
{
public SessionDeletedEventArgs(string id, IDictionary sessionItems)
public SessionDeletedEventArgs(string id, string userName, IDictionary sessionItems)
{
Id = id ?? throw new ArgumentNullException(nameof(id));
UserName = userName;
SessionItems = sessionItems ?? throw new ArgumentNullException(nameof(sessionItems));
}

Expand All @@ -20,6 +21,11 @@ public SessionDeletedEventArgs(string id, IDictionary sessionItems)
/// </summary>
public string Id { get; }

/// <summary>
/// Gets the user name of the session.
/// </summary>
public string UserName { get; }

/// <summary>
/// Gets or sets a key/value collection that can be used to share data within the scope of this session.
/// </summary>
Expand Down
2 changes: 2 additions & 0 deletions Source/MQTTnet.Server/InjectedMqttApplicationMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,6 @@ public InjectedMqttApplicationMessage(MqttApplicationMessage applicationMessage)
public IDictionary CustomSessionItems { get; set; }

public string SenderClientId { get; set; } = string.Empty;

public string SenderUserName { get; set; }
}
33 changes: 29 additions & 4 deletions Source/MQTTnet.Server/Internal/MqttClientSessionsManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public async Task DeleteSessionAsync(string clientId)
{
if (_eventContainer.SessionDeletedEvent.HasHandlers && session != null)
{
var eventArgs = new SessionDeletedEventArgs(clientId, session.Items);
var eventArgs = new SessionDeletedEventArgs(clientId, session.UserName, session.Items);
await _eventContainer.SessionDeletedEvent.TryInvokeAsync(eventArgs, _logger).ConfigureAwait(false);
}
}
Expand All @@ -117,6 +117,7 @@ public async Task DeleteSessionAsync(string clientId)

public async Task<DispatchApplicationMessageResult> DispatchApplicationMessage(
string senderId,
string senderUserName,
IDictionary senderSessionItems,
MqttApplicationMessage applicationMessage,
CancellationToken cancellationToken)
Expand All @@ -130,7 +131,7 @@ public async Task<DispatchApplicationMessageResult> DispatchApplicationMessage(
// Allow the user to intercept application message...
if (_eventContainer.InterceptingPublishEvent.HasHandlers)
{
var interceptingPublishEventArgs = new InterceptingPublishEventArgs(applicationMessage, cancellationToken, senderId, senderSessionItems);
var interceptingPublishEventArgs = new InterceptingPublishEventArgs(applicationMessage, cancellationToken, senderId, senderUserName, senderSessionItems);
if (string.IsNullOrEmpty(interceptingPublishEventArgs.ApplicationMessage.Topic))
{
// This can happen if a topic alias us used but the topic is
Expand Down Expand Up @@ -310,6 +311,25 @@ public Task<IList<MqttClientStatus>> GetClientsStatus()
return Task.FromResult((IList<MqttClientStatus>)result);
}

public Task<MqttSessionStatus> GetSessionStatus(string id)
{
_sessionsManagementLock.EnterReadLock();
try
{
if (!_sessionsStorage.TryGetSession(id, out var session))
{
throw new InvalidOperationException($"Session with ID '{id}' not found.");
}

var sessionStatus = new MqttSessionStatus(session);
return Task.FromResult(sessionStatus);
}
finally
{
_sessionsManagementLock.ExitReadLock();
}
}

public Task<IList<MqttSessionStatus>> GetSessionsStatus()
{
var result = new List<MqttSessionStatus>();
Expand Down Expand Up @@ -408,7 +428,12 @@ public async Task HandleClientConnectionAsync(IMqttChannelAdapter channelAdapter
if (connectedClient.Id != null && !connectedClient.IsTakenOver && _eventContainer.ClientDisconnectedEvent.HasHandlers)
{
var disconnectType = connectedClient.DisconnectPacket != null ? MqttClientDisconnectType.Clean : MqttClientDisconnectType.NotClean;
var eventArgs = new ClientDisconnectedEventArgs(connectedClient.Id, connectedClient.DisconnectPacket, disconnectType, endpoint, connectedClient.Session.Items);
var eventArgs = new ClientDisconnectedEventArgs(
connectedClient.ConnectPacket,
connectedClient.DisconnectPacket,
disconnectType,
endpoint,
connectedClient.Session.Items);

await _eventContainer.ClientDisconnectedEvent.InvokeAsync(eventArgs).ConfigureAwait(false);
}
Expand Down Expand Up @@ -592,7 +617,7 @@ async Task<MqttConnectedClient> CreateClientConnection(
if (_eventContainer.ClientDisconnectedEvent.HasHandlers)
{
var eventArgs = new ClientDisconnectedEventArgs(
oldConnectedClient.Id,
oldConnectedClient.ConnectPacket,
null,
MqttClientDisconnectType.Takeover,
oldConnectedClient.RemoteEndPoint,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ public async Task<SubscribeResult> Subscribe(MqttSubscribePacket subscribePacket
{
foreach (var finalTopicFilter in finalTopicFilters)
{
var eventArgs = new ClientSubscribedTopicEventArgs(_session.Id, finalTopicFilter, _session.Items);
var eventArgs = new ClientSubscribedTopicEventArgs(_session.Id, _session.UserName, finalTopicFilter, _session.Items);
await _eventContainer.ClientSubscribedTopicEvent.InvokeAsync(eventArgs).ConfigureAwait(false);
}
}
Expand Down Expand Up @@ -297,7 +297,7 @@ public async Task<UnsubscribeResult> Unsubscribe(MqttUnsubscribePacket unsubscri
{
foreach (var topicFilter in unsubscribePacket.TopicFilters)
{
var eventArgs = new ClientUnsubscribedTopicEventArgs(_session.Id, topicFilter, _session.Items);
var eventArgs = new ClientUnsubscribedTopicEventArgs(_session.Id, _session.UserName, topicFilter, _session.Items);
await _eventContainer.ClientUnsubscribedTopicEvent.InvokeAsync(eventArgs).ConfigureAwait(false);
}
}
Expand Down Expand Up @@ -460,7 +460,7 @@ async Task<InterceptingSubscriptionEventArgs> InterceptSubscribe(
List<MqttUserProperty> userProperties,
CancellationToken cancellationToken)
{
var eventArgs = new InterceptingSubscriptionEventArgs(cancellationToken, _session.Id, new MqttSessionStatus(_session), topicFilter, userProperties);
var eventArgs = new InterceptingSubscriptionEventArgs(cancellationToken, _session.Id, _session.UserName, new MqttSessionStatus(_session), topicFilter, userProperties);

if (topicFilter.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce)
{
Expand Down Expand Up @@ -493,7 +493,7 @@ async Task<InterceptingUnsubscriptionEventArgs> InterceptUnsubscribe(
List<MqttUserProperty> userProperties,
CancellationToken cancellationToken)
{
var clientUnsubscribingTopicEventArgs = new InterceptingUnsubscriptionEventArgs(cancellationToken, _session.Id, _session.Items, topicFilter, userProperties)
var clientUnsubscribingTopicEventArgs = new InterceptingUnsubscriptionEventArgs(cancellationToken, _session.Id, _session.UserName, _session.Items, topicFilter, userProperties)
{
Response =
{
Expand Down
Loading

0 comments on commit c96f2dd

Please sign in to comment.