Skip to content

Commit

Permalink
Optimize status code lookup and logging, increase timeouts of cloud t…
Browse files Browse the repository at this point in the history
…o edge clients. (#2318)

* Increase timeouts and refine exception handling
* Refactor status code handling and update shutdown logic
* Ensuring subscriptions are deleted on shutdown.
* Log publish errors only for the active session and return early for disconnected sessions.
  • Loading branch information
marcschier authored Aug 9, 2024
1 parent b9ef030 commit 36cf2e0
Show file tree
Hide file tree
Showing 27 changed files with 157 additions and 128 deletions.
15 changes: 3 additions & 12 deletions docs/opc-publisher/commandline.md
Original file line number Diff line number Diff line change
Expand Up @@ -703,9 +703,9 @@ Subscription settings
generating model change notifications.
Default: `12:00:00`.
--sqp, --sequentialpublishing, --EnableSequentialPublishing[=VALUE]
(Experimental) Explicitly disable or enable
sequential publishing in the protocol stack.
Default: `False` (disabled).
Set to false to disable sequential publishing in
the protocol stack.
Default: `True` (enabled).
--urc, --usereverseconnect, --DefaultUseReverseConnect[=VALUE]
(Experimental) Use reverse connect for all
endpoints that are part of the subscription
Expand Down Expand Up @@ -878,15 +878,6 @@ OPC UA Client configuration
want to ensure the complex types are never
loaded for an endpoint.
Default: `false`.
--peh, --activepublisherrorhandling, --ActivePublishErrorHandling[=VALUE]
Actively handle reconnecting a session when
publishing errors occur due to issues in the
underlying connectivity rather than letting the
stack and keep alive handling manage
reconnecting.
Note that the default will be `false` in future
releases.
Default: `True`.
--otl, --opctokenlifetime, --SecurityTokenLifetime=VALUE
OPC UA Stack Transport Secure Channel - Security
token lifetime in milliseconds.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ public record class WriterGroupDiagnosticModel
public long OutgressIoTMessageCount { get; set; }

/// <summary>
/// ConnectionRetries
/// Connection Retries
/// </summary>
[DataMember(Name = "ConnectionRetries", Order = 23,
EmitDefaultValue = true)]
Expand Down Expand Up @@ -469,5 +469,12 @@ public record class WriterGroupDiagnosticModel
[DataMember(Name = "ActiveConditionCount", Order = 69,
EmitDefaultValue = true)]
public long ActiveConditionCount { get; set; }

/// <summary>
/// ConnectionCount
/// </summary>
[DataMember(Name = "ConnectionCount", Order = 70,
EmitDefaultValue = true)]
public long ConnectionCount { get; set; }
}
}
2 changes: 1 addition & 1 deletion src/Azure.IIoT.OpcUa.Publisher.Module/cli/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public static void Main(string[] args)
{
switch (args[i])
{
case "--dumpprofiles":
case "--dump-profiles":
Console.WriteLine();
Console.WriteLine();
Console.WriteLine("The following messaging profiles are supported (selected with --mm and --me):");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ public CommandLine(string[] args, CommandLineLogger? logger = null)
$"(Experimental) The default time to wait until the address space model is browsed again when generating model change notifications.\nDefault: `{OpcUaSubscriptionConfig.DefaultRebrowsePeriodDefault}`.\n",
(TimeSpan t) => this[OpcUaSubscriptionConfig.DefaultRebrowsePeriodKey] = t.ToString() },
{ $"sqp|sequentialpublishing:|{OpcUaSubscriptionConfig.EnableSequentialPublishingKey}:",
$"(Experimental) Explicitly disable or enable sequential publishing in the protocol stack.\nDefault: `{OpcUaSubscriptionConfig.EnableSequentialPublishingDefault}` (disabled).\n",
$"Set to false to disable sequential publishing in the protocol stack.\nDefault: `{OpcUaSubscriptionConfig.EnableSequentialPublishingDefault}` (enabled).\n",
(bool? b) => this[OpcUaSubscriptionConfig.EnableSequentialPublishingKey] = b?.ToString() ?? "True" },
{ $"urc|usereverseconnect:|{OpcUaSubscriptionConfig.DefaultUseReverseConnectKey}:",
"(Experimental) Use reverse connect for all endpoints that are part of the subscription configuration unless otherwise configured.\nDefault: `false`.\n",
Expand Down Expand Up @@ -440,9 +440,6 @@ public CommandLine(string[] args, CommandLineLogger? logger = null)
{ $"dcp|disablecomplextypepreloading:|{OpcUaClientConfig.DisableComplexTypePreloadingKey}:",
"Complex types (structures, enumerations) a server exposes are preloaded from the server after the session is connected. In some cases this can cause problems either on the client or server itself. Use this setting to disable pre-loading support.\nNote that since the complex type system is used for meta data messages it will still be loaded at the time the subscription is created, therefore also disable meta data support if you want to ensure the complex types are never loaded for an endpoint.\nDefault: `false`.\n",
(bool? b) => this[OpcUaClientConfig.DisableComplexTypePreloadingKey] = b?.ToString() ?? "True" },
{ $"peh|activepublisherrorhandling:|{OpcUaClientConfig.ActivePublishErrorHandlingKey}:",
$"Actively handle reconnecting a session when publishing errors occur due to issues in the underlying connectivity instead of letting the stack and keep alive handling manage reconnecting.\nNote that the default was `true` in previous releases. If you unexpectedly encounter issues with non-publishing subscriptions, enable this option.\nDefault: `{OpcUaClientConfig.ActivePublishErrorHandlingDefault}`.\n",
(bool? b) => this[OpcUaClientConfig.ActivePublishErrorHandlingKey] = b?.ToString() ?? "True" },

{ $"otl|opctokenlifetime=|{OpcUaClientConfig.SecurityTokenLifetimeKey}=",
"OPC UA Stack Transport Secure Channel - Security token lifetime in milliseconds.\nDefault: `3600000` (1h).\n",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ private async IAsyncEnumerable<PublisherModel> EnumeratePublishersAsync(string?
private readonly IMethodClient _client;
private readonly IJsonSerializer _serializer;
private readonly ILogger _logger;
private static readonly TimeSpan kTimeout = TimeSpan.FromSeconds(10);
private static readonly TimeSpan kTimeout = TimeSpan.FromSeconds(60);
private readonly ActivitySource _activitySource = Diagnostics.NewActivitySource();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -858,7 +858,7 @@ private async ValueTask<EndpointRegistrationModel> GetEndpointAsync(string endpo
// Setting an expiration will cause entries in the cache to be evicted
// if they're not accessed within the expiration time allotment.
//
entry.SetSlidingExpiration(TimeSpan.FromSeconds(30));
entry.SetSlidingExpiration(TimeSpan.FromSeconds(60));
return ep.Registration;
}
catch
Expand All @@ -870,7 +870,7 @@ private async ValueTask<EndpointRegistrationModel> GetEndpointAsync(string endpo
return found!;
}

private static readonly TimeSpan kTimeout = TimeSpan.FromSeconds(10);
private static readonly TimeSpan kTimeout = TimeSpan.FromSeconds(60);
private readonly IEndpointRegistry _endpoints;
private readonly IMethodClient _client;
private readonly IJsonSerializer _serializer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public async ValueTask HandleAsync(string deviceId, string? moduleId, ReadOnlySe
DataType = type == BuiltInType.Null
? null : type.ToString(),
Status = (message.Value?.StatusCode.Code == StatusCodes.Good)
? null : StatusCode.LookupSymbolicId(message.Value?.StatusCode.Code ?? 0),
? null : (message.Value?.StatusCode).AsString(),
SourceTimestamp = (message.Value?.SourceTimestamp == DateTime.MinValue)
? null : message.Value?.SourceTimestamp,
SourcePicoseconds = (message.Value?.SourcePicoseconds == 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,7 @@ public async ValueTask HandleAsync(string deviceId, string? moduleId, ReadOnlySe
DataSetClassId = message.DataSetClassId.ToString(),
DataSetWriterId = dataSetMessage.DataSetWriterName,
SequenceNumber = dataSetMessage.SequenceNumber,
Status = dataSetMessage.Status == null ? null :
StatusCode.LookupSymbolicId(dataSetMessage.Status.Value.Code),
Status = dataSetMessage.Status.AsString(),
MetaDataVersion = $"{dataSetMessage.MetaDataVersion?.MajorVersion ?? 1}" +
$".{dataSetMessage.MetaDataVersion?.MinorVersion ?? 0}",
Timestamp = dataSetMessage.Timestamp,
Expand All @@ -83,7 +82,7 @@ public async ValueTask HandleAsync(string deviceId, string? moduleId, ReadOnlySe
DataType = type == BuiltInType.Null
? null : type.ToString(),
Status = (dataValue.StatusCode.Code == StatusCodes.Good)
? null : StatusCode.LookupSymbolicId(dataValue.StatusCode.Code),
? null : dataValue.StatusCode.AsString(),
SourceTimestamp = (dataValue.SourceTimestamp == DateTime.MinValue)
? null : dataValue.SourceTimestamp,
SourcePicoseconds = (dataValue.SourcePicoseconds == 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,7 @@ public async ValueTask HandleAsync(string deviceId, string? moduleId, ReadOnlySe
DataSetClassId = message.DataSetClassId.ToString(),
DataSetWriterId = dataSetMessage.DataSetWriterId.ToString(CultureInfo.InvariantCulture),
SequenceNumber = dataSetMessage.SequenceNumber,
Status = dataSetMessage.Status == null ? null :
StatusCode.LookupSymbolicId(dataSetMessage.Status.Value.Code),
Status = dataSetMessage.Status.AsString(),
MetaDataVersion = $"{dataSetMessage.MetaDataVersion?.MajorVersion ?? 1}" +
$".{dataSetMessage.MetaDataVersion?.MinorVersion ?? 0}",
Timestamp = dataSetMessage.Timestamp,
Expand All @@ -83,7 +82,7 @@ public async ValueTask HandleAsync(string deviceId, string? moduleId, ReadOnlySe
DataType = type == BuiltInType.Null
? null : type.ToString(),
Status = (dataValue.StatusCode.Code == StatusCodes.Good)
? null : StatusCode.LookupSymbolicId(dataValue.StatusCode.Code),
? null : dataValue.StatusCode.AsString(),
SourceTimestamp = (dataValue.SourceTimestamp == DateTime.MinValue)
? null : dataValue.SourceTimestamp,
SourcePicoseconds = (dataValue.SourcePicoseconds == 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1924,7 +1924,7 @@ await session.Services.BrowseNextAsync(header.ToRequestHeader(_timeProvider),
}
}
}
catch (Exception ex)
catch (Exception ex) when (ex is not OperationCanceledException)
{
_logger.LogInformation(ex, "Failed to obtain child information");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,10 @@ internal WriterGroupDiagnosticModel AggregateModel
MonitoredOpcNodesLateCount = MonitoredOpcNodesLateCount +
writers.Sum(w => w.MonitoredOpcNodesLateCount),
OpcEndpointConnected = NumberOfConnectedEndpoints != 0,
ConnectionCount = ConnectionCount +
writers.Sum(w => w.ConnectionCount),
ConnectionRetries = ConnectionRetries +
writers.Sum(w => w.ConnectionRetries),
PublishRequestsRatio = PublishRequestsRatio +
writers.Sum(w => w.PublishRequestsRatio),
BadPublishRequestsRatio = BadPublishRequestsRatio +
Expand Down Expand Up @@ -331,6 +335,8 @@ public WriterGroupDiagnosticModel Get(string dataSetWriterId, TimeProvider timeP
(d, i) => d.NumberOfDisconnectedEndpoints = (int)i,
["iiot_edge_publisher_connection_retries"] =
(d, i) => d.ConnectionRetries = (long)i,
["iiot_edge_publisher_connections"] =
(d, i) => d.ConnectionCount = (long)i,
["iiot_edge_publisher_subscriptions"] =
(d, i) => d.NumberOfSubscriptions = (long)i,
["iiot_edge_publisher_publish_requests_per_subscription"] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -623,8 +623,9 @@ static string Format(long changes, long lastMinute, double s)
.AppendFormat(CultureInfo.CurrentCulture, "{0,14:0}", info.NumberOfConnectedEndpoints).Append(" | ")
.AppendFormat(CultureInfo.CurrentCulture, "{0:0}", info.NumberOfDisconnectedEndpoints).Append(' ')
.AppendLine(connectivityState)
.Append(" # Connection retries : ")
.AppendFormat(CultureInfo.CurrentCulture, "{0,14:0}", info.ConnectionRetries)
.Append(" # Connections created/retries : ")
.AppendFormat(CultureInfo.CurrentCulture, "{0,14:0}", info.ConnectionCount).Append(" | ")
.AppendFormat(CultureInfo.CurrentCulture, "{0:0}", info.ConnectionRetries)
.AppendLine()
.Append(" # Subscriptions count : ")
.AppendFormat(CultureInfo.CurrentCulture, "{0,14:0}", info.NumberOfSubscriptions)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1126,6 +1126,9 @@ public override int GetHashCode()
private int ReconnectCount => UsedClients
.Sum(s => s.ReconnectCount);

private int ConnectCount => UsedClients
.Sum(s => s.ConnectCount);

private int ConnectedClients => UsedClients
.Count(s => s.State == EndpointConnectivityState.Ready);

Expand Down Expand Up @@ -1236,7 +1239,10 @@ private void InitializeMetrics()
description: "Number of Writers/Subscriptions in the writer group.");
_meter.CreateObservableUpDownCounter("iiot_edge_publisher_connection_retries",
() => new Measurement<long>(ReconnectCount, _metrics.TagList),
description: "OPC UA connect retries.");
description: "OPC UA total connect retries.");
_meter.CreateObservableUpDownCounter("iiot_edge_publisher_connections",
() => new Measurement<long>(ConnectCount, _metrics.TagList),
description: "OPC UA total connection success count.");
_meter.CreateObservableGauge("iiot_edge_publisher_is_connection_ok",
() => new Measurement<int>(ConnectedClients, _metrics.TagList),
description: "OPC UA endpoints that are successfully connected.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ public static ServiceResultModel ToServiceResultModel(this ServiceResult sr)
Locale = sr.LocalizedText?.Locale,
AdditionalInfo = sr.AdditionalInfo,
NamespaceUri = sr.NamespaceUri,
SymbolicId = sr.SymbolicId ??
StatusCode.LookupSymbolicId(sr.Code),
SymbolicId = sr.SymbolicId ?? sr.StatusCode.AsString(),
Inner = sr.InnerResult == null ||
sr.InnerResult.StatusCode == StatusCodes.Good ?
null : sr.InnerResult.ToServiceResultModel()
Expand Down Expand Up @@ -59,12 +58,12 @@ public static ServiceResultModel ToServiceResultModel(this Exception e)
default:
return Create(StatusCodes.Bad, e.Message);
}
static ServiceResultModel Create(uint code, string message) =>
static ServiceResultModel Create(StatusCode code, string message) =>
new ServiceResultModel
{
ErrorMessage = message,
SymbolicId = StatusCode.LookupSymbolicId(code),
StatusCode = code
SymbolicId = code.AsString(),
StatusCode = code.Code
};
}

Expand All @@ -83,7 +82,7 @@ public static ServiceResultModel CreateResultModel(this StatusCode statusCode,
// The last operation result is the one that caused the service to fail.
StatusCode = statusCode.Code,
SymbolicId = stringTable?.GetStringFromTable(diagnostics?.SymbolicId) ??
StatusCode.LookupSymbolicId(statusCode.Code),
statusCode.AsString(),
ErrorMessage = stringTable?.GetStringFromTable(diagnostics?.LocalizedText),
NamespaceUri = stringTable?.GetStringFromTable(diagnostics?.NamespaceUri),
Locale = stringTable?.GetStringFromTable(diagnostics?.Locale),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,15 @@ public interface IOpcUaClientDiagnostics
EndpointConnectivityState State { get; }

/// <summary>
/// Connection attempts
/// Total connection attempts
/// </summary>
int ReconnectCount { get; }

/// <summary>
/// Total successful connections
/// </summary>
int ConnectCount { get; }

/// <summary>
/// Current min publish request count
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ public sealed class OpcUaClientConfig : PostConfigureOptionBase<OpcUaClientOptio
public const string ReverseConnectPortKey = "ReverseConnectPort";
public const string DisableComplexTypePreloadingKey = "DisableComplexTypePreloading";
public const string PublishRequestsPerSubscriptionPercentKey = "PublishRequestsPerSubscriptionPercent";
public const string ActivePublishErrorHandlingKey = "ActivePublishErrorHandling";
public const string MinPublishRequestsKey = "MinPublishRequests";
public const string MaxPublishRequestsKey = "MaxPublishRequests";
public const string MaxNodesPerBrowseOverrideKey = "MaxNodesPerBrowseOverride";
Expand Down Expand Up @@ -114,7 +113,6 @@ public sealed class OpcUaClientConfig : PostConfigureOptionBase<OpcUaClientOptio
public const bool RejectSha1SignedCertificatesDefault = false;
public const bool AddAppCertToTrustedStoreDefault = true;
public const bool RejectUnknownRevocationStatusDefault = true;
public const bool ActivePublishErrorHandlingDefault = false;
public const int MinPublishRequestsDefault = 2;
public const int MaxPublishRequestsDefault = 10;
public const int PublishRequestsPerSubscriptionPercentDefault = 100;
Expand Down Expand Up @@ -275,9 +273,6 @@ public override void PostConfigure(string? name, OpcUaClientOptions options)
options.PublishRequestsPerSubscriptionPercent ??= GetIntOrNull(
PublishRequestsPerSubscriptionPercentKey);

options.ActivePublishErrorHandling ??= GetBoolOrDefault(
ActivePublishErrorHandlingKey, ActivePublishErrorHandlingDefault);

options.MaxNodesPerReadOverride ??= GetIntOrNull(MaxNodesPerReadOverrideKey);
options.MaxNodesPerBrowseOverride ??= GetIntOrNull(MaxNodesPerBrowseOverrideKey);

Expand Down
Loading

0 comments on commit 36cf2e0

Please sign in to comment.