diff --git a/docs/opc-publisher/api.md b/docs/opc-publisher/api.md
index 7bb6d67ba0..371d70888f 100644
--- a/docs/opc-publisher/api.md
+++ b/docs/opc-publisher/api.md
@@ -862,23 +862,22 @@ This section lists the diagnostics APi provided by OPC Publisher providing
name.
-
-#### ResetAllClients
+
+#### GetConnectionDiagnostic
```
-GET /v2/reset
+GET /v2/diagnostics/connections
```
##### Description
-Can be used to reset all established connections causing a full reconnect and recreate of all subscriptions.
+Get connection diagnostic information for all connections. The first set of diagnostics are the diagnostics active for all connections, continue reading to get updates.
##### Responses
|HTTP Code|Description|Schema|
|---|---|---|
-|**200**|The operation was successful.|No Content|
-|**408**|The operation timed out.|[ProblemDetails](definitions.md#problemdetails)|
+|**200**|The operation was successful.|[ConnectionDiagnosticModelIAsyncEnumerable](definitions.md#connectiondiagnosticmodeliasyncenumerable)|
|**500**|An unexpected error occurred|[ProblemDetails](definitions.md#problemdetails)|
@@ -888,15 +887,15 @@ Can be used to reset all established connections causing a full reconnect and re
* `application/x-msgpack`
-
-#### SetTraceMode
+
+#### ResetAllClients
```
-GET /v2/tracemode
+GET /v2/reset
```
##### Description
-Can be used to set trace mode for all established connections. Call within a minute to keep trace mode up or else trace mode will be disabled again after 1 minute. Enabling and resetting tracemode will cause a reconnect of the client.
+Can be used to reset all established connections causing a full reconnect and recreate of all subscriptions.
##### Responses
@@ -904,6 +903,7 @@ Can be used to set trace mode for all established connections. Call within a min
|HTTP Code|Description|Schema|
|---|---|---|
|**200**|The operation was successful.|No Content|
+|**408**|The operation timed out.|[ProblemDetails](definitions.md#problemdetails)|
|**500**|An unexpected error occurred|[ProblemDetails](definitions.md#problemdetails)|
diff --git a/docs/opc-publisher/definitions.md b/docs/opc-publisher/definitions.md
index 3f82726180..272ee3b786 100644
--- a/docs/opc-publisher/definitions.md
+++ b/docs/opc-publisher/definitions.md
@@ -314,6 +314,11 @@ Condition handling options model
|**updateInterval**
*optional*|Time interval for sending pending interval updates in seconds.|integer (int32)|
+
+### ConnectionDiagnosticModelIAsyncEnumerable
+*Type* : object
+
+
### ConnectionModel
Connection model
diff --git a/docs/opc-publisher/media/keyset.png b/docs/opc-publisher/media/keyset.png
new file mode 100644
index 0000000000..9471e9e5eb
Binary files /dev/null and b/docs/opc-publisher/media/keyset.png differ
diff --git a/docs/opc-publisher/media/keyset2.png b/docs/opc-publisher/media/keyset2.png
new file mode 100644
index 0000000000..2cc6eb10b5
Binary files /dev/null and b/docs/opc-publisher/media/keyset2.png differ
diff --git a/docs/opc-publisher/media/keyset3.png b/docs/opc-publisher/media/keyset3.png
new file mode 100644
index 0000000000..650242b955
Binary files /dev/null and b/docs/opc-publisher/media/keyset3.png differ
diff --git a/docs/opc-publisher/openapi.json b/docs/opc-publisher/openapi.json
index d03354ef5d..fcbca4d922 100644
--- a/docs/opc-publisher/openapi.json
+++ b/docs/opc-publisher/openapi.json
@@ -1166,21 +1166,24 @@
}
}
},
- "/v2/tracemode": {
+ "/v2/diagnostics/connections": {
"get": {
"tags": [
"Diagnostics"
],
- "summary": "SetTraceMode",
- "description": "Can be used to set trace mode for all established connections. Call within a minute to keep trace mode up or else trace mode will be disabled again after 1 minute. Enabling and resetting tracemode will cause a reconnect of the client.",
- "operationId": "SetTraceMode",
+ "summary": "GetConnectionDiagnostic",
+ "description": "Get connection diagnostic information for all connections. The first set of diagnostics are the diagnostics active for all connections, continue reading to get updates.",
+ "operationId": "GetConnectionDiagnostic",
"produces": [
"application/json",
"application/x-msgpack"
],
"responses": {
"200": {
- "description": "The operation was successful."
+ "description": "The operation was successful.",
+ "schema": {
+ "$ref": "#/definitions/ConnectionDiagnosticModelIAsyncEnumerable"
+ }
},
"500": {
"description": "An unexpected error occurred",
@@ -4918,6 +4921,10 @@
},
"additionalProperties": false
},
+ "ConnectionDiagnosticModelIAsyncEnumerable": {
+ "type": "object",
+ "additionalProperties": false
+ },
"ConnectionModel": {
"description": "Connection model",
"type": "object",
@@ -9314,4 +9321,4 @@
"description": "\r\n\r\n This section contains the API to configure data set writers and writer\r\n groups inside OPC Publisher. It supersedes the configuration API.\r\n Applications should use one or the other, but not both at the same\r\n time.\r\n \r\n\r\n\r\n The method name for all transports other than HTTP (which uses the shown\r\n HTTP methods and resource uris) is the name of the subsection header.\r\n To use the version specific method append \"_V1\" or \"_V2\" to the method\r\n name.\r\n "
}
]
-}
\ No newline at end of file
+}
diff --git a/docs/opc-publisher/troubleshooting.md b/docs/opc-publisher/troubleshooting.md
index c12fb56327..49ce757ea8 100644
--- a/docs/opc-publisher/troubleshooting.md
+++ b/docs/opc-publisher/troubleshooting.md
@@ -16,6 +16,7 @@ In this document you find information about
- [IoT Hub Metrics](#iot-hub-metrics)
- [Use Azure IoT Explorer](#use-azure-iot-explorer)
- [Restart the module](#restart-the-module)
+- [Analyzing network capture files](#analyzing-network-capture-files)
- [Limits and contributing factors](#limits-and-contributing-factors)
- [Debugging Discovery](#debugging-discovery)
@@ -166,6 +167,26 @@ iotedge list
You can also troubleshoot the OPC Publisher module remotely through the Azure Portal. Select the module inside the respective IoT Edge device in the IoT Edge blade and click on the Troubleshooting button to see logs and restart the module remotely.
+## Analyzing network capture files
+
+The issue might be between the OPC Publisher OPC UA client and the OPC UA server you are using. A network capture provides the definitive answer as to where the issue lies. To capture network traffic you can use [Wireshark](https://www.wireshark.org/) or [tshark](https://tshark.dev/setup/install/) (aka. command line wireshark) and capture a .pcap file for analysis. An example of how to capture network traces in a docker environment can be found [here](../../deploy/docker/with-pcap-capture.yaml). To analyze OPC UA traffic, you must load the .pcap or .pcapng file you captured with Wireshark.
+
+Follow [these instructions](https://opcconnect.opcfoundation.org/2017/02/analyzing-opc-ua-communications-with-wireshark/#:~:text=Wireshark%20has%20a%20built-in%20filter%20for%20OPC%20UA%2C,fairly%20easy%20to%20capture%20and%20analyze%20the%20conversation.) to visualize the OPC UA traffic between OPC Publisher and your OPC UA server.
+
+If your connection to the OPC UA server is encrypted (using security) you must use Wireshark 4.3 (not the stable version!). You will also need to capture the client and server keys. You can start OPC Publisher with the `-ksf` [command line argument](./commandline.md), providing an optional folder path that can be accessed after running OPC Publisher (volume mount). The folder is structured like this:
+
+![Key Set Log Folder](./media/keyset.png)
+
+The folder path starts with the port number, because the port number needs to be configured in the OPC UA protocol page in Wireshark.
+
+![Wireshark configuration](./media/keyset2.png)
+
+Find the connection you want to trace. You can open the `opcua_debug.log` file in one of the sub folders to identify the connection. The log file shows the remote and local endpoints as well as a summary of the connection configuration that was used to connect OPC Publisher. Once you have found the right folder, load the `opcua_debug.txt` file using the protocol page, then save and Wireshark will be able to decrypt traffic.
+
+![Wireshark Analysis](./media/keyset3.png)
+
+> IMPORTANT: While the keys that are logged are scoped to the connection and cannot be re-used, it still presents a security risk, therefore, ensure to clean up the logs when you are done, and do not use the feature in production.
+
## Limits and contributing factors
IoT Edge runs on various distributions of Linux and leverages Docker containers for all workloads, including the built-in IoT Edge modules `edgeAgent` and `edgeHub`. Docker has settings for the maximum host resources that each container can consume. These can be displayed by the Docker command "stats" (run "docker stats" from the terminal on the gateway host):
diff --git a/src/Azure.IIoT.OpcUa.Publisher.Models/src/ChannelDiagnosticModel.cs b/src/Azure.IIoT.OpcUa.Publisher.Models/src/ChannelDiagnosticModel.cs
new file mode 100644
index 0000000000..0d2719c71e
--- /dev/null
+++ b/src/Azure.IIoT.OpcUa.Publisher.Models/src/ChannelDiagnosticModel.cs
@@ -0,0 +1,58 @@
+// ------------------------------------------------------------
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License (MIT). See License.txt in the repo root for license information.
+// ------------------------------------------------------------
+
+namespace Azure.IIoT.OpcUa.Publisher.Models
+{
+ using System;
+ using System.Runtime.Serialization;
+
+ ///
+ /// Channel token. Can be used to decrypt encrypted
+ /// capture files.
+ ///
+ [DataContract]
+ public record class ChannelDiagnosticModel
+ {
+ ///
+ /// The id assigned to the channel that the token
+ /// belongs to.
+ ///
+ [DataMember(Name = "channelId", Order = 0)]
+ public required uint ChannelId { get; init; }
+
+ ///
+ /// The id assigned to the token.
+ ///
+ [DataMember(Name = "tokenId", Order = 1)]
+ public required uint TokenId { get; init; }
+
+ ///
+ /// When the token was created by the server
+ /// (refers to the server's clock).
+ ///
+ [DataMember(Name = "createdAt", Order = 2)]
+ public required DateTime CreatedAt { get; init; }
+
+ ///
+ /// The lifetime of the token
+ ///
+ [DataMember(Name = "lifetime", Order = 3)]
+ public required TimeSpan Lifetime { get; init; }
+
+ ///
+ /// Client keys
+ ///
+ [DataMember(Name = "client", Order = 4,
+ EmitDefaultValue = false)]
+ public ChannelKeyModel? Client { get; init; }
+
+ ///
+ /// Server keys
+ ///
+ [DataMember(Name = "aerver", Order = 5,
+ EmitDefaultValue = false)]
+ public ChannelKeyModel? Server { get; init; }
+ }
+}
diff --git a/src/Azure.IIoT.OpcUa.Publisher.Models/src/ChannelKeyModel.cs b/src/Azure.IIoT.OpcUa.Publisher.Models/src/ChannelKeyModel.cs
new file mode 100644
index 0000000000..cdb1d9580c
--- /dev/null
+++ b/src/Azure.IIoT.OpcUa.Publisher.Models/src/ChannelKeyModel.cs
@@ -0,0 +1,35 @@
+// ------------------------------------------------------------
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License (MIT). See License.txt in the repo root for license information.
+// ------------------------------------------------------------
+
+namespace Azure.IIoT.OpcUa.Publisher.Models
+{
+ using System.Collections.Generic;
+ using System.Runtime.Serialization;
+
+ ///
+ /// Channel token key model.
+ ///
+ [DataContract]
+ public record class ChannelKeyModel
+ {
+ ///
+ /// Iv
+ ///
+ [DataMember(Name = "iv", Order = 0)]
+ public required IReadOnlyList Iv { get; init; }
+
+ ///
+ /// Key
+ ///
+ [DataMember(Name = "key", Order = 1)]
+ public required IReadOnlyList Key { get; init; }
+
+ ///
+ /// Signature length
+ ///
+ [DataMember(Name = "sigLen", Order = 2)]
+ public required int SigLen { get; init; }
+ }
+}
diff --git a/src/Azure.IIoT.OpcUa.Publisher.Models/src/ConnectionDiagnosticModel.cs b/src/Azure.IIoT.OpcUa.Publisher.Models/src/ConnectionDiagnosticModel.cs
new file mode 100644
index 0000000000..b80f93d48b
--- /dev/null
+++ b/src/Azure.IIoT.OpcUa.Publisher.Models/src/ConnectionDiagnosticModel.cs
@@ -0,0 +1,68 @@
+// ------------------------------------------------------------
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License (MIT). See License.txt in the repo root for license information.
+// ------------------------------------------------------------
+
+namespace Azure.IIoT.OpcUa.Publisher.Models
+{
+ using System;
+ using System.Runtime.Serialization;
+
+ ///
+ /// Connection / session diagnostics model
+ ///
+ [DataContract]
+ public record class ConnectionDiagnosticModel
+ {
+ ///
+ /// Timestamp of the diagnostic information
+ ///
+ [DataMember(Name = "timeStamp", Order = 0)]
+ public required DateTimeOffset TimeStamp { get; init; }
+
+ ///
+ /// The connection information specified by user.
+ ///
+ [DataMember(Name = "connection", Order = 1)]
+ public required ConnectionModel Connection { get; init; }
+
+ ///
+ /// Effective remote ip address used for the
+ /// connection if connected. Empty if disconnected.
+ ///
+ [DataMember(Name = "remoteIpAddress", Order = 2,
+ EmitDefaultValue = false)]
+ public string? RemoteIpAddress { get; init; }
+
+ ///
+ /// The effective remote port used when connected,
+ /// null if disconnected.
+ ///
+ [DataMember(Name = "remotePort", Order = 3,
+ EmitDefaultValue = false)]
+ public int? RemotePort { get; init; }
+
+ ///
+ /// Effective local ip address used for the connection
+ /// if connected. Empty if disconnected.
+ ///
+ [DataMember(Name = "localIpAddress", Order = 4,
+ EmitDefaultValue = false)]
+ public string? LocalIpAddress { get; init; }
+
+ ///
+ /// The effective local port used when connected,
+ /// null if disconnected.
+ ///
+ [DataMember(Name = "localPort", Order = 5,
+ EmitDefaultValue = false)]
+ public int? LocalPort { get; init; }
+
+ ///
+ /// Channel diagnostics
+ ///
+ [DataMember(Name = "channelDiagnostics", Order = 6,
+ EmitDefaultValue = false)]
+ public ChannelDiagnosticModel? ChannelDiagnostics { get; init; }
+ }
+}
diff --git a/src/Azure.IIoT.OpcUa.Publisher.Models/src/PublishedDataItemsModel.cs b/src/Azure.IIoT.OpcUa.Publisher.Models/src/PublishedDataItemsModel.cs
index ad3c64849c..52a23ab920 100644
--- a/src/Azure.IIoT.OpcUa.Publisher.Models/src/PublishedDataItemsModel.cs
+++ b/src/Azure.IIoT.OpcUa.Publisher.Models/src/PublishedDataItemsModel.cs
@@ -5,7 +5,6 @@
namespace Azure.IIoT.OpcUa.Publisher.Models
{
- using System;
using System.Collections.Generic;
using System.Runtime.Serialization;
diff --git a/src/Azure.IIoT.OpcUa.Publisher.Models/src/PublishedNodesEntryModel.cs b/src/Azure.IIoT.OpcUa.Publisher.Models/src/PublishedNodesEntryModel.cs
index 68ae04953e..4ac20b57bf 100644
--- a/src/Azure.IIoT.OpcUa.Publisher.Models/src/PublishedNodesEntryModel.cs
+++ b/src/Azure.IIoT.OpcUa.Publisher.Models/src/PublishedNodesEntryModel.cs
@@ -400,10 +400,42 @@ public sealed record class PublishedNodesEntryModel
EmitDefaultValue = false)]
public bool? DataSetFetchDisplayNames { get; set; }
+ ///
+ /// Default time to live for messages sent through
+ /// the writer group if the transport supports it.
+ ///
+ [DataMember(Name = "WriterGroupMessageTtlTimepan", Order = 49,
+ EmitDefaultValue = false)]
+ public TimeSpan? WriterGroupMessageTtlTimepan { get; set; }
+
+ ///
+ /// Default message retention setting for messages sent
+ /// through the writer group if the transport supports it.
+ ///
+ [DataMember(Name = "WriterGroupMessageRetention", Order = 50,
+ EmitDefaultValue = false)]
+ public bool? WriterGroupMessageRetention { get; set; }
+
+ ///
+ /// Message time to live for messages sent by the
+ /// writer if the transport supports it.
+ ///
+ [DataMember(Name = "MessageTtlTimespan", Order = 52,
+ EmitDefaultValue = false)]
+ public TimeSpan? MessageTtlTimespan { get; set; }
+
+ ///
+ /// Message retention setting for messages sent by
+ /// the writer if the transport supports it.
+ ///
+ [DataMember(Name = "MessageRetention", Order = 53,
+ EmitDefaultValue = false)]
+ public bool? MessageRetention { get; set; }
+
///
/// The node to monitor in "ns=" syntax.
///
- [DataMember(Name = "NodeId", Order = 50,
+ [DataMember(Name = "NodeId", Order = 99,
EmitDefaultValue = false)]
public NodeIdModel? NodeId { get; set; }
}
diff --git a/src/Azure.IIoT.OpcUa.Publisher.Models/src/PublishingQueueSettingsModel.cs b/src/Azure.IIoT.OpcUa.Publisher.Models/src/PublishingQueueSettingsModel.cs
index 782b194860..cddb4c0b6e 100644
--- a/src/Azure.IIoT.OpcUa.Publisher.Models/src/PublishingQueueSettingsModel.cs
+++ b/src/Azure.IIoT.OpcUa.Publisher.Models/src/PublishingQueueSettingsModel.cs
@@ -6,6 +6,7 @@
namespace Azure.IIoT.OpcUa.Publisher.Models
{
using Furly.Extensions.Messaging;
+ using System;
using System.Runtime.Serialization;
///
@@ -24,10 +25,27 @@ public sealed record class PublishingQueueSettingsModel
///
/// Desired Quality of service to use in case of broker
- /// transport.
+ /// transport that supports configuring delivery guarantees.
///
[DataMember(Name = "requestedDeliveryGuarantee", Order = 2,
EmitDefaultValue = false)]
public QoS? RequestedDeliveryGuarantee { get; set; }
+
+ ///
+ /// Desired Time to live to use in case of using a broker
+ /// transport that supports ttl.
+ ///
+ [DataMember(Name = "ttl", Order = 3,
+ EmitDefaultValue = false)]
+ public TimeSpan? Ttl { get; set; }
+
+ ///
+ /// If the broker transport supports message retention this
+ /// setting determines if the messages should be retained
+ /// in the queue.
+ ///
+ [DataMember(Name = "retain", Order = 4,
+ EmitDefaultValue = false)]
+ public bool? Retain { get; set; }
}
}
diff --git a/src/Azure.IIoT.OpcUa.Publisher.Module/src/Controllers/DiagnosticsController.cs b/src/Azure.IIoT.OpcUa.Publisher.Module/src/Controllers/DiagnosticsController.cs
index dc898d7cc7..3013c0771c 100644
--- a/src/Azure.IIoT.OpcUa.Publisher.Module/src/Controllers/DiagnosticsController.cs
+++ b/src/Azure.IIoT.OpcUa.Publisher.Module/src/Controllers/DiagnosticsController.cs
@@ -6,6 +6,7 @@
namespace Azure.IIoT.OpcUa.Publisher.Module.Controllers
{
using Azure.IIoT.OpcUa.Publisher.Module.Filters;
+ using Azure.IIoT.OpcUa.Publisher.Models;
using Asp.Versioning;
using Furly;
using Furly.Tunnel.Router;
@@ -13,6 +14,7 @@ namespace Azure.IIoT.OpcUa.Publisher.Module.Controllers
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using System;
+ using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
@@ -68,27 +70,27 @@ public DiagnosticsController(IClientDiagnostics diagnostics)
[HttpGet("reset")]
public async Task ResetAllClientsAsync(CancellationToken ct = default)
{
- await _diagnostics.ResetAllClients(ct).ConfigureAwait(false);
+ await _diagnostics.ResetAllClientsAsync(ct).ConfigureAwait(false);
}
///
- /// SetTraceMode
+ /// GetConnectionDiagnostic
///
///
- /// Can be used to set trace mode for all established connections.
- /// Call within a minute to keep trace mode up or else trace mode
- /// will be disabled again after 1 minute. Enabling and resetting
- /// tracemode will cause a reconnect of the client.
+ /// Get connection diagnostic information for all connections.
+ /// The first set of diagnostics are the diagnostics active for
+ /// all connections, continue reading to get updates.
///
///
/// The operation was successful.
/// An unexpected error occurred
[ProducesResponseType(StatusCodes.Status200OK)]
[ProducesResponseType(typeof(ProblemDetails), StatusCodes.Status500InternalServerError)]
- [HttpGet("tracemode")]
- public async Task SetTraceModeAsync(CancellationToken ct = default)
+ [HttpGet("diagnostics/connections")]
+ public IAsyncEnumerable GetConnectionDiagnosticAsync(
+ CancellationToken ct = default)
{
- await _diagnostics.SetTraceModeAsync(ct).ConfigureAwait(false);
+ return _diagnostics.GetConnectionDiagnosticAsync(ct);
}
private readonly IClientDiagnostics _diagnostics;
diff --git a/src/Azure.IIoT.OpcUa.Publisher.Module/src/Filters/ControllerExceptionFilterAttribute.cs b/src/Azure.IIoT.OpcUa.Publisher.Module/src/Filters/ControllerExceptionFilterAttribute.cs
index d369a40418..4ad0c78982 100644
--- a/src/Azure.IIoT.OpcUa.Publisher.Module/src/Filters/ControllerExceptionFilterAttribute.cs
+++ b/src/Azure.IIoT.OpcUa.Publisher.Module/src/Filters/ControllerExceptionFilterAttribute.cs
@@ -41,7 +41,6 @@ public override void OnException(ExceptionContext context)
return;
}
-
if (context.Exception is AggregateException ae)
{
var root = ae.GetBaseException();
diff --git a/src/Azure.IIoT.OpcUa.Publisher.Module/src/Runtime/CommandLine.cs b/src/Azure.IIoT.OpcUa.Publisher.Module/src/Runtime/CommandLine.cs
index e685e2b210..e930ff08f1 100644
--- a/src/Azure.IIoT.OpcUa.Publisher.Module/src/Runtime/CommandLine.cs
+++ b/src/Azure.IIoT.OpcUa.Publisher.Module/src/Runtime/CommandLine.cs
@@ -7,7 +7,6 @@ namespace Azure.IIoT.OpcUa.Publisher.Module.Runtime
{
using Azure.IIoT.OpcUa.Publisher.Models;
using Azure.IIoT.OpcUa.Publisher.Stack.Runtime;
- using Azure.IIoT.OpcUa.Publisher.Stack.Services;
using Furly.Azure.IoT.Edge;
using Furly.Extensions.Messaging;
using Microsoft.Extensions.Configuration;
@@ -139,6 +138,12 @@ public CommandLine(string[] args, CommandLineLogger? logger = null)
{ $"qos|{PublisherConfig.DefaultQualityOfServiceKey}=",
$"The default quality of service to use for data set messages.\nThis does not apply to metadata messages which are always sent with `AtLeastOnce` semantics.\nAllowed values:\n `{string.Join("`\n `", Enum.GetNames(typeof(QoS)))}`\nDefault: `{nameof(QoS.AtLeastOnce)}`.\n",
(QoS q) => this[PublisherConfig.DefaultQualityOfServiceKey] = q.ToString() },
+ { $"ttl|{PublisherConfig.DefaultMessageTimeToLiveKey}=",
+ "The default time to live for all network message published in milliseconds if the transport supports it.\nThis does not apply to metadata messages which are always sent with a ttl of the metadata update interval or infinite ttl.\nDefault: `not set` (infinite).\n",
+ (uint k) => this[PublisherConfig.DefaultMessageTimeToLiveKey] = TimeSpan.FromMilliseconds(k).ToString() },
+ { $"retain:|{PublisherConfig.DefaultMessageRetentionKey}:",
+ "Whether by default to send messages with retain flag to a broker if the transport supports it.\nThis does not apply to metadata messages which are always sent as retained messages.\nDefault: `false'.\n",
+ (bool? b) => this[PublisherConfig.DefaultMessageRetentionKey] = b?.ToString() ?? "True" },
// TODO: Add ConfiguredMessageSize
@@ -554,6 +559,9 @@ public CommandLine(string[] args, CommandLineLogger? logger = null)
{ $"sl|opcstacklogging:|{OpcUaClientConfig.EnableOpcUaStackLoggingKey}:",
"Enable opc ua stack logging beyond logging at error level.\nDefault: `disabled`.\n",
(bool? b) => this[OpcUaClientConfig.EnableOpcUaStackLoggingKey] = b?.ToString() ?? "True" },
+ { $"ksf|keysetlogfolder:|{OpcUaClientConfig.OpcUaKeySetLogFolderNameKey}:",
+ "Writes negotiated symmetric keys for all running client connection to this file.\nThe file can be loaded by Wireshark 4.3 and used to decrypt encrypted channels when analyzing network traffic captures.\nNote that enabling this feature presents a security risk!\nDefault: `disabled`.\n",
+ (string? f) => this[OpcUaClientConfig.OpcUaKeySetLogFolderNameKey] = f ?? Directory.GetCurrentDirectory() },
{ $"ecw|enableconsolewriter:|{Configuration.ConsoleWriter.EnableKey}:",
"Enable writing encoded messages to standard error log through the filesystem transport (must enable via `-t FileSystem` and `-o` must be set to either `stderr` or `stdout`).\nDefault: `false`.\n",
(bool? b) => this[Configuration.ConsoleWriter.EnableKey] = b?.ToString() ?? "True" },
diff --git a/src/Azure.IIoT.OpcUa.Publisher/src/IClientDiagnostics.cs b/src/Azure.IIoT.OpcUa.Publisher/src/IClientDiagnostics.cs
index b62a178aa1..d06adeb945 100644
--- a/src/Azure.IIoT.OpcUa.Publisher/src/IClientDiagnostics.cs
+++ b/src/Azure.IIoT.OpcUa.Publisher/src/IClientDiagnostics.cs
@@ -5,6 +5,8 @@
namespace Azure.IIoT.OpcUa.Publisher
{
+ using Azure.IIoT.OpcUa.Publisher.Models;
+ using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
@@ -19,13 +21,14 @@ public interface IClientDiagnostics
///
///
///
- Task ResetAllClients(CancellationToken ct = default);
+ Task ResetAllClientsAsync(CancellationToken ct = default);
///
- /// Set all connections into trace mode for a minute.
+ /// Watch diagnostic information of all connections.
///
///
///
- Task SetTraceModeAsync(CancellationToken ct = default);
+ IAsyncEnumerable GetConnectionDiagnosticAsync(
+ CancellationToken ct = default);
}
}
diff --git a/src/Azure.IIoT.OpcUa.Publisher/src/Models/WriterGroupContext.cs b/src/Azure.IIoT.OpcUa.Publisher/src/Models/WriterGroupContext.cs
index 1563299373..34357758a4 100644
--- a/src/Azure.IIoT.OpcUa.Publisher/src/Models/WriterGroupContext.cs
+++ b/src/Azure.IIoT.OpcUa.Publisher/src/Models/WriterGroupContext.cs
@@ -14,7 +14,7 @@ namespace Azure.IIoT.OpcUa.Publisher.Models
public record class WriterGroupContext
{
///
- /// Topic for the message if not metadata message
+ /// Topic for the message
///
public required string Topic { get; init; }
@@ -23,6 +23,16 @@ public record class WriterGroupContext
///
public required QoS? Qos { get; init; }
+ ///
+ /// Requested Retain
+ ///
+ public bool? Retain { get; init; }
+
+ ///
+ /// Requested Time to live
+ ///
+ public TimeSpan? Ttl { get; init; }
+
///
/// Publisher id
///
diff --git a/src/Azure.IIoT.OpcUa.Publisher/src/Runtime/PublisherConfig.cs b/src/Azure.IIoT.OpcUa.Publisher/src/Runtime/PublisherConfig.cs
index 6810701eed..9bd537e821 100644
--- a/src/Azure.IIoT.OpcUa.Publisher/src/Runtime/PublisherConfig.cs
+++ b/src/Azure.IIoT.OpcUa.Publisher/src/Runtime/PublisherConfig.cs
@@ -70,6 +70,8 @@ public sealed class PublisherConfig : PostConfigureOptionBase
public const string RenewTlsCertificateOnStartupKey = "RenewTlsCertificateOnStartup";
public const string DefaultTransportKey = "DefaultTransport";
public const string DefaultQualityOfServiceKey = "DefaultQualityOfService";
+ public const string DefaultMessageTimeToLiveKey = "DefaultMessageTimeToLive";
+ public const string DefaultMessageRetentionKey = "DefaultMessageRetention";
public const string DefaultDataSetRoutingKey = "DefaultDataSetRouting";
public const string ApiKeyOverrideKey = "ApiKey";
public const string PublishMessageSchemaKey = "PublishMessageSchema";
@@ -338,6 +340,15 @@ public override void PostConfigure(string? name, PublisherOptions options)
options.DefaultQualityOfService = qos;
}
+ if (options.DefaultMessageTimeToLive == null)
+ {
+ var ttl = GetIntOrNull(DefaultMessageTimeToLiveKey);
+ options.DefaultMessageTimeToLive = ttl.HasValue ?
+ TimeSpan.FromMilliseconds(ttl.Value) : GetDurationOrNull(
+ DefaultMessageTimeToLiveKey);
+ }
+ options.DefaultMessageRetention = GetBoolOrNull(DefaultMessageRetentionKey);
+
if (options.MessageTimestamp == null)
{
if (!Enum.TryParse(GetStringOrDefault(MessageTimestampKey),
diff --git a/src/Azure.IIoT.OpcUa.Publisher/src/Runtime/PublisherOptions.cs b/src/Azure.IIoT.OpcUa.Publisher/src/Runtime/PublisherOptions.cs
index 7c0a230bc8..20ba1a3d18 100644
--- a/src/Azure.IIoT.OpcUa.Publisher/src/Runtime/PublisherOptions.cs
+++ b/src/Azure.IIoT.OpcUa.Publisher/src/Runtime/PublisherOptions.cs
@@ -157,6 +157,16 @@ public sealed class PublisherOptions
///
public QoS? DefaultQualityOfService { get; set; }
+ ///
+ /// Default message time to live
+ ///
+ public TimeSpan? DefaultMessageTimeToLive { get; set; }
+
+ ///
+ /// Default whether to set message retain flag
+ ///
+ public bool? DefaultMessageRetention { get; set; }
+
///
/// Default Max data set messages per published network
/// message.
diff --git a/src/Azure.IIoT.OpcUa.Publisher/src/Services/NetworkMessageEncoder.cs b/src/Azure.IIoT.OpcUa.Publisher/src/Services/NetworkMessageEncoder.cs
index 9c0b3d7d87..dc1791fbc0 100644
--- a/src/Azure.IIoT.OpcUa.Publisher/src/Services/NetworkMessageEncoder.cs
+++ b/src/Azure.IIoT.OpcUa.Publisher/src/Services/NetworkMessageEncoder.cs
@@ -122,16 +122,16 @@ public void Dispose()
.SetTimestamp(_timeProvider.GetUtcNow().UtcDateTime) // TODO: move to offsets
.SetContentEncoding(m.networkMessage.ContentEncoding)
.SetContentType(m.networkMessage.ContentType)
- .SetTopic(m.topic)
- .SetRetain(m.retain)
- .SetQoS(m.qos)
+ .SetTopic(m.queue.QueueName!)
+ .SetRetain(m.queue.Retain ?? false)
+ .SetQoS(m.queue.RequestedDeliveryGuarantee ?? QoS.AtLeastOnce)
.AddBuffers(chunks)
;
- if (m.ttl != default)
+ if (m.queue.Ttl.HasValue)
{
chunkedMessage = chunkedMessage
- .SetTtl(m.ttl);
+ .SetTtl(m.queue.Ttl.Value);
}
if (m.schema != null)
@@ -200,16 +200,13 @@ public void Dispose()
///
///
///
- ///
- ///
- ///
- ///
+ ///
///
///
///
private record struct EncodedMessage(int notificationsPerMessage,
- PubSubMessage networkMessage, string topic, bool retain,
- TimeSpan ttl, QoS qos, Action onSent, IEventSchema? schema,
+ PubSubMessage networkMessage, PublishingQueueSettingsModel queue,
+ Action onSent, IEventSchema? schema,
IServiceMessageContext? encodingContext = null);
///
@@ -224,18 +221,23 @@ private List GetNetworkMessages(IEnumerable();
- static QoS GetQos(WriterGroupContext context, QoS? defaultQos)
+ static PublishingQueueSettingsModel GetQueue(WriterGroupContext context, PublisherOptions options)
{
- return context.Qos ?? defaultQos ?? QoS.AtLeastOnce;
+ return new PublishingQueueSettingsModel
+ {
+ RequestedDeliveryGuarantee = context.Qos,
+ Retain = context.Retain,
+ Ttl = context.Ttl,
+ QueueName = context.Topic
+ };
}
// Group messages by topic and qos, then writer group and then by dataset class id
foreach (var topics in messages
.Select(m => (Notification: m, Context: (m.Context as WriterGroupContext)!))
.Where(m => m.Context != null)
- .GroupBy(m => (m.Context!.Topic,
- GetQos(m.Context, _options.Value.DefaultQualityOfService))))
+ .GroupBy(m => GetQueue(m.Context, _options.Value)))
{
- var (topic, qos) = topics.Key;
+ var queue = topics.Key;
foreach (var publishers in topics.GroupBy(m => m.Context.PublisherId))
{
var publisherId = publishers.Key;
@@ -463,7 +465,7 @@ void AddMessage(BaseDataSetMessage dataSetMessage)
if (maxMessagesToPublish != null && currentMessage.Messages.Count >= maxMessagesToPublish)
{
result.Add(new EncodedMessage(currentNotifications.Count, currentMessage,
- topic, false, default, qos, () => currentNotifications.ForEach(n => n.Dispose()),
+ queue, () => currentNotifications.ForEach(n => n.Dispose()),
schema, Notification.ServiceMessageContext));
#if DEBUG
currentNotifications.ForEach(n => n.MarkProcessed());
@@ -479,7 +481,7 @@ void AddMessage(BaseDataSetMessage dataSetMessage)
{
// Start a new message but first emit current
result.Add(new EncodedMessage(currentNotifications.Count, currentMessage,
- topic, false, default, qos, () => currentNotifications.ForEach(n => n.Dispose()),
+ queue, () => currentNotifications.ForEach(n => n.Dispose()),
schema, Notification.ServiceMessageContext));
#if DEBUG
currentNotifications.ForEach(n => n.MarkProcessed());
@@ -494,8 +496,7 @@ void AddMessage(BaseDataSetMessage dataSetMessage)
Notification.MetaData, namespaceFormat, standardsCompliant,
out var metadataMessage))
{
- result.Add(new EncodedMessage(0, metadataMessage, topic, true,
- Context.Writer.MetaDataUpdateTime ?? default, qos, Notification.Dispose,
+ result.Add(new EncodedMessage(0, metadataMessage, queue, Notification.Dispose,
schema, Notification.ServiceMessageContext));
}
#if DEBUG
@@ -506,8 +507,8 @@ void AddMessage(BaseDataSetMessage dataSetMessage)
}
if (currentMessage?.Messages.Count > 0)
{
- result.Add(new EncodedMessage(currentNotifications.Count, currentMessage, topic, false,
- default, qos, () => currentNotifications.ForEach(n => n.Dispose()),
+ result.Add(new EncodedMessage(currentNotifications.Count, currentMessage, queue,
+ () => currentNotifications.ForEach(n => n.Dispose()),
schema, currentNotifications.LastOrDefault()?.ServiceMessageContext));
#if DEBUG
currentNotifications.ForEach(n => n.MarkProcessed());
diff --git a/src/Azure.IIoT.OpcUa.Publisher/src/Services/PublisherConfigurationService.cs b/src/Azure.IIoT.OpcUa.Publisher/src/Services/PublisherConfigurationService.cs
index 0cbc90fdd0..37ec6a11b3 100644
--- a/src/Azure.IIoT.OpcUa.Publisher/src/Services/PublisherConfigurationService.cs
+++ b/src/Azure.IIoT.OpcUa.Publisher/src/Services/PublisherConfigurationService.cs
@@ -14,7 +14,6 @@ namespace Azure.IIoT.OpcUa.Publisher.Services
using Furly.Exceptions;
using Furly.Extensions.Serializers;
using Microsoft.Extensions.Logging;
- using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Diagnostics;
diff --git a/src/Azure.IIoT.OpcUa.Publisher/src/Services/WriterGroupDataSource.cs b/src/Azure.IIoT.OpcUa.Publisher/src/Services/WriterGroupDataSource.cs
index 87e5786010..027be367f1 100644
--- a/src/Azure.IIoT.OpcUa.Publisher/src/Services/WriterGroupDataSource.cs
+++ b/src/Azure.IIoT.OpcUa.Publisher/src/Services/WriterGroupDataSource.cs
@@ -419,8 +419,16 @@ public DataSetWriterSubscription(WriterGroupDataSource outer,
}, _variables);
_topic = builder.TelemetryTopic;
+
_qos = _dataSetWriter.Publishing?.RequestedDeliveryGuarantee
- ?? _outer._writerGroup.Publishing?.RequestedDeliveryGuarantee;
+ ?? _outer._writerGroup.Publishing?.RequestedDeliveryGuarantee
+ ?? _outer._options.Value.DefaultQualityOfService;
+ _ttl = _dataSetWriter.Publishing?.Ttl
+ ?? _outer._writerGroup.Publishing?.Ttl
+ ?? _outer._options.Value.DefaultMessageTimeToLive;
+ _retain = _dataSetWriter.Publishing?.Retain
+ ?? _outer._writerGroup.Publishing?.Retain
+ ?? _outer._options.Value.DefaultMessageRetention;
_metadataTopic = builder.DataSetMetaDataTopic;
if (string.IsNullOrWhiteSpace(_metadataTopic))
@@ -431,7 +439,8 @@ public DataSetWriterSubscription(WriterGroupDataSource outer,
_contextSelector = _routing == DataSetRoutingMode.None
? n => n.Context
: n => n.PathFromRoot == null || n.Context != null ? n.Context : new TopicContext(
- _topic, n.PathFromRoot, _qos, _routing != DataSetRoutingMode.UseBrowseNames);
+ _topic, n.PathFromRoot, _qos, _retain, _ttl,
+ _routing != DataSetRoutingMode.UseBrowseNames);
TagList = new TagList(outer._metrics.TagList.ToArray().AsSpan())
{
@@ -818,7 +827,8 @@ private void CallMessageReceiverDelegates(IOpcUaSubscriptionNotification subscri
#pragma warning disable CA2000 // Dispose objects before losing scope
var metadata = new MetadataNotificationModel(notification, _outer._timeProvider)
{
- Context = CreateMessageContext(_metadataTopic, _qos,
+ Context = CreateMessageContext(_metadataTopic, QoS.AtLeastOnce, true,
+ _metadataTimer?.Interval ?? _dataSetWriter.MetaDataUpdateTime,
() => Interlocked.Increment(ref _metadataSequenceNumber))
};
#pragma warning restore CA2000 // Dispose objects before losing scope
@@ -830,7 +840,7 @@ private void CallMessageReceiverDelegates(IOpcUaSubscriptionNotification subscri
if (!metaDataTimer)
{
Debug.Assert(notification.Notifications != null);
- notification.Context = CreateMessageContext(_topic, _qos,
+ notification.Context = CreateMessageContext(_topic, _qos, _retain, _ttl,
() => Interlocked.Increment(ref _dataSetSequenceNumber), itemContext);
_outer._logger.LogTrace("Enqueuing notification: {Notification}",
notification.ToString());
@@ -844,8 +854,8 @@ private void CallMessageReceiverDelegates(IOpcUaSubscriptionNotification subscri
_outer._logger.LogWarning(ex, "Failed to produce message.");
}
- WriterGroupContext CreateMessageContext(string topic, QoS? qos, Func sequenceNumber,
- MonitoredItemContext? item = null)
+ WriterGroupContext CreateMessageContext(string topic, QoS? qos, bool? retain, TimeSpan? ttl,
+ Func sequenceNumber, MonitoredItemContext? item = null)
{
_outer.GetWriterGroup(out var writerGroup, out var networkMessageSchema);
return new WriterGroupContext
@@ -855,6 +865,8 @@ WriterGroupContext CreateMessageContext(string topic, QoS? qos, Func seque
NextWriterSequenceNumber = sequenceNumber,
WriterGroup = writerGroup,
Schema = networkMessageSchema,
+ Retain = item?.Retain ?? retain,
+ Ttl = item?.Ttl ?? ttl,
Topic = item?.Topic ?? topic,
Qos = item?.Qos ?? qos
};
@@ -960,6 +972,16 @@ private abstract class MonitoredItemContext
/// Topic for the message if not metadata message
///
public abstract QoS? Qos { get; }
+
+ ///
+ /// Time to live
+ ///
+ public abstract TimeSpan? Ttl { get; }
+
+ ///
+ /// Retain
+ ///
+ public abstract bool? Retain { get; }
}
///
@@ -971,6 +993,10 @@ private sealed class TopicContext : MonitoredItemContext
public override string Topic { get; }
///
public override QoS? Qos { get; }
+ ///
+ public override TimeSpan? Ttl { get; }
+ ///
+ public override bool? Retain { get; }
///
/// Create
@@ -978,9 +1004,11 @@ private sealed class TopicContext : MonitoredItemContext
///
///
///
+ ///
+ ///
///
public TopicContext(string root, RelativePath subpath, QoS? qos,
- bool includeNamespaceIndex)
+ bool? retain, TimeSpan? ttl, bool includeNamespaceIndex)
{
var sb = new StringBuilder().Append(root);
foreach (var path in subpath.Elements)
@@ -993,6 +1021,8 @@ public TopicContext(string root, RelativePath subpath, QoS? qos,
sb.Append(TopicFilter.Escape(path.TargetName.Name));
}
Topic = sb.ToString();
+ Ttl = ttl;
+ Retain = retain;
Qos = qos;
}
@@ -1019,6 +1049,10 @@ private sealed class LazilyEvaluatedContext : MonitoredItemContext
public override string Topic => _topic.Value;
///
public override QoS? Qos => _settings.RequestedDeliveryGuarantee;
+ ///
+ public override TimeSpan? Ttl => _settings.Ttl;
+ ///
+ public override bool? Retain => _settings.Retain;
///
/// Create context
@@ -1064,6 +1098,8 @@ public override int GetHashCode()
private volatile uint _frameCount;
private readonly string _topic;
private readonly QoS? _qos;
+ private readonly TimeSpan? _ttl;
+ private readonly bool? _retain;
private readonly string _metadataTopic;
private readonly Dictionary _variables;
private readonly DataSetRoutingMode _routing;
diff --git a/src/Azure.IIoT.OpcUa.Publisher/src/Stack/Extensions/ContainerBuilderEx.cs b/src/Azure.IIoT.OpcUa.Publisher/src/Stack/Extensions/ContainerBuilderEx.cs
index 7d8a6c2851..5014215577 100644
--- a/src/Azure.IIoT.OpcUa.Publisher/src/Stack/Extensions/ContainerBuilderEx.cs
+++ b/src/Azure.IIoT.OpcUa.Publisher/src/Stack/Extensions/ContainerBuilderEx.cs
@@ -27,6 +27,8 @@ public static void AddOpcUaStack(this ContainerBuilder builder)
builder.RegisterType()
.AsImplementedInterfaces().SingleInstance().AutoActivate();
+ builder.RegisterType()
+ .AsImplementedInterfaces().SingleInstance().AutoActivate();
builder.RegisterType()
.AsImplementedInterfaces().SingleInstance();
builder.RegisterType()
diff --git a/src/Azure.IIoT.OpcUa.Publisher/src/Stack/Runtime/OpcUaClientConfig.cs b/src/Azure.IIoT.OpcUa.Publisher/src/Stack/Runtime/OpcUaClientConfig.cs
index 2a7c29b2e4..adadf10469 100644
--- a/src/Azure.IIoT.OpcUa.Publisher/src/Stack/Runtime/OpcUaClientConfig.cs
+++ b/src/Azure.IIoT.OpcUa.Publisher/src/Stack/Runtime/OpcUaClientConfig.cs
@@ -54,6 +54,7 @@ public sealed class OpcUaClientConfig : PostConfigureOptionBase
public bool? EnableOpcUaStackLogging { get; set; }
+ ///
+ /// Folder to write keysets to for later decryption
+ /// of wireshark traces.
+ ///
+ public string? OpcUaKeySetLogFolderName { get; set; }
+
///
/// Minimum number of publish requests to queue
/// at all times. Default is 2.
diff --git a/src/Azure.IIoT.OpcUa.Publisher/src/Stack/Services/OpcUaClient.cs b/src/Azure.IIoT.OpcUa.Publisher/src/Stack/Services/OpcUaClient.cs
index c9625a03c1..5cb5c2bd47 100644
--- a/src/Azure.IIoT.OpcUa.Publisher/src/Stack/Services/OpcUaClient.cs
+++ b/src/Azure.IIoT.OpcUa.Publisher/src/Stack/Services/OpcUaClient.cs
@@ -13,6 +13,7 @@ namespace Azure.IIoT.OpcUa.Publisher.Stack.Services
using Microsoft.Extensions.Logging;
using Nito.AsyncEx;
using Opc.Ua;
+ using Opc.Ua.Bindings;
using Opc.Ua.Client;
using System;
using System.Collections.Generic;
@@ -20,6 +21,7 @@ namespace Azure.IIoT.OpcUa.Publisher.Stack.Services
using System.Diagnostics.Metrics;
using System.Globalization;
using System.Linq;
+ using System.Net;
using System.Runtime.CompilerServices;
using System.Security.Cryptography.X509Certificates;
using System.Threading;
@@ -107,6 +109,11 @@ internal sealed partial class OpcUaClient : DefaultSessionFactory, IOpcUaClient,
///
internal OperationLimits? LimitOverrides { get; set; }
+ ///
+ /// Last diagnostic information on this client
+ ///
+ internal ConnectionDiagnosticModel LastDiagnostics => _lastDiagnostics;
+
///
/// No complex type loading ever
///
@@ -170,6 +177,7 @@ public int MinPublishRequestCount
///
///
///
+ ///
///
///
///
@@ -179,6 +187,7 @@ public OpcUaClient(ApplicationConfiguration configuration,
Meter meter, IMetricsContext metrics,
EventHandler? notifier,
ReverseConnectManager? reverseConnectManager,
+ Action diagnosticsCallback,
TimeSpan? maxReconnectPeriod = null, string? sessionName = null)
{
_timeProvider = timeProvider;
@@ -188,6 +197,12 @@ public OpcUaClient(ApplicationConfiguration configuration,
}
_connection = connection.Connection;
+ _diagnosticsCb = diagnosticsCallback;
+ _lastDiagnostics = new ConnectionDiagnosticModel
+ {
+ Connection = _connection,
+ TimeStamp = _timeProvider.GetUtcNow()
+ };
Debug.Assert(_connection.GetEndpointUrls().Any());
_reverseConnectManager = reverseConnectManager;
@@ -219,7 +234,7 @@ public OpcUaClient(ApplicationConfiguration configuration,
_cts = new CancellationTokenSource();
_channel = Channel.CreateUnbounded<(ConnectionEvent, object?)>();
_disconnectLock = _lock.WriterLock(_cts.Token);
- _traceModeTimer = _timeProvider.CreateTimer(_ => OnTraceModeExpired(),
+ _channelMonitor = _timeProvider.CreateTimer(_ => OnUpdateConnectionDiagnostics(),
null, Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan);
_sessionManager = ManageSessionStateMachineAsync(_cts.Token);
}
@@ -368,48 +383,6 @@ internal Task ResetAsync(CancellationToken ct)
return tcs.Task;
}
- ///
- /// Enable trace mode
- ///
- ///
- ///
- internal async Task SetTraceModeAsync(CancellationToken ct)
- {
- bool reset;
- lock (_lock)
- {
- reset = !_traceMode;
- _traceMode = true;
-
- _traceModeTimer.Change(TimeSpan.FromMinutes(1), TimeSpan.FromMinutes(1));
- }
-
- if (reset)
- {
- // Reset the client into trace mode
- await ResetAsync(ct).ConfigureAwait(false);
- }
- }
-
- ///
- /// Disable trace mode if necessary when watchdog expires
- ///
- private void OnTraceModeExpired()
- {
- bool reset;
- lock (_lock)
- {
- reset = _traceMode;
-
- _traceMode = false;
- _traceModeTimer.Change(Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan);
- }
- if (reset)
- {
- TriggerConnectionEvent(ConnectionEvent.Reset);
- }
- }
-
///
/// Close client
///
@@ -454,8 +427,6 @@ internal async ValueTask CloseAsync()
finally
{
_cts.Dispose();
-
- await _traceModeTimer.DisposeAsync().ConfigureAwait(false);
}
}
@@ -1053,6 +1024,12 @@ async ValueTask ApplySubscriptionAsync(IReadOnlyList subscri
_logger.LogWarning(sre, "{Client}: Failed to fetch namespace table...", this);
}
+ if (!DisableComplexTypeLoading && !session.IsTypeSystemLoaded)
+ {
+ // Ensure type system is loaded
+ await session.GetComplexTypeSystemAsync(ct).ConfigureAwait(false);
+ }
+
await Task.WhenAll(subscriptions.Concat(extra).Select(async subscription =>
{
try
@@ -1246,11 +1223,6 @@ private async ValueTask TryConnectAsync(CancellationToken ct)
//
var securityMode = _connection.Endpoint.SecurityMode ?? SecurityMode.NotNone;
var securityProfile = _connection.Endpoint.SecurityPolicy;
- if (_traceMode)
- {
- securityMode = SecurityMode.None;
- securityProfile = null;
- }
var endpointDescription = await SelectEndpointAsync(endpointUrl,
connection, securityMode, securityProfile).ConfigureAwait(false);
@@ -1377,7 +1349,7 @@ internal void Session_HandlePublishError(ISession session, PublishErrorEventArgs
"{Client}: Too many publish request error: Limiting number of requests to {Limit}...",
this, limit);
return;
- default:
+ default:
if (session.Connected)
{
_logger.LogInformation("{Client}: Publish error: {Error} (Actively handled: {Active})...",
@@ -1554,30 +1526,140 @@ private async ValueTask UpdateSessionAsync(ISession session)
}
var oldTable = _session?.NamespaceUris.ToArray();
- try
+ Debug.Assert(_reconnectingSession == null);
+ if (ReferenceEquals(_session, session))
{
- Debug.Assert(_reconnectingSession == null);
- if (ReferenceEquals(_session, session))
+ // Not a new session
+ NotifyConnectivityStateChange(EndpointConnectivityState.Ready);
+ UpdateNamespaceTableAndSessionDiagnostics(session, oldTable);
+ return false;
+ }
+
+ await CloseSessionAsync().ConfigureAwait(false);
+ _session = (OpcUaSession)session;
+
+ NotifyConnectivityStateChange(EndpointConnectivityState.Ready);
+ UpdateNamespaceTableAndSessionDiagnostics(_session, oldTable);
+ kSessions.Add(1, _metrics.TagList);
+ return true;
+
+ void UpdateNamespaceTableAndSessionDiagnostics(ISession session,
+ string[]? oldTable)
+ {
+ if (oldTable != null)
{
- // Not a new session
- NotifyConnectivityStateChange(EndpointConnectivityState.Ready);
- return false;
+ var newTable = session.NamespaceUris.ToArray();
+ LogNamespaceTableChanges(oldTable, newTable);
}
- await CloseSessionAsync().ConfigureAwait(false);
- _session = (OpcUaSession)session;
+ lock (_channelLock)
+ {
+ UpdateConnectionDiagnosticsFromSession(session);
+ }
+ }
+ }
- NotifyConnectivityStateChange(EndpointConnectivityState.Ready);
- kSessions.Add(1, _metrics.TagList);
- return true;
+ ///
+ /// Update diagnostic if the channel has changed
+ ///
+ private void OnUpdateConnectionDiagnostics()
+ {
+ if (_session != null)
+ {
+ lock (_channelLock)
+ {
+ UpdateConnectionDiagnosticsFromSession(_session);
+ }
}
- finally
+ }
+
+ ///
+ /// Update session diagnostics
+ ///
+ ///
+ private void UpdateConnectionDiagnosticsFromSession(ISession session)
+ {
+ var channel = session.TransportChannel;
+ var token = channel?.CurrentToken;
+
+ // Get effective ip address and port
+ var socket = (channel as UaSCUaBinaryTransportChannel)?.Socket;
+ var remoteIpAddress = socket?.RemoteEndpoint?.GetIPAddress();
+ var remotePort = socket?.RemoteEndpoint?.GetPort();
+ var localIpAddress = socket?.LocalEndpoint?.GetIPAddress();
+ var localPort = socket?.LocalEndpoint?.GetPort();
+ var now = _timeProvider.GetUtcNow();
+
+ var elapsed = now - _lastDiagnostics.TimeStamp;
+ var lastChannel = _lastDiagnostics.ChannelDiagnostics;
+ if (lastChannel != null &&
+ lastChannel.ChannelId == token?.ChannelId &&
+ lastChannel.TokenId == token?.TokenId &&
+ lastChannel.CreatedAt == token?.CreatedAt)
{
- if (oldTable != null && _session != null)
+ //
+ // Token has not yet been updated, let's retry later
+ // It is also assumed that the port/ip are still the same
+ //
+ var lifetime = TimeSpan.FromMilliseconds(token.Lifetime);
+ if (lifetime > elapsed)
{
- var newTable = _session.NamespaceUris.ToArray();
- LogNamespaceTableChanges(oldTable, newTable);
+ _channelMonitor.Change(lifetime - elapsed,
+ Timeout.InfiniteTimeSpan);
}
+ else
+ {
+ _channelMonitor.Change(TimeSpan.FromSeconds(1),
+ Timeout.InfiniteTimeSpan);
+ }
+ return;
+ }
+
+ _lastDiagnostics = new ConnectionDiagnosticModel
+ {
+ Connection = _connection,
+ TimeStamp = now,
+ RemoteIpAddress = remoteIpAddress?.ToString(),
+ RemotePort = remotePort == -1 ? null : remotePort,
+ LocalIpAddress = localIpAddress?.ToString(),
+ LocalPort = localPort == -1 ? null : localPort,
+
+ ChannelDiagnostics = token != null ? new ChannelDiagnosticModel
+ {
+ ChannelId = token.ChannelId,
+ TokenId = token.TokenId,
+ CreatedAt = token.CreatedAt,
+ Lifetime = TimeSpan.FromMilliseconds(token.Lifetime),
+ Client = ToChannelKey(token.ClientInitializationVector,
+ token.ClientEncryptingKey, token.ClientSigningKey),
+ Server = ToChannelKey(token.ServerInitializationVector,
+ token.ServerEncryptingKey, token.ServerSigningKey)
+ } : null
+ };
+
+ _diagnosticsCb(_lastDiagnostics);
+ _logger.LogDebug("{Client}: Diagnostics information updated.", this);
+
+ if (token != null)
+ {
+ // Monitor channel's token lifetime and update diagnostics
+ var lifetime = TimeSpan.FromMilliseconds(token.Lifetime);
+ _channelMonitor.Change(lifetime, Timeout.InfiniteTimeSpan);
+ }
+
+ static ChannelKeyModel? ToChannelKey(byte[]? iv, byte[]? key, byte[]? signingKey)
+ {
+ if (iv == null || key == null || signingKey == null ||
+ iv.Length == 0 || key.Length == 0 || signingKey.Length == 0)
+ {
+ return null;
+ }
+ return new ChannelKeyModel
+ {
+ Iv = iv,
+ Key = key,
+ SigLen = signingKey.Length
+ };
}
}
@@ -2032,7 +2114,7 @@ private void InitializeMetrics()
private int _publishTimeoutCounter;
private int _keepAliveCounter;
private int _namespaceTableChanges;
- private bool _traceMode;
+ private ConnectionDiagnosticModel _lastDiagnostics;
private readonly ReverseConnectManager? _reverseConnectManager;
private readonly AsyncReaderWriterLock _lock = new();
private readonly ApplicationConfiguration _configuration;
@@ -2043,14 +2125,16 @@ private void InitializeMetrics()
private readonly ConnectionModel _connection;
private readonly IMetricsContext _metrics;
private readonly ILogger _logger;
-#pragma warning disable CA2213 // Disposable fields should be disposed
- private readonly ITimer _traceModeTimer;
private readonly TimeProvider _timeProvider;
+ private readonly object _channelLock = new();
+#pragma warning disable CA2213 // Disposable fields should be disposed
+ private readonly ITimer _channelMonitor;
private readonly SessionReconnectHandler _reconnectHandler;
private readonly CancellationTokenSource _cts;
#pragma warning restore CA2213 // Disposable fields should be disposed
private readonly TimeSpan _maxReconnectPeriod;
private readonly Channel<(ConnectionEvent, object?)> _channel;
+ private readonly Action _diagnosticsCb;
private readonly EventHandler? _notifier;
private readonly Dictionary<(string, TimeSpan), Sampler> _samplers = new();
private readonly Dictionary<(string, TimeSpan), Browser> _browsers = new();
diff --git a/src/Azure.IIoT.OpcUa.Publisher/src/Stack/Services/OpcUaClientManager.cs b/src/Azure.IIoT.OpcUa.Publisher/src/Stack/Services/OpcUaClientManager.cs
index 857da4c6e7..16ea782061 100644
--- a/src/Azure.IIoT.OpcUa.Publisher/src/Stack/Services/OpcUaClientManager.cs
+++ b/src/Azure.IIoT.OpcUa.Publisher/src/Stack/Services/OpcUaClientManager.cs
@@ -13,6 +13,7 @@ namespace Azure.IIoT.OpcUa.Publisher.Stack.Services
using Furly.Extensions.Utils;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
+ using Nito.AsyncEx;
using Opc.Ua;
using Opc.Ua.Client;
using System;
@@ -141,15 +142,43 @@ public async Task TestConnectionAsync(
}
///
- public Task ResetAllClients(CancellationToken ct)
+ public Task ResetAllClientsAsync(CancellationToken ct)
{
return Task.WhenAll(_clients.Values.Select(c => c.ResetAsync(ct)).ToArray());
}
///
- public Task SetTraceModeAsync(CancellationToken ct)
+ public async IAsyncEnumerable GetConnectionDiagnosticAsync(
+ [EnumeratorCancellation] CancellationToken ct)
{
- return Task.WhenAll(_clients.Values.Select(c => c.SetTraceModeAsync(ct)).ToArray());
+ var queue = new AsyncProducerConsumerQueue();
+ _listeners.TryAdd(queue, true);
+ try
+ {
+ // Get all items from buffer
+ var set = new HashSet(
+ _clients.Values.Select(c => c.LastDiagnostics));
+ foreach (var item in set)
+ {
+ yield return item;
+ }
+
+ // Dequeue items we have not yet sent from current state from queue
+ // until cancelled
+ while (!ct.IsCancellationRequested)
+ {
+ // Get updates - handle fact that we have already sent the reference
+ var item = await queue.DequeueAsync(ct).ConfigureAwait(false);
+ if (!set.Contains(item))
+ {
+ yield return item;
+ }
+ }
+ }
+ finally
+ {
+ _listeners.TryRemove(queue, out _);
+ }
}
///
@@ -532,7 +561,7 @@ private OpcUaClient GetOrAddClient(ConnectionModel connection)
{
var client = new OpcUaClient(_configuration.Value, id, _serializer,
_loggerFactory, _timeProvider, _meter, _metrics, OnConnectionStateChange,
- reverseConnect ? _reverseConnectManager : null,
+ reverseConnect ? _reverseConnectManager : null, OnClientConnectionDiagnosticChange,
_options.Value.MaxReconnectDelayDuration)
{
OperationTimeout = _options.Value.Quotas.OperationTimeout == 0 ? null :
@@ -595,6 +624,18 @@ private OpcUaClient GetOrAddClient(ConnectionModel connection)
}
}
+ ///
+ /// Called by clients when their connection information changed
+ ///
+ ///
+ private void OnClientConnectionDiagnosticChange(ConnectionDiagnosticModel model)
+ {
+ foreach (var listener in _listeners.Keys)
+ {
+ listener.Enqueue(model);
+ }
+ }
+
///
/// Create metrics
///
@@ -615,6 +656,8 @@ private void InitializeMetrics()
private readonly IJsonSerializer _serializer;
private readonly ReverseConnectManager _reverseConnectManager;
private readonly Lazy _reverseConnectStartException;
+ private readonly ConcurrentDictionary<
+ AsyncProducerConsumerQueue, bool> _listeners = new();
private readonly ConcurrentDictionary _clients = new();
private readonly IMetricsContext _metrics;
private readonly Meter _meter = Diagnostics.NewMeter();
diff --git a/src/Azure.IIoT.OpcUa.Publisher/src/Stack/Services/OpcUaStackKeySetLogger.cs b/src/Azure.IIoT.OpcUa.Publisher/src/Stack/Services/OpcUaStackKeySetLogger.cs
new file mode 100644
index 0000000000..bf9881035a
--- /dev/null
+++ b/src/Azure.IIoT.OpcUa.Publisher/src/Stack/Services/OpcUaStackKeySetLogger.cs
@@ -0,0 +1,153 @@
+// ------------------------------------------------------------
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License (MIT). See License.txt in the repo root for license information.
+// ------------------------------------------------------------
+
+namespace Azure.IIoT.OpcUa.Publisher.Stack.Services
+{
+ using Azure.IIoT.OpcUa.Publisher.Models;
+ using Microsoft.Extensions.Options;
+ using System;
+ using System.Globalization;
+ using System.IO;
+ using System.Linq;
+ using System.Threading;
+ using System.Threading.Tasks;
+
+ ///
+ ///
+ /// Wireshark 4.3 now allows decryption of the UA binary protocol using a keyset log
+ /// (opc ua debug file). The file contains records in the following format:
+ ///
+ ///
+ /// client_iv_%channel_id%_%token_id%: %hex-string%
+ /// client_key_%channel_id%_%token_id%: %hex-string%
+ /// client_siglen_%channel_id%_%token_id%: 32
+ /// server_iv_%channel_id%_%token_id%: %hex-string%
+ /// server_key_%channel_id%_%token_id%: %hex-string%
+ /// server_siglen_%channel_id%_%token_id%: 16|24|32
+ /// ...
+ ///
+ ///
+ /// This class writes the file to disk if a file was configured
+ /// See: https://gitlab.com/wireshark/wireshark/-/blob/master/plugins/epan/opcua/opcua.c#L232
+ ///
+ ///
+ public sealed class OpcUaStackKeySetLogger : IDisposable
+ {
+ ///
+ /// Create logger
+ ///
+ ///
+ ///
+ public OpcUaStackKeySetLogger(IOptions options,
+ IClientDiagnostics diagnostics)
+ {
+ _diagnostics = diagnostics;
+ _cts = new CancellationTokenSource();
+ if (options.Value.OpcUaKeySetLogFolderName == null)
+ {
+ _task = Task.CompletedTask;
+ return;
+ }
+ _task = WriteDebugFileAsync(options.Value.OpcUaKeySetLogFolderName, _cts.Token);
+ }
+
+ ///
+ public void Dispose()
+ {
+ try
+ {
+ _cts.Cancel();
+ _task.GetAwaiter().GetResult();
+ }
+ catch
+ {
+ _cts.Dispose();
+ }
+ }
+
+ ///
+ /// Log debug file to disk
+ ///
+ ///
+ ///
+ ///
+ public async Task WriteDebugFileAsync(string folderName, CancellationToken ct)
+ {
+ await foreach (var change in _diagnostics.GetConnectionDiagnosticAsync(
+ ct).ConfigureAwait(false))
+ {
+ var entry = change.ChannelDiagnostics;
+ if (entry?.Client == null || entry?.Server == null || change.RemotePort == null)
+ {
+ // Not a valid entry, channel without keys
+ continue;
+ }
+
+ var id = change.Connection.CreateConsistentHash();
+ var path = Path.Combine(folderName, "port",
+ change.RemotePort.Value.ToString(CultureInfo.InvariantCulture),
+ "connection", id.ToString("X", CultureInfo.InvariantCulture));
+ if (!Directory.Exists(path))
+ {
+ Directory.CreateDirectory(path);
+ }
+ var logFileName = Path.Combine(path, "opcua_debug.log");
+ var log = File.AppendText(logFileName);
+ await using (var _ = log.ConfigureAwait(false))
+ {
+ await log.WriteAsync($"Timestamp={change.TimeStamp};")
+ .ConfigureAwait(false);
+ await log.WriteAsync($"Connection={change.Connection};")
+ .ConfigureAwait(false);
+ await log.WriteAsync($"LocalEP={change.LocalIpAddress}:{change.LocalPort};")
+ .ConfigureAwait(false);
+ await log.WriteAsync($"RemoteEP={change.RemoteIpAddress}:{change.RemotePort};")
+ .ConfigureAwait(false);
+ await log.WriteLineAsync($"ChannelId={entry.ChannelId};TokenId={entry.TokenId}")
+ .ConfigureAwait(false);
+
+ await log.FlushAsync(ct).ConfigureAwait(false);
+ }
+
+ var keysetsFileName = Path.Combine(path, "opcua_debug.txt");
+ var keysets = File.AppendText(keysetsFileName);
+ await using (var _ = keysets.ConfigureAwait(false))
+ {
+ await keysets.WriteAsync($"client_iv_{entry.ChannelId}_{entry.TokenId}: ")
+ .ConfigureAwait(false);
+ await keysets.WriteLineAsync(Convert.ToHexString(entry.Client.Iv.ToArray()))
+ .ConfigureAwait(false);
+ await keysets.WriteAsync($"client_key_{entry.ChannelId}_{entry.TokenId}: ")
+ .ConfigureAwait(false);
+ await keysets.WriteLineAsync(Convert.ToHexString(entry.Client.Key.ToArray()))
+ .ConfigureAwait(false);
+ await keysets.WriteAsync($"client_siglen_{entry.ChannelId}_{entry.TokenId}: ")
+ .ConfigureAwait(false);
+ await keysets.WriteLineAsync(entry.Client.SigLen.ToString(CultureInfo.InvariantCulture))
+ .ConfigureAwait(false);
+
+ await keysets.WriteAsync($"server_iv_{entry.ChannelId}_{entry.TokenId}: ")
+ .ConfigureAwait(false);
+ await keysets.WriteLineAsync(Convert.ToHexString(entry.Server.Iv.ToArray()))
+ .ConfigureAwait(false);
+ await keysets.WriteAsync($"server_key_{entry.ChannelId}_{entry.TokenId}: ")
+ .ConfigureAwait(false);
+ await keysets.WriteLineAsync(Convert.ToHexString(entry.Server.Key.ToArray()))
+ .ConfigureAwait(false);
+ await keysets.WriteAsync($"server_siglen_{entry.ChannelId}_{entry.TokenId}: ")
+ .ConfigureAwait(false);
+ await keysets.WriteLineAsync(entry.Server.SigLen.ToString(CultureInfo.InvariantCulture))
+ .ConfigureAwait(false);
+
+ await keysets.FlushAsync(ct).ConfigureAwait(false);
+ }
+ }
+ }
+
+ private readonly Task _task;
+ private readonly CancellationTokenSource _cts;
+ private readonly IClientDiagnostics _diagnostics;
+ }
+}
diff --git a/src/Azure.IIoT.OpcUa.Publisher/src/Storage/PublishedNodesConverter.cs b/src/Azure.IIoT.OpcUa.Publisher/src/Storage/PublishedNodesConverter.cs
index 8a28c24d70..1ffbbe407a 100644
--- a/src/Azure.IIoT.OpcUa.Publisher/src/Storage/PublishedNodesConverter.cs
+++ b/src/Azure.IIoT.OpcUa.Publisher/src/Storage/PublishedNodesConverter.cs
@@ -114,6 +114,8 @@ public IEnumerable ToPublishedNodes(uint version, Date
MessageEncoding = item.WriterGroup.MessageType,
WriterGroupTransport = item.WriterGroup.Transport,
WriterGroupQualityOfService = item.WriterGroup.Publishing?.RequestedDeliveryGuarantee,
+ WriterGroupMessageTtlTimepan = item.WriterGroup.Publishing?.Ttl,
+ WriterGroupMessageRetention = item.WriterGroup.Publishing?.Retain,
WriterGroupPartitions = item.WriterGroup.PublishQueuePartitions,
WriterGroupQueueName = item.WriterGroup.Publishing?.QueueName,
SendKeepAliveDataSetMessages = item.Writer.DataSet?.SendKeepAlive ?? false,
@@ -121,6 +123,8 @@ public IEnumerable ToPublishedNodes(uint version, Date
MetaDataUpdateTimeTimespan = item.Writer.MetaDataUpdateTime,
QueueName = item.Writer.Publishing?.QueueName,
QualityOfService = item.Writer.Publishing?.RequestedDeliveryGuarantee,
+ MessageTtlTimespan = item.Writer.Publishing?.Ttl,
+ MessageRetention = item.Writer.Publishing?.Retain,
MetaDataQueueName = item.Writer.MetaData?.QueueName,
MetaDataUpdateTime = null,
BatchTriggerIntervalTimespan = item.WriterGroup.PublishingInterval,
@@ -369,7 +373,9 @@ public IEnumerable ToWriterGroups(IEnumerable ToWriterGroups(IEnumerable w.OpcNodes!)
.Distinct(OpcNodeModelEx.Comparer)
.Batch(_maxNodesPerDataSet)
- // Future: batch in service so it is centralized
+ // Future: batch in service so it is centralized
))
.SelectMany(b => b.WriterBatches // Do we need to materialize here?
.DefaultIfEmpty(kDummyEntry.YieldReturn())
@@ -398,12 +404,16 @@ public IEnumerable ToWriterGroups(IEnumerable op
? null : new PublishingQueueSettingsModel
{
QueueName = node.Topic,
- RequestedDeliveryGuarantee = node.QualityOfService
+ RequestedDeliveryGuarantee = node.QualityOfService,
+ Retain = null,
+ Ttl = null
},
Triggering = skipTriggering || node.TriggeredNodes == null
? null : new PublishedDataSetTriggerModel
@@ -659,7 +671,9 @@ static PublishedEventItemsModel ToPublishedEventItems(IEnumerable
? null : new PublishingQueueSettingsModel
{
QueueName = node.Topic,
- RequestedDeliveryGuarantee = node.QualityOfService
+ RequestedDeliveryGuarantee = node.QualityOfService,
+ Retain = null,
+ Ttl = null
},
Triggering = skipTriggering || node.TriggeredNodes == null
? null : new PublishedDataSetTriggerModel
diff --git a/src/Azure.IIoT.OpcUa.Publisher/tests/Services/Encoder/NetworkMessage.cs b/src/Azure.IIoT.OpcUa.Publisher/tests/Services/Encoder/NetworkMessage.cs
index 21e9c843e8..7678fc3e30 100644
--- a/src/Azure.IIoT.OpcUa.Publisher/tests/Services/Encoder/NetworkMessage.cs
+++ b/src/Azure.IIoT.OpcUa.Publisher/tests/Services/Encoder/NetworkMessage.cs
@@ -108,7 +108,7 @@ public static IList GenerateSampleSubscriptionNot
uint numOfMessages, bool eventList = false,
MessageEncoding encoding = MessageEncoding.Json,
NetworkMessageContentFlags extraNetworkMessageMask = 0,
- bool isSampleMode = false)
+ bool isSampleMode = false, bool randomTopic = false)
{
var messages = new List();
const string publisherId = "Publisher";
@@ -234,13 +234,16 @@ public static IList GenerateSampleSubscriptionNot
}
}
+#pragma warning disable CA5394 // Do not use insecure randomness
var message = new SubscriptionNotificationModel(DateTimeOffset.UtcNow, new ServiceMessageContext())
{
Context = new WriterGroupContext
{
NextWriterSequenceNumber = () => i,
Qos = null,
- Topic = string.Empty,
+ Topic = randomTopic ? Guid.NewGuid().ToString() : string.Empty,
+ Retain = false,
+ Ttl = randomTopic ? TimeSpan.FromSeconds(Random.Shared.Next(60)) : null,
PublisherId = publisherId,
Schema = null,
Writer = writer,
@@ -254,6 +257,7 @@ public static IList GenerateSampleSubscriptionNot
EndpointUrl = "EndpointUrl" + suffix,
ApplicationUri = "ApplicationUri" + suffix
};
+#pragma warning restore CA5394 // Do not use insecure randomness
messages.Add(message);
}
diff --git a/src/Azure.IIoT.OpcUa.Publisher/tests/Services/Encoder/NetworkMessageEncoderJsonTests.cs b/src/Azure.IIoT.OpcUa.Publisher/tests/Services/Encoder/NetworkMessageEncoderJsonTests.cs
index 6439095043..69f08304e8 100644
--- a/src/Azure.IIoT.OpcUa.Publisher/tests/Services/Encoder/NetworkMessageEncoderJsonTests.cs
+++ b/src/Azure.IIoT.OpcUa.Publisher/tests/Services/Encoder/NetworkMessageEncoderJsonTests.cs
@@ -108,6 +108,26 @@ public void EncodeJsonTest(bool encodeBatchFlag)
Assert.Equal(20, encoder.AvgNotificationsPerMessage);
}
+ [Theory]
+ [InlineData(true)]
+ [InlineData(false)]
+ public void EncodeJsonWithRandomTopicTest(bool encodeBatchFlag)
+ {
+ const int maxMessageSize = 256 * 1024;
+ var messages = NetworkMessage.GenerateSampleSubscriptionNotifications(20, false, MessageEncoding.Json, randomTopic: true);
+
+ using var encoder = GetEncoder();
+ var networkMessages = encoder.Encode(NetworkMessage.Create, messages, maxMessageSize, encodeBatchFlag);
+
+ // Batch or no batch, each notification has its own topic, so every single one generates a message
+
+ Assert.Equal(20, networkMessages.Sum(m => ((NetworkMessage)m.Event).Buffers.Count));
+ Assert.Equal(20, encoder.NotificationsProcessedCount);
+ Assert.Equal(0, encoder.NotificationsDroppedCount);
+ Assert.Equal(20, encoder.MessagesProcessedCount);
+ Assert.Equal(1, encoder.AvgNotificationsPerMessage);
+ }
+
[Theory]
[InlineData(true)]
[InlineData(false)]
@@ -129,12 +149,14 @@ public void EncodeChunkTest(bool encodeBatchFlag)
Assert.Equal(1, Math.Round(encoder.AvgNotificationsPerMessage));
}
- [Fact]
- public void EncodeJsonSingleMessageTest()
+ [Theory]
+ [InlineData(true)]
+ [InlineData(false)]
+ public void EncodeJsonSingleMessageTest(bool randomTopic)
{
const int maxMessageSize = 256 * 1024;
var messages = NetworkMessage.GenerateSampleSubscriptionNotifications(20, false, MessageEncoding.Json,
- NetworkMessageContentFlags.SingleDataSetMessage);
+ NetworkMessageContentFlags.SingleDataSetMessage, randomTopic: randomTopic);
using var encoder = GetEncoder();
var networkMessages = encoder.Encode(NetworkMessage.Create, messages, maxMessageSize, false);
diff --git a/src/Azure.IIoT.OpcUa.Publisher/tests/Services/PublisherConfigServicesTests.cs b/src/Azure.IIoT.OpcUa.Publisher/tests/Services/PublisherConfigServicesTests.cs
index 9d9785579c..6ead41f8b1 100644
--- a/src/Azure.IIoT.OpcUa.Publisher/tests/Services/PublisherConfigServicesTests.cs
+++ b/src/Azure.IIoT.OpcUa.Publisher/tests/Services/PublisherConfigServicesTests.cs
@@ -11,7 +11,6 @@ namespace Azure.IIoT.OpcUa.Publisher.Tests.Services
using Azure.IIoT.OpcUa.Publisher.Models;
using Azure.IIoT.OpcUa.Publisher.Stack.Runtime;
using Azure.IIoT.OpcUa.Publisher.Storage;
- using Avro.Generic;
using FluentAssertions;
using Furly.Exceptions;
using Furly.Extensions.Serializers;
diff --git a/src/Azure.IIoT.OpcUa.Publisher/tests/Stack/OpcUaApplicationTests.cs b/src/Azure.IIoT.OpcUa.Publisher/tests/Stack/OpcUaApplicationTests.cs
index e653859e28..db8bc83e06 100644
--- a/src/Azure.IIoT.OpcUa.Publisher/tests/Stack/OpcUaApplicationTests.cs
+++ b/src/Azure.IIoT.OpcUa.Publisher/tests/Stack/OpcUaApplicationTests.cs
@@ -408,6 +408,7 @@ private static IContainer Build()
var containerBuilder = new ContainerBuilder();
containerBuilder.AddLogging();
containerBuilder.AddOpcUaStack();
+ containerBuilder.AddNewtonsoftJsonSerializer();
return containerBuilder.Build();
}
}
diff --git a/src/Azure.IIoT.OpcUa/src/Encoders/ConsoleWriter.cs b/src/Azure.IIoT.OpcUa/src/Encoders/ConsoleWriter.cs
index fd500c07df..0060d6d08a 100644
--- a/src/Azure.IIoT.OpcUa/src/Encoders/ConsoleWriter.cs
+++ b/src/Azure.IIoT.OpcUa/src/Encoders/ConsoleWriter.cs
@@ -8,7 +8,6 @@ namespace Azure.IIoT.OpcUa.Encoders
using Furly;
using Furly.Extensions.Messaging;
using Furly.Extensions.Storage;
- using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Buffers;
diff --git a/src/Azure.IIoT.OpcUa/src/Publisher/Extensions/PublishedNodesEntryModelEx.cs b/src/Azure.IIoT.OpcUa/src/Publisher/Extensions/PublishedNodesEntryModelEx.cs
index 552db23152..3d4a1c9322 100644
--- a/src/Azure.IIoT.OpcUa/src/Publisher/Extensions/PublishedNodesEntryModelEx.cs
+++ b/src/Azure.IIoT.OpcUa/src/Publisher/Extensions/PublishedNodesEntryModelEx.cs
@@ -62,6 +62,14 @@ public static string GetUniqueWriterGroupId(this PublishedNodesEntryModel model)
{
id.Append(model.WriterGroupPartitions.Value);
}
+ if (model.WriterGroupMessageTtlTimepan != null)
+ {
+ id.Append(model.WriterGroupMessageTtlTimepan.Value);
+ }
+ if (model.WriterGroupMessageRetention == true)
+ {
+ id.AppendLine();
+ }
return id.ToString().ToSha1Hash();
}
@@ -116,6 +124,14 @@ public static bool HasSameWriterGroup(this PublishedNodesEntryModel model,
{
return false;
}
+ if (model.WriterGroupMessageRetention != that.WriterGroupMessageRetention)
+ {
+ return false;
+ }
+ if (model.WriterGroupMessageTtlTimepan != that.WriterGroupMessageTtlTimepan)
+ {
+ return false;
+ }
return true;
}
@@ -337,6 +353,14 @@ public static string GetUniqueDataSetWriterId(this PublishedNodesEntryModel mode
{
id.Append(model.DataSetFetchDisplayNames.Value);
}
+ if (model.MessageTtlTimespan != null)
+ {
+ id.Append(model.MessageTtlTimespan.Value);
+ }
+ if (model.MessageRetention == true)
+ {
+ id.AppendLine();
+ }
Debug.Assert(id.Length != 0); // Should always have an endpoint mixed in
return id.ToString().ToSha1Hash();
}
@@ -492,6 +516,14 @@ public static bool HasSameDataSet(this PublishedNodesEntryModel model,
{
return false;
}
+ if (model.MessageRetention != that.MessageRetention)
+ {
+ return false;
+ }
+ if (model.MessageTtlTimespan != that.MessageTtlTimespan)
+ {
+ return false;
+ }
return true;
}